本文共 6533 字,大约阅读时间需要 21 分钟。
RPC的全称是Remote Procedure Call,它是一种进程间通信方式。允许像调用本地服务一样调用远程服务,它的具体实现可以不同,如Spring的HTTP Invoker,Facebook的Thrift二进制私有协议通信。
它在80年代由Bruce Jay Nelson提出,它的定义如下:
核心技术点:
1)远程服务提供者需要以某种形式提供服务调用的相关信息,包括但不限于接口定义、数据结构、或者中间态的服务定义文件,例如Thrift的IDL文件,WS-RPC的WSDL文件定义,甚至也可以是服务端接口的说明文档。服务调用者需要通过一定的途径获取远程服务的调用的相关信息,例如服务端接口定义jar包导入,获取服务端IDL文件等。
2)远程代理对象:服务调用者调用的服务实际上是远程服务的本地代理,对于java语言,它的实现就是JDK动态代理,通过动态代理的拦截机制,将本地调用封装成远程服务调用。
3)通信:RPC与具体的协议无关,例如Spring远程调用支持HTTP Invoker、RMI Invoke,MessagePack使用的是私有的二进制压缩协议。
4)序列化:远程通信,需要将对象转换成二进制码流进行传输,不同的框架支持的数据类型,数据包大小,异常类型,以及性能都不同,不同的RPC框架应用场景不同,因此技术选择会存在很大差异,一些比较好的RPC框架,支持多种序列化方式,甚至支持用户自定义序列化框架(Hadoop Avro)。
一个RPC框架最主要有三部分:
首先定义个服务端接口:
public interface ZService { String echo(String message);}
实现类:
public class ZServiceImpl implements ZService{ @Override public String echo(String message) { // TODO Auto-generated method stub return "打印:"+message; }}
RPC服务发布者:
import java.io.IOException;import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.Method;import java.net.InetSocketAddress;import java.net.ServerSocket;import java.net.Socket;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; public class RpcPublisher { //返回到虚拟机的最大可用的处理器数量 static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public static void publish(String hostName, int port) throws IOException { ServerSocket server = new ServerSocket(); server.bind(new InetSocketAddress(hostName,port)); try { while (true){ //server监听客户端请求连接,由线程池执行 executorService.execute(new SupportTask(server.accept())); } }finally { server.close(); } } private static class SupportTask implements Runnable{ Socket socket = null; public SupportTask(Socket socket){ this.socket = socket; } public void run() { ObjectInputStream inputStream = null; ObjectOutputStream outputStream = null; try { inputStream = new ObjectInputStream(socket.getInputStream()); //获取客户端发送的类名 String interfaceName = inputStream.readUTF(); Class service = Class.forName(interfaceName); //获取客户端发送的方法名 String methodName = inputStream.readUTF(); //获取客户端发送的参数类型 Class [] parameterTypes = (Class []) inputStream.readObject(); //获取参数 Object[] args = (Object[]) inputStream.readObject(); //invoke调用对应的方法 Method method = service.getMethod(methodName, parameterTypes); Object result = method.invoke(service.newInstance(), args); //返回执行结果给客户端 outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeObject(result); } catch (Exception e) { e.printStackTrace(); } finally { if(inputStream != null){ try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if(outputStream != null){ try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } } if (socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } }}
服务发布者主要职责:
RPC客户端本地服务代理:
import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.InetSocketAddress;import java.net.Socket;public class RpcImporter{ @SuppressWarnings("unchecked") public S importer(final Class serviceClass, final InetSocketAddress address) { return (S) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class [] { serviceClass.getInterfaces()[0] }, new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Socket socket = null; ObjectOutputStream outputStream = null; ObjectInputStream inputStream = null; try { socket = new Socket(); socket.connect(address); // 向远程发布者提供远程服务的信息 outputStream = new ObjectOutputStream(socket.getOutputStream()); outputStream.writeUTF(serviceClass.getName()); outputStream.writeUTF(method.getName()); outputStream.writeObject(method.getParameterTypes()); outputStream.writeObject(args); // 接收远程服务的返回 inputStream = new ObjectInputStream(socket.getInputStream()); return inputStream.readObject(); } finally { if (socket != null) { socket.close(); } if (inputStream != null) { inputStream.close(); } if (outputStream != null) { outputStream.close(); } } } }); }}
本地服务代理的主要功能如下:
启动:
import java.io.IOException;import java.net.InetSocketAddress;public class Test { public static void main(String[] args) { //异步启动服务 new Thread(new Runnable() { public void run() { try { RpcPublisher.publish("localhost",9988); } catch (IOException e) { e.printStackTrace(); } } }).start(); //创建rpc客户端代理 RpcImporterimporter = new RpcImporter (); //构建rpc请求参数,发起rpc调用 ZService localhost = importer.importer(ZServiceImpl.class, new InetSocketAddress("localhost", 9999)); //将调用结果输出到控制台 String result = localhost.echo("远程服务调用信息……"); System.out.println(result); }}
参考《分布式服务框架原理与实践》
转载地址:http://drmsz.baihongyu.com/