假设我有两个可能的无限流:
s1 = a..b..c..d..e... s2 = 1.2.3.4.5.6.7...
我想合并流,然后将合并的流与慢速异步操作(例如,来自Promise和flatMapConcat的培根)进行映射.
我可以把它们合并在一起
me = a12b3.c45d6.7e...
然后映射
s1 = a..b..c..d..e... s2 = 1.2.3.4.5.6.7... me = a12b3.c45d6.7e... mm = a..1..2..b..3..c..4..5..
从长远来看,你看到更厉害的s2流可以获得优势.这是不理想的行为.
合并行为不行,因为我想有某种背压有更多的交错,“公平”,“循环”合并.几个想要的行为的例子:
s1 = a.....b..............c... s2 = ..1.2.3.................. mm = a...1...b...2...3....c... s1 = a.........b..........c... s2 = ..1.2.3.................. mm = a...1...2...b...3....c...
认为这一点的一种方法是s1和s2将任务发送给只能处理一个任务的工作人员.使用merge和flatMapConcat,我会得到一个贪心的任务管理器,但是我想要更公平的一个.
我想找到一个简单而优雅的解决方案.对于任意数量的流可以很容易地概括为好:
// roundRobinPromiseMap(streams: [Stream a],f: a -> Promise b): Stream b var mm = roundRobinPromiseMap([s1,s2],slowAsyncFunc);
使用RxJS或其他Rx库的解决方案也很好.
澄清
不是zipAsArray
我不想要
function roundRobinPromiseMap(streams,f) { return Bacon.zipAsArray.apply(null,streams) .flatMap(Bacon.fromArray) .flatMapConcat(function (x) { return Bacon.fromPromise(f(x)); }); }
比较大理石图示例:
s1 = a.....b..............c....... s2 = ..1.2.3...................... mm = a...1...b...2...3....c....... // wanted zip = a...1...b...2........c...3... // zipAsArray based
是的,我会遇到缓冲问题
…但是我会直截了当的不公平一个:
function greedyPromiseMap(streams,f) { Bacon.mergeAll(streams).flatMapConcat(function (x) { return Bacon.fromPromise(f(x)); }); }
大理石图
s1 = a.........b..........c... s2 = ..1.2.3.................. mm = a...1...2...b...3....c... merge = a...1...2...3...b....c...
解决方法
这是一个可以帮助的一大堆代码.
它将输入流转换为单个“值”事件流,然后将它们与“发送”事件(和“结束”事件合并)进行记帐.然后,使用状态机,它会从“值”事件中建立队列,并在“发送”事件上调度值.
本来我写了一个roundRobinThrottle,但我已经把它移到一个要点.
这是一个非常相似的roundRobinPromiseMap.要点中的代码已经过测试,但这不是.
# roundRobinPromiseMap :: (a -> Promise b) -> [EventStream] -> EventStream roundRobinPromiseMap = (promiser,streams) -> # A bus to trigger new sends based on promise fulfillment promiseFulfilled = new Bacon.Bus() # Merge the input streams into a single,keyed stream theStream = Bacon.mergeAll(streams.map((s,idx) -> s.map((val) -> { type: 'value' index: idx value: val }) )) # Merge in 'end' events .merge(Bacon.mergeAll(streams.map((s) -> s.mapEnd(-> { type: 'end' }) ))) # Merge in 'send' events that fire when the promise is fulfilled. .merge(promiseFulfilled.map({ type: 'send' })) # Feed into a state machine that keeps queues and only creates # output events on 'send' input events. .withStateMachine( { queues: streams.map(-> []) toPush: 0 ended: 0 } handleState ) # Feed this output to the promiser theStream.onValue((value) -> Bacon.fromPromise(promiser(value)).onValue(-> promiseFulfilled.push() )) handleState = (state,baconEvent) -> outEvents = [] if baconEvent.hasValue() # Handle a round robin event of 'value','send',or 'end' outEvents = handleRoundRobinEvent(state,baconEvent.value()) else outEvents = [baconEvent] [state,outEvents] handleRoundRobinEvent = (state,rrEvent) -> outEvents = [] # 'value' : push onto queue if rrEvent.type == 'value' state.queues[rrEvent.index].push(rrEvent.value) # 'send' : send the next value by round-robin selection else if rrEvent.type == 'send' # Here's a sentinel for empty queues noValue = {} nextValue = noValue triedQueues = 0 while nextValue == noValue && triedQueues < state.queues.length if state.queues[state.toPush].length > 0 nextValue = state.queues[state.toPush].shift() state.toPush = (state.toPush + 1) % state.queues.length triedQueues++ if nextValue != noValue outEvents.push(new Bacon.Next(nextValue)) # 'end': Keep track of ended streams else if rrEvent.type == 'end' state.ended++ # End the round-robin stream if all inputs have ended if roundRobinEnded(state) outEvents.push(new Bacon.End()) outEvents roundRobinEnded = (state) -> emptyQueues = allEmpty(state.queues) emptyQueues && state.ended == state.queues.length allEmpty = (arrays) -> for a in arrays return false if a.length > 0 return true