c# – 创建一个使用async / await的IObservable以原始顺序返回已完成的任务

前端之家收集整理的这篇文章主要介绍了c# – 创建一个使用async / await的IObservable以原始顺序返回已完成的任务前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
假设您有一个包含100个URL的列表,并且您想要下载它们,解析响应并通过IObservable推送结果:
public IObservable<ImageSource> GetImages(IEnumerable<string> urls)
{
    return urls
        .ToObservable()
        .Select(async url =>
        {
            var bytes = await this.DownloadImage(url);
            var image = await this.ParseImage(bytes);
            return image;
        });
}

我有一些问题.

一个是同时敲击服务器100个请求是不礼貌的 – 理想情况下,你会在给定时刻限制为6个请求.但是,如果我添加一个Buffer调用,由于Select中的异步lambda,所有内容仍然会同时触发.

此外,结果将以与URL的输入序列不同的顺序返回,这是不好的,因为图像是将在UI上显示的动画的一部分.

我已经尝试了各种各样的东西,我有一个有效的解决方案,但感觉很复杂:

public IObservable<ImageSource> GetImages(IEnumerable<string> urls)
{
    var semaphore = new SemaphoreSlim(6);

    return Observable.Create<ImageSource>(async observable =>
    {
        var tasks = urls
            .Select(async url =>
            {
                await semaphore.WaitAsync();
                var bytes = await this.DownloadImage(url);
                var image = await this.ParseImage(url);
            })
            .ToList();

        foreach (var task in tasks)
        {
            observable.OnNext(await task);
        }

        observable.OnCompleted();
    });
}

它工作,但现在我正在做Observable.Create而不仅仅是IObservable.Select,我必须弄乱信号量.此外,在UI上运行的其他动画在运行时停止(它们基本上只是DispatcherTimer实例),所以我认为我一定做错了.

解决方法

尝试一下:
urls.ToObservable()
    .Select(url => Observable.FromAsync(async () => {
        var bytes = await this.DownloadImage(url);
        var image = await this.ParseImage(bytes);
        return image;        
    }))
    .Merge(6 /*at a time*/);

我们在这里做什么?

对于每个URL,我们创建一个Cold Observable(即一个根本不会做任何事情,直到某人调用Subscribe). FromAsync返回一个Observable,当您订阅它时,它会运行您提供的异步块.因此,我们选择将URL作为一个对象来完成我们的工作,但前提是我们稍后会问它.

然后,我们的结果是IObservable< IObservable< Image>> – 未来结果流.我们想要将该流展平为一个结果流,因此我们使用Merge(int).合并操作符将一次订阅n个项目,当他们回来时,我们将订阅更多.即使url list非常大,Merge正在缓冲的项目也只是一个URL和一个Func对象(即要做什么的描述),所以相对较小.

原文链接:/csharp/92625.html

猜你在找的C#相关文章