Cassandra源码学习:Gossip协议


Gossip协议概述

Cassandra集群中的节点没有主次之分,它们通过一种叫做Gossip的协议进行通信。通过Gossip协议,它们可以知道集群中有哪些节点,以及这些节点的状态如何?每一条Gossip消息上都有一个版本号,节点可以对接收到的消息进行版本比对,从而得知哪些消息是我需要更新的,哪些消息是我有而别人没有的,然后互相倾诉吐槽,确保二者得到的信息相同,这很像现实生活中的八卦(摆龙门阵)一样,一传十,十传百,最后尽人皆知。
在Cassandra启动时,会启动Gossip服务,Gossip服务启动后会启动一个任务GossipTask,这个任务会周期性地与其他节点进行通信。GossipTask是位于org.apache.cassandra.gms.Gossip类下的一个内部类,其run方法如下:
  1. publicvoidrun(){
  2. MessagingService.instance().waitUntilListening();
  3. /*Updatethelocalheartbeatcounter.*/
  4. endpointStateMap.get(FBUtilities.getBroadcastAddress()).getHeartBeatState().updateHeartBeat();
  5. finalList<GossipDigest>gDigests=newArrayList<GossipDigest>();
  6. Gossiper.instance.makeRandomGossipDigest(gDigests);
  7. if(gDigests.size()>0){
  8. GossipDigestSyndigestSynMessage=newGossipDigestSyn(DatabaseDescriptor.getClusterName(),
  9. DatabaseDescriptor.getPartitionerName(),
  10. gDigests);
  11. MessageOut<GossipDigestSyn>message=newMessageOut<GossipDigestSyn>(MessagingService.Verb.GOSSIP_DIGEST_SYN,
  12. digestSynMessage,
  13. GossipDigestSyn.serializer);
  14. /*Gossiptosomerandomlivemember*/
  15. booleangossipedToSeed=doGossipToLiveMember(message);
  16. /*Gossiptosomeunreachablememberwithsomeprobabilitytocheckifheisbackup*/
  17. doGossipToUnreachableMember(message);
  18. if(!gossipedToSeed||liveEndpoints.size()<seeds.size())
  19. doGossipToSeed(message);
  20. doStatusCheck();
  21. }
  22. }
主要做了几下几件事:
1、GossipTask在Gossip启动后并不会立即运行,阻塞在listenGate这个条件变量上,当Gossip服务调用listen时才开始运行;
2、首先更新本节点的心跳版本号,然后构造需要发送给其他节点的消息gDigests;
3、从存活节点中随机选择一个节点发送、从失效节点中随机选取一个发送。如果当前存活节点数小于种子数,向其中一个种子节点发消息;
4、检查节点状态。
有关种子节点和节点状态后面再分析,本节只关注Gossip协议本身。
GossipTask用于向其他节点发送Gossip信息,Cassandra还提供了SocketThread这样一个线程来负责接收消息,接收消息的代码在org.apache.cassandra.net.IncomingTcpConnection类中。不管是发送还是接收Gossip消息,都是调用org.apache.cassandra.net.MessagingService的sendOneWay方法实现的。
一次Gossip通信分为三个阶段,如图所示:

每个阶段对应一个消息类,这三个消息类都有对应的处理器,处理器的注册是在org.apache.cassandra.service.StorageService类中完成的,参考如下代码
    MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_SYN,newGossipDigestSynVerbHandler());
  1. MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK,newGossipDigestAckVerbHandler());
  2. MessagingService.instance().registerVerbHandlers(MessagingService.Verb.GOSSIP_DIGEST_ACK2,newGossipDigestAck2VerbHandler());
可见这三种消息分别对应三个消息类型GOSSIP_DIGEST_SYN、GOSSIP_DIGEST_ACK、GOSSIP_DIGEST_ACK2.
下面详细分析每个阶段。

GossipDigestSynMessage

GossipTask的run方法中发送了GOSSIP_DIGEST_SYN类型的消息(GossipDigestSynMessage),这种消息交给其对应的处理器GossipDigestAckVerbHandler处理,具体处理过程在doVerb()方法中,核心代码如下:
    publicvoiddoVerb(MessageIn<GossipDigestSyn>message,intid){
  1. //...check
  2. List<GossipDigest>gDigestList=gDigestMessage.getGossipDigests();
  3. doSort(gDigestList);
  4. List<GossipDigest>deltaGossipDigestList=newArrayList<GossipDigest>();
  5. Map<InetAddress,EndpointState>deltaEpStateMap=newHashMap<InetAddress,EndpointState>();
  6. Gossiper.instance.examineGossiper(gDigestList,deltaGossipDigestList,deltaEpStateMap);
  7. logger.trace("sending{}digestsand{}deltas",deltaGossipDigestList.size(),deltaEpStateMap.size());
  8. MessageOut<GossipDigestAck>gDigestAckMessage=newMessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK,226); color:inherit; line-height:18px"> newGossipDigestAck(deltaGossipDigestList,deltaEpStateMap),85); line-height:18px"> GossipDigestAck.serializer);
  9. Gossiper.instance.checkSeedContact(from);
  10. MessagingService.instance().sendOneWay(gDigestAckMessage,from);
  11. }
1、对接受到的消息排序:先按generation排序,如果generation相同,按maxVersion与本地版本差排序;
2、examineGossiper对比接收到的信息与本节点的差异,本节点需要进一步获取的消息由deltaGossipDigestList保存,本节点需要告诉from节点的信息由deltaEpStateMap保存;
3、利用deltaGossipDigestList和deltaEpStateMap封装成GossipDigestAckMessage消息,发送给from节点。

GossipDigestAckMessage

同GossipDigestSynMessage消息一样,GOSSIP_DIGEST_ACK类型的消息由处理器GossipDigestAckVerbHandler处理,也是doVerb()方法,实际上这三个处理器有共同的实现接口IVerbHandler。核心代码如下:
    publicvoiddoVerb(MessageIn<GossipDigestAck>message,85); line-height:18px"> List<GossipDigest>gDigestList=gDigestAckMessage.getGossipDigestList();
  1. if(epStateMap.size()>0)
  2. {
  3. Gossiper.instance.notifyFailureDetector(epStateMap);
  4. Gossiper.instance.applyStateLocally(epStateMap);
  5. for(GossipDigestgDigest:gDigestList)
  6. {
  7. InetAddressaddr=gDigest.getEndpoint();
  8. EndpointStatelocalEpStatePtr=Gossiper.instance.getStateForVersionBiggerThan(addr,gDigest.getMaxVersion());
  9. if(localEpStatePtr!=null)
  10. deltaEpStateMap.put(addr,localEpStatePtr);
  11. }
  12. MessageOut<GossipDigestAck2>gDigestAck2Message=newMessageOut<GossipDigestAck2>(MessagingService.Verb.GOSSIP_DIGEST_ACK2,226); color:inherit; line-height:18px"> newGossipDigestAck2(deltaEpStateMap),85); line-height:18px"> GossipDigestAck2.serializer);
  13. MessagingService.instance().sendOneWay(gDigestAck2Message,from);
  14. }
1、epStateMap 是from节点想要告诉它的消息,调用applyStateLocally方法进行更新;
2、gDigestList是from节点想要进一步更新的消息,调用getStateForVersionBiggerThan方法拿到from节点可以更新的消息;
3、将2中的消息封装成GOSSIP_DIGEST_ACK2消息发送给from结点。

GossipDigestAck2Message

由GossipDigestAck2VerbHandler处理器处理,这里就不贴代码,接收到from接口发过来的消息,进行本地更新即可。

下节学习cassandra的机架感应

参考资料:
http://ddg.jaist.ac.jp/pub/HDY+04.pdf
http://www.datastax.com/documentation/cassandra/2.0/cassandra/architecture/architectureIntro_c.html

相关文章

一、引言 学习redis 也有一段时间了,该接触的也差不多了。后来有一天,以前的同事问我,如何向redis中...
一、引言 上一篇文章,我介绍了如何在Linux系统上安装和配置MongoDB,其实都不是很难,不需要安装和编译...
一、介绍 Redis客户端使用RESP(Redis的序列化协议)协议与Redis的服务器端进行通信。 虽然该协议是专门...
一、引言 redis学了一段时间了,基本的东西都没问题了。从今天开始讲写一些redis和lua脚本的相关的东西...
一、介绍 今天继续redis-cli使用的介绍,上一篇文章写了一部分,写到第9个小节,今天就来完成第二部分。...
一、引言 上一篇文章我们已经介绍了MongoDB数据库的查询操作,但是并没有介绍全,随着自己的学习的深入...