04 Trident聚合

前端之家收集整理的这篇文章主要介绍了04 Trident聚合前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
•聚合是流计算的常见场景。Trident提供了两种聚合方式

1:Aggregate

2:partitionAggregate

•三种聚合器来实现聚合任务。如sum

1:CombinerAggregator

2:ReducerAggregator

3: Aggregator

这3个聚合器一般是,配合groupBy()来做分区聚合

比如:做如下聚合,每个手机号的费用总合

SELECT tel,SUM(money) FROM u GROUP BY tel



-----------------配套视频----------------------------------------
http://pan.baidu.com/s/1kT5kecn
------------------配套代码----------------------------------------------------------------------------------------------------------------------

package storm.test.trident;


import java.util.Map;


import com.sun.corba.se.spi.orb.StringPair;


import storm.trident.Stream;
import storm.trident.TridentTopology;
import storm.trident.operation.Aggregator;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.Filter;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.spout.IBatchSpout;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.tuple.TridentTuple;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;


public class _04Aggregate {


private static IBatchSpout getBatch() {
// SELECT tel,SUM(money) FROM u GROUP BY tel
FixedBatchSpout batchSpout = new FixedBatchSpout(new Fields("tels"),3,
new Values("189111 4"),new Values("135111 9"),new Values(
"189111 8"),new Values("158111 3"),new Values(
"158111 3"));
return batchSpout;
}


public static void main(String[] args) {
IBatchSpout batchSpout = getBatch();


TridentTopology tt = new TridentTopology();


Stream stream = tt.newStream("_01Stream",batchSpout);


// SELECT tel,SUM(money) FROM u GROUP BY tel
//final String dburl = "jdbc:MysqL://localhost:3306/tel?user=root&password=root";


stream.each(new Fields("tels"),new KeyValueFunction(),
new Fields("tel","money")).groupBy(new Fields("tel"))//
.partitionAggregate(new Fields("money","tel"),new SumAgg(),
new Fields("sum"));


// 转化成第一个编程模型
StormTopology stormTopology = tt.build();


Config config = new Config();


LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("_02Filter",config,stormTopology);
}
}






class SumState {
int state = 0;
}


class SumAgg implements Aggregator<SumState> {


// "189111 4") 1
// ("135111 9") 2
// "189111 8"),3
String tel = "";


@Override
public void aggregate(SumState val,TridentTuple tuple,
TridentCollector collector) {


tel = tuple.getStringByField("tel");


String money = tuple.getStringByField("money");
Integer m = Integer.parseInt(money);
val.state += m;
}


@Override
public void complete(SumState val,TridentCollector collector) {
System.err.println(tel + ",聚合结果:" + val.state);
collector.emit(new Values(val.state));
}


@Override
public SumState init(Object batchId,TridentCollector collector) {
// TODO Auto-generated method stub
return new SumState();
}


@Override
public void cleanup() {
// TODO Auto-generated method stub


}


@Override
public void prepare(Map conf,TridentOperationContext context) {
// TODO Auto-generated method stub


}


}
-----------------配套视频----------------------------------------
http://pan.baidu.com/s/1kT5kecn#path=%252Fstorm%25E5%2588%2586%25E4%25BA%25AB%25E8%25A7%2586%25E9%25A2%2591
原文链接:/javaschema/284854.html

猜你在找的设计模式相关文章