0%

Java通过RMI实现手写RPC框架

参考视频:https://www.bilibili.com/video/av30168877/?p=3
参考文章:https://blog.csdn.net/shan9liang/article/details/8995023

1.RPC与RMI

  • RMI(remote method invocation,面向对象的远程方法调用)
  • RPC(remote procedure call,远程过程调用)

RPC是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。在RPC中,当一个请求到达RPC服务器时,这个请求就包含了一个参数集和一个文本值,通常形成“classname.methodname”的形式。这就向RPC服务器表明,被请求的方法在为 “classname”的类中,名叫“methodname”。然后RPC服务器就去搜索与之相匹配的类和方法,并把它作为那种方法参数类型的输入。这里的参数类型是与RPC请求中的类型是匹配的。一旦匹配成功,这个方法就被调用了,其结果被编码后返回客户方。

Java RMI 指的是远程方法调用 (Remote Method Invocation)。它是一种机制,能够让在某个 Java 虚拟机上的对象调用另一个 Java 虚拟机中的对象上的方法。可以用此方法调用的任何对象必须实现该远程接口。

RPC与RMI区别于联系

  • RPC 跨语言,而 RMI只支持Java。
  • RMI 调用远程对象方法,允许方法返回 Java 对象以及基本数据类型,而RPC 不支持对象的概念,传送到 RPC 服务的消息由外部数据表示 (External Data Representation, XDR) 语言表示,这种语言抽象了字节序类和数据类型结构之间的差异。只有由 XDR 定义的数据类型才能被传递, 可以说 RMI 是面向对象方式的 Java RPC 。
  • 在方法调用上,RMI中,远程接口使每个远程方法都具有方法签名。如果一个方法在服务器上执行,但是没有相匹配的签名被添加到这个远程接口上,那么这个新方法就不能被RMI客户方所调用。

代码实现:

1.Server端方法接口与实现类
HelloService.java

1
2
3
4
5
package server;

public interface HelloService {
String sayHi(String name);//hi name
}

HelloServiceImpl.java

1
2
3
4
5
6
7
8
package server;

public class HelloServiceImpl implements HelloService {
@Override
public String sayHi(String name) {
return "Hi " + name;
}
}

2.远程调用的函数注册中心接口及其实现
RegisterServerCenter.java

1
2
3
4
5
6
7
8
9
10
11
12
package server;

public interface RegisterServerCenter {
//服务启动
void start();

//服务终止
void stop();

//服务注册
void register(Class service, Class serviceImpl);
}

RegisterServerCenterImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package server;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class RegisterServerCenterImpl implements RegisterServerCenter {

//以哈希表的形式存储注册的远程调用函数
private static Map<String, Class> serviceRegister = new HashMap<>();

//远程调用端口号
private static int port;

//创建一个定长线程池,可控制线程最大并发数
//java.lang.Runtime.availableProcessors() 方法返回到Java虚拟机的可用的处理器数量
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

//程序启动和关闭的标记
private static boolean isRunning = false;

public RegisterServerCenterImpl(int port) {
this.port = port;
}

@Override
public void start() {
ServerSocket serverSocket = null;
Socket socket;
try {
serverSocket = new ServerSocket();

//与ServerSocket与指定端口号绑定
serverSocket.bind(new InetSocketAddress(port));
} catch (IOException e) {
e.printStackTrace();
}
isRunning = true;
while (true) {
try {
System.out.println("---- start server ----");

//等待请求
socket = serverSocket.accept();

//启动线程完成请求
executorService.execute(new ServiceTask(socket));
} catch (IOException e) {
e.printStackTrace();
}
}


}

@Override
public void stop() {
isRunning = false;

//关闭线程池
executorService.shutdown();
}

@Override
public void register(Class service, Class serviceImpl) {

//将可远程调用注册到map中
serviceRegister.put(service.getName(), serviceImpl);
}

private static class ServiceTask implements Runnable {

private Socket socket;

ServiceTask(Socket socket) {
this.socket = socket;
}

@Override
public void run() {
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
//接收到请求
inputStream = new ObjectInputStream(socket.getInputStream());

//获取类名、方法名、参数类型、参数值
String serviceName = inputStream.readUTF();
String methodName = inputStream.readUTF();
Class[] paramType = (Class[]) inputStream.readObject();
Object[] args = (Object[]) inputStream.readObject();

//通过服务注册表,获取类、获取方法,执行方法获取结果
Class serviceClass = serviceRegister.get(serviceName);
Method method = serviceClass.getMethod(methodName, paramType);
Object result = method.invoke(serviceClass.newInstance(), args);

//返回结果
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(result);
} catch (IOException | ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) {
e.printStackTrace();
} finally {
try {
if (inputStream != null) {
inputStream.close();
}
if (outputStream != null) {
outputStream.close();
}
if (socket != null) {
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}

3.Client端代码
Client.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package client;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Objects;

public class Client {

/*
* a:类加载器 : 需要代理哪个类(例如HelloService接口),
* 就需要将HelloService的类加载器 传入第一个参数
* b:需要代理的对象,具备哪些方法 --接口
* 单继承,多实现 A implements B接口,c接口
* String str = new String();
* String[] str = new String[]{"aaa","bb","cc"} ;
*/
public static <T> T getRemoteProxyObj(Class service, InetSocketAddress inetSocketAddress) {
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[]{service}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = new Socket();
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
//与端口建立连接
socket.connect(inetSocketAddress);
outputStream = new ObjectOutputStream(socket.getOutputStream());

//按顺序将参数传给server端
outputStream.writeUTF(service.getName());
outputStream.writeUTF(method.getName());
outputStream.writeObject(method.getParameterTypes());
outputStream.writeObject(args);

//获取返回的结果
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
} finally {
Objects.requireNonNull(inputStream).close();
Objects.requireNonNull(outputStream).close();
socket.close();
}

}
});
}
}

4.服务端启动类
RPCServerTest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package test;

import server.HelloService;
import server.HelloServiceImpl;
import server.RegisterServerCenter;
import server.RegisterServerCenterImpl;

public class RPCServerTest {
public static void main(String[] args) {

//开启一个线程
new Thread(new Runnable() {
@Override
public void run() {
//服务中心
RegisterServerCenter server = new RegisterServerCenterImpl(9999);
//将HelloService接口及实现类 注册到 服务中心
server.register(HelloService.class, HelloServiceImpl.class);
server.start();
}
}).start();//start()
}
}

5.客户端启动类
ClientRPCTest.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package test;

import client.Client;
import server.HelloService;

import java.net.InetSocketAddress;

public class ClientRPCTest {
public static void main(String[] args) throws ClassNotFoundException {
//通过类反射机制类参数
HelloService service = Client.getRemoteProxyObj(Class.forName("server.HelloService"), new InetSocketAddress("127.0.0.1", 9999));
System.out.println((service.sayHi("zhangsan")));
}
}

此时启动两个启动类便可实现RPC远程方法调用