所以现在我在我的应用程序中尝试实现这一点.在开始之前,我有一些事情想要解决.我看到IObservable与IEnumerable相反,但是,我看不到我特定实例中的任何地方可以并入我的应用程序.
目前,我大量使用事件,很多,我可以看到“管道”开始变得难以管理.我会认为,IObservable可以帮助我在这里.
考虑以下设计,这是我的应用程序中的I / O周围的包装(FYI,我通常需要处理字符串):
我有一个基础接口叫做IDataIO:
public interface IDataIO { event OnDataReceived; event OnTimeout: event OnTransmit; }
现在,我目前有三个实现这个接口的类,这些类中的每一个都以某种方式利用Async方法调用,引入了一些类型的多线程处理:
public class SerialIO : IDataIO; public class UdpIO : IDataIO; public class TcpIO : IDataIO;
每个这些类的一个实例被包装到我的最后一个类中,称为IO(它也实现了IDataIO – 遵守我的策略模式):
public class IO : IDataIO { public SerialIO Serial; public UdpIO Udp; public TcpIO Tcp; }
我已经使用策略模式来封装这三个类,以便在运行时在不同的IDataIO实例之间切换时,最终用户“不可见”.你可以想象,这在后台导致了相当多的“事件管道”.
那么,在这种情况下,我该如何利用“推”通知?而不是订阅事件(DataReceived等),我想简单地将数据推送给任何有兴趣的人.我有点不确定在哪里开始.我还在尝试玩主题的想法/泛型类,以及这个(ReplaySubject / AsynSubject / BehavIoUrSubject)的各种形式.有人可以告诉我这个(可能参考我的设计)吗?或者这根本不是IObservable的理想选择吗?
PS.随意纠正我的任何误会:)
解决方法
// Using a subject is probably the easiest way to push data to an Observable // It wraps up both IObservable and IObserver so you almost never use IObserver directly private readonly Subject<byte> subject = new Subject<byte>(); private void OnPort_DataReceived(object sender,EventArgs e) { // This pushes the data to the IObserver,which is probably just a wrapper // around your subscribe delegate is you're using the Rx extensions this.subject.OnNext(port.Data); // pseudo code }
public IObservable<byte> DataObservable { get { return this.subject; } // Or this.subject.AsObservable(); }
您可以使用IObservable< T>替换IDataIO上的DataReceived事件.并且使每个策略类以他们需要的任何方式处理他们的数据并推送到主题< T>.
另一方面,任何订阅Observable的人都可以像事件一样处理它(只需使用Action< byte []>)),或者您可以使用Select,Where,Buffer执行一些非常有用的工作等等
private IDataIO dataIo = new ... private void SubscribeToData() { dataIo.DataObservable.Buffer(16).Subscribe(On16Bytes); } private void On16Bytes(IList<byte> bytes) { // do stuff }
ReplaySubject / ConnectableObservables是很棒的,当你知道你的订阅者将迟到聚会,但仍然需要赶上所有的事件.源缓存其所有的东西,并为每个用户重播所有内容.只有你可以说这是你实际需要的行为(但要小心,因为它将缓存所有内存使用量明显增加).
当我学习Rx时,我发现Rx的http://leecampbell.blogspot.co.uk/博客系列对于了解理论非常有帮助(这个帖子现在有点过时了,API已经改变了,所以要注意)