Gossip协议概述
Cassandra集群中的节点没有主次之分,它们通过一种叫做Gossip的协议进行通信。通过Gossip协议,它们可以知道集群中有哪些节点,以及这些节点的状态如何?每一条Gossip消息上都有一个版本号,节点可以对接收到的消息进行版本比对,从而得知哪些消息是我需要更新的,哪些消息是我有而别人没有的,然后互相倾诉吐槽,确保二者得到的信息相同,这很像现实生活中的八卦(摆龙门阵)一样,一传十,十传百,最后尽人皆知。
在Cassandra启动时,会启动Gossip服务,Gossip服务启动后会启动一个任务GossipTask,这个任务会周期性地与其他节点进行通信。GossipTask是位于org.apache.cassandra.gms.Gossip类下的一个内部类,其run方法如下:
- publicvoidrun(){
- MessagingService.instance().waitUntilListening();
- /*Updatethelocalheartbeatcounter.*/
- endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
- finalList<GossipDigest>gDigests=newArrayList<GossipDigest>();
- Gossiper.instance.makeRandomGossipDigest(gDigests);
- if(gDigests.size()>0){
- GossipDigestSyndigestSynMessage=newGossipDigestSyn(DatabaseDescriptor.getClusterName(),
- DatabaseDescriptor.getPartitionerName(),
- gDigests);
- MessageOut<GossipDigestSyn>message=newMessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
- digestSynMessage,
- GossipDigestSyn.serializer);
- /*Gossiptosomerandomlivemember*/
- booleangossipedToSeed=doGossipToLiveMember(message);
- /*Gossiptosomeunreachablememberwithsomeprobabilitytocheckifheisbackup*/
- doGossipToUnreachableMember(message);
- if(!gossipedToSeed||liveEndpoints.size()<seeds.size())
- doGossipToSeed(message);
- doStatusCheck();
- }
- }
1、GossipTask在Gossip启动后并不会立即运行,阻塞在listenGate这个条件变量上,当Gossip服务调用listen时才开始运行;
2、首先更新本节点的心跳版本号,然后构造需要发送给其他节点的消息gDigests;
4、检查节点状态。
有关种子节点和节点状态后面再分析,本节只关注Gossip协议本身。
GossipTask用于向其他节点发送Gossip信息,Cassandra还提供了SocketThread这样一个线程来负责接收消息,接收消息的代码在org.apache.cassandra.net.IncomingTcpConnection类中。不管是发送还是接收Gossip消息,都是调用org.apache.cassandra.net.MessagingService的sendOneWay方法实现的。
一次Gossip通信分为三个阶段,如图所示:
- MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN,newGossipDigestSynVerbHandler());
- MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK,newGossipDigestAckVerbHandler());
- MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2,newGossipDigestAck2VerbHandler());
下面详细分析每个阶段。