我是RxJS的新手,想知道是否有人可以帮助我.
我想从请求流(有效载荷数据)创建一个同步的响应流(最好带有相应的请求).
我基本上希望逐个发送请求,每个请求等待最后一个请求.
var requestStream,responseStream;
requestStream = Rx.Observable.from(['a','b','c','d','e']);
responseStream = requestStream.flatMap(
sendRequest,(val,response)=>{ return {val,response}; }
);
responseStream.subscribe(
item=>{
console.log(item);
},err => {
console.err(err);
},()=>{
console.log('Done');
}
);
function sendRequest(val) {
return new Promise((resolve,reject)=>{
setTimeout(()=>{resolve('result for '+val);},1000);
});
};
以下工作在某种程度上,但不使用流作为请求数据(jsbin).
var data,responseStream;
data = ['a','e'];
responseStream = Rx.Observable.create(observer=>{
var sendNext = function(){
var val = data.shift();
if (!val) {
observer.onCompleted();
return;
}
sendRequest(val).then(response=>{
observer.onNext({val,response});
sendNext();
});
};
sendNext();
});
responseStream.subscribe(
item=>{
console.log(item);
},reject)=>{
setTimeout(()=>{resolve('response for '+val);},Math.random() * 2500 + 500);
});
};
谢谢!
编辑:
只是为了澄清,这就是我想要实现的目标:
“发送A,当您收到A的响应时,发送B,当您收到B的响应时,发送C等……”
使用concatMap和defer,如user3743222所建议的,似乎是这样做的(jsbin):
responseStream = requestStream.concatMap(
(val)=>{
return Rx.Observable.defer(()=>{
return sendRequest(val);
});
},response}; }
);