简单RPC实现

本文代码请戳:简单RPC-Zeus

一、什么是RPC

假如一个我们有一个电子商城系统,电子商城系统因为业务的拓展水平拆分为不同的子系统,例如库存中心、订单中心、营销中心等等。不同的子系统构成分布在不同的集群上构成一个分布式系统,而这些子系统只能通过网络进行交互,而通过网络进行交互,不同系统之间进行服务的消费以及提供,就是RPC系统。

二、RPC的调用流程

业界的RPC框架非常多,比如阿里巴巴的Dubbo,FaceBook的Thirft等等。RPC的实现非常复杂,但是具体的流程可以简化如下:

  • 服务消费方Consumer以本地调用的方式进行服务调用。
  • 将需要传输的接口、方法、参数等等进行序列化成网络传输的消息体。
  • 发送具体的消息体到服务提供方Provider的地址。
  • 服务提供方进行反序列化,调用本地服务。
  • 本地服务执行结果进行序列化并返回。
  • 服务消费方接受到服务提供方提供的数据进行反序列化,得到结果。

三、如何进行透明远程服务调用

解决方式是通过动态代理,使用动态代理屏蔽网络处理的具体细节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* Created by buzheng on 18/3/23.
* 动态代理类
*/
public class ProxyHandler<T extends IRpcService> implements InvocationHandler {

private Class<? extends IRpcService> service;

private InetSocketAddress addr;

public ProxyHandler(Class<? extends IRpcService> service, InetSocketAddress addr) {
this.service = service;
this.addr = addr;
}

public <T> T getProxyInstance() {
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[]{service}, this);
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = null;
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
socket = new Socket();
socket.connect(addr);
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeUTF(service.getName());
outputStream.writeUTF(method.getName());
outputStream.writeObject(method.getParameterTypes());
outputStream.writeObject(args);
outputStream.flush();
inputStream = new ObjectInputStream(socket.getInputStream());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (socket != null) {
socket.close();
}

}
return inputStream.readObject();
}
}

四、如何进行服务发布

服务的发布与下线需要一个服务注册表,服务注册表能够跟服务提供方保持心跳,维护服务提供方的上下线状态。并将服务注册表的数据同步给服务消费方。而Zookeeper是一个比较适用的服务注册表的实现方式。当服务上线的时候,服务会生在ZK生成自己的服务目录路径,路径对服务的版本、服务名称、服务地址、服务端口进行创建。同时ZK提供心跳检测,它会定时给服务提供者发送一个请求,如果长期得不到响应,会任务服务提供者已经不存在,将其从服务注册表中剔除。

五、一个简单的RPC实现

1、服务注册中心

服务注册中心,提供服务的注册功能,将服务注册在Map中,当获取到请求的时候,可以将请求的接口转换为对应的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Created by buzheng on 18/3/23.
*/
public interface Server {

/**
* 服务端端口
*/
Integer PORT = 9999;

/**
* 服务端启动
*
* @throws IOException
*/
void start() throws IOException;

/**
* 停止服务端
*
* @throws IOException
*/
void stop() throws IOException;

/**
* 服务启动注册
*
* @param iRpcService
* @param IRpcServiceImpl
*/
void register(Class<? extends IRpcService> iRpcService, Class<? extends IRpcService> IRpcServiceImpl);
}

具体实现类如下:

将请求转换为一个Runnable,并交给线程池执行,避免请求过多的时候创建与销毁线程的开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* Created by buzheng on 18/3/23.
*/
public class ServiceCenter implements Server {

private boolean isEndLoop = false;

/**
* 服务线程池执行
*/
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 50, 200, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>());

/**
* 一个注册服务的缓存注册表 类似Spring的单例注册表
*/
public static final ConcurrentHashMap<String, Class<? extends IRpcService>> registerFactory = new ConcurrentHashMap<>();


@Override
public void start() throws IOException {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(PORT));
while (!isEndLoop) {
threadPoolExecutor.execute(new Task(serverSocket.accept()));
}
} finally {
if (serverSocket != null) {
serverSocket.close();
}
}


}

@Override
public void stop() throws IOException {
/**
* 执行完任务后再关闭容器
*/
threadPoolExecutor.shutdown();
}

@Override
public void register(Class<? extends IRpcService> iRpcService, Class<? extends IRpcService> IRpcServiceImpl) {
registerFactory.put(iRpcService.getName(), IRpcServiceImpl);
}
}
2、获取服务注册中心的对应接口,进行服务的反射调用并刷回请求的流中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* Created by buzheng on 18/3/23.
* 方法请求
*/
public class Task implements Runnable {

Socket client;

public Task(Socket client) {
this.client = client;
}

@Override
public void run() {
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
inputStream = new ObjectInputStream(client.getInputStream());
String serviceName = inputStream.readUTF();
String methodName = inputStream.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) inputStream.readObject();
Object[] arguments = (Object[]) inputStream.readObject();
Class<?> serviceProvider = ServiceCenter.registerFactory.get(serviceName);
Method method = serviceProvider.getMethod(methodName, parameterTypes);
Object result = method.invoke(serviceProvider.newInstance(), arguments);
outputStream = new ObjectOutputStream(client.getOutputStream());
outputStream.writeObject(result);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (client != null) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
3、进行本地透明代理调用远端服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* Created by buzheng on 18/3/23.
* 动态代理获取远端接口
*/
public class ZeusClient {
public static <T extends IRpcService> T getRemoteProxy(Class<? extends IRpcService> serviceInterface, InetSocketAddress addr) {
ProxyHandler proxyHandler = new ProxyHandler(serviceInterface, addr);
return (T) proxyHandler.getProxyInstance();
}
}

/**
* Created by buzheng on 18/3/23.
* 动态代理类
*/
public class ProxyHandler<T extends IRpcService> implements InvocationHandler {

private Class<? extends IRpcService> service;

private InetSocketAddress addr;

public ProxyHandler(Class<? extends IRpcService> service, InetSocketAddress addr) {
this.service = service;
this.addr = addr;
}

public <T> T getProxyInstance() {
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[]{service}, this);
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = null;
ObjectOutputStream outputStream = null;
ObjectInputStream inputStream = null;
try {
socket = new Socket();
socket.connect(addr);
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeUTF(service.getName());
outputStream.writeUTF(method.getName());
outputStream.writeObject(method.getParameterTypes());
outputStream.writeObject(args);
outputStream.flush();
inputStream = new ObjectInputStream(socket.getInputStream());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (socket != null) {
socket.close();
}

}
return inputStream.readObject();
}
}
4、编写对外服务提供接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Created by buzheng on 18/3/23.
* 对外暴露的接口
*/
public interface ZeusApi extends IRpcService {
/**
* 测试接口
*
* @return
*/
String zeus();
}


/**
* Created by buzheng on 18/3/23.
* 服务实现类
*/
public class ZeusApiImpl implements ZeusApi {

private static final long serialVersionUID = 10086L;

@Override
public String zeus() {
return "this is the zeus api impl";
}
}
5、测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void testApi2() throws Exception {
ZeusApi zeusApi = ZeusClient.getRemoteProxy(ZeusApi.class, new InetSocketAddress(InetAddress.getLocalHost(), 9999));

String s = zeusApi.zeus();
System.out.println(s);
}

@Test
public void testService1(String[] args) throws Exception {
ServiceCenter serviceCenter = new ServiceCenter();
serviceCenter.register(ZeusApi.class, ZeusApiImpl.class);
serviceCenter.start();
}
坚持原创技术分享,您的支持将鼓励我继续创作!