Java基础:SocketChannel和Selector在ZooKeeper中应用
ZooKeeper的启动从QuorumPeerMain类的main函数开始:
调用顺序是: Main -> initializeAndRun-> runFromConfig
一、默认的NIOServerCnxnFactory通信方式其中runFromConfig主要做了两件事情:
(1)初始化客户端与服务端的网络通信处理类ServerCnxnFactory:
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
而createFactory函数的内部实现是:
if (serverCnxnFactoryName == null) { serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } try { return(ServerCnxnFactory) Class.forName(serverCnxnFactoryName).newInstance(); } catch (Exception e) { IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName); ioe.initCause(e); throw ioe; } 可以看到默认初始化的ServerCnxnFactory是NIOServerCnxnFactory,这是Java NIO方式的网络通信,另外还有NettyServerCnxnFactory类提供Netty通信方式。(2)启动QuorumPeer:
quorumPeer.start();
内部会调用初始化好的ServerCnxnFactory类的start函数:
cnxnFactory.start();
二、NIOServerCnxnFactory.start()方法 @Override publicvoidstart() { stopped = false; if (workerPool == null){ workerPool = new WorkerService( "NIOWorker", numWorkerThreads, false); } //先启动一堆selector线程 for(SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); } } //再启动accept线程 if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } //最后启动expire线程 if (expirerThread.getState() ==Thread.State.NEW){ expirerThread.start(); } }三、NIOServerCnxnFactory中几个线程类的关系
NIOServerCnxnFactory中包含了三个类:
(1)AbstractSelectThread:SelectorThread类和AcceptThread类的共同父类,维护了一个selector选择器对象。
(2)AcceptThread:管理新的ZooKeeper client连接请求,实际上就是利用父类的选择器selector监听ServerSocketChannel的“SelectionKey.OP_ACCEPT”事件,一旦来新的请求,负责建立好和客户端的连接SocketChannel,并从SelectorThread线程池分配一个线程,将该SocketChannel连接放入SelectorThread线程维护的acceptedQueue队列中。
(3)SelectorThread:监听AcceptThread分配的已经建立的SocketChannel连接上发生的数据读写事件,并执行实际数据读写。实际上就是利用父类的选择器selector监听在acceptedQueue队列中建立好的连接的数据读写事件,一旦读写事件发生,调用handleIO函数处理读写请求。
下面是读写事件监听线程的线程池selectorThreads的初始化:
for(inti=0; i<numSelectorThreads;++i) { selectorThreads.add(newSelectorThread(i)); }下面是新连接管理线程acceptThread的初始化: this.ss =ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port " + addr); ss.socket().bind(addr); ss.configureBlocking(false); acceptThread= newAcceptThread(ss, addr, selectorThreads);
然后NIOServerCnxnFactory.start()将会启动这些线程。
AcceptThread的run函数很简单,就是监听连接SelectionKey.op_accept事件,然后建立SocketChannel连接,并分配一个SelectorThread线程来处理该连接,具体就不详述了。
SelectorThread的run函数核心就是监听到SelectionKey.OP_READ事件后执行handleIO函数,具体如何和客户端进行数据交互计划放在其他文章中介绍。
相关热词:
本站内容来源于网络,如有侵权请与我们联系,我们会及时删除,我们深感抱歉!
注:本站所有信息仅供用于网络技术学习参考,学习中请遵循相关法律法规!
本文地址: https://www.juheyunku.com/jiaob/bjc/9461.shtml
相关文章
热门TAG
命令 外链 企业网站 白帽 php 织梦教程 dedecms修改内容 javascript 织梦 功能 标签 调用 详解 技巧 权重 服务器 网站流量 Dedecms 织梦cms HTML tags标签 python jquery教程 jquery windows 蜘蛛 搜索引擎 网站收录 JSP 实例解析最新文章
-
Socket编程实践(16) TCP/IP各层
时间:2020-12-26
-
SocketIT之家实践(16) TCP/IP各
时间:2020-12-26
-
SoC嵌入式软件架构设计之
时间:2020-12-26
-
socket通信简介
时间:2020-12-26
-
理解glibcmalloc
时间:2020-12-26
热门文章
-
SoC嵌入式软件架构设计之四 :内存空间规
时间:2020-12-26
-
Socket编程实践(16) TCP/IP各层报文(1)
时间:2020-12-26
-
socket通信简介
时间:2020-12-26
-
理解glibcmalloc
时间:2020-12-26
-
SocketIT之家实践(16) TCP/IP各层报文(1)
时间:2020-12-26
