手写RPC框架
程序员文章站
2022-07-12 16:39:07
...
服务接口
public interface EchoService {
String echo(String ping);
}
服务端实现
public class EchoServiceImpl implements EchoService {
@Override
public String echo(String ping) {
return ping != null ? ping + " ---> Hello, thank you." : "Thank you very much.";
}
}
暴露服务
public class RpcExporter {
static Logger logger = Logger.global;
static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
public static void exporter(String hostName, Integer port) throws IOException {
ServerSocket server = new ServerSocket();
server.bind(new InetSocketAddress(hostName, port));
try {
while (true) {
executor.execute(new ExporterTask(server.accept()));
}
} catch (Throwable throwable) {
throwable.printStackTrace();
} finally {
try {
server.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static class ExporterTask implements Runnable {
Socket client = null;
public ExporterTask(Socket client) {
this.client = client;
}
private Class getImplClazz(Class clazz) {
Reflections reflections = new Reflections("com.maxiaoba.sample");
Set<Class> subTypesOf = reflections.getSubTypesOf(clazz);
return subTypesOf.iterator().next();
}
@Override
public void run() {
ObjectInputStream input = null;
ObjectOutputStream output = null;
try {
input = new ObjectInputStream(client.getInputStream());
String interfaceName = input.readUTF();
Class<?> service = getImplClazz(Class.forName(interfaceName));
String methodName = input.readUTF();
Class<?>[] parameterTypes = (Class<?>[]) input.readObject();
Object[] arguments = (Object[]) input.readObject();
Method method = service.getMethod(methodName, parameterTypes);
Object result = method.invoke(service.newInstance(), arguments);
output = new ObjectOutputStream(client.getOutputStream());
output.writeObject(result);
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (input != null) {
input.close();
}
} catch (IOException e) {
e.printStackTrace();
}
try {
if (output != null) {
output.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客户端
public class RpcImporter<S> {
public S importer(Class<?> serverClass, InetSocketAddress socketAddress) {
return (S) Proxy.newProxyInstance(serverClass.getClassLoader(), new Class<?>[]{ serverClass }, (proxy, method, args) -> {
Socket socket = null;
ObjectInputStream inputStream = null;
ObjectOutputStream outputStream = null;
try {
socket = new Socket();
socket.connect(socketAddress);
outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeUTF(serverClass.getName());
outputStream.writeUTF(method.getName());
outputStream.writeObject(method.getParameterTypes());
outputStream.writeObject(args);
inputStream = new ObjectInputStream(socket.getInputStream());
return inputStream.readObject();
} finally {
if (inputStream != null) {
inputStream.close();
}
if (outputStream != null) {
outputStream.close();
}
}
});
}
}
启动服务端,调用服务端接口
public class Main {
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
RpcExporter.exporter("localhost", 8088);
} catch (Exception e) {
e.printStackTrace();
}
}
});
// 启动服务端
thread.start();
RpcImporter<EchoService> importer = new RpcImporter<>();
// 客户端只需要以来接口即可
EchoService echoService = importer.importer(EchoService.class, new InetSocketAddress("localhost", 8088));
String echo = echoService.echo("Are you ok");
thread.stop();
System.out.println(echo);
}
}
上一篇: 分布式部署
下一篇: Hadoop单节点(伪分布式)安装部署