NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using NIO non-blocking socket calls. Communication between threads is handled via queues. - 1 accept thread, which accepts new connections and assigns to a selector thread - 1-N selector threads, each of which selects on 1/N of the connections. The reason the factory supports more than one selector thread is that with large numbers of connections, select() itself can become a performance bottleneck. - 0-M socket I/O worker threads, which perform basic socket reads and writes. If configured with 0 worker threads, the selector threads do the socket I/O directly. - 1 connection expiration thread, which closes idle connections; this is necessary to expire connections on which no session is established. Typical (default) thread counts are: on a 32 core machine, 1 accept thread, 1 connection expiration thread, 4 selector threads, and 64 worker threads.
if (!key.isValid()) { continue; } if (key.isAcceptable()) { if (!doAccept()) { // If unable to pull a new connection off the accept // queue, pause accepting to give us time to free // up file descriptors and so the accept thread // doesn't spin in a tight loop. pauseAccept(10); } } else { LOG.warn("Unexpected ops in accept select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } }
// Close connections still pending on the selector. Any others // with in-flight work, let drain out of the work queue. for (SelectionKey key : selector.keys()) { NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); } cleanupSelectionKey(key); } SocketChannel accepted; while ((accepted = acceptedQueue.poll()) != null) { fastCloseSock(accepted); } updateQueue.clear(); } finally { closeSelector(); // This will wake up the accept thread and the other selector // threads, and tell the worker thread pool to begin shutdown. NIOServerCnxnFactory.this.stop(); LOG.info("selector thread exitted run method"); } }
publicvoidschedule(WorkRequest workRequest, long id){ if (stopped) { workRequest.cleanup(); return; }
ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);
// If we have a worker thread pool, use that; otherwise, do the work // directly. int size = workers.size(); if (size > 0) { try { // make sure to map negative ids as well to [0, size-1] int workerNum = ((int) (id % size) + size) % size; ExecutorService worker = workers.get(workerNum); worker.execute(scheduledWorkRequest); } catch (RejectedExecutionException e) { LOG.warn("ExecutorService rejected execution", e); workRequest.cleanup(); } } else { // When there is no worker thread pool, do the work directly // and wait for its completion scheduledWorkRequest.run(); } }
可以看到,如果worker线程的数量不大于0,就会直接使用调用线程执行
ConnectionExpirerThread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
publicvoidrun(){ try { while (!stopped) { long waitTime = cnxnExpiryQueue.getWaitTime(); if (waitTime > 0) { Thread.sleep(waitTime); continue; } for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) { ServerMetrics.getMetrics().SESSIONLESS_CONNECTIONS_EXPIRED.add(1); conn.close(ServerCnxn.DisconnectReason.CONNECTION_EXPIRED); } }
@Override publicvoidconfigure(InetSocketAddress addr, int maxcc, int backlog, boolean secure)throws IOException { if (secure) { thrownew UnsupportedOperationException("SSL isn't supported in NIOServerCnxn"); } configureSaslLogin();
maxClientCnxns = maxcc; initMaxCnxns(); sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000); // We also use the sessionlessCnxnTimeout as expiring interval for // cnxnExpiryQueue. These don't need to be the same, but the expiring // interval passed into the ExpiryQueue() constructor below should be // less than or equal to the timeout. cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout); expirerThread = new ConnectionExpirerThread();
int numCores = Runtime.getRuntime().availableProcessors(); // 32 cores sweet spot seems to be 4 selector threads numSelectorThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores / 2), 1)); if (numSelectorThreads < 1) { thrownew IOException("numSelectorThreads must be at least 1"); }
@Override publicvoidstart(){ stopped = false; if (workerPool == null) { workerPool = new WorkerService("NIOWorker", numWorkerThreads, false); } for (SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); } } // ensure thread is started once and only once if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); } }
创建workerPool,并启动各种线程
细节分析
maxClientCnxns
这个参数是控制同一个ip允许的最大连接数
在NIOServerCnxnFactory中,使用ipMap实现的
1 2
// ipMap is used to limit connections per IP privatefinal ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>> ipMap = new ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>>();
privatevoidreadPayload()throws IOException, InterruptedException, ClientCnxnLimitException { if (incomingBuffer.remaining() != 0) { // have we read length bytes? int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok if (rc < 0) { handleFailedRead(); } }
if (incomingBuffer.remaining() == 0) { // have we read length bytes? incomingBuffer.flip(); packetReceived(4 + incomingBuffer.remaining()); if (!initialized) { readConnectRequest(); } else { readRequest(); } lenBuffer.clear(); incomingBuffer = lenBuffer; } }