Rx全称是Reactive Extensions for .NET,是微软的一个并行扩展框架,目前已支持.Net4.0/.Net3.5/SL3/SL4/XNA/WP7/JS等,最新版本是2010年平安夜的1.0.2838.104版本

[下载地址],更多信息可以在这里找到: http://msdn.microsoft.com/en-us/devlabs/ee794896

Rx还在不断完善,API更新比较频繁,下文提到的Rx均指目前Net4.0平台的最新版本1.0.2838.104。

================================================================

鉴于国内目前似乎没有相关的中文教程,所以简单介绍下Rx:

Rx将定时器/UI事件/函数调用等都看成是一种事件流,事件分为普通事件、异常事件和结束事件(当然,像UI事件或无限的定时器是没有结束事件的)。

IObservable<T>: 事件源,基于推送方式的事件提供者源,当有新事件时将主动调用事件处理程序(可以是同步也可以是异步,取决于调度器)

public interface IObservable<T>
{
IDisposable Subscribe(IObserver
<T> observer);
}

IObserver<T>: 事件处理程序,负责处理事件源的三种消息,和TPL是对应的。

public interface IObserver<TValue, TResult>
{
TResult OnCompleted();
TResult OnError(Exception exception);
TResult OnNext(TValue value);
}

(PS.Net4.0已经在核心类库mscorlib中包含了IObservable<T>接口,所以在.Net4.0的Rx里是没有System.Observable程序集的,这点大概是Rx在.Net4.0和.Net3.5的最大区别)

================================================================

好了,废话少说给具体例子,还是经典的HelloWorld吧,嘿嘿~

static void Main(string[] args)
{
Console.WriteLine(
"Start test");
Test1();
Console.WriteLine(
"Press any key to Exit");
Console.ReadKey(
false);
}
static void Test1()
{
IObservable
<string> seq = Observable.Start(() => "Hello World");
Console.WriteLine(seq.GetType().ToString());
seq.Subscribe(Console.WriteLine);
}

运行结果:

Start test
System.Collections.Generic.AnonymousObservable`
1[System.String]
Press any key to Exit
Hello World

说明:Observable.Start定义了一个事件流,该事件流是一个直接返回"Hello World"的函数调用,从输出结果可以看出定义事件流后并不会立即执行。

也可以使用TPL实现同样的效果:

static void Test1UseTPL()
{
var task
= new Task<string>(() => "Hello World");
Console.WriteLine(task.GetType().ToString());
task.ContinueWith(t
=> Console.WriteLine(t.Result));
task.Start();
}

运行结果:

Start test
System.Threading.Tasks.Task`
1[System.String]
Press any key to Exit
Hello World

================================================================

上面的代码有点过于简单了,下面给个较为详细的例子

static void Test2()
{
Console.WriteLine(
"Main thread id: "
+ Thread.CurrentThread.ManagedThreadId.ToString());
IObservable
<string> seq = Observable.Start(() =>
{
Console.WriteLine(
"Execute thread id: "
+ Thread.CurrentThread.ManagedThreadId.ToString());
Console.WriteLine(
"IsBackground: "
+ Thread.CurrentThread.IsBackground.ToString());
Console.WriteLine(
"IsThreadPoolThread: "
+ Thread.CurrentThread.IsThreadPoolThread.ToString());
Console.WriteLine(
"Wait 3 seconds...");
Thread.Sleep(
1000);
Console.WriteLine(
"Wait 2 seconds...");
Thread.Sleep(
1000);
Console.WriteLine(
"Wait 1 seconds...");
Thread.Sleep(
1000);
return "Hello World";
});
Console.WriteLine(seq.GetType().ToString());
seq.Subscribe(Console.WriteLine);
}

运行结果:

Start test
Main thread id:
10
System.Collections.Generic.AnonymousObservable`
1[System.String]
Execute thread id:
6
IsBackground: True
IsThreadPoolThread: True
Wait
3 seconds...
Press any key to Exit
Wait
2 seconds...
Wait
1 seconds...
Hello World

说明:可以看到,调用事件处理程序的是线程池里的一个后台线程,和主线程是并行运行的(注意看Press any key to Exit是在Wait 3 seconds和Wait 2 seconds之间的)

当然,用TPL照样可以做到同样的效果(运行结果就不贴了,有兴趣自己运行下):

static void Test2UseTPL()
{
Console.WriteLine(
"Main thread id: "
+ Thread.CurrentThread.ManagedThreadId.ToString());
var task
= new Task<string>(() =>
{
Console.WriteLine(
"Execute thread id: "
+ Thread.CurrentThread.ManagedThreadId.ToString());
Console.WriteLine(
"IsBackground: "
+ Thread.CurrentThread.IsBackground.ToString());
Console.WriteLine(
"IsThreadPoolThread: "
+ Thread.CurrentThread.IsThreadPoolThread.ToString());
Console.WriteLine(
"Wait 3 seconds...");
Thread.Sleep(
1000);
Console.WriteLine(
"Wait 2 seconds...");
Thread.Sleep(
1000);
Console.WriteLine(
"Wait 1 seconds...");
Thread.Sleep(
1000);
return "Hello World";
});
Console.WriteLine(task.GetType().ToString());
task.ContinueWith(t
=> Console.WriteLine(t.Result));
task.Start();
}

================================================================

老实说,上面两个例子有点无聊,不就是异步调用吗,大费周章的。下面给个稍微像样点的:

static void Test3()
{
Console.WriteLine(
"Main thread id: "
+ Thread.CurrentThread.ManagedThreadId.ToString());
var rnd
= new Random();
var seq
= Observable.ForkJoin(
Observable.Start(()
=>
{
Thread.Sleep(rnd.Next(
1000));
Console.WriteLine(
"create message1 from thread id: {0}",
Thread.CurrentThread.ManagedThreadId);
return "Hello";
}),
Observable.Start(()
=>
{
Thread.Sleep(rnd.Next(
1000));
Console.WriteLine(
"create message2 from thread id: {0}",
Thread.CurrentThread.ManagedThreadId);
return "World";
}),
Observable.Start(()
=>
{
Thread.Sleep(rnd.Next(
1000));
Console.WriteLine(
"create message3 from thread id: {0}",
Thread.CurrentThread.ManagedThreadId);
return "!";
})
).Finally(()
=> Console.WriteLine("ok"));
Console.WriteLine(seq.GetType().ToString());
Console.WriteLine(
string.Join("", seq.First()));
}

运行结果:

Start test
Main thread id:
9
System.Collections.Generic.AnonymousObservable`
1[System.String[]]
create message1 from thread id:
10
create message3 from thread id:
10
create message2 from thread id:
11
ok
HelloWorld
!
Press any key to Exit

说明:Observable.ForkJoin可以将多个事件源的最终结果合并。

================================================================

对Rx的介绍暂时到这里,有空继续...

作者: neutra 发表于 2011-01-10 01:59 原文链接

推荐.NET配套的通用数据层ORM框架:CYQ.Data 通用数据层框架