•聚合是流计算的常见场景。Trident提供了两种聚合方式
1:Aggregate
2:partitionAggregate
•三种聚合器来实现聚合任务。如sum
1:CombinerAggregator
2:ReducerAggregator
3: Aggregator
这3个聚合器一般是,配合groupBy()来做分区聚合
比如:做如下聚合,每个手机号的费用总合
SELECT tel,SUM(money) FROM u GROUP BY tel
-----------------配套视频----------------------------------------
http://pan.baidu.com/s/1kT5kecn
------------------配套代码----------------------------------------------------------------------------------------------------------------------
package storm.test.trident;
import java.util.Map;
import com.sun.corba.se.spi.orb.StringPair;
import storm.trident.Stream;
import storm.trident.TridentTopology;
import storm.trident.operation.Aggregator;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.Filter;
import storm.trident.operation.TridentCollector;
import storm.trident.operation.TridentOperationContext;
import storm.trident.spout.IBatchSpout;
import storm.trident.testing.FixedBatchSpout;
import storm.trident.tuple.TridentTuple;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class _04Aggregate {
private static IBatchSpout getBatch() {
// SELECT tel,SUM(money) FROM u GROUP BY tel
FixedBatchSpout batchSpout = new FixedBatchSpout(new Fields("tels"),3,
new Values("189111 4"),new Values("135111 9"),new Values(
"189111 8"),new Values("158111 3"),new Values(
"158111 3"));
return batchSpout;
}
public static void main(String[] args) {
IBatchSpout batchSpout = getBatch();
TridentTopology tt = new TridentTopology();
Stream stream = tt.newStream("_01Stream",batchSpout);
// SELECT tel,SUM(money) FROM u GROUP BY tel
//final String dburl = "jdbc:MysqL://localhost:3306/tel?user=root&password=root";
stream.each(new Fields("tels"),new KeyValueFunction(),
new Fields("tel","money")).groupBy(new Fields("tel"))//
.partitionAggregate(new Fields("money","tel"),new SumAgg(),
new Fields("sum"));
// 转化成第一个编程模型
StormTopology stormTopology = tt.build();
Config config = new Config();
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("_02Filter",config,stormTopology);
}
}
class SumState {
int state = 0;
}
class SumAgg implements Aggregator<SumState> {
// "189111 4") 1
// ("135111 9") 2
// "189111 8"),3
String tel = "";
@Override
public void aggregate(SumState val,TridentTuple tuple,
TridentCollector collector) {
tel = tuple.getStringByField("tel");
String money = tuple.getStringByField("money");
Integer m = Integer.parseInt(money);
val.state += m;
}
@Override
public void complete(SumState val,TridentCollector collector) {
System.err.println(tel + ",聚合结果:" + val.state);
collector.emit(new Values(val.state));
}
@Override
public SumState init(Object batchId,TridentCollector collector) {
// TODO Auto-generated method stub
return new SumState();
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void prepare(Map conf,TridentOperationContext context) {
// TODO Auto-generated method stub
}
}
-----------------配套视频----------------------------------------
http://pan.baidu.com/s/1kT5kecn#path=%252Fstorm%25E5%2588%2586%25E4%25BA%25AB%25E8%25A7%2586%25E9%25A2%2591