假设我有两个可能的无限流:
- s1 = a..b..c..d..e...
- s2 = 1.2.3.4.5.6.7...
我想合并流,然后将合并的流与慢速异步操作(例如,来自Promise和flatMapConcat的培根)进行映射.
我可以把它们合并在一起
- me = a12b3.c45d6.7e...
然后映射
- s1 = a..b..c..d..e...
- s2 = 1.2.3.4.5.6.7...
- me = a12b3.c45d6.7e...
- mm = a..1..2..b..3..c..4..5..
从长远来看,你看到更厉害的s2流可以获得优势.这是不理想的行为.
合并行为不行,因为我想有某种背压有更多的交错,“公平”,“循环”合并.几个想要的行为的例子:
- s1 = a.....b..............c...
- s2 = ..1.2.3..................
- mm = a...1...b...2...3....c...
- s1 = a.........b..........c...
- s2 = ..1.2.3..................
- mm = a...1...2...b...3....c...
认为这一点的一种方法是s1和s2将任务发送给只能处理一个任务的工作人员.使用merge和flatMapConcat,我会得到一个贪心的任务管理器,但是我想要更公平的一个.
我想找到一个简单而优雅的解决方案.对于任意数量的流可以很容易地概括为好:
- // roundRobinPromiseMap(streams: [Stream a],f: a -> Promise b): Stream b
- var mm = roundRobinPromiseMap([s1,s2],slowAsyncFunc);
使用RxJS或其他Rx库的解决方案也很好.
澄清
不是zipAsArray
我不想要
- function roundRobinPromiseMap(streams,f) {
- return Bacon.zipAsArray.apply(null,streams)
- .flatMap(Bacon.fromArray)
- .flatMapConcat(function (x) {
- return Bacon.fromPromise(f(x));
- });
- }
比较大理石图示例:
- s1 = a.....b..............c.......
- s2 = ..1.2.3......................
- mm = a...1...b...2...3....c....... // wanted
- zip = a...1...b...2........c...3... // zipAsArray based
是的,我会遇到缓冲问题
…但是我会直截了当的不公平一个:
- function greedyPromiseMap(streams,f) {
- Bacon.mergeAll(streams).flatMapConcat(function (x) {
- return Bacon.fromPromise(f(x));
- });
- }
大理石图
- s1 = a.........b..........c...
- s2 = ..1.2.3..................
- mm = a...1...2...b...3....c...
- merge = a...1...2...3...b....c...
解决方法
这是一个可以帮助的一大堆代码.
它将输入流转换为单个“值”事件流,然后将它们与“发送”事件(和“结束”事件合并)进行记帐.然后,使用状态机,它会从“值”事件中建立队列,并在“发送”事件上调度值.
本来我写了一个roundRobinThrottle,但我已经把它移到一个要点.
这是一个非常相似的roundRobinPromiseMap.要点中的代码已经过测试,但这不是.
- # roundRobinPromiseMap :: (a -> Promise b) -> [EventStream] -> EventStream
- roundRobinPromiseMap = (promiser,streams) ->
- # A bus to trigger new sends based on promise fulfillment
- promiseFulfilled = new Bacon.Bus()
- # Merge the input streams into a single,keyed stream
- theStream = Bacon.mergeAll(streams.map((s,idx) ->
- s.map((val) -> {
- type: 'value'
- index: idx
- value: val
- })
- ))
- # Merge in 'end' events
- .merge(Bacon.mergeAll(streams.map((s) ->
- s.mapEnd(-> {
- type: 'end'
- })
- )))
- # Merge in 'send' events that fire when the promise is fulfilled.
- .merge(promiseFulfilled.map({ type: 'send' }))
- # Feed into a state machine that keeps queues and only creates
- # output events on 'send' input events.
- .withStateMachine(
- {
- queues: streams.map(-> [])
- toPush: 0
- ended: 0
- }
- handleState
- )
- # Feed this output to the promiser
- theStream.onValue((value) ->
- Bacon.fromPromise(promiser(value)).onValue(->
- promiseFulfilled.push()
- ))
- handleState = (state,baconEvent) ->
- outEvents = []
- if baconEvent.hasValue()
- # Handle a round robin event of 'value','send',or 'end'
- outEvents = handleRoundRobinEvent(state,baconEvent.value())
- else
- outEvents = [baconEvent]
- [state,outEvents]
- handleRoundRobinEvent = (state,rrEvent) ->
- outEvents = []
- # 'value' : push onto queue
- if rrEvent.type == 'value'
- state.queues[rrEvent.index].push(rrEvent.value)
- # 'send' : send the next value by round-robin selection
- else if rrEvent.type == 'send'
- # Here's a sentinel for empty queues
- noValue = {}
- nextValue = noValue
- triedQueues = 0
- while nextValue == noValue && triedQueues < state.queues.length
- if state.queues[state.toPush].length > 0
- nextValue = state.queues[state.toPush].shift()
- state.toPush = (state.toPush + 1) % state.queues.length
- triedQueues++
- if nextValue != noValue
- outEvents.push(new Bacon.Next(nextValue))
- # 'end': Keep track of ended streams
- else if rrEvent.type == 'end'
- state.ended++
- # End the round-robin stream if all inputs have ended
- if roundRobinEnded(state)
- outEvents.push(new Bacon.End())
- outEvents
- roundRobinEnded = (state) ->
- emptyQueues = allEmpty(state.queues)
- emptyQueues && state.ended == state.queues.length
- allEmpty = (arrays) ->
- for a in arrays
- return false if a.length > 0
- return true