lanicc

ZooKeeper源码阅读——ServerCnxnFactory

2021-08-29

ServerCnxnFactory是zk中负责服务端与客户端网络IO的工厂,有两种实现,下面分别分析两种实现。

NIOServerCnxnFactory

看一眼javadoc

NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using NIO non-blocking socket calls. Communication between threads is handled via queues. - 1 accept thread, which accepts new connections and assigns to a selector thread - 1-N selector threads, each of which selects on 1/N of the connections. The reason the factory supports more than one selector thread is that with large numbers of connections, select() itself can become a performance bottleneck. - 0-M socket I/O worker threads, which perform basic socket reads and writes. If configured with 0 worker threads, the selector threads do the socket I/O directly. - 1 connection expiration thread, which closes idle connections; this is necessary to expire connections on which no session is established. Typical (default) thread counts are: on a 32 core machine, 1 accept thread, 1 connection expiration thread, 4 selector threads, and 64 worker threads.

大概意思就是:
NIOServerCnxnFactory使用NIO non-blocking socket calls实现了一个多线程的ServerCnxnFactory。 多线程之间通过队列通信。

  • 1个accept线程负责接收新的连接并为其选择一个selector线程。
  • 多个(1-N个)selector线程,负责对已接收的连接做select操作。使用多个selector线程的原因是,当有大量连接时,select()操作本身也会成为性能瓶颈。
  • 多个(0-M个)worker线程,负责socker的IO读写。如果配置的worker线程数为0,selector线程将会直接执行socket读写。
  • 1个connection expiration线程,用来将空闲的连接失效(将没有session的连接失效是有必要的)。

默认的线程数量是:在32核心的机器上,1个accept线程,1个connection expiration线程,4个selector线程,64个worker线程

看一下每种线程的实现

AcceptThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void run() {
try {
while (!stopped && !acceptSocket.socket().isClosed()) {
try {
select();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}
} finally {
closeSelector();
// This will wake up the selector threads, and tell the
// worker thread pool to begin shutdown.
if (!reconfiguring) {
NIOServerCnxnFactory.this.stop();
}
LOG.info("accept thread exitted run method");
}
}

其实就是在一个循环中不断的调用select方法

看一下select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

private void select() {
try {
selector.select();

Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();

if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
if (!doAccept()) {
// If unable to pull a new connection off the accept
// queue, pause accepting to give us time to free
// up file descriptors and so the accept thread
// doesn't spin in a tight loop.
pauseAccept(10);
}
} else {
LOG.warn("Unexpected ops in accept select {}", key.readyOps());
}
}
} catch (IOException e) {
LOG.warn("Ignoring IOException while selecting", e);
}
}

如果selector’s selected-key set不是空,就遍历每一个key,对acceptable的key进行doAccept操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private boolean doAccept() {
boolean accepted = false;
SocketChannel sc = null;
try {
sc = acceptSocket.accept();
accepted = true;
if (limitTotalNumberOfCnxns()) {
throw new IOException("Too many connections max allowed is " + maxCnxns);
}
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);

if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns);
}

LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress());

sc.configureBlocking(false);

// Round-robin assign this connection to a selector thread
if (!selectorIterator.hasNext()) {
selectorIterator = selectorThreads.iterator();
}
SelectorThread selectorThread = selectorIterator.next();
if (!selectorThread.addAcceptedConnection(sc)) {
throw new IOException("Unable to add connection to selector queue"
+ (stopped ? " (shutdown in progress)" : ""));
}
acceptErrorLogger.flush();
} catch (IOException e) {
// accept, maxClientCnxns, configureBlocking
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage());
fastCloseSock(sc);
}
return accepted;
}
  • 第5行,接收连接
  • 第7行,校验当前连接数是否超过最大允许连接数
  • 第13行,校验是是否超过同一个IP允许的最大连接数
  • 第25行,选择一个selector线程
  • 第26行,将sc交给selector线程

SelectorThread

先看addAcceptedConnection方法

1
2
3
4
5
6
7
public boolean addAcceptedConnection(SocketChannel accepted) {
if (stopped || !acceptedQueue.offer(accepted)) {
return false;
}
wakeupSelector();
return true;
}
  • 第2行,将accepted(客户端socket)添加到队列acceptedQueue
  • 第5行,唤醒selector线程

看一下selector线程的核心方法run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void run() {
try {
while (!stopped) {
try {
select();
processAcceptedConnections();
processInterestOpsUpdateRequests();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception", e);
}
}

// Close connections still pending on the selector. Any others
// with in-flight work, let drain out of the work queue.
for (SelectionKey key : selector.keys()) {
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
if (cnxn.isSelectable()) {
cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
}
cleanupSelectionKey(key);
}
SocketChannel accepted;
while ((accepted = acceptedQueue.poll()) != null) {
fastCloseSock(accepted);
}
updateQueue.clear();
} finally {
closeSelector();
// This will wake up the accept thread and the other selector
// threads, and tell the worker thread pool to begin shutdown.
NIOServerCnxnFactory.this.stop();
LOG.info("selector thread exitted run method");
}
}
  • 第5行,select有读写事件的socket,然后交给worker线程处理(如果worker线程数为0,selector线程会直接执行)
  • 第6行,处理已接收的连接,注册OP_READ
  • 第7行,处理InterestOps变更的事件

WorkerService

核心方法是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void schedule(WorkRequest workRequest, long id) {
if (stopped) {
workRequest.cleanup();
return;
}

ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);

// If we have a worker thread pool, use that; otherwise, do the work
// directly.
int size = workers.size();
if (size > 0) {
try {
// make sure to map negative ids as well to [0, size-1]
int workerNum = ((int) (id % size) + size) % size;
ExecutorService worker = workers.get(workerNum);
worker.execute(scheduledWorkRequest);
} catch (RejectedExecutionException e) {
LOG.warn("ExecutorService rejected execution", e);
workRequest.cleanup();
}
} else {
// When there is no worker thread pool, do the work directly
// and wait for its completion
scheduledWorkRequest.run();
}
}

可以看到,如果worker线程的数量不大于0,就会直接使用调用线程执行

ConnectionExpirerThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void run() {
try {
while (!stopped) {
long waitTime = cnxnExpiryQueue.getWaitTime();
if (waitTime > 0) {
Thread.sleep(waitTime);
continue;
}
for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {
ServerMetrics.getMetrics().SESSIONLESS_CONNECTIONS_EXPIRED.add(1);
conn.close(ServerCnxn.DisconnectReason.CONNECTION_EXPIRED);
}
}

} catch (InterruptedException e) {
LOG.info("ConnnectionExpirerThread interrupted");
}
}

ConnectionExpirerThread就是将cnxnExpiryQueue队列中的连接关闭

启动流程

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

@Override
public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
if (secure) {
throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
}
configureSaslLogin();

maxClientCnxns = maxcc;
initMaxCnxns();
sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
// We also use the sessionlessCnxnTimeout as expiring interval for
// cnxnExpiryQueue. These don't need to be the same, but the expiring
// interval passed into the ExpiryQueue() constructor below should be
// less than or equal to the timeout.
cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
expirerThread = new ConnectionExpirerThread();

int numCores = Runtime.getRuntime().availableProcessors();
// 32 cores sweet spot seems to be 4 selector threads
numSelectorThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
Math.max((int) Math.sqrt((float) numCores / 2), 1));
if (numSelectorThreads < 1) {
throw new IOException("numSelectorThreads must be at least 1");
}

numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);

String logMsg = "Configuring NIO connection handler with "
+ (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, "
+ numSelectorThreads + " selector thread(s), "
+ (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and "
+ (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers."));
LOG.info(logMsg);
for (int i = 0; i < numSelectorThreads; ++i) {
selectorThreads.add(new SelectorThread(i));
}

listenBacklog = backlog;
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port {}", addr);
if (listenBacklog == -1) {
ss.socket().bind(addr);
} else {
ss.socket().bind(addr, listenBacklog);
}
ss.configureBlocking(false);
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}

初始化参数:

  • maxClientCnxns: 一个ip允许的最大连接数
  • maxCnxns: 允许的最大连接数
  • sessionlessCnxnTimeout:
  • cnxnExpiryQueue: 连接失效队列

创建各种线程

启动

1
2
3
4
5
6
7
8
9
@Override
public void startup(ZooKeeperServer zks, boolean startServer) throws IOException, InterruptedException {
start();
setZooKeeperServer(zks);
if (startServer) {
zks.startdata();
zks.startup();
}
}

核心是在start

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

@Override
public void start() {
stopped = false;
if (workerPool == null) {
workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
}
for (SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
// ensure thread is started once and only once
if (acceptThread.getState() == Thread.State.NEW) {
acceptThread.start();
}
if (expirerThread.getState() == Thread.State.NEW) {
expirerThread.start();
}
}

创建workerPool,并启动各种线程

细节分析

maxClientCnxns

这个参数是控制同一个ip允许的最大连接数

在NIOServerCnxnFactory中,使用ipMap实现的

1
2
// ipMap is used to limit connections per IP
private final ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>> ipMap = new ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>>();

key是ip,value是ip对应的NIOServerCnxn集合

在selector线程执行processAcceptedConnections方法时,会为新连接创建NIOServerCnxn对象cnxn,并且添加到ipMap

在accept线程执行doAccept方法时,会对当前的clientCnxn数量做校验,从而实现maxClientCnxns的功能

IOWorkerRequest

在selector线程执行handleIO方法时,会创建IOWorkerRequest对象,然后交给workPool处理

IOWorkerRequest.doWork方法,主要做一下几件事

  • 调用NIOServerCnxn.doIO进行socket读写
  • 更新NIOServerCnxn的过期时间

NIOServerCnxn

这个类是负责与客户端通信的,每个客户端对应一个NIOServerCnxn
NIOServerCnxn.doIO方法中,会处理读写事件

读事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
handleFailedRead();
}
}

if (incomingBuffer.remaining() == 0) { // have we read length bytes?
incomingBuffer.flip();
packetReceived(4 + incomingBuffer.remaining());
if (!initialized) {
readConnectRequest();
} else {
readRequest();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
  • 第13行,如果没有初始化过,读取连接请求,会调用zkServer.processConnectRequest方法
  • 第15行,读取其他请求,会调用zkServer.processPacket方法

所以请求都是交给zkServer处理的

写事件

NettyServerCnxnFactory

使用Netty来支持底层网络通信,就要是要到Netty中的channel pipeline.
CnxnChannelHandlerNettyServerCnxnFactory使用 Netty时最重要的一个Channel Handler.

对比NIOServerCnxnFactory,看下NettyServerCnxnFactory.CnxnChannelHandler是如何实现的

maxClientCnxns

CnxnChannelHandlerchannelActive方法中,会对该配置进行校验

1
2
3
4
5
6
7
InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
LOG.warn("Too many connections from {} - max is {}", addr, maxClientCnxns);
channel.close();
return;
}

NIOServerCnxnFactory一样,NettyServerCnxnFactory内部也有一个ipMap,用来记录同一个IP地址的连接数量

1
private final Map<InetAddress, AtomicInteger> ipMap = new ConcurrentHashMap<>();

channelRead

当有读事件时,Netty会调用CnxnChannelHandler.channelRead方法,将消息交给NettyServerCnxn处理,消息的处理流程类似于NIOServerCnxn

总结

  • 想要将NIOServerCnxnFactory的线程模型和Netty的线程模型做个对比,
    想画图,不会画,Netty的代码之前也看过,但好像很模糊,很不清晰,所以这块先放一放
  • 查看NIOServerCnxnFactory的历史代码,可以看到这个类的轨迹,看了一个第一次提交是在2012年,当时的设计还没有现在这么复杂的线程模型,为啥现在搞得这么复杂,为啥不直接使用Netty的实现
  • 这两种实现下,编解码是怎么做的

NIOServerCnxnFactory的线程模型

NettyServerCnxnFactory的线程模型

Netty的线程模型
Netty

图片来自: https://segmentfault.com/a/1190000040392205