欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop RPC Server端的简单实现

程序员文章站 2022-03-04 15:44:51
...

Server端的主要负责接收client端发送的请求并处理,最后返回处理结果给客户端。

Hadoop RPC的Server端采用了NIO技术,涉及到channel,selector等概念。Server类中主要有Listener,Connect,Call,Handler,Responder等类。

1、Listener类和Reader类

private class Listener extends Thread {
        private ServerSocketChannel acceptChannel = null;                         
        private Selector selector = null;
        private Reader[] readers = null;
        private int currentReader = 0;
        private InetSocketAddress address;

        public Listener() throws IOException {
            address = new InetSocketAddress(bindAddress, port);
            acceptChannel = ServerSocketChannel.open();
            acceptChannel.configureBlocking(false);
            bind(acceptChannel.socket(), address);
            selector = Selector.open();
            readers = new Reader[readThreads];
            for (int i = 0; i < readThreads; i++) {
                System.out.println(">>>start reader" + i + "......");
                Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port" + port);
                readers[i] = reader;
                reader.start();
            }
            System.out.println(">>>register listener selector on port" + port + "......");
            acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
            this.setName("IPC Server listener on:" + acceptChannel.socket().getLocalPort());
            this.setDaemon(true);
        }

        private class Reader extends Thread {
            private volatile boolean adding = false;
            private final Selector readSelector;

            Reader(String name) throws IOException {
                super(name);
                this.readSelector = Selector.open();
            }

            @Override
            public void run() {
                doRunLoop();
            }

            public synchronized void doRunLoop(){
                while (running){
                    SelectionKey key = null;
                    try {
                        readSelector.select();
                        while(adding){
                            this.wait(1000);
                        }

                        Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
                        while(iter.hasNext()){
                            key = iter.next();
                            iter.remove();
                            if(key.isValid() && key.isReadable()){
                                doRead(key);
                            }
                            key = null;
                        }
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }

            public void doRead(SelectionKey key){
                Connection c = (Connection)key.attachment();
                if(c == null){
                    return;
                }
                int count = 0;
                try {
                    System.out.println(">>>reader read and process " + this.toString() + "......");
                    count = c.readAndProcess();
                } catch (IOException e) {
                    e.printStackTrace();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                if(count < 0) {
                    closeConnection(c);
                    c = null;
                }
            }

            public void startAdd() {
                adding = true;
                readSelector.wakeup();
            }

            public synchronized void finishAdd() {
                adding = false;
                this.notify();
            }

            public synchronized SelectionKey registerChannel(SocketChannel channel) throws ClosedChannelException {
                System.out.println(">>>register reader on channel:"+ this.toString() + "......");
                return channel.register(readSelector, SelectionKey.OP_READ);
            }
        }

        @Override
        public void run() {
            while (running) {
                SelectionKey key = null;
                try {
                    getSelector().select();
                    Iterator<SelectionKey> iter = getSelector().selectedKeys().iterator();
                    while (iter.hasNext()) {
                        key = iter.next();
                        iter.remove();
                        if (key.isValid() && key.isAcceptable()) {
                            doAccept(key);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            synchronized (this) {
                try {
                    acceptChannel.close();
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                selector = null;
                acceptChannel = null;

                while(!connectionList.isEmpty()){
                    closeConnection(connectionList.remove(0));
                }

            }
        }

        void doAccept(SelectionKey key) throws IOException {
            Connection c = null;
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            SocketChannel channel;
            while ((channel = server.accept()) != null) {
                channel.configureBlocking(false);
                channel.socket().setTcpNoDelay(tcpNoDelay);
                Reader reader = getReader();
                reader.startAdd();
                System.out.println(">>>start add reader" + reader.toString() + "...");
                SelectionKey readKey = reader.registerChannel(channel);
                System.out.println(">>>create connection...");
                c = new Connection(readKey, channel);
                readKey.attach(c);
                synchronized (connectionList) {
                    connectionList.add(numConnections, c);
                    numConnections++;
                }
                reader.finishAdd();
            }
        }

        Reader getReader() {
            currentReader = (currentReader + 1) % readers.length;
            return readers[currentReader];
        }

        synchronized Selector getSelector() {
            return selector;
        }
    }

 

2、Connection类

public class Connection {
        private SocketChannel channel;
        private ByteBuffer dataLengthBuffer;
        private ByteBuffer data;
        private int dataLength;
        private LinkedList<Call> responseQueue;

        public Connection(SelectionKey key, SocketChannel channel) {
            this.channel = channel;
            this.dataLengthBuffer = ByteBuffer.allocate(4);
            this.data = null;
            this.responseQueue = new LinkedList<Call>();
        }

        public int readAndProcess() throws IOException, InterruptedException {
            int count = -1;

            if(dataLengthBuffer.remaining() > 0){
                System.out.println(">>>read the data length from the channel:" + channel.toString() + ".......");
                count = channelRead(channel, dataLengthBuffer);
                if(count < 0 || dataLengthBuffer.remaining() > 0){
                    return count;
                }
            }
            System.out.println(">>>read the data from the channel:" + channel.toString() + ".......");
            if(data == null){
                dataLengthBuffer.flip();
                dataLength = dataLengthBuffer.getInt();
                data = ByteBuffer.allocate(dataLength);
            }

            count = channelRead(channel, data);
            System.out.println(">>>finished reading the data from the channel and prepare to process the rpc.......");
            if(data.remaining() == 0){
                dataLengthBuffer.clear();
                data.flip();
                processOneRpc(data.array());
                data = null;
            }

            return count;
        }

        private void processOneRpc(byte[] buf) throws IOException, InterruptedException {
            final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf));
            int id = 0;
            Writable invocation = null;
            try {
                invocation = new Invocation(Client.class.getMethod("call", Writable.class, Client.ConnectionId.class), new Object[]{});
                id = dis.readInt();
                invocation.readFields(dis);
            } catch (NoSuchMethodException e) {
                e.printStackTrace();
            }
            System.out.println(">>> create the call according to the data: id#" + id + ":" + invocation.toString());
            Call call = new Call(id, invocation, this);
            callQueue.put(call);
        }

        public void close(){

        }

    }

 

3、Call类

public static class Call {
        private final int callId;             //标识调用的id,在客户端处理返回结果时用到
        private final Writable rpcRequest;    //封装请求
        private final Connection connection;   //连接中包含channel信息
        private ByteBuffer rpcResponse;         //返回结果

        public Call(int id, Writable param, Connection connection) {
            this.callId = id;
            this.rpcRequest = param;
            this.connection = connection;
        }

        public void setResponse(ByteBuffer response){
            this.rpcResponse = response;
        }
    }

 4、Handler类

private class Handler extends Thread{
        public Handler(int instanceNumber){
            this.setDaemon(true);
            this.setName("IPC Server handler " + instanceNumber + "on port" + port);
        }

        @Override
        public void run(){
            ByteArrayOutputStream buf = new ByteArrayOutputStream(10240);
            while(running){
                Writable value = null;
                try {
                    final Call call = callQueue.take();
                    System.out.println(">>>call the service on the server...");
                    value = call(call);
                    synchronized (call.connection.responseQueue){
                        System.out.println(">>>prepare to respond the call...");
                        setupResponse(buf, call, value);
                        responder.doRespond(call);

                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        }
    }

 5、Responder类

private class Responder extends Thread{
        private final Selector writeSelector;
        private int pending;

        final static int PURGE_INTERVAL = 900000; // 15mins

        Responder() throws IOException {
            this.setName("IPC Server Responder");
            this.setDaemon(true);
            writeSelector = Selector.open();
            pending = 0;
        }

        @Override
        public void run(){
            while(running){
                try {
                    waitPending();
                    writeSelector.select(PURGE_INTERVAL);
                    Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
                    while(iter.hasNext()){
                        SelectionKey key = iter.next();
                        iter.remove();
                        if(key.isValid() && key.isWritable()){
                            doAsyncWrite(key);
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            try {
                writeSelector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        void doRespond(Call call){
            synchronized (call.connection.responseQueue){
                call.connection.responseQueue.addLast(call);
                System.out.println(">>>only one response then directly respond the call......");
                if(call.connection.responseQueue.size() == 1){
                    processResponse(call.connection.responseQueue, true);
                }
            }
        }

        private void doAsyncWrite(SelectionKey key) throws IOException {
            Call call = (Call) key.attachment();
            if(call == null){
                return;
            }
            if(key.channel() != call.connection.channel){
                throw new IOException("doAsyncWrite: bad channel");
            }

            synchronized (call.connection.responseQueue){
                System.out.println(">>>doAsyncwrite...........");
                processResponse(call.connection.responseQueue, false);
            }
        }

        private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler){
            boolean done  = false;
            Call call = null;
            int numElements = 0;
            synchronized (responseQueue){
                if((numElements = responseQueue.size()) == 0){
                    return true;
                }

                call = responseQueue.removeFirst();
                SocketChannel channel = call.connection.channel;
                try {
                    int numBytes = channelWrite(channel, call.rpcResponse);
                    if(numBytes < 0){
                        return true;
                    }

                    if(!call.rpcResponse.hasRemaining()){
                        System.out.println(">>>data writing is finished.....");
                        call.rpcResponse = null;
                        if(numElements == 1){
                            done = true;
                        }else{
                            done = false;
                        }
                    }else{
                        System.out.println(">>>data writing is not finished and register writeselector on the channel.....");
                        call.connection.responseQueue.addFirst(call);
                        if(inHandler){
                            incPending();
                            try {
                                writeSelector.wakeup();
                                channel.register(writeSelector, SelectionKey.OP_WRITE, call);
                            }catch (ClosedChannelException e){
                                done = true;
                            }finally {
                                decPending();
                            }
                        }
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            return done;
        }

        private synchronized void incPending(){
            pending++;
        }

        private synchronized void decPending(){
            pending--;
            notify();
        }

        private synchronized void waitPending() throws InterruptedException {
            while(pending > 0){
                wait();
            }
        }
    }

 6、Server类的成员

volatile private boolean running = true;
    private String bindAddress;
    private int port;
    private BlockingDeque<Call> callQueue;
    private int handlerCount;
    private Handler[] handlers = null;
    private Responder responder = null;

    private List<Connection> connectionList =
            Collections.synchronizedList(new LinkedList<Connection>());

    private Listener listener = null;
    private int numConnections = 0;
    private int readThreads;
    private final boolean tcpNoDelay;
    private static int NIO_BUFFER_LIMIT = 8 * 1024;

 Server类的方法

protected Server(String bindAddress, int port, int numReader) throws IOException {
        this.tcpNoDelay = false;
        this.bindAddress = bindAddress;
        this.port = port;
        this.readThreads = numReader;
        this.callQueue = new LinkedBlockingDeque<Call>();
        listener = new Listener();
        responder = new Responder();
        handlerCount = 1;
    }

    public synchronized void start(){
        responder.start();
        listener.start();
        handlers = new Handler[handlerCount];
        for(int i = 0; i < handlerCount; i++){
            handlers[i] = new Handler(i);
            handlers[i].start();
        }

    }

    public synchronized void stop(){
        running = false;
        running = false;
        if (handlers != null) {
            for (int i = 0; i < handlerCount; i++) {
                if (handlers[i] != null) {
                    handlers[i].interrupt();
                }
            }
        }
        listener.interrupt();
        responder.interrupt();
        notifyAll();
    }

    public static void bind(ServerSocket socket, InetSocketAddress address) throws IOException {
        socket.bind(address);
        if (!socket.isBound()) {
            throw new BindException("could not find a free port...");
        }
    }

    private void closeConnection(Connection connection){
        synchronized (connectionList){
            if(connectionList.remove(connection))
                numConnections--;
        }
        connection.close();
    }

    private int channelRead(ReadableByteChannel channel, ByteBuffer buffer) throws IOException {
       int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
               channel.read(buffer) : channelIO(channel, null, buffer);
        return count;
    }

    private static int channelIO(ReadableByteChannel readCh,
                                WritableByteChannel writeCh,
                                ByteBuffer buf) throws IOException {
        int originalLimit = buf.limit();
        int initialRemaining = buf.remaining();
        int ret = 0;

        while(buf.remaining() > 0){
            try {
                int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
                buf.limit(buf.position() + ioSize);
                ret = (readCh == null) ? writeCh.write(buf) : readCh.read(buf);
            } finally {
                buf.limit(originalLimit);
            }
        }

        int nBytes = initialRemaining - buf.remaining();
        return (nBytes > 0) ? nBytes : ret;
    }

    private int channelWrite(SocketChannel channel, ByteBuffer rpcResponse) throws IOException {
        int count = (rpcResponse.remaining() <= NIO_BUFFER_LIMIT)?
                channel.write(rpcResponse):channelIO(null, channel, rpcResponse);
        return count;
    }

    private void setupResponse(ByteArrayOutputStream responseBuf, Call call, Writable rv){
        responseBuf.reset();
        DataOutputStream out = new DataOutputStream(responseBuf);
        try {
            final DataOutputBuffer buf = new DataOutputBuffer();
            rv.write(buf);
            byte[] data = buf.getData();
            int fullLength = buf.getLength();
//            out.writeInt(fullLength);
            out.writeInt(call.callId);
            out.write(data, 0, buf.getLength());
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println(">>>set response of the call#" + call.callId + "........");
        call.setResponse(ByteBuffer.wrap(responseBuf.toByteArray()));
    }

    public Writable call(Call call){
        return call.rpcRequest;
    }

 

相关标签: hadoop RPC Server