博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊storm WindowTridentProcessor的FreshCollector
阅读量:6544 次
发布时间:2019-06-24

本文共 15086 字,大约阅读时间需要 50 分钟。

本文主要研究一下storm WindowTridentProcessor的FreshCollector

实例

TridentTopology topology = new TridentTopology();        topology.newStream("spout1", spout)                .partitionBy(new Fields("user"))                .window(windowConfig,windowsStoreFactory,new Fields("user","score"),new UserCountAggregator(),new Fields("aggData"))                .parallelismHint(1)                .each(new Fields("aggData"), new PrintEachFunc(),new Fields());复制代码
  • 这个实例在window操作之后跟了一个each操作

WindowTridentProcessor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/windowing/WindowTridentProcessor.java

public class WindowTridentProcessor implements TridentProcessor {		private FreshCollector collector;	//......    public void prepare(Map stormConf, TopologyContext context, TridentContext tridentContext) {        this.topologyContext = context;        List
parents = tridentContext.getParentTupleFactories(); if (parents.size() != 1) { throw new RuntimeException("Aggregation related operation can only have one parent"); } Long maxTuplesCacheSize = getWindowTuplesCacheSize(stormConf); this.tridentContext = tridentContext; collector = new FreshCollector(tridentContext); projection = new TridentTupleView.ProjectionFactory(parents.get(0), inputFields); windowStore = windowStoreFactory.create(stormConf); windowTaskId = windowId + WindowsStore.KEY_SEPARATOR + topologyContext.getThisTaskId() + WindowsStore.KEY_SEPARATOR; windowTriggerInprocessId = getWindowTriggerInprocessIdPrefix(windowTaskId); tridentWindowManager = storeTuplesInStore ? new StoreBasedTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector(), maxTuplesCacheSize, inputFields) : new InMemoryTridentWindowManager(windowConfig, windowTaskId, windowStore, aggregator, tridentContext.getDelegateCollector()); tridentWindowManager.prepare(); } public void finishBatch(ProcessorContext processorContext) { Object batchId = processorContext.batchId; Object batchTxnId = getBatchTxnId(batchId); LOG.debug("Received finishBatch of : [{}] ", batchId); // get all the tuples in a batch and add it to trident-window-manager List
tuples = (List
) processorContext.state[tridentContext.getStateIndex()]; tridentWindowManager.addTuplesBatch(batchId, tuples); List
pendingTriggerIds = null; List
triggerKeys = new ArrayList<>(); Iterable
triggerValues = null; if (retriedAttempt(batchId)) { pendingTriggerIds = (List
) windowStore.get(inprocessTriggerKey(batchTxnId)); if (pendingTriggerIds != null) { for (Integer pendingTriggerId : pendingTriggerIds) { triggerKeys.add(triggerKey(pendingTriggerId)); } triggerValues = windowStore.get(triggerKeys); } } // if there are no trigger values in earlier attempts or this is a new batch, emit pending triggers. if(triggerValues == null) { pendingTriggerIds = new ArrayList<>(); Queue
pendingTriggers = tridentWindowManager.getPendingTriggers(); LOG.debug("pending triggers at batch: [{}] and triggers.size: [{}] ", batchId, pendingTriggers.size()); try { Iterator
pendingTriggersIter = pendingTriggers.iterator(); List
values = new ArrayList<>(); StoreBasedTridentWindowManager.TriggerResult triggerResult = null; while (pendingTriggersIter.hasNext()) { triggerResult = pendingTriggersIter.next(); for (List aggregatedResult : triggerResult.result) { String triggerKey = triggerKey(triggerResult.id); triggerKeys.add(triggerKey); values.add(aggregatedResult); pendingTriggerIds.add(triggerResult.id); } pendingTriggersIter.remove(); } triggerValues = values; } finally { // store inprocess triggers of a batch in store for batch retries for any failures if (!pendingTriggerIds.isEmpty()) { windowStore.put(inprocessTriggerKey(batchTxnId), pendingTriggerIds); } } } collector.setContext(processorContext); int i = 0; for (Object resultValue : triggerValues) { collector.emit(new ConsList(new TriggerInfo(windowTaskId, pendingTriggerIds.get(i++)), (List) resultValue)); } collector.setContext(null); }}复制代码
  • WindowTridentProcessor在prepare的时候创建了FreshCollector
  • finishBatch的时候,调用FreshCollector.emit将窗口的aggregate的结果集传递过去
  • 传递的数据结构为ConsList,其实是个AbstractList的实现,由Object类型的first元素,以及List<Object>结构的_elems组成

FreshCollector

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/FreshCollector.java

public class FreshCollector implements TridentCollector {    FreshOutputFactory _factory;    TridentContext _triContext;    ProcessorContext context;        public FreshCollector(TridentContext context) {        _triContext = context;        _factory = new FreshOutputFactory(context.getSelfOutputFields());    }                    public void setContext(ProcessorContext pc) {        this.context = pc;    }    @Override    public void emit(List values) {        TridentTuple toEmit = _factory.create(values);        for(TupleReceiver r: _triContext.getReceivers()) {            r.execute(context, _triContext.getOutStreamId(), toEmit);        }                }    @Override    public void reportError(Throwable t) {        _triContext.getDelegateCollector().reportError(t);    }     public Factory getOutputFactory() {        return _factory;    }    }复制代码
  • FreshCollector在构造器里头根据context的selfOutputFields(第一个field固定为_task_info,之后的几个field为用户在window方法定义的functionFields)构造FreshOutputFactory
  • emit方法,首先使用FreshOutputFactory根据outputFields构造TridentTupleView,之后获取TupleReceiver,调用TupleReceiver的execute方法把TridentTupleView传递过去
  • 这里的TupleReceiver有ProjectedProcessor、PartitionPersistProcessor

TridentTupleView.FreshOutputFactory

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/tuple/TridentTupleView.java

public static class FreshOutputFactory  implements Factory {        Map
_fieldIndex; ValuePointer[] _index; public FreshOutputFactory(Fields selfFields) { _fieldIndex = new HashMap<>(); for(int i=0; i
selfVals) { return new TridentTupleView(PersistentVector.EMPTY.cons(selfVals), _index, _fieldIndex); } @Override public Map
getFieldIndex() { return _fieldIndex; } @Override public int numDelegates() { return 1; } @Override public List
getOutputFields() { return indexToFieldsList(_index); } }复制代码
  • FreshOutputFactory是TridentTupleView的一个静态类,其构造方法主要是计算_index以及_fieldIndex
  • _fieldIndex是一个map,key是field字段,value是ValuePointer,记录其delegateIndex(这里固定为0)、index及field信息;第一个field为_task_info,index为0;之后的fields为用户在window方法定义的functionFields
  • 这里的create方法主要是构造TridentTupleView,其构造器第一个值为IPersistentVector,第二个值为_index,第三个值为_fieldIndex

ValuePointer

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/tuple/ValuePointer.java

public class ValuePointer {    public static Map
buildFieldIndex(ValuePointer[] pointers) { Map
ret = new HashMap
(); for(ValuePointer ptr: pointers) { ret.put(ptr.field, ptr); } return ret; } public static ValuePointer[] buildIndex(Fields fieldsOrder, Map
pointers) { if(fieldsOrder.size()!=pointers.size()) { throw new IllegalArgumentException("Fields order must be same length as pointers map"); } ValuePointer[] ret = new ValuePointer[pointers.size()]; for(int i=0; i
  • 这里的buildIndex,主要是根据selfOutputFields的顺序返回ValuePointer数组

ProjectedProcessor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/ProjectedProcessor.java

public class ProjectedProcessor implements TridentProcessor {    Fields _projectFields;    ProjectionFactory _factory;    TridentContext _context;        public ProjectedProcessor(Fields projectFields) {        _projectFields = projectFields;    }        @Override    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {        if(tridentContext.getParentTupleFactories().size()!=1) {            throw new RuntimeException("Projection processor can only have one parent");        }        _context = tridentContext;        _factory = new ProjectionFactory(tridentContext.getParentTupleFactories().get(0), _projectFields);    }    @Override    public void cleanup() {    }    @Override    public void startBatch(ProcessorContext processorContext) {    }    @Override    public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {        TridentTuple toEmit = _factory.create(tuple);        for(TupleReceiver r: _context.getReceivers()) {            r.execute(processorContext, _context.getOutStreamId(), toEmit);        }    }    @Override    public void finishBatch(ProcessorContext processorContext) {    }    @Override    public Factory getOutputFactory() {        return _factory;    }}复制代码
  • ProjectedProcessor在prepare的时候,创建了ProjectionFactory,其_projectFields就是window方法定义的functionFields,这里还使用tridentContext.getParentTupleFactories().get(0)提取了parent的第一个Factory,由于是FreshCollector传递过来的,因而这里是TridentTupleView.FreshOutputFactory
  • execute的时候,首先调用ProjectionFactory.create方法,对TridentTupleView进行字段提取操作,toEmit就是根据window方法定义的functionFields重新提取的TridentTupleView
  • execute方法之后对_context.getReceivers()挨个调用execute操作,将toEmit传递过去,这里的receiver就是window操作之后的各种processor了,比如EachProcessor

TridentTupleView.ProjectionFactory

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/tuple/TridentTupleView.java

public static class ProjectionFactory implements Factory {        Map
_fieldIndex; ValuePointer[] _index; Factory _parent; public ProjectionFactory(Factory parent, Fields projectFields) { _parent = parent; if(projectFields==null) projectFields = new Fields(); Map
parentFieldIndex = parent.getFieldIndex(); _fieldIndex = new HashMap<>(); for(String f: projectFields) { _fieldIndex.put(f, parentFieldIndex.get(f)); } _index = ValuePointer.buildIndex(projectFields, _fieldIndex); } public TridentTuple create(TridentTuple parent) { if(_index.length==0) return EMPTY_TUPLE; else return new TridentTupleView(((TridentTupleView)parent)._delegates, _index, _fieldIndex); } @Override public Map
getFieldIndex() { return _fieldIndex; } @Override public int numDelegates() { return _parent.numDelegates(); } @Override public List
getOutputFields() { return indexToFieldsList(_index); } }复制代码
  • ProjectionFactory是TridentTupleView的静态类,它在构造器里头根据projectFields构造_index及_fieldIndex,这样create方法就能根据所需的字段创建TridentTupleView

EachProcessor

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/EachProcessor.java

public class EachProcessor implements TridentProcessor {    Function _function;    TridentContext _context;    AppendCollector _collector;    Fields _inputFields;    ProjectionFactory _projection;        public EachProcessor(Fields inputFields, Function function) {        _function = function;        _inputFields = inputFields;    }        @Override    public void prepare(Map conf, TopologyContext context, TridentContext tridentContext) {        List
parents = tridentContext.getParentTupleFactories(); if(parents.size()!=1) { throw new RuntimeException("Each operation can only have one parent"); } _context = tridentContext; _collector = new AppendCollector(tridentContext); _projection = new ProjectionFactory(parents.get(0), _inputFields); _function.prepare(conf, new TridentOperationContext(context, _projection)); } @Override public void cleanup() { _function.cleanup(); } @Override public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) { _collector.setContext(processorContext, tuple); _function.execute(_projection.create(tuple), _collector); } @Override public void startBatch(ProcessorContext processorContext) { } @Override public void finishBatch(ProcessorContext processorContext) { } @Override public Factory getOutputFactory() { return _collector.getOutputFactory(); } }复制代码
  • EachProcessor的execute方法,首先设置_collector的context为processorContext,然后调用_function.execute方法
  • 这里调用了_projection.create(tuple)来提取字段,主要是根据_function定义的inputFields来提取
  • 这里传递给_function的collector为AppendCollector

AppendCollector

storm-core-1.2.2-sources.jar!/org/apache/storm/trident/planner/processor/AppendCollector.java

public class AppendCollector implements TridentCollector {    OperationOutputFactory _factory;    TridentContext _triContext;    TridentTuple tuple;    ProcessorContext context;        public AppendCollector(TridentContext context) {        _triContext = context;        _factory = new OperationOutputFactory(context.getParentTupleFactories().get(0), context.getSelfOutputFields());    }                    public void setContext(ProcessorContext pc, TridentTuple t) {        this.context = pc;        this.tuple = t;    }    @Override    public void emit(List values) {        TridentTuple toEmit = _factory.create((TridentTupleView) tuple, values);        for(TupleReceiver r: _triContext.getReceivers()) {            r.execute(context, _triContext.getOutStreamId(), toEmit);        }    }    @Override    public void reportError(Throwable t) {        _triContext.getDelegateCollector().reportError(t);    }         public Factory getOutputFactory() {        return _factory;    }}复制代码
  • AppendCollector在构造器里头创建了OperationOutputFactory,其emit方法也是提取OperationOutputFields,然后挨个调用_triContext.getReceivers()的execute方法;如果each之后没有其他操作,那么AppendCollector的_triContext.getReceivers()就为空

小结

  • WindowTridentProcessor里头使用的是FreshCollector,WindowTridentProcessor在finishBatch的时候,会从TridentWindowManager提取window创建的pendingTriggers(提取之后会将其数据从pendingTriggers移除),里头包含了窗口累积的数据,然后使用FreshCollector发射这些数据,默认第一个value为TriggerInfo,第二个value就是窗口累积发射的values
  • FreshCollector的emit方法首先使用TridentTupleView.FreshOutputFactory根据selfOutputFields(第一个field固定为_task_info,之后的几个field为用户在window方法定义的functionFields)构建TridentTupleView,然后挨个调用_triContext.getReceivers()的execute方法
  • 后续的receivers中有一个ProjectedProcessor,用于根据window方法定义的functionFields重新提取的TridentTupleView,它的execute方法也类似FreshCollector.emit方法,先提取所需字段构造TridentTupleView,然后挨个调用_triContext.getReceivers()的execute方法(比如EachProcessor.execute)
  • EachProcessor使用的collector为AppendCollector,它的emit方法也类似FreshCollector的emit方法,先进行字段提取构造TridentTupleView,然后挨个调用_triContext.getReceivers()的execute方法
  • FreshCollector的emit方法与ProjectedProcessor的execute方法以及AppendCollector的emit方法都非常类似,首先是使用Factory提取所需字段构建TridentTupleView,然后挨个调用_triContext.getReceivers()的execute方法;当一个_triContext没有receiver的时候,tuple的传递也就停止了

doc

转载地址:http://hzldo.baihongyu.com/

你可能感兴趣的文章
【技巧】easyUI的datagrid,如何在翻页以后仍能记录被选中的行
查看>>
经过强制类型转换以后,变量a, b的值分别为( )short a = 128; byte b = (byte) a;
查看>>
ubuntu下msmtp+mutt的安装和配置
查看>>
spring中注解说明
查看>>
QLabel显示图片,图片可以自适应label的大小
查看>>
阅读下面程序,请回答如下问题:
查看>>
BZOJ3994:[SDOI2015]约数个数和——题解
查看>>
3、EJB3.0开发第一个无会话Bean和客户端(jboss4.2.3)
查看>>
git fetch & pull详解
查看>>
优酷2013.3去广告 不黑屏
查看>>
web入门、tomcat、servlet、jsp
查看>>
boost_1.63.0编译VS2013
查看>>
mysql查看每个数据库所占磁盘大小
查看>>
jQuery 插件-(初体验一)
查看>>
PHP语言 -- Ajax 登录处理
查看>>
基于js的CC攻击实现与防御
查看>>
Largest Rectangle in a Histogram
查看>>
树状数组模板
查看>>
我的家庭私有云计划-19
查看>>
项目实践中Linux集群的总结和思考
查看>>