javascript – 如何交叉流(带背压)

前端之家收集整理的这篇文章主要介绍了javascript – 如何交叉流(带背压)前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
假设我有两个可能的无限流:
  1. s1 = a..b..c..d..e...
  2. s2 = 1.2.3.4.5.6.7...

我想合并流,然后将合并的流与慢速异步操作(例如,来自Promise和flatMapConcat的培根)进行映射.

我可以把它们合并在一起

  1. me = a12b3.c45d6.7e...

然后映射

  1. s1 = a..b..c..d..e...
  2. s2 = 1.2.3.4.5.6.7...
  3. me = a12b3.c45d6.7e...
  4. mm = a..1..2..b..3..c..4..5..

从长远来看,你看到更厉害的s2流可以获得优势.这是不理想的行为.

合并行为不行,因为我想有某种背压有更多的交错,“公平”,“循环”合并.几个想要的行为的例子:

  1. s1 = a.....b..............c...
  2. s2 = ..1.2.3..................
  3. mm = a...1...b...2...3....c...
  4.  
  5. s1 = a.........b..........c...
  6. s2 = ..1.2.3..................
  7. mm = a...1...2...b...3....c...

认为这一点的一种方法是s1和s2将任务发送给只能处理一个任务的工作人员.使用merge和flatMapConcat,我会得到一个贪心的任务管理器,但是我想要更公平的一个.

我想找到一个简单而优雅的解决方案.对于任意数量的流可以很容易地概括为好:

  1. // roundRobinPromiseMap(streams: [Stream a],f: a -> Promise b): Stream b
  2. var mm = roundRobinPromiseMap([s1,s2],slowAsyncFunc);

使用RxJS或其他Rx库的解决方案也很好.

澄清

不是zipAsArray

我不想要

  1. function roundRobinPromiseMap(streams,f) {
  2. return Bacon.zipAsArray.apply(null,streams)
  3. .flatMap(Bacon.fromArray)
  4. .flatMapConcat(function (x) {
  5. return Bacon.fromPromise(f(x));
  6. });
  7. }

比较大理石图示例:

  1. s1 = a.....b..............c.......
  2. s2 = ..1.2.3......................
  3. mm = a...1...b...2...3....c....... // wanted
  4. zip = a...1...b...2........c...3... // zipAsArray based

是的,我会遇到缓冲问题

…但是我会直截了当的不公平一个:

  1. function greedyPromiseMap(streams,f) {
  2. Bacon.mergeAll(streams).flatMapConcat(function (x) {
  3. return Bacon.fromPromise(f(x));
  4. });
  5. }

大理石图

  1. s1 = a.........b..........c...
  2. s2 = ..1.2.3..................
  3. mm = a...1...2...b...3....c...
  4. merge = a...1...2...3...b....c...

解决方法

这是一个可以帮助的一大堆代码.

它将输入流转换为单个“值”事件流,然后将它们与“发送”事件(和“结束”事件合并)进行记帐.然后,使用状态机,它会从“值”事件中建立队列,并在“发送”事件上调度值.

本来我写了一个roundRobinThrottle,但我已经把它移到一个要点.

这是一个非常相似的roundRobinPromiseMap.要点中的代码已经过测试,但这不是.

  1. # roundRobinPromiseMap :: (a -> Promise b) -> [EventStream] -> EventStream
  2. roundRobinPromiseMap = (promiser,streams) ->
  3. # A bus to trigger new sends based on promise fulfillment
  4. promiseFulfilled = new Bacon.Bus()
  5.  
  6. # Merge the input streams into a single,keyed stream
  7. theStream = Bacon.mergeAll(streams.map((s,idx) ->
  8. s.map((val) -> {
  9. type: 'value'
  10. index: idx
  11. value: val
  12. })
  13. ))
  14. # Merge in 'end' events
  15. .merge(Bacon.mergeAll(streams.map((s) ->
  16. s.mapEnd(-> {
  17. type: 'end'
  18. })
  19. )))
  20. # Merge in 'send' events that fire when the promise is fulfilled.
  21. .merge(promiseFulfilled.map({ type: 'send' }))
  22. # Feed into a state machine that keeps queues and only creates
  23. # output events on 'send' input events.
  24. .withStateMachine(
  25. {
  26. queues: streams.map(-> [])
  27. toPush: 0
  28. ended: 0
  29. }
  30. handleState
  31.  
  32. )
  33. # Feed this output to the promiser
  34. theStream.onValue((value) ->
  35. Bacon.fromPromise(promiser(value)).onValue(->
  36. promiseFulfilled.push()
  37. ))
  38.  
  39. handleState = (state,baconEvent) ->
  40. outEvents = []
  41.  
  42. if baconEvent.hasValue()
  43. # Handle a round robin event of 'value','send',or 'end'
  44. outEvents = handleRoundRobinEvent(state,baconEvent.value())
  45. else
  46. outEvents = [baconEvent]
  47.  
  48. [state,outEvents]
  49.  
  50. handleRoundRobinEvent = (state,rrEvent) ->
  51. outEvents = []
  52.  
  53. # 'value' : push onto queue
  54. if rrEvent.type == 'value'
  55. state.queues[rrEvent.index].push(rrEvent.value)
  56. # 'send' : send the next value by round-robin selection
  57. else if rrEvent.type == 'send'
  58. # Here's a sentinel for empty queues
  59. noValue = {}
  60. nextValue = noValue
  61. triedQueues = 0
  62.  
  63. while nextValue == noValue && triedQueues < state.queues.length
  64. if state.queues[state.toPush].length > 0
  65. nextValue = state.queues[state.toPush].shift()
  66. state.toPush = (state.toPush + 1) % state.queues.length
  67. triedQueues++
  68. if nextValue != noValue
  69. outEvents.push(new Bacon.Next(nextValue))
  70. # 'end': Keep track of ended streams
  71. else if rrEvent.type == 'end'
  72. state.ended++
  73.  
  74. # End the round-robin stream if all inputs have ended
  75. if roundRobinEnded(state)
  76. outEvents.push(new Bacon.End())
  77.  
  78. outEvents
  79.  
  80. roundRobinEnded = (state) ->
  81. emptyQueues = allEmpty(state.queues)
  82. emptyQueues && state.ended == state.queues.length
  83.  
  84. allEmpty = (arrays) ->
  85. for a in arrays
  86. return false if a.length > 0
  87. return true

猜你在找的JavaScript相关文章