假设您有一个包含100个URL的列表,并且您想要下载它们,解析响应并通过IObservable推送结果:
public IObservable<ImageSource> GetImages(IEnumerable<string> urls) { return urls .ToObservable() .Select(async url => { var bytes = await this.DownloadImage(url); var image = await this.ParseImage(bytes); return image; }); }
我有一些问题.
一个是同时敲击服务器100个请求是不礼貌的 – 理想情况下,你会在给定时刻限制为6个请求.但是,如果我添加一个Buffer调用,由于Select中的异步lambda,所有内容仍然会同时触发.
此外,结果将以与URL的输入序列不同的顺序返回,这是不好的,因为图像是将在UI上显示的动画的一部分.
我已经尝试了各种各样的东西,我有一个有效的解决方案,但感觉很复杂:
public IObservable<ImageSource> GetImages(IEnumerable<string> urls) { var semaphore = new SemaphoreSlim(6); return Observable.Create<ImageSource>(async observable => { var tasks = urls .Select(async url => { await semaphore.WaitAsync(); var bytes = await this.DownloadImage(url); var image = await this.ParseImage(url); }) .ToList(); foreach (var task in tasks) { observable.OnNext(await task); } observable.OnCompleted(); }); }
它工作,但现在我正在做Observable.Create而不仅仅是IObservable.Select,我必须弄乱信号量.此外,在UI上运行的其他动画在运行时停止(它们基本上只是DispatcherTimer实例),所以我认为我一定做错了.
解决方法
尝试一下:
urls.ToObservable() .Select(url => Observable.FromAsync(async () => { var bytes = await this.DownloadImage(url); var image = await this.ParseImage(bytes); return image; })) .Merge(6 /*at a time*/);
我们在这里做什么?
对于每个URL,我们创建一个Cold Observable(即一个根本不会做任何事情,直到某人调用Subscribe). FromAsync返回一个Observable,当您订阅它时,它会运行您提供的异步块.因此,我们选择将URL作为一个对象来完成我们的工作,但前提是我们稍后会问它.
然后,我们的结果是IObservable< IObservable< Image>> – 未来结果流.我们想要将该流展平为一个结果流,因此我们使用Merge(int).合并操作符将一次订阅n个项目,当他们回来时,我们将订阅更多.即使url list非常大,Merge正在缓冲的项目也只是一个URL和一个Func对象(即要做什么的描述),所以相对较小.