java – 在一个简单的聚合风暴拓扑中进行分组

前端之家收集整理的这篇文章主要介绍了java – 在一个简单的聚合风暴拓扑中进行分组前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在尝试编写一个执行以下操作的拓扑:

>订阅一个twitter Feed(基于关键字)
>一个聚合螺栓,用于聚合收集中的一些tweets(例如N),并将它们发送给打印机螺栓
>一个简单的螺栓,将集合一次打印到控制台.

在现实中,我想对收藏进行一些更多的处理.

我在本地测试,看起来像是在工作.但是,我不知道我是否正确设置了螺栓上的分组,并且在部署在实际的风暴集群上时可以正常工作.如果有人可以帮助您查看此拓扑并提出任何错误,更改或改进,我将不胜感激.

谢谢.

这是我的拓扑结构.

  1. builder.setSpout("spout",new TwitterFilterSpout("pittsburgh"));
  2. builder.setBolt("sampleaggregate",new SampleAggregatorBolt())
  3. .shuffleGrouping("spout");
  4. builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");

聚合螺栓

  1. public class SampleAggregatorBolt implements IRichBolt {
  2.  
  3. protected OutputCollector collector;
  4. protected Tuple currentTuple;
  5. protected Logger log;
  6. /**
  7. * Holds the messages in the bolt till you are ready to send them out
  8. */
  9. protected List<Status> statusCache;
  10.  
  11. @Override
  12. public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
  13. this.collector = collector;
  14.  
  15. log = Logger.getLogger(getClass().getName());
  16. statusCache = new ArrayList<Status>();
  17. }
  18.  
  19. @Override
  20. public void execute(Tuple tuple) {
  21. currentTuple = tuple;
  22.  
  23. Status currentStatus = null;
  24. try {
  25. currentStatus = (Status) tuple.getValue(0);
  26. } catch (ClassCastException e) {
  27. }
  28. if (currentStatus != null) {
  29.  
  30. //add it to the status cache
  31. statusCache.add(currentStatus);
  32. collector.ack(tuple);
  33.  
  34.  
  35. //check the size of the status cache and pass it to the next stage if you have enough messages to emit
  36. if (statusCache.size() > 10) {
  37. collector.emit(new Values(statusCache));
  38. }
  39.  
  40. }
  41. }
  42.  
  43. @Override
  44. public void cleanup() {
  45.  
  46.  
  47. }
  48.  
  49. @Override
  50. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  51. declarer.declare(new Fields("tweets"));
  52.  
  53. }
  54.  
  55. @Override
  56. public Map<String,Object> getComponentConfiguration() {
  57. return null; //To change body of implemented methods use File | Settings | File Templates.
  58. }
  59.  
  60.  
  61. protected void setupNonSerializableAttributes() {
  62.  
  63. }
  64.  
  65. }

打印机螺栓

  1. public class PrinterBolt extends BaseBasicBolt {
  2.  
  3. @Override
  4. public void execute(Tuple tuple,BasicOutputCollector collector) {
  5. System.out.println(tuple.size() + " " + tuple);
  6. }
  7.  
  8. @Override
  9. public void declareOutputFields(OutputFieldsDeclarer ofd) {
  10. }
  11.  
  12. }

解决方法

从我可以看到它看起来不错.魔鬼的细节虽然如此.我不知道你的聚合器螺栓是什么,但是如果对传递给它的值做出任何假设,那么你应该考虑一个适当的字段分组.当您使用默认的并行性提示1时,这可能不会有太大的差异,但是如果您决定使用多个聚合螺栓实例进行缩放隐式逻辑假设,则可能需要非随机分组.

猜你在找的Java相关文章