博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RPC架构
阅读量:566 次
发布时间:2019-03-09

本文共 6533 字,大约阅读时间需要 21 分钟。

    RPC的全称是Remote Procedure Call,它是一种进程间通信方式。允许像调用本地服务一样调用远程服务,它的具体实现可以不同,如Spring的HTTP Invoker,Facebook的Thrift二进制私有协议通信。

它在80年代由Bruce Jay Nelson提出,它的定义如下:

  • 简单:RPC概念的语义十分清晰以及简单,这样建立分布式计算就更容易。
  • 高效:过程调用看起来十分简单,而且高效。
  • 通用:在单机计算中往往是算法和api,跨进程最重要的是通用的通信机制。

RPC原理 

核心技术点:

       1)远程服务提供者需要以某种形式提供服务调用的相关信息,包括但不限于接口定义、数据结构、或者中间态的服务定义文件,例如Thrift的IDL文件,WS-RPC的WSDL文件定义,甚至也可以是服务端接口的说明文档。服务调用者需要通过一定的途径获取远程服务的调用的相关信息,例如服务端接口定义jar包导入,获取服务端IDL文件等。

       2)远程代理对象:服务调用者调用的服务实际上是远程服务的本地代理,对于java语言,它的实现就是JDK动态代理,通过动态代理的拦截机制,将本地调用封装成远程服务调用。

       3)通信:RPC与具体的协议无关,例如Spring远程调用支持HTTP Invoker、RMI Invoke,MessagePack使用的是私有的二进制压缩协议。

       4)序列化:远程通信,需要将对象转换成二进制码流进行传输,不同的框架支持的数据类型,数据包大小,异常类型,以及性能都不同,不同的RPC框架应用场景不同,因此技术选择会存在很大差异,一些比较好的RPC框架,支持多种序列化方式,甚至支持用户自定义序列化框架(Hadoop Avro)。

实现

一个RPC框架最主要有三部分:

  1. 服务提供者:运行在服务端,负责提供接口的定义以及服务具体实现类。
  2. 服务发布者:运行在RPC服务端,负责将本地服务发布成远程服务,供其他消费者使用。
  3. 本地服务代理:运行在客户端,通过代理调用远程服务提供者,然后将结果进行封装返回给本地消费者。

首先定义个服务端接口:

public interface ZService {	 String echo(String message);}

实现类:

public class ZServiceImpl implements ZService{	@Override	public String echo(String message) {		// TODO Auto-generated method stub		return "打印:"+message;	}}

RPC服务发布者:

import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.Method;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; public class RpcPublisher {    //返回到虚拟机的最大可用的处理器数量    static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());     public static void publish(String hostName, int port) throws IOException {         ServerSocket server = new ServerSocket();        server.bind(new InetSocketAddress(hostName,port));        try {            while (true){                //server监听客户端请求连接,由线程池执行                executorService.execute(new SupportTask(server.accept()));            }        }finally {            server.close();        }     }     private static class SupportTask implements Runnable{        Socket socket = null;        public SupportTask(Socket socket){            this.socket = socket;        }         public void run() {            ObjectInputStream inputStream = null;            ObjectOutputStream outputStream = null;            try {                inputStream = new ObjectInputStream(socket.getInputStream());                //获取客户端发送的类名                String interfaceName = inputStream.readUTF();                Class
service = Class.forName(interfaceName); //获取客户端发送的方法名 String methodName = inputStream.readUTF(); //获取客户端发送的参数类型 Class
[] parameterTypes = (Class
[]) inputStream.readObject(); //获取参数 Object[] args = (Object[]) inputStream.readObject(); //invoke调用对应的方法 Method method = service.getMethod(methodName, parameterTypes); Object result = method.invoke(service.newInstance(), args); //返回执行结果给客户端 outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeObject(result); } catch (Exception e) { e.printStackTrace(); } finally { if(inputStream != null){ try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if(outputStream != null){ try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }}

服务发布者主要职责:

  1. 作为服务端,监听客户端的TCP连接,接收到新的客户端连接后,将其封装成Task,由线程池执行;
  2. 将客户端发送的码流反序列化成对象,反射调用服务实现者,获取执行结果;
  3. 将执行结果对象序列化,通过Socket发送给客户端;
  4. 远程服务执行完毕后,关闭Socket等连接资源,防止句柄泄露。
     

 RPC客户端本地服务代理:

import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.InetSocketAddress;import java.net.Socket;public class RpcImporter {	@SuppressWarnings("unchecked")	public S importer(final Class
serviceClass, final InetSocketAddress address) { return (S) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class
[] { serviceClass.getInterfaces()[0] }, new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectOutputStream outputStream = null; ObjectInputStream inputStream = null; try { socket = new Socket(); socket.connect(address); // 向远程发布者提供远程服务的信息 outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeUTF(serviceClass.getName()); outputStream.writeUTF(method.getName()); outputStream.writeObject(method.getParameterTypes()); outputStream.writeObject(args); // 接收远程服务的返回 inputStream = new ObjectInputStream(socket.getInputStream()); return inputStream.readObject(); } finally { if (socket != null) { socket.close(); } if (inputStream != null) { inputStream.close(); } if (outputStream != null) { outputStream.close(); } } } }); }}

本地服务代理的主要功能如下:

  1. 将本地的接口调用转换成JDK动态代理,在动态代理中实现接口的远程调用。
  2. 创建Socket客户端,根据指定地址连接远程服务提供者。
  3. 将远程服务调用所需要的接口类,方法名,参数列表等编码后发送给服务提供者。
  4. 同步阻塞等待服务端返回响应,获取响应结果后返回。
     

启动:

import java.io.IOException;import java.net.InetSocketAddress;public class Test {    public static void main(String[] args) {         //异步启动服务        new Thread(new Runnable() {            public void run() {                try {                    RpcPublisher.publish("localhost",9988);                } catch (IOException e) {                    e.printStackTrace();                }            }        }).start();        //创建rpc客户端代理        RpcImporter
importer = new RpcImporter
(); //构建rpc请求参数,发起rpc调用 ZService localhost = importer.importer(ZServiceImpl.class, new InetSocketAddress("localhost", 9999)); //将调用结果输出到控制台 String result = localhost.echo("远程服务调用信息……"); System.out.println(result); }}

 

 

参考《分布式服务框架原理与实践》

转载地址:http://drmsz.baihongyu.com/

你可能感兴趣的文章