Rx并行框架(1)
Rx全称是Reactive Extensions for .NET,是微软的一个并行扩展框架,目前已支持.Net4.0/.Net3.5/SL3/SL4/XNA/WP7/JS等,最新版本是2010年平安夜的1.0.2838.104版本
[下载地址],更多信息可以在这里找到: http://msdn.microsoft.com/en-us/devlabs/ee794896。
Rx还在不断完善,API更新比较频繁,下文提到的Rx均指目前Net4.0平台的最新版本1.0.2838.104。
================================================================
鉴于国内目前似乎没有相关的中文教程,所以简单介绍下Rx:
Rx将定时器/UI事件/函数调用等都看成是一种事件流,事件分为普通事件、异常事件和结束事件(当然,像UI事件或无限的定时器是没有结束事件的)。
IObservable<T>: 事件源,基于推送方式的事件提供者源,当有新事件时将主动调用事件处理程序(可以是同步也可以是异步,取决于调度器)
public interface IObservable<T>
{
IDisposable Subscribe(IObserver<T> observer);
}
IObserver<T>: 事件处理程序,负责处理事件源的三种消息,和TPL是对应的。
public interface IObserver<TValue, TResult>
{
TResult OnCompleted();
TResult OnError(Exception exception);
TResult OnNext(TValue value);
}
(PS.Net4.0已经在核心类库mscorlib中包含了IObservable<T>接口,所以在.Net4.0的Rx里是没有System.Observable程序集的,这点大概是Rx在.Net4.0和.Net3.5的最大区别)
================================================================
好了,废话少说给具体例子,还是经典的HelloWorld吧,嘿嘿~
static void Main(string[] args)
{
Console.WriteLine("Start test");
Test1();
Console.WriteLine("Press any key to Exit");
Console.ReadKey(false);
}
static void Test1()
{
IObservable<string> seq = Observable.Start(() => "Hello World");
Console.WriteLine(seq.GetType().ToString());
seq.Subscribe(Console.WriteLine);
}
运行结果:
Start test
System.Collections.Generic.AnonymousObservable`1[System.String]
Press any key to Exit
Hello World
说明:Observable.Start定义了一个事件流,该事件流是一个直接返回"Hello World"的函数调用,从输出结果可以看出定义事件流后并不会立即执行。
也可以使用TPL实现同样的效果:
static void Test1UseTPL()
{
var task = new Task<string>(() => "Hello World");
Console.WriteLine(task.GetType().ToString());
task.ContinueWith(t => Console.WriteLine(t.Result));
task.Start();
}
运行结果:
Start test
System.Threading.Tasks.Task`1[System.String]
Press any key to Exit
Hello World
================================================================
上面的代码有点过于简单了,下面给个较为详细的例子
static void Test2()
{
Console.WriteLine("Main thread id: "
+ Thread.CurrentThread.ManagedThreadId.ToString());
IObservable<string> seq = Observable.Start(() =>
{
Console.WriteLine("Execute thread id: "
+ Thread.CurrentThread.ManagedThreadId.ToString());
Console.WriteLine("IsBackground: "
+ Thread.CurrentThread.IsBackground.ToString());
Console.WriteLine("IsThreadPoolThread: "
+ Thread.CurrentThread.IsThreadPoolThread.ToString());
Console.WriteLine("Wait 3 seconds...");
Thread.Sleep(1000);
Console.WriteLine("Wait 2 seconds...");
Thread.Sleep(1000);
Console.WriteLine("Wait 1 seconds...");
Thread.Sleep(1000);
return "Hello World";
});
Console.WriteLine(seq.GetType().ToString());
seq.Subscribe(Console.WriteLine);
}
运行结果:
Start test
Main thread id: 10
System.Collections.Generic.AnonymousObservable`1[System.String]
Execute thread id: 6
IsBackground: True
IsThreadPoolThread: True
Wait 3 seconds...
Press any key to Exit
Wait 2 seconds...
Wait 1 seconds...
Hello World
说明:可以看到,调用事件处理程序的是线程池里的一个后台线程,和主线程是并行运行的(注意看Press any key to Exit是在Wait 3 seconds和Wait 2 seconds之间的)
当然,用TPL照样可以做到同样的效果(运行结果就不贴了,有兴趣自己运行下):
static void Test2UseTPL()
{
Console.WriteLine("Main thread id: "
+ Thread.CurrentThread.ManagedThreadId.ToString());
var task = new Task<string>(() =>
{
Console.WriteLine("Execute thread id: "
+ Thread.CurrentThread.ManagedThreadId.ToString());
Console.WriteLine("IsBackground: "
+ Thread.CurrentThread.IsBackground.ToString());
Console.WriteLine("IsThreadPoolThread: "
+ Thread.CurrentThread.IsThreadPoolThread.ToString());
Console.WriteLine("Wait 3 seconds...");
Thread.Sleep(1000);
Console.WriteLine("Wait 2 seconds...");
Thread.Sleep(1000);
Console.WriteLine("Wait 1 seconds...");
Thread.Sleep(1000);
return "Hello World";
});
Console.WriteLine(task.GetType().ToString());
task.ContinueWith(t => Console.WriteLine(t.Result));
task.Start();
}
================================================================
老实说,上面两个例子有点无聊,不就是异步调用吗,大费周章的。下面给个稍微像样点的:
static void Test3()
{
Console.WriteLine("Main thread id: "
+ Thread.CurrentThread.ManagedThreadId.ToString());
var rnd = new Random();
var seq = Observable.ForkJoin(
Observable.Start(() =>
{
Thread.Sleep(rnd.Next(1000));
Console.WriteLine("create message1 from thread id: {0}",
Thread.CurrentThread.ManagedThreadId); return "Hello";
}),
Observable.Start(() =>
{
Thread.Sleep(rnd.Next(1000));
Console.WriteLine("create message2 from thread id: {0}",
Thread.CurrentThread.ManagedThreadId); return "World";
}),
Observable.Start(() =>
{
Thread.Sleep(rnd.Next(1000));
Console.WriteLine("create message3 from thread id: {0}",
Thread.CurrentThread.ManagedThreadId); return "!";
})
).Finally(() => Console.WriteLine("ok"));
Console.WriteLine(seq.GetType().ToString());
Console.WriteLine(string.Join("", seq.First()));
}
运行结果:
Start test
Main thread id: 9
System.Collections.Generic.AnonymousObservable`1[System.String[]]
create message1 from thread id: 10
create message3 from thread id: 10
create message2 from thread id: 11
ok
HelloWorld!
Press any key to Exit
说明:Observable.ForkJoin可以将多个事件源的最终结果合并。
================================================================
对Rx的介绍暂时到这里,有空继续...
发表评论
你好,我用nuget下载的Rx_Experimental,按照你的第一个方法写了。但是Console.WriteLine(task.GetType().ToString());得出的结果是System.Reactive.AnonymousObservable~1[System.String]。是Dll引用错误吗?我用的dll是System.Reactive.dll 版本1.1.11111.0