您的当前位置:首页正文

Zookeeper学习之ZooKeeper源码分析

2020-02-15 来源:品趣旅游知识分享网
Zookeeper学习之ZooKeeper源码分析

⼀、宏观分析ZooKeeper源码结构

  ZooKeeper宏观分析源码,如下图所⽰:

        

  要想分析源码,⾸先需要宏观分析整个ZooKeeper结构,要知道ZooKeeper分为两部分:服务端集群、客户端。  其中服务端:

每台ZooKeeper服务器都有三个状态:初始化、运⾏中、结束关机。因此当服务器都处于运⾏时,构成⼀个zookeeper集群,那么就能够对外提供服务(单机也可以运⾏);

服务端启动服务后,进⾏初始化构成可⽤集群;  对于客户端:

客户端封装出API操作层,这样任何访问都基于同⼀API;客户端的API要遵循⼀定的协议,进⾏消息协议封装;⽹络通讯要实现序列化、反序列化以及连接建⽴;

  当然,客户端提供的这部分协议封装、序列化/反序列化、建⽴连接能⼒,服务端也同时需要具备。我们可以通过写伪服务端拦截请求进⾏查看,代码如下:

public class SoecktLister {

public static void main(String[] args) throws Exception { ServerSocket serverSocket = new ServerSocket(2181); Socket accept = serverSocket.accept(); byte[] result = new byte[2048];

accept.getInputStream().read(result);

ByteBuffer bb = ByteBuffer.wrap(result);

ByteBufferInputStream bbis = new ByteBufferInputStream(bb); BinaryInputArchive bia = BinaryInputArchive.getArchive(bbis); RequestHeader header2 = new RequestHeader(); header2.deserialize(bia, \"header\"); System.out.println(header2); bbis.close(); }}

  然后通过客户端进⾏访问:

public class ZooKeeperTest { private ZooKeeper zooKeeper;

public ZooKeeperTest() { try {

zooKeeper= new ZooKeeper(\"localhost:2181\ 5000,

null, false);

} catch (IOException e) { e.printStackTrace(); } }

public void add(String path,String data){ try {

String newPath = zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException e) { e.printStackTrace();

} catch (InterruptedException e) { e.printStackTrace(); } }

public static void main(String[] args) {

ZooKeeperTest zooKeeperTest=new ZooKeeperTest(); zooKeeperTest.add(\"/monkey2\ }}

  于是服务端可以收到请求:

RequestHeader{protocolVersion=45, lastZxidSeen=0, timeOut=0, sessionId=21474836480000, passwd=[]}

  其实这些内容就是⼀次简单请求消息的协议包装。

⼆、服务端源码分析

  1、服务端初始化

  根据ZooKeeper启动脚本./zkServer.sh start -server ip:port,打开脚本可以看到服务端启动⼊⼝:org.apache.zookeeper.server.quorum.QuorumPeerMain。

  注意:服务端的数据存放结构是:org.apache.zookeeper.server.DataTree,dataTree是放在ZKDataBasse中的。  服务端启动后,会依次进⾏配置⽂件zoo.cfg加载、数据加载、通讯建⽴、leader选举,代码如下:

@Override

public synchronized void start() { if (!getView().containsKey(myid)) {

throw new RuntimeException(\"My id \" + myid + \" not in the peer list\"); }

loadDataBase(); //加载数据 znode数据加载:读取硬盘快照⽂件(data⽬录下) startServerCnxnFactory(); //⽹络通讯建⽴ try {

adminServer.start();

} catch (AdminServerException e) {

LOG.warn(\"Problem starting AdminServer\ System.out.println(e); }

startLeaderElection(); //选举

startJvmPauseMonitor(); super.start();    //此刻调⽤线程run⽅法}

  注:在调⽤此之前已经进⾏了配置加载,如代码:

public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { try {

ManagedUtil.registerLog4jMBeans(); } catch (JMException e) {

LOG.warn(\"Unable to register log4j JMX control\ }

LOG.info(\"Starting quorum peer\"); MetricsProvider metricsProvider; try {

metricsProvider = MetricsProviderBootstrap.startMetricsProvider( config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration()); } catch (MetricsProviderLifeCycleException error) {

throw new IOException(\"Cannot boot MetricsProvider \" + config.getMetricsProviderClassName(), error); } try {

ServerMetrics.metricsProviderInitialized(metricsProvider); ServerCnxnFactory cnxnFactory = null;

ServerCnxnFactory secureCnxnFactory = null; if (config.getClientPortAddress() != null) {

cnxnFactory = ServerCnxnFactory.createFactory();

cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false); }

if (config.getSecureClientPortAddress() != null) {

secureCnxnFactory = ServerCnxnFactory.createFactory();

secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true); }

quorumPeer = getQuorumPeer();

quorumPeer.setTxnFactory(new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());

quorumPeer.enableLocalSessionsUpgrading(config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId());

quorumPeer.setTickTime(config.getTickTime());

quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit());

quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit()); quorumPeer.setObserverMasterPort(config.getObserverMasterPort()); quorumPeer.setConfigFileName(config.getConfigFilename());

quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); if (config.getLastSeenQuorumVerifier() != null) {

quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); }

quorumPeer.initConfigInZKDatabase();

quorumPeer.setCnxnFactory(cnxnFactory);

quorumPeer.setSecureCnxnFactory(secureCnxnFactory); quorumPeer.setSslQuorum(config.isSslQuorum());

quorumPeer.setUsePortUnification(config.shouldUsePortUnification()); quorumPeer.setLearnerType(config.getPeerType());

quorumPeer.setSyncEnabled(config.getSyncEnabled());

quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); if (config.sslQuorumReloadCertFiles) {

quorumPeer.getX509Util().enableCertFileReloading(); }

// sets quorum sasl authentication configurations

quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if (quorumPeer.isQuorumSaslAuthEnabled()) {

quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);

quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); }

quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); quorumPeer.initialize();

if (config.jvmPauseMonitorToRun) {

quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config)); }

quorumPeer.start();    //此刻是调⽤quorumPeer的start⽅法,并不是启动quorumPeer线程,真正线程的启动在start⽅法中的super.start() quorumPeer.join();      //等待服务端初始化完成 } catch (InterruptedException e) { // warn, but generally this is ok

LOG.warn(\"Quorum Peer interrupted\ } finally {

if (metricsProvider != null) { try {

metricsProvider.stop(); } catch (Throwable error) {

LOG.warn(\"Error while stopping metrics\ } } }}

  服务端启动详细流程如下图所⽰:

  

  2、服务端请求响应

  然后就是服务端对外提供服务响应请求,如下图所⽰(响应写操作):

    

   以上流程就遵从了ZooKeeper的Zab⼀致性协议,Zab协议 的全称是 Zookeeper Atomic Broadcast (Zookeeper原⼦⼴播),Zookeeper 是通过 Zab 协议来保证分布式事务的最终⼀致性。  Zab协议详情以及选举规则请参考:

三、客户端源码分析

  1、客户端初始化

  客户端启动流程如下:

        

  ⼀开始客户端会进⾏集群的解析和⽹络初始化(ClientCncx对象),同时ClientCncx对象会创建SendThread和EventThread两个线程,⽤于对request/response以及watcher event进⾏管理。其代码如下:

public ClientCnxn( String chrootPath,

HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,

ClientWatchManager watcher,

ClientCnxnSocket clientCnxnSocket, long sessionId,

byte[] sessionPasswd, boolean canBeReadOnly) { this.zooKeeper = zooKeeper; this.watcher = watcher; this.sessionId = sessionId;

this.sessionPasswd = sessionPasswd; this.sessionTimeout = sessionTimeout; this.hostProvider = hostProvider; this.chrootPath = chrootPath;

connectTimeout = sessionTimeout / hostProvider.size(); readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly;

sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread();

this.clientConfig = zooKeeper.getClientConfig(); initRequestTimeout();}

public void start() { sendThread.start(); eventThread.start();}

  2、客户端请求管理

  客户端访问服务端流程如下:

           

  由上图可知ClientCncx启动了两个线程:SendThread、EventThread,这两个线程⼀个处理对服务端的请求响应,⼀个处理监听事

件。

  这两个线程都是基于队列进⾏请求管理,outGoingQueue⽤于处理发送request请求的队列,PendingQueue⽤于存储已经发送等待服务响应的请求,这样当收到请求后就可以进⾏response处理,waitingEventsQueue⽤处临时存放需要被触发的对象,因此通过队列的应⽤就实现了ZooKeeper的⾼性能。

  因此客户端主要使⽤了这些技术:底层请求管理就是队列>线程进⾏队列处理>NIO默认通讯⽅式>synchronized锁(⽤在队列上)。  客户端和服务端同时使⽤到了Jute序列化组件以及⾃有的通讯协议,详情请查看:

四、ZooKeeper运维

  ⽇常在Linux下使⽤:echo zk命令|nc ip port 命令进⾏⽇常ZooKeeper运维,如:echo mntr |nc 192.168.0.31 2181。  Linux中nc命令是⼀个功能强⼤的⽹络⼯具,全称是netcat,在线安装:yum install -y nc。常见zk命令如下:

        

  当然也可以⾃⼰代码实现,进⾏界⾯运维。

因篇幅问题不能全部显示,请点此查看更多更全内容