.NET分布式事务处理总结【下】 - 包含MSMQ的分布式事务处理
.NET直接提供对MSMQ的访问支持,只需要添加System.Messaging程序集引用即可方便地操作MSMQ。MSMQ支持两种事务处理模式:内部事务处理以及基于MS-DTC的分布式事务处理。
MSMQ的内部事务处理
MSMQ的内部事务处理是指,仅采用MSMQ本身提供的事务处理机制完成事务处理。比如,假设有一系列的消息需要发布到MSMQ,那么,就可以启动一个内部事务,确保这些消息的发布过程是一个原子操作。要使用MSMQ的内部事务处理机制,在创建消息队列的时候,就需要勾选“事务性”选项,如下图所示:
首先,需要创建一个MessageQueueTransaction的对象,并使用Begin调用以启动MSMQ的内部事务处理。然后,在MessageQueue的Send方法中,使用Send(object, MessageQueueTransaction)的重载函数发送消息,将创建好的MessageQueueTransaction对象作为第二个参数传递给Send方法;在完成所有消息的发送之后,使用MessageQueueTransaction对象的Commit方法提交事务。如果在发送消息的过程中遇到问题,则使用MessageQueueTransaction对象的Abort调用回滚事务。请参见下面的示例代码:
- 
using (MessageQueue messageQueue = new MessageQueue(@".\private$\TPCDemoQueue",
 - 
false, false, QueueAccessMode.SendAndReceive))
 - 
{ - 
MessageQueueTransaction trans = new MessageQueueTransaction();
 - 
try - 
{ - 
trans.Begin();
 - 
for (int i = 0; i < 5; i++)
 - 
{ - 
messageQueue.Send(new Message(i), trans);
 - 
}
 - 
trans.Commit();
 - 
}
 - 
catch - 
{ - 
trans.Abort();
 - 
}
 - 
messageQueue.Close();
 - 
}
 
注意:如果你的消息队列在创建的时候没有设置“事务性”选项,那么,在完成消息队列的创建以后,你将无法修改该选项。更糟糕的是,在非事务性队列上执行上面的代码,则无法将消息发布到消息队列上,框架本身也不会提示任何错误信息,指示消息并未发布成功。
在分布式事务处理中使用MSMQ
在分布式事务处理的上下文中(比如,.NET 2.0+的TransactionScope中),上面所提到的MessageQueueTransaction将毫无用处,也就是说,MessageQueueTransaction与分布式事务处理毫无关系。你所要做的是,用正常的方式初始化一个MessageQueue的实例,然后,调用Send方法发布消息,在发布消息的时候,通过设置MessageQueueTransactionType的值来告诉MessageQueue,目前正处于一个分布式事务的上下文中。于是,你需要使用Send/Receive的重载方法:Send(object, MessageQueueTransactionType)以及Receive(MessageQueueTransactionType)。如下:
- 
using (TransactionScope transaction = new TransactionScope())
 - 
{ - 
Message inputMsg = inputQueue.Receive(MessageQueueTransactionType.Automatic);
 - 
// do some work - 
transaction.Complete(); - 
}
 
注意:对于一些生命周期相对较长的事务处理,比如,假设你的用例是这样的:你首先需要从一个消息队列中获得消息,然后更新你的数据库记录,那么你的代码可能会是这样的:
- 
using (TransactionScope transaction = new TransactionScope())
 - 
{ - 
using (MessageQueue someQueue = new MessageQueue("<queue connection>"))
 - 
{ - 
Message msg = someQueue.Receive(); - 
// do something else - 
} - 
transaction.Complete();
 - 
}
 
这样做其实是不对的!因为Receive方法是一种同步调用,如果消息队列中根本没有任何内容,那么Receive调用就会被阻塞,直到消息队列中出现新的消息。这就意味着你的分布式事务一直都是处于开启的状态,而且可能由于等待时间过长而导致超时,最终导致一个MessageQueueException。
正确的做法是,在MessageQueue上使用BeginPeek调用(注意:不是BeginReceive方法,因为BeginReceive方法并不是用来处理事务性队列的),然后订阅PeekComplete事件,在事件处理过程中,再使用TransactionScope以及Receive等方法实现消息的获取。例如:
- 
MessageQueue inputQueue = new MessageQueue("<queue connection>");
 - 
inputQueue.PeekCompleted += (s, e) =>
 - 
{ - 
using (TransactionScope transaction = new TransactionScope())
 - 
{ - 
Message inputMsg = inputQueue.Receive(MessageQueueTransactionType.Automatic);
 - 
// do some work - 
transaction.Complete(); - 
}
 - 
inputQueue.BeginPeek();
 - 
};
 - 
inputQueue.BeginPeek();
 
最后再提醒一下,就是如果你所要做的事情仅限于与MSMQ打交道,那么只要使用MSMQ的内部事务处理机制就可以了,毕竟使用分布式事务处理会涉及到MS-DTC,从而造成过大的系统开销,影响性能。
