我正在尝试编写一个执行以下操作的拓扑:
>订阅一个twitter Feed(基于关键字)
>一个聚合螺栓,用于聚合收集中的一些tweets(例如N),并将它们发送给打印机螺栓
>一个简单的螺栓,将集合一次打印到控制台.
在现实中,我想对收藏进行一些更多的处理.
我在本地测试,看起来像是在工作.但是,我不知道我是否正确设置了螺栓上的分组,并且在部署在实际的风暴集群上时可以正常工作.如果有人可以帮助您查看此拓扑并提出任何错误,更改或改进,我将不胜感激.
谢谢.
这是我的拓扑结构.
- builder.setSpout("spout",new TwitterFilterSpout("pittsburgh"));
- builder.setBolt("sampleaggregate",new SampleAggregatorBolt())
- .shuffleGrouping("spout");
- builder.setBolt("printaggreator",new PrinterBolt()).shuffleGrouping("sampleaggregate");
聚合螺栓
- public class SampleAggregatorBolt implements IRichBolt {
- protected OutputCollector collector;
- protected Tuple currentTuple;
- protected Logger log;
- /**
- * Holds the messages in the bolt till you are ready to send them out
- */
- protected List<Status> statusCache;
- @Override
- public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) {
- this.collector = collector;
- log = Logger.getLogger(getClass().getName());
- statusCache = new ArrayList<Status>();
- }
- @Override
- public void execute(Tuple tuple) {
- currentTuple = tuple;
- Status currentStatus = null;
- try {
- currentStatus = (Status) tuple.getValue(0);
- } catch (ClassCastException e) {
- }
- if (currentStatus != null) {
- //add it to the status cache
- statusCache.add(currentStatus);
- collector.ack(tuple);
- //check the size of the status cache and pass it to the next stage if you have enough messages to emit
- if (statusCache.size() > 10) {
- collector.emit(new Values(statusCache));
- }
- }
- }
- @Override
- public void cleanup() {
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("tweets"));
- }
- @Override
- public Map<String,Object> getComponentConfiguration() {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
- protected void setupNonSerializableAttributes() {
- }
- }
打印机螺栓
- public class PrinterBolt extends BaseBasicBolt {
- @Override
- public void execute(Tuple tuple,BasicOutputCollector collector) {
- System.out.println(tuple.size() + " " + tuple);
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer ofd) {
- }
- }