前言 昨天看了一篇关于用几行代码实现RPC框架的博客http://javatar.iteye.com/blog/1123915 ,收获很大,于是我想在这篇博客的基础上理一理思路,尽可能的多加一点注释,进一步降低学习RPC框架原理的门槛。
原理图 先上一个原理图,读者可根据此图来帮助理解后续的代码。
代码 RpcFramework核心类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.ServerSocket; import java.net.Socket; public class RpcFramework { /** * 暴露服务 * * @param service 服务实现 * @param port 服务端口 * @throws Exception */ public static void export(final Object service, int port) throws Exception { System.out.println("Export service " + service.getClass().getName() + " on port " + port); ServerSocket server = new ServerSocket(port); //一直轮询,相比while(true),这种方式性能更佳 for(;;) { try { //此处阻塞一直等到有consumer请求过来 final Socket socket = server.accept(); //每来一个消费请求就开启一个新的线程 new Thread(() -> { try { try { ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { //consumer会分三次发送所需要的方法信息,这里的readUTF(),readObject()都会发生阻塞 String methodName = input.readUTF(); Class<?>[] parameterTypes = (Class<?>[])input.readObject(); Object[] arguments = (Object[])input.readObject(); ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { //获取到目标方法 Method method = service.getClass().getMethod(methodName, parameterTypes); //通过反射执行目标方法并返回结果 Object result = method.invoke(service, arguments); //将执行结果返回给consumer output.writeObject(result); } catch (Throwable t) { output.writeObject(t); } finally { output.close(); } } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { e.printStackTrace(); } }).start(); } catch (Exception e) { e.printStackTrace(); } } } /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { System.out.println("Get remote service " + interfaceClass.getName() + " from server " + host + ":" + port); //通过JDK动态代理的方式直接返回给调用refer方法的调用者一个被动态代理处理过的 对象 return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() { @Override //调用该对象的每个方法都会先去调用下面的逻辑 public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { //当方法真实被调用的时候才会发起RPC远程请求provider执行服务 Socket socket = new Socket(host, port); try { ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { //分三次发送方法所需要的信息 output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(arguments); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { //得到服务执行的最终结果 Object result = input.readObject(); if (result instanceof Throwable) { throw (Throwable) result; } return result; } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } }); } }
服务接口1 2 3 4 5 public interface HelloService { String hello(String name); }
服务接口实现
1 2 3 4 5 6 7 8 9 public class HelloServiceImpl implements HelloService { @Override public String hello(String name) { System.out.println("被调用了"); return "Hello" + name; } }
provider引导类
1 2 3 4 5 6 public class RpcProvider { public static void main(String[] args) throws Exception { HelloService service = new HelloServiceImpl(); RpcFramework.export(service, 1234); } }
consumer引导类
1 2 3 4 5 6 7 8 9 10 11 public class RpcConsumer { public static void main(String[] args) throws Exception { //此时获取到的service是被JDK动态代理包装后的service,在调用方法的时候会进行远程调用 HelloService service = RpcFramework.refer(HelloService.class, "127.0.0.1", 1234); for (int i = 0; i < Integer.MAX_VALUE; i ++) { String hello = service.hello("World" + i); System.out.println(hello); Thread.sleep(1000); } } }
下图是provider项目的类结构图
下图是consumer项目的类结构图
总结 简单总结了一下简易RPC框架,题主最近正在学习dubbo原理,故并没有延展讲太多分布式内容,希望随着学习的深入以后能写一篇关于分布式的文章,共勉!