AnthonyZero's Bolg

基于netty撸一个mini RPC

思路

RPC(Remote Procedure Call),即远程过程调用,它是一种通过网络从远程计算机程序 上请求服务,而不需要了解底层网络实现的技术。常见的RPC 框架有: 源自阿里的Dubbo, Spring 旗下的Spring Cloud,Google 出品的grpc 等等。

minirpc.png

按照上面的步骤整理出自己的思路(大体实现流程):

  1. 服务端启动,加载Netty服务器,通过Spring将带有RPCServer注解的实现类注册到serviceMap中。
  2. 启动客户端,通过java的动态代理机制为我们UserService创建代理对象,在代理对象执行方法的时候实际上已经被我们定制的方法拦截。
  3. 在拦截的逻辑里面,我们在获取到调用的方法,类名,参数集合,参数类型集合后封装到一个JavaBean——Request中去,然后我们(Netty客户端)将Request序列化后传输给Netty服务端(这里也就是服务提供者)
  4. 传输数据之后,让客户端当前线程wait 等待Netty客户端收到响应
  5. Netty服务端收到数据之后,将数据反序列化为Request对象,通过Request对象到serviceMap找到对应实现进行反射调用,将执行结果封装成Response返回。
  6. 客户端拿到Response返回 唤醒当前线程,拿到最终结果。

具体代码实现

请求和响应Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Data
public class Request {   
private String requestId;   
private String className;   
private String methodName;   
private Class<?>[] parameterTypes;   
private Object[] parameters;
}

@Data
public class Response {   
private String requestId;   
private Throwable error;   
private Object result; //返回结果(客户端需要的结果)
}

客户端JDK代理

创建代理对象,封装Request对象,传输数据(内部同时启动Netty客户端)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public <T>T createProxy(Class<?> clazz){
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[] { clazz }, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request request = new Request();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameters(args);
request.setRequestId(UUID.randomUUID().toString());
request.setParameterTypes(method.getParameterTypes());
//启动客户端 发送数据
ClientHandler client = new ClientHandler(address, port);
Response response = client.send(request);
if (response.getError() != null){
throw response.getError();
}
else{
return response.getResult();
}
}
});
}

客户端Handler业务逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class ClientHandler extends SimpleChannelInboundHandler<Response> {

private Response response;
private Object obj = new Object();
private String address;
private int port;
public ClientHandler(String address, int port) {
this.address = address;
this.port = port;
}
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
this.response = response; //返回结果
synchronized (obj) {
obj.notify();
}
}
// 发送数据 等待client段收到数据 唤醒(代理类中调用此方法 拿到rpc的结果)
public Response send(Request request) throws InterruptedException {
ChannelFuture channelFuture = initClient();
channelFuture.channel().writeAndFlush(request);
synchronized (obj) {
obj.wait();
}
return this.response;
}
// 初始化客户端
public ChannelFuture initClient() {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new ResponseDecoder());
pipeline.addLast(new RequestEncoder());
pipeline.addLast(ClientHandler.this);
}
});
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress(address, port)).sync();
return channelFuture;
} catch (Exception e) {
e.printStackTrace();;
group.shutdownGracefully();
}
return null;
}
}

这里ResponseDecoder与RequestEncoder是对Response与Request进行的反序列化与序列化,采用的谷歌的Protostuff序列化框架实现。

注解扫描

Netty服务端在开启服务的时候扫描所有的service实现类,将其装进我们的serviceMap。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* RPC 请求注解(标注在服务实现类上)
*/
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface RPCService {
Class<?> value();
}

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(RPCService.class);
for (Map.Entry<String,Object> entry: beansWithAnnotation.entrySet()) {
String interfaceName = entry.getValue().getClass().getAnnotation(RPCService.class).value().getName();
serviceMap.put(interfaceName, entry.getValue()); //接口名 -》 接口实现类
}
System.out.println("代理类实现集合(用户服务端调用):" + serviceMap);
//启动netty 服务端
startServer(SysConstant.PORT);
}

服务端Handler处理逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ServerHandler extends SimpleChannelInboundHandler<Request> {

private Map<String,Object> serviceMap; //服务注册存放
public ServerHandler(Map<String, Object> serviceMap) {
this.serviceMap = serviceMap;
}

protected void channelRead0(ChannelHandlerContext channelHandlerContext, Request request) throws Exception {
Response response = new Response();
//调用请求类的请求方法执行并返回执行结果
Object invoke = null;
try {
Object requestBean = serviceMap.get(request.getClassName());
Class<?> requestClass = Class.forName(request.getClassName());
Method method = requestClass.getMethod(request.getMethodName(), request.getParameterTypes()); //获取执行方法
invoke = method.invoke(requestBean, request.getParameters()); //调用
response.setRequestId(request.getRequestId());
response.setResult(invoke); //结果
} catch (Exception e) {
response.setError(e.getCause());
response.setRequestId(request.getRequestId());
}
System.out.println(request + "->" + response);
channelHandlerContext.writeAndFlush(response);
}
}

结果演示

UserService接口的getUser方法实现

1
2
3
4
5
6
7
8
9
10
11
@RPCService(UserService.class)
public class UserServiceImpl implements UserService {
public User getUser(String userName) {
User user = new User();
user.setAge(24);
user.setUserName(userName);
user.setPassword("123456");
user.setSalary("15000");
return user;
}
}

启动NettyServerBootstrap(启动Netty服务端,加载Spring IOC)

1
2
3
4
public static void main(String[] args) {
System.out.println(">>>>> netty server 正在启动 <<<<<");
new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
}

启动NettyClientBootstrap (Netty客户端,发送请求)

1
2
3
4
5
6
7
8
9
10
11
12
public class NettyClientBootstrap {

public static void main(String[] args) throws InterruptedException {
RPCProxy proxy = new RPCProxy(SysConstant.HOST, SysConstant.PORT);
UserService userService = proxy.createProxy(UserService.class);

User user = userService.getUser("anthonyzero");
System.out.println("RPC返回结果:" + user);
}
}

控制台客户端输出:RPC返回结果:User(userName=anthonyzero, age=24, password=123456, salary=15000)

完整代码请移步到我的GitHub仓库中:Github