`

第六章:小朱笔记hadoop之源码分析-ipc分析 第三节:Server类分析

 
阅读更多

第六章:小朱笔记hadoop之源码分析-ipc分析

第三节:Server类分析

      启动Listener进程,如果收到需要建立连接的请求,将建立连接,然后在上面捕获读操作的命令。收到命令之后,将把解析客户端发过来信息的工作委派给Connection。Connection把信息封装到Call对象中,放入队列中,待Handler处理。启动指定数目的Handler线程,处理客户端对指定方法调用的请求,然后把结果返回给客户端。

(1)nio的reactor模式

 (a)一个线程来处理所有连接(使用一个Selector)
 (b)一组线程来读取已经建立连接的数据(多个Selector,这里的线程数一般和cpu的核数相当);
 (c)一个线程池(这个线程池大小可以根据业务需求进行设置)
 (d)一个线程处理所有的连接的数据的写操作(一个Selector)

 

(2)RPC Server主要流程

         RPC Server作为服务提供者由两个部分组成:接收Call调用和处理Call调用。

 接收Call调用负责接收来自RPC Client的调用请求,编码成Call对象后放入到Call队列中。这一过程由Listener线程完成。具体步骤:

(a)Listener线程监视RPC Client发送过来的数据。
(b)当有数据可以接收时,调用Connection的readAndProcess方法。
(c)Connection边接收边对数据进行处理,如果接收到一个完整的Call包,则构建一个Call对象PUSH到Call队列中,由Handler线程来处理Call队列中的所有Call。
(d)处理Call调用负责处理Call队列中的每个调用请求,由Handler线程完成:
(e)Handler线程监听Call队列,如果Call队列非空,按FIFO规则从Call队列取出Call。
(f)将Call交给RPC.Server处理。
(g)借助JDK提供的Method,完成对目标方法的调用,目标方法由具体的业务逻辑实现。
(h)返回响应。Server.Handler按照异步非阻塞的方式向RPC Client发送响应,如果有未发送出数据,则交由Server.Responder来完成。

 

 

 

 

(3)server类的结构

       

/**
 * An abstract IPC service. IPC calls take a single {@link Writable} as a
 * parameter, and return a {@link Writable} as their value. A service runs on a
 * port and is defined by a parameter class and a value class.
 * 
 * @see Client
 */
public abstract class Server {
    private final boolean authorize;
    private boolean isSecurityEnabled;

    /**
     * The first four bytes of Hadoop RPC connections
     */
    public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());

    // 1 : Introduce ping and server does not throw away RPCs
    // 3 : Introduce the protocol into the RPC connection header
    // 4 : Introduced SASL security layer
    public static final byte CURRENT_VERSION = 4;

    /**
     * How many calls/handler are allowed in the queue.
     */
    private static final int IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT = 100;
    private static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY = "ipc.server.handler.queue.size";

    /**
     * Initial and max size of response buffer
     */
    static int INITIAL_RESP_BUF_SIZE = 10240;
    static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY = "ipc.server.max.response.size";
    static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT = 1024 * 1024;

    public static final Log LOG = LogFactory.getLog(Server.class);
    private static final Log AUDITLOG = LogFactory.getLog("SecurityLogger." + Server.class.getName());
    private static final String AUTH_FAILED_FOR = "Auth failed for ";
    private static final String AUTH_SUCCESSFULL_FOR = "Auth successfull for ";

    private static final ThreadLocal<Server> SERVER = new ThreadLocal<Server>();

    private static final Map<String, Class<?>> PROTOCOL_CACHE = new ConcurrentHashMap<String, Class<?>>();

    private String bindAddress; // 服务端绑定的地址
    private int port; // port we listen on 服务端监听端口
    private int handlerCount; // number of handler threads处理线程的数量
    private int readThreads; // number of read threads
    private Class<? extends Writable> paramClass; // class of call
                                                  // parameters调用的参数的类,必须实现Writable序列化接口
    private int maxIdleTime; // the maximum idle time after 当一个客户端断开连接后的最大空闲时间
                             // which a client may be disconnected
    private int thresholdIdleConnections; // the number of idle
                                          // connections可维护的最大连接数量
                                          // after which we will start
                                          // cleaning up idle
                                          // connections
    int maxConnectionsToNuke; // the max number of
                              // connections to nuke
                              // during a cleanup

    protected RpcInstrumentation rpcMetrics; // 维护RPC统计数据

    private Configuration conf;
    private SecretManager<TokenIdentifier> secretManager;

    private int maxQueueSize;// 处理器Handler实例队列大小
    private final int maxRespSize;
    private int socketSendBufferSize; // Socket Buffer大小
    private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm

    volatile private boolean running = true; // true while server runs
    private BlockingQueue<Call> callQueue; // queued calls// 维护调用实例的队列

    private List<Connection> connectionList = Collections.synchronizedList(new LinkedList<Connection>());// 维护客户端连接的列表
    // maintain a list
    // of client connections
    private Listener listener = null;// // 监听Server Socket的线程,为处理器Handler线程创建任务
    private Responder responder = null;// // 响应客户端RPC调用的线程,向客户端调用发送响应信息
    private int numConnections = 0;
    private Handler[] handlers = null; // // 处理器Handler线程数组
     /**
     * This is set to Call object before Handler invokes an RPC and reset after
     * the call returns.
     */
    private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
........

}

 
       这里的Server类是个抽象类,唯一抽象的地方,就是
public abstract Writable call(Writable param, long receiveTime) throws IOException;由RPC.server来实现。

 


(a)Call
    用以存储客户端发来的请求,这个请求会放入一个BlockQueue中;

 

    /**
     * A call queued for handling.
     * 
     * 该类Server端使用队列维护的调用实体类
     * */
    private static class Call {
        private int id; // 客户端调用Call的ID
        private Writable param; // 客户端调用传递的参数
        private Connection connection; // 到客户端的连接实例
        private long timestamp; // 向客户端调用发送响应的时间戳
        private ByteBuffer response; // 向客户端调用响应的字节缓冲区
     .......
    }

 
(b)Listener
 监听类,用以监听客户端发来的请求。同时Listener下面还有一个静态类,Listener.Reader,当监听器监听到用户请求,便用让Reader读取用户请求。Listener主要负责Socket的监听以及Connection的建立,同时监控ClientSocket的数据可读事件,通知Connection进行processData,收到完成请求包以后,封装为一个Call对象(包含Connection对象,从网络流中读取的参数信息,调用方法信息),将其放入队列。

 

 

 

    /**
     * Listens on the socket. Creates jobs for the handler threads
     * 
     * 用来监听服务器Socket,并未Handler处理器线程创建处理任务
     * 
     * 在启动Listener线程时,服务端会一直等待客户端的连接,下面贴出Server.Listener类的run()方法:
     * */
    private class Listener extends Thread {

        private ServerSocketChannel acceptChannel = null; // the accept channel
        private Selector selector = null; // the selector that we use for the
                                          // server
        private Reader[] readers = null;
        private int currentReader = 0;
        private InetSocketAddress address; // the address we bind at
        private Random rand = new Random();
        private long lastCleanupRunTime = 0; // the last time when a cleanup
                                             // connec-
                                             // -tion (for idle connections) ran
        private long cleanupInterval = 10000; // the minimum interval between
                                              // two cleanup runs
        private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
        private ExecutorService readPool;

        public Listener() throws IOException {
            address = new InetSocketAddress(bindAddress, port);// 根据bindAddress和port创建一个Socket地址
            // Create a new server socket and set to non blocking mode
            acceptChannel = ServerSocketChannel.open(); // 创建一个Server
                                                        // Socket通道(ServerSocketChannel)
            acceptChannel.configureBlocking(false); // 设置Server Socket通道为非阻塞模式

            // Bind the server socket to the local host and port
            bind(acceptChannel.socket(), address, backlogLength);
            port = acceptChannel.socket().getLocalPort(); // Could be an
                                                          // ephemeral port //
                                                          // Socket绑定端口
            // create a selector;
            selector = Selector.open(); // 创建一个选择器(使用选择器,可以使得指定的通道多路复用)
            readers = new Reader[readThreads];
            readPool = Executors.newFixedThreadPool(readThreads);
            // 启动多个reader线程,为了防止请求多时服务端响应延时的问题
            for (int i = 0; i < readThreads; i++) {
                Selector readSelector = Selector.open();
                Reader reader = new Reader(readSelector);
                readers[i] = reader;
                readPool.execute(reader);
            }

            // Register accepts on the server socket with the selector.
            // / // 注册连接事件
            acceptChannel.register(selector, SelectionKey.OP_ACCEPT);// 向通道acceptChannel注册上述selector选择器,选择器的键为Server
                                                                     // Socket接受的操作集合
            this.setName("IPC Server listener on " + port); // 设置监听线程名称
            this.setDaemon(true); // 设置为后台线程
        }

 

 

 


(c)Responder
        响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。它不断地检查响应队列中是否有调用信息,如果有的话,就把调用的结果返回给客户端。

       

    // Sends responses of RPC back to clients.
    // 该线程类实现发送RPC响应到客户端
    // 通过线程执行可以看到,调用的相应数据的处理,是在服务器运行过程中处理的,而且分为两种情况:
    //
    // 1、一种情况是:如果某些调用超过了指定的时限而一直未被处理,这些调用被视为过期,服务器不会再为这些调用处理,而是直接清除掉;
    //
    // 2、另一种情况是:如果所选择的通道上,已经注册的调用是合法的,并且通道可写,会直接将调用的相应数据写入到通道,等待客户端读取。
    private class Responder extends Thread {
        private Selector writeSelector;
        private int pending; // connections waiting to register

        final static int PURGE_INTERVAL = 900000; // 15mins

        Responder() throws IOException {
            this.setName("IPC Server Responder");
            this.setDaemon(true);
            writeSelector = Selector.open(); // create a selector
            pending = 0;
        }

        @Override
        public void run() {
            LOG.info(getName() + ": starting");
            SERVER.set(Server.this);
            long lastPurgeTime = 0; // last check for old calls.

            while (running) {
                try {
                    waitPending(); // If a channel is being registered, wait.//
                                   // 等待一个通道中,接收到来的调用进行注册
                    writeSelector.select(PURGE_INTERVAL);// 设置超时时限
                    Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
                    while (iter.hasNext()) {// 迭代选择器writeSelector选择的key集合
                        SelectionKey key = iter.next();
                        iter.remove();
                        try {
                            if (key.isValid() && key.isWritable()) {
                                doAsyncWrite(key); // 执行异步写操作,向通道中写入调用执行的响应数据
                            }
                        } catch (IOException e) {
                            LOG.info(getName() + ": doAsyncWrite threw exception " + e);
                        }
                    }
                    long now = System.currentTimeMillis();
                    if (now < lastPurgeTime + PURGE_INTERVAL) {
                        continue;
                    }
                    lastPurgeTime = now;
                    //
                    // If there were some calls that have not been sent out for
                    // a
                    // long time, discard them.
                    //
                    LOG.debug("Checking for old call responses.");
                    ArrayList<Call> calls;

                    // get the list of channels from list of keys.
                    // 如果存在一些一直没有被发送出去的调用,这是时间限制为lastPurgeTime + PURGE_INTERVAL
                    // 则这些调用被视为过期调用,进行清除
                    synchronized (writeSelector.keys()) {
                        calls = new ArrayList<Call>(writeSelector.keys().size());
                        iter = writeSelector.keys().iterator();
                        while (iter.hasNext()) {
                            SelectionKey key = iter.next();
                            Call call = (Call) key.attachment();
                            if (call != null && key.channel() == call.connection.channel) {
                                calls.add(call);
                            }
                        }
                    }

                    for (Call call : calls) {
                        try {
                            doPurge(call, now);
                        } catch (IOException e) {
                            LOG.warn("Error in purging old calls " + e);
                        }
                    }
                } catch (OutOfMemoryError e) {
                    //
                    // we can run out of memory if we have too many threads
                    // log the event and sleep for a minute and give
                    // some thread(s) a chance to finish
                    //
                    LOG.warn("Out of Memory in server select", e);
                    try {
                        Thread.sleep(60000);
                    } catch (Exception ie) {
                    }
                } catch (Exception e) {
                    LOG.warn("Exception in Responder " + StringUtils.stringifyException(e));
                }
            }
            LOG.info("Stopping " + this.getName());
        }

        // 当某个通道上可写的时候,可以执行异步写响应数据的操作,实现方法为:
        private void doAsyncWrite(SelectionKey key) throws IOException {
            Call call = (Call) key.attachment();
            if (call == null) {
                return;
            }
            if (key.channel() != call.connection.channel) {
                throw new IOException("doAsyncWrite: bad channel");
            }

            synchronized (call.connection.responseQueue) {
                if (processResponse(call.connection.responseQueue, false)) {// 调用processResponse处理与调用关联的响应数据
                    try {
                        key.interestOps(0);
                    } catch (CancelledKeyException e) {
                        /*
                         * The Listener/reader might have closed the socket. We
                         * don't explicitly cancel the key, so not sure if this
                         * will ever fire. This warning could be removed.
                         */
                        LOG.warn("Exception while changing ops : " + e);
                    }
                }
            }
        }

        //
        // Remove calls that have been pending in the responseQueue
        // for a long time.
        //
        /**
         * 如果未被处理响应的调用在队列中滞留超过指定时限,要定时清除掉
         */
        private void doPurge(Call call, long now) throws IOException {
            LinkedList<Call> responseQueue = call.connection.responseQueue;
            synchronized (responseQueue) {
                Iterator<Call> iter = responseQueue.listIterator(0);
                while (iter.hasNext()) {
                    call = iter.next();
                    if (now > call.timestamp + PURGE_INTERVAL) {
                        closeConnection(call.connection);
                        break;
                    }
                }
            }
        }

        // Processes one response. Returns true if there are no more pending
        // data for this channel.
        //
        /**
         * 处理一个通道上调用的响应数据 如果一个通道空闲,返回true
         */
        private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException {
            boolean error = true;
            boolean done = false; // there is more data for this channel. //
                                  // 一个通道channel有更多的数据待读取
            int numElements = 0;
            Call call = null;
            try {
                synchronized (responseQueue) {
                    //
                    // If there are no items for this channel, then we are done
                    //
                    numElements = responseQueue.size();
                    if (numElements == 0) {
                        error = false;
                        return true; // no more data for this channel.
                    }
                    //
                    // Extract the first call
                    // // 从队列中取出第一个调用call
                    call = responseQueue.removeFirst();
                    SocketChannel channel = call.connection.channel; // 获取该调用对应的通道channel
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection);
                    }
                    //
                    // Send as much data as we can in the non-blocking fashion
                    //
                    int numBytes = channelWrite(channel, call.response);// 向通道channel中写入响应信息(响应信息位于call.response字节缓冲区中)
                    if (numBytes < 0) {// 如果写入字节数为0,说明已经没有字节可写,返回
                        return true;
                    }
                    if (!call.response.hasRemaining()) { // 如果call.response字节缓冲区中没有响应字节数据,说明已经全部写入到相关量的通道中
                        call.connection.decRpcCount();// 该调用call对应的RPC连接计数减1
                        if (numElements == 1) { // 最后一个调用已经处理完成
                            done = true; // 该通道channel没有更多的数据
                        } else {
                            done = false; // 否则,还存在尚未处理的调用,要向给通道发送数据
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection + " Wrote " + numBytes
                                    + " bytes.");
                        }
                    } else {
                        //
                        // If we were unable to write the entire response out,
                        // then
                        // insert in Selector queue.
                        // // 如果call.response字节缓冲区中还存在未被写入通道响应字节数据
                        // /; // 如果不能够将全部的响应字节数据写入到通道中,需要暂时插入到Selector选择其队列中
                        call.connection.responseQueue.addFirst(call);

                        if (inHandler) { // 如果指定:现在就对调用call进行处理(该调用的响应还没有进行处理)
                            call.timestamp = System.currentTimeMillis(); // 设置调用时间戳
                            incPending(); // 增加未被处理响应信息的调用计数
                            try {
                                // Wakeup the thread blocked on select, only
                                // then can the call
                                // to channel.register() complete.
                                writeSelector.wakeup();// 唤醒阻塞在该通道writeSelector上的线程
                                channel.register(writeSelector, SelectionKey.OP_WRITE, call);// 调用call注册通道writeSelector
                            } catch (ClosedChannelException e) {
                                // Its ok. channel might be closed else where.
                                done = true;
                            } finally {
                                decPending();// 经过上面处理,不管在处理过程中正常处理,或是发生通道已关闭异常,最后,都将设置该调用完成,更新计数
                            }
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection + " Wrote partial " + numBytes
                                    + " bytes.");
                        }
                    }
                    error = false; // everything went off well
                }
            } finally {
                if (error && call != null) {
                    LOG.warn(getName() + ", call " + call + ": output error");
                    done = true; // error. no more data for this channel.
                    closeConnection(call.connection);
                }
            }
            return done;
        }

        //
        // Enqueue a response from the application.
        //
        void doRespond(Call call) throws IOException {
            synchronized (call.connection.responseQueue) {
                call.connection.responseQueue.addLast(call); // 将执行完成的调用加入队列,准备响应客户端
                if (call.connection.responseQueue.size() == 1) {
                    processResponse(call.connection.responseQueue, true);// 如果队列中只有一个调用,直接进行处理
                }
            }
        }

        private synchronized void incPending() { // call waiting to be enqueued.
            pending++;
        }

        private synchronized void decPending() { // call done enqueueing.
            pending--;
            notify();
        }

        private synchronized void waitPending() throws InterruptedException {
            while (pending > 0) {
                wait();
            }
        }
    }

 
(d)Connection
连接类,真正的客户端请求读取逻辑在这个类中。Connection,代表与Client端的连接,读取客户端的call并放到一个阻塞队列中,Handler负责从这个队列中读取数据并处理

 

 

     /**
     * Reads calls from a connection and queues them for handling.
     * 该类表示服务端一个连接的抽象,主要是读取从Client发送的调用,并把读取到的调用Client.Call实例加入到待处理的队列。
     * 
     * 
     * */
    public class Connection {
        private boolean rpcHeaderRead = false; // if initial rpc header is read/
                                               // 是否初始化签名,并读取了版本信息
        private boolean headerRead = false; // if the connection header that//
                                            // 是否读取了头信息
                                            // follows version is read.

        private SocketChannel channel;
        private ByteBuffer data;
        private ByteBuffer dataLengthBuffer;
        private LinkedList<Call> responseQueue;
        private volatile int rpcCount = 0; // number of outstanding rpcs
        private long lastContact;
        private int dataLength;// 数据长度
        private Socket socket;
        // Cache the remote host & port info so that even if the socket is
        // disconnected, we can say where it used to connect to.
        private String hostAddress;
        private int remotePort;
        private InetAddress addr;

        ConnectionHeader header = new ConnectionHeader();// 连接头信息
        Class<?> protocol;// 协议类
        boolean useSasl;
        SaslServer saslServer;
        private AuthMethod authMethod;
        private boolean saslContextEstablished;
        private boolean skipInitialSaslHandshake;
        private ByteBuffer rpcHeaderBuffer;
        private ByteBuffer unwrappedData;
        private ByteBuffer unwrappedDataLengthBuffer;

        UserGroupInformation user = null;
        public UserGroupInformation attemptingUser = null; // user name before
                                                           // auth

        // Fake 'call' for failed authorization response
        private final int AUTHROIZATION_FAILED_CALLID = -1;
        private final Call authFailedCall = new Call(AUTHROIZATION_FAILED_CALLID, null, this);
        private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
        // Fake 'call' for SASL context setup
        private static final int SASL_CALLID = -33;
        private final Call saslCall = new Call(SASL_CALLID, null, this);
        private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();

        private boolean useWrap = false;

        public Connection(SelectionKey key, SocketChannel channel, long lastContact) {
            this.channel = channel;// Socket通道
            this.lastContact = lastContact;// 最后连接时间
            this.data = null;
            this.dataLengthBuffer = ByteBuffer.allocate(4);
            this.unwrappedData = null;
            this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
            this.socket = channel.socket(); // 获取到与通道channel关联的Socket
            this.addr = socket.getInetAddress();// 获取Socket地址
            if (addr == null) {
                this.hostAddress = "*Unknown*";
            } else {
                this.hostAddress = addr.getHostAddress();
            }
            this.remotePort = socket.getPort();// 获取到远程连接的端口号
            this.responseQueue = new LinkedList<Call>();// 服务端待处理调用的队列
            if (socketSendBufferSize != 0) {
                try {
                    socket.setSendBufferSize(socketSendBufferSize); // 设置Socket
                                                                    // Buffer大小
                } catch (IOException e) {
                    LOG.warn("Connection: unable to set socket send buffer size to " + socketSendBufferSize);
                }
            }
        }
.......
}

 

 


(e)Handler
请求(blockQueueCall)处理类,会循环阻塞读取callQueue中的call对象,并对其进行操作。它从调用队列中获取调用信息,然后反射调用真正的对象,得到结果,然后再把此次调用放到响应队列(response queue)里。

 

     /** Handles queued calls . */
    private class Handler extends Thread {
        public Handler(int instanceNumber) {
            this.setDaemon(true);// 作为后台线程运行
            this.setName("IPC Server handler " + instanceNumber + " on " + port);
        }

        @Override
        public void run() {
            LOG.info(getName() + ": starting");
            SERVER.set(Server.this);// 作为后台线程运行
            ByteArrayOutputStream buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE); // 存放响应信息的缓冲区
            while (running) {
                try {
                    final Call call = callQueue.take(); // pop the queue; maybe
                                                        // blocked here//
                                                        // 出队操作,获取到一个调用Server.Call
                                                        // call

                    if (LOG.isDebugEnabled())
                        LOG.debug(getName() + ": has #" + call.id + " from " + call.connection);

                    String errorClass = null;
                    String error = null;
                    Writable value = null;

                    CurCall.set(call);// 设置当前线程本地变量拷贝的值为出队得到的一个call调用实例
                    try {
                        // Make the call as the user via Subject.doAs, thus
                        // associating
                        // the call with the Subject
                        // 调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中
                        if (call.connection.user == null) {
                            value = call(call.connection.protocol, call.param, call.timestamp);
                        } else {
                            // //
                            // 根据调用Server.Call关联的连接Server.Connection,所对应的用户Subject,来执行IPC调用过程
                            value = call.connection.user.doAs(new PrivilegedExceptionAction<Writable>() {
                                @Override
                                public Writable run() throws Exception {
                                    // make the call
                                    return call(call.connection.protocol, call.param, call.timestamp);

                                }
                            });
                        }
                    } catch (Throwable e) {
                        LOG.info(getName() + ", call " + call + ": error: " + e, e);
                        errorClass = e.getClass().getName();
                        error = StringUtils.stringifyException(e);
                    }
                    CurCall.set(null);
                    synchronized (call.connection.responseQueue) {
                        // setupResponse() needs to be sync'ed together with
                        // responder.doResponse() since setupResponse may use
                        // SASL to encrypt response data and SASL enforces
                        // its own message ordering.
                        setupResponse(buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error);
                        // Discard the large buf and reset it back to
                        // smaller size to freeup heap
                        if (buf.size() > maxRespSize) {
                            LOG.warn("Large response size " + buf.size() + " for call " + call.toString());
                            buf = new ByteArrayOutputStream(INITIAL_RESP_BUF_SIZE);
                        }
                        responder.doRespond(call);
                    }
                } catch (InterruptedException e) {
                    if (running) { // unexpected -- log it
                        LOG.info(getName() + " caught: " + StringUtils.stringifyException(e));
                    }
                } catch (Exception e) {
                    LOG.info(getName() + " caught: " + StringUtils.stringifyException(e));
                }
            }
            LOG.info(getName() + ": exiting");
        }

    }

 

 

 

(4)Server一次完整请求处理流程分析

         demo示例:
        

/**
 * 
 * Description: RPCserver test<br>
 * 
 * Copyright: Copyright (c) 2013 <br>
 * Company: www.renren.com
 * 
 * @author zhuhui{hui.zhu@renren-inc.com} 2013-5-17
 * @version 1.0
 */
public class RPCserver {

    /**
     * @param args
     */
    public static void main(String[] args) {
        Server server;
        try {
            server = RPC.getServer(new HelloProtocalImp(), "127.0.0.1", 9813, 6, true, new Configuration());
            server.start();
            try {
                server.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

 

 

 

第一步、启动过程

            server = RPC.getServer(new HelloProtocalImp(), "127.0.0.1", 9813, 6, true, new Configuration());
            server.start();

          (1)初始化

           我们发现getServer()是一个创建Server对象的工厂方法,但创建的却是RPC.Server类的对象,并且初始化了listener、 responder

            

protected Server(String bindAddress, int port, 
                  Class<? extends Writable> paramClass, int handlerCount, 
                  Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager) 
    throws IOException {
    this.bindAddress = bindAddress;
    this.conf = conf;
    this.port = port;
    this.paramClass = paramClass;
    this.handlerCount = handlerCount;
    this.socketSendBufferSize = 0;
    this.maxQueueSize = handlerCount * conf.getInt(
                                IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
                                IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
    this.maxRespSize = conf.getInt(IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
                                   IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
    this.readThreads = conf.getInt(
        IPC_SERVER_RPC_READ_THREADS_KEY,
        IPC_SERVER_RPC_READ_THREADS_DEFAULT);
    this.callQueue  = new LinkedBlockingQueue<Call>(maxQueueSize); 
    this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
    this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
    this.thresholdIdleConnections = conf.getInt("ipc.client.idlethreshold", 4000);
    this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
    this.authorize = 
      conf.getBoolean(HADOOP_SECURITY_AUTHORIZATION, false);
    this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
    
    // Start the listener here and let it bind to the port
    listener = new Listener();
    this.port = listener.getAddress().getPort();    
    this.rpcMetrics = RpcInstrumentation.create(serverName, this.port);
    this.tcpNoDelay = conf.getBoolean("ipc.server.tcpnodelay", false);

    // Create the responder here
    responder = new Responder();
    
    if (isSecurityEnabled) {
      SaslRpcServer.init(conf);
    }
  }

   (2)启动listener、 responder、 handlers

 

            

public synchronized void start() {
    responder.start();
    listener.start();
    handlers = new Handler[handlerCount];
    
    for (int i = 0; i < handlerCount; i++) {
      handlers[i] = new Handler(i);
      handlers[i].start();
    }
  }

   responder、listener、handlers三个对象的线程均阻塞了,前两个阻塞在selector.select()方法上,handler阻塞在callQueue.take()方法,都在等待客户端请求。Responder设置了超时时间,为15分钟。而listener还开启了Reader线程,该线程也阻塞了。

 

 

第二步、Listener接收数据

(1)启动run,在 selector.select阻塞,等待初始化注册的SelectionKey.OP_ACCEPT事件

写道
// Register accepts on the server socket with the selector.
//注册连接事件
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);// 向通道acceptChannel注册上述selector选择器,选择器的键为Server
// Socket接受的操作集合
this.setName("IPC Server listener on " + port); // 设置监听线程名称
this.setDaemon(true); // 设置为后台线程

 

 while (running) {
                SelectionKey key = null;
                try {
                    selector.select();// 选择一组key集合,这些选择的key相关联的通道已经为I/O操作做好准备
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        key = iter.next();
                        iter.remove();
                        try {
                            if (key.isValid()) {
                                if (key.isAcceptable())// 如果该key对应的通道已经准备好接收新的Socket连接

                                    doAccept(key); // 调用,接收与该key关联的通道上的连接

                            }
                        } catch (IOException e) {
                        }
                        key = null;
                    }
                } catch (OutOfMemoryError e) {
                    // we can run out of memory if we have too many threads
                    // log the event and sleep for a minute and give
                    // some thread(s) a chance to finish
                    LOG.warn("Out of Memory in server select", e);
                    closeCurrentConnection(key, e);
                    cleanupConnections(true);
                    try {
                        Thread.sleep(60000);
                    } catch (Exception ie) {
                    }
                } catch (Exception e) {
                    closeCurrentConnection(key, e);
                }
                cleanupConnections(false);
            } 

 

 

 

(2)唤醒 reader, 根据key关联的Server Socket通道,接收该通道上Client端到来的连接

 

          /**
         * 根据key关联的Server Socket通道,接收该通道上Client端到来的连接
         */
        void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
            Connection c = null;
            ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 获取到Server
                                                                              // Socket
                                                                              // 通道
            SocketChannel channel;
            while ((channel = server.accept()) != null) {

                channel.configureBlocking(false);
                channel.socket().setTcpNoDelay(tcpNoDelay);

                Reader reader = getReader();

                try {
                    reader.startAdd(); // 激活readSelector,设置adding为true

                    SelectionKey readKey = reader.registerChannel(channel); // 向选择器selector注册读操作集合,返回键 将读事件设置成兴趣事件

                    c = new Connection(readKey, channel, System.currentTimeMillis()); // 创建连接

                    readKey.attach(c); // 使连接实例与注册到选择器selector相关的读操作集合键相关联
                                       // //将connection对象注入readKey

                    synchronized (connectionList) {
                        connectionList.add(numConnections, c);// 加入Server端连接维护列表
                        numConnections++;
                    }
                    if (LOG.isDebugEnabled())
                        LOG.debug("Server connection from " + c.toString() + "; # active connections: " + numConnections
                                + "; # queued calls: " + callQueue.size());
                } finally {
                    // 设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每个reader都使
                    // 用了wait()方法等待。因篇幅有限,就不贴出源码了。
                    reader.finishAdd();
                }

            }
        }

         // 向选择器selector注册读操作集合,返回键 将读事件设置成兴趣事件
          public synchronized SelectionKey registerChannel(SocketChannel channel) throws IOException {
                return channel.register(readSelector, SelectionKey.OP_READ);
            }

 

 

 

   (3)读取数据readAndProcess>>processOneRpc>>processData

 

         // 方法读取远程过程调用的数据,从一个Server.Connection的Socket通道中读取数据,并将调用任务加入到callQueue,转交给Handler线程去处理
        // 上面方法是接收调用数据的核心方法,实现了如何从SocketChannel通道中读取数据。其中processHeader方法与processData方法已经在上面种详细分析了。
        // 另外,作为Server.Connection是连接到客户端的,与客户端调用进行通信,所以一个连接定义了关闭的操作,关闭的时候需要关闭与客户端Socket关联的SocketChannel通道。
        public int readAndProcess() throws IOException, InterruptedException {
            while (true) {
                /*
                 * Read at most one RPC. If the header is not read completely
                 * yet then iterate until we read first RPC or until there is no
                 * data left.
                 */
                int count = -1;
                // 从通道channel中读取字节,加入到dataLengthBuffer字节缓冲区
                if (dataLengthBuffer.remaining() > 0) {
                    count = channelRead(channel, dataLengthBuffer); // 如果通道已经达到了流的末尾,会返回-1的
                    if (count < 0 || dataLengthBuffer.remaining() > 0) // 读取不成功,直接返回读取的字节数(读取失败可能返回0或-1)
                        return count;
                }

                // // 如果版本号信息还没有读取
                if (!rpcHeaderRead) {
                    // Every connection is expected to send the header.
                    if (rpcHeaderBuffer == null) {
                        rpcHeaderBuffer = ByteBuffer.allocate(2);
                    }
                    count = channelRead(channel, rpcHeaderBuffer);
                    if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
                        return count;
                    }
                    int version = rpcHeaderBuffer.get(0);
                    byte[] method = new byte[] { rpcHeaderBuffer.get(1) };
                    authMethod = AuthMethod.read(new DataInputStream(new ByteArrayInputStream(method)));
                    dataLengthBuffer.flip(); // 反转dataLengthBuffer缓冲区
                    // 如果读取到的版本号信息不匹配,返回-1(HEADER =
                    // ByteBuffer.wrap("hrpc".getBytes()),CURRENT_VERSION = 3)
                    if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
                        // Warning is ok since this is not supposed to happen.
                        LOG.warn("Incorrect header or version mismatch from " + hostAddress + ":" + remotePort + " got version " + version
                                + " expected version " + CURRENT_VERSION);
                        return -1;
                    }
                    dataLengthBuffer.clear();
                    if (authMethod == null) {
                        throw new IOException("Unable to read authentication method");
                    }
                    if (isSecurityEnabled && authMethod == AuthMethod.SIMPLE) {
                        AccessControlException ae = new AccessControlException("Authentication is required");
                        setupResponse(authFailedResponse, authFailedCall, Status.FATAL, null, ae.getClass().getName(), ae.getMessage());
                        responder.doRespond(authFailedCall);
                        throw ae;
                    }
                    if (!isSecurityEnabled && authMethod != AuthMethod.SIMPLE) {
                        doSaslReply(SaslStatus.SUCCESS, new IntWritable(SaslRpcServer.SWITCH_TO_SIMPLE_AUTH), null, null);
                        authMethod = AuthMethod.SIMPLE;
                        // client has already sent the initial Sasl message and
                        // we
                        // should ignore it. Both client and server should fall
                        // back
                        // to simple auth from now on.
                        skipInitialSaslHandshake = true;
                    }
                    if (authMethod != AuthMethod.SIMPLE) {
                        useSasl = true;
                    }
                    // 成功读取到了版本号信息,清空dataLengthBuffer以便重用,同时设置versionRead为true

                    rpcHeaderBuffer = null;
                    rpcHeaderRead = true;
                    continue;
                }

                if (data == null) {
                    dataLengthBuffer.flip();
                    // 读取数据长度信息,以便分配data字节缓冲区
                    dataLength = dataLengthBuffer.getInt();
                    // 如果是Client端的ping调用,不需要处理数据,清空dataLengthBuffer,返回
                    if (dataLength == Client.PING_CALL_ID) {
                        if (!useWrap) { // covers the !useSasl too
                            dataLengthBuffer.clear();
                            return 0; // ping message
                        }
                    }
                    if (dataLength < 0) {
                        LOG.warn("Unexpected data length " + dataLength + "!! from " + getHostAddress());
                    }
                    data = ByteBuffer.allocate(dataLength);// 分配data数据缓冲区,准备接收调用参数数据
                }
                // 从通道channel中读取字节到data字节缓冲区中
                count = channelRead(channel, data);

                // /// 如果data已经如期读满
                if (data.remaining() == 0) {
                    dataLengthBuffer.clear();
                    data.flip();// 反转dat字节缓冲区,准备从data缓冲区读取数据
                    if (skipInitialSaslHandshake) {
                        data = null;
                        skipInitialSaslHandshake = false;
                        continue;
                    }
                    boolean isHeaderRead = headerRead;
                    if (useSasl) {
                        saslReadAndProcess(data.array());
                    } else {
                        processOneRpc(data.array());
                    }
                    data = null;
                    if (!isHeaderRead) {
                        continue;
                    }
                }
                return count;
            }
        }

 

private void processData(byte[] buf) throws  IOException, InterruptedException {
      DataInputStream dis =
        new DataInputStream(new ByteArrayInputStream(buf));
      int id = dis.readInt();                    // try to read an id
        
      if (LOG.isDebugEnabled())
        LOG.debug(" got #" + id);

      Writable param = ReflectionUtils.newInstance(paramClass, conf);//read param
      param.readFields(dis);    //这个就是client传递过来的Invocation,包含了函数名和参数    
        
      Call call = new Call(id, param, this);  //封装成call  
      callQueue.put(call);   // 将call存入callQueue  
      incRpcCount();  // 增加rpc请求的计数  
      
}

 

 

第三步、Handler执行业务

   (1)从callQueue获取需要处理 Call
            final Call call = callQueue.take(); // pop the queue; maybe blocked here// 出队操作,获取到一个调用Server.Call call 
  (2)反射调用,业务处理,返回数据调用responder.doRespond(call);

 

public Writable call(Class<?> protocol, Writable param, long receivedTime)
    throws IOException {
        …...........

        Invocation call = (Invocation)param;

      Method method =protocol.getMethod(call.getMethodName(),call.getParameterClasses());
       // 通过反射,根据调用方法名和方法参数类型得到Method实例

        method.setAccessible(true);
      // 设置反射的对象在使用时取消Java语言访问检查,提高效率 
        
        Object value = method.invoke(instance, call.getParameters());// 执行调用(instance是调用底层方法的对象,第二个参数是方法调用的参数) 
        ….............
        
}

 

 

 

第四步、Responder返回数据

       该线程类实现发送RPC响应到客户端,通过线程执行可以看到,调用的相应数据的处理,是在服务器运行过程中处理的,而且分为两种情况:一种情况是:如果某些调用超过了指定的时限而一直未被处理,这些调用被视为过期,服务器不会再为这些调用处理,而是直接清除掉;另一种情况是:如果所选择的通道上,已经注册的调用是合法的,并且通道可写,会直接将调用的相应数据写入到通道,等待客户端读取。
       a: Handler在提交结果到Responder时,会再自己的线程里面执行Responder的发送buffer的逻辑,需要注意的是为保证一个buffer是连续的写出的,Handler在将buffer加入到connection.responseQueue中时,会判断responseQueue的size是不是大于等于1,如果是则表明上一个buffer没有发送完则不走发送流程,而是交给Responder来发送;在Handler走的发送逻辑里面,如果buffer发送完成则将其从connection.responseQueue中移除。如果没发送完成则此buffer仍然是connection.responseQueue[0],并在Responder的Selector上注册此Connection的ON_WRITE事件。
      b: Handler提交的数据可能积累在responseQueue上,这些由Responder来发送,Responder的发送逻辑是:如果一个buffer没发送完成会到Responder的Selector上注册此Connection的ON_WRITE事件,Responder循环处理那些可写的Connection,对于一个Connection写完其responseQueue上的数据后就取消其ON_WRITE,对于长时间不可写的Connection采取关闭连接处理。

 

(1)初始化创建

 

       Responder() throws IOException {
            this.setName("IPC Server Responder");
            this.setDaemon(true);
            writeSelector = Selector.open(); // create a selector
            pending = 0;
        }

 

 

(2)发送数据processResponse

 

         /**
         * 处理一个通道上调用的响应数据 如果一个通道空闲,返回true
         */
        private boolean processResponse(LinkedList<Call> responseQueue, boolean inHandler) throws IOException {
            boolean error = true;
            boolean done = false; // there is more data for this channel. //
                                  // 一个通道channel有更多的数据待读取
            int numElements = 0;
            Call call = null;
            try {
                synchronized (responseQueue) {
                    //
                    // If there are no items for this channel, then we are done
                    //
                    numElements = responseQueue.size();
                    if (numElements == 0) {
                        error = false;
                        return true; // no more data for this channel.
                    }
                    //
                    // Extract the first call
                    // // 从队列中取出第一个调用call
                    call = responseQueue.removeFirst();
                    SocketChannel channel = call.connection.channel; // 获取该调用对应的通道channel
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection);
                    }
                    //
                    // Send as much data as we can in the non-blocking fashion
                    //
                    int numBytes = channelWrite(channel, call.response);// 向通道channel中写入响应信息(响应信息位于call.response字节缓冲区中)
                    if (numBytes < 0) {// 如果写入字节数为0,说明已经没有字节可写,返回
                        return true;
                    }
                    if (!call.response.hasRemaining()) { // 如果call.response字节缓冲区中没有响应字节数据,说明已经全部写入到相关量的通道中
                        call.connection.decRpcCount();// 该调用call对应的RPC连接计数减1
                        if (numElements == 1) { // 最后一个调用已经处理完成
                            done = true; // 该通道channel没有更多的数据
                        } else {
                            done = false; // 否则,还存在尚未处理的调用,要向给通道发送数据
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection + " Wrote " + numBytes
                                    + " bytes.");
                        }
                    } else {
                        //
                        // If we were unable to write the entire response out,
                        // then
                        // insert in Selector queue.
                        // // 如果call.response字节缓冲区中还存在未被写入通道响应字节数据
                        // /; // 如果不能够将全部的响应字节数据写入到通道中,需要暂时插入到Selector选择其队列中
                        call.connection.responseQueue.addFirst(call);

                        if (inHandler) { // 如果指定:现在就对调用call进行处理(该调用的响应还没有进行处理)
                            call.timestamp = System.currentTimeMillis(); // 设置调用时间戳
                            incPending(); // 增加未被处理响应信息的调用计数
                            try {
                                // Wakeup the thread blocked on select, only
                                // then can the call
                                // to channel.register() complete.
                                writeSelector.wakeup();// 唤醒阻塞在该通道writeSelector上的线程
                                channel.register(writeSelector, SelectionKey.OP_WRITE, call);// 调用call注册通道writeSelector
                            } catch (ClosedChannelException e) {
                                // Its ok. channel might be closed else where.
                                done = true;
                            } finally {
                                decPending();// 经过上面处理,不管在处理过程中正常处理,或是发生通道已关闭异常,最后,都将设置该调用完成,更新计数
                            }
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug(getName() + ": responding to #" + call.id + " from " + call.connection + " Wrote partial " + numBytes
                                    + " bytes.");
                        }
                    }
                    error = false; // everything went off well
                }
            } finally {
                if (error && call != null) {
                    LOG.warn(getName() + ", call " + call + ": output error");
                    done = true; // error. no more data for this channel.
                    closeConnection(call.connection);
                }
            }
            return done;
        }

 

 

(3)未发送完的数据转交给Responder run()处理

        调用doAsyncWrite方法写数据,如果过期则清理

public void run() {
            LOG.info(getName() + ": starting");
            SERVER.set(Server.this);
            long lastPurgeTime = 0; // last check for old calls.

            while (running) {
                try {
                    waitPending(); // If a channel is being registered, wait.//
                                   // 等待一个通道中,接收到来的调用进行注册
                    writeSelector.select(PURGE_INTERVAL);// 设置超时时限
                    Iterator<SelectionKey> iter = writeSelector.selectedKeys().iterator();
                    while (iter.hasNext()) {// 迭代选择器writeSelector选择的key集合
                        SelectionKey key = iter.next();
                        iter.remove();
                        try {
                            if (key.isValid() && key.isWritable()) {
                                doAsyncWrite(key); // 执行异步写操作,向通道中写入调用执行的响应数据
                            }
                        } catch (IOException e) {
                            LOG.info(getName() + ": doAsyncWrite threw exception " + e);
                        }
                    }
                    long now = System.currentTimeMillis();
                    if (now < lastPurgeTime + PURGE_INTERVAL) {
                        continue;
                    }
                    lastPurgeTime = now;
                    //
                    // If there were some calls that have not been sent out for
                    // a
                    // long time, discard them.
                    //
                    LOG.debug("Checking for old call responses.");
                    ArrayList<Call> calls;

                    // get the list of channels from list of keys.
                    // 如果存在一些一直没有被发送出去的调用,这是时间限制为lastPurgeTime + PURGE_INTERVAL
                    // 则这些调用被视为过期调用,进行清除
                    synchronized (writeSelector.keys()) {
                        calls = new ArrayList<Call>(writeSelector.keys().size());
                        iter = writeSelector.keys().iterator();
                        while (iter.hasNext()) {
                            SelectionKey key = iter.next();
                            Call call = (Call) key.attachment();
                            if (call != null && key.channel() == call.connection.channel) {
                                calls.add(call);
                            }
                        }
                    }

                    for (Call call : calls) {
                        try {
                            doPurge(call, now);
                        } catch (IOException e) {
                            LOG.warn("Error in purging old calls " + e);
                        }
                    }
                } catch (OutOfMemoryError e) {
                    //
                    // we can run out of memory if we have too many threads
                    // log the event and sleep for a minute and give
                    // some thread(s) a chance to finish
                    //
                    LOG.warn("Out of Memory in server select", e);
                    try {
                        Thread.sleep(60000);
                    } catch (Exception ie) {
                    }
                } catch (Exception e) {
                    LOG.warn("Exception in Responder " + StringUtils.stringifyException(e));
                }
            }
            LOG.info("Stopping " + this.getName());
        }

 

 时序图:

 

 类图:



 

  • 大小: 114.7 KB
  • 大小: 147.9 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics