如果我有一个Node js流,例如来自process.stdin或fs.createReadStream之类的东西,我如何使用RxJs5将其转换为RxJs Observable流?
解决方法
对于任何寻找此项的人,请遵循Mark的建议,I adapted rx-node
fromStream
implementation for rxjs5.
import { Observable } from 'rxjs'; // Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52 export default function fromStream(stream,finishEventName = 'end',dataEventName = 'data') { stream.pause(); return new Observable((observer) => { function dataHandler(data) { observer.next(data); } function errorHandler(err) { observer.error(err); } function endHandler() { observer.complete(); } stream.addListener(dataEventName,dataHandler); stream.addListener('error',errorHandler); stream.addListener(finishEventName,endHandler); stream.resume(); return () => { stream.removeListener(dataEventName,dataHandler); stream.removeListener('error',errorHandler); stream.removeListener(finishEventName,endHandler); }; }).share(); }