.Net下的MSMQ的同步异步调用_.Net教程
推荐:用.net动态创建类的实例用.net动态创建类的实例 看了网上很多关于DotNet动态创建类的实例的文章,我这里想总结一下,其实方法很简单,就是用“Activator.CreateInstance”。但是这个方法需要待创建的类的
一、MSMQ简介
MSMQ(微软消息队列)是Windows操作系统中消息应用程序的基础,是用于创建分布式、松散连接的消息通讯应用程序的开发工具。消息队列
和电子邮件有着很多相似处,他们都包含多个属性,用于保存消息,消息类型中都指出发送者和接收者的地址;然而他们的用处却有着很大的
区别:消息队列的发送者和接收者是应用程序,而电子邮件的发送者和接收者通常是人。如同电子邮件一样,消息队列的发送和接收也不需要
发送者和接收者同时在场,可以存储在消息队列或是邮件服务器中。
二、消息队列的安装
默认情况下安装操作系统是不安装消息队列的,你可以在控制面板中找到添加/删除程序,然后选择添加/删除Windows组件一项,然后选择应
用程序服务器,双击它进入详细资料中选择消息队列一项进行安装,如图:
三、消息队列类型
消息对列分为3类:
公共队列
MachineName\QueueName
能被别的机器所访问,如果你的多个项目中用到消息队列,那么你可以把队列定义为公共队列
专用队列
MachineName\Private$\QueueName
只针对于本机的程序才可以调用的队列,有些情况下为了安全起见定义为私有队列。
日志队列
MachineName\QueueName\Journal$
四、消息队列的创建
MessageQueue Mq=new MessageQueue(“.\\private$\\Mymq”);
通过Path属性引用消息队列的代码也十分简单:
MessageQueue Mq=new MessageQueue();
Mq.Path=”.\\private$\\Mymq”;
使用 Create 方法可以在计算机上创建队列:
System.Messaging.MessageQueue.Create(@".\private$\Mymq");
这里注意由于在C#中要记住用反斜杠将“\”转义。
由于消息对列所放置的地方经常改变,所以建议消息队列路径不要写死,建议放在配置文件中。
五、消息的发送
消息的发送可以分为简单消息和复杂消息,简单消息类型就是常用的数据类型,例如整型、字符串等数据;复杂消息的数据类型通常对应于系统中的复杂数据类型,例如结构,对象等等。
Mq.Send("Hello!");
在这里建议你可以事先定义一个对象类,然后发送这个对象类的实例对象,这样以后无论在增加什么发送信息,只需在对象类中增加相应的属性即可。
六、消息的接收和阅读
(1)同步接收消息
接收消息的代码很简单:
Mq.Receive();
Mq.Receive(TimeSpan timeout); //设定超时时间
Mq.ReceiveById(ID);
Mq.Peek();
通过Receive方法接收消息同时永久性地从队列中删除消息;
通过Peek方法从队列中取出消息而不从队列中移除该消息。
如果知道消息的标识符(ID),还可以通过ReceiveById方法和PeekById方法完成相应的操作。
(2)异步接受消息
利用委托机制:MessQueue.ReceiveCompleted =new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
(3)消息阅读
在应用程序能够阅读的消息和消息队列中的消息格式不同,应用程序发送出去的消息经过序列化以后才发送给了消息队列
而在接受端必须反序列化,利用下面的代码可以实现:
public void mq_ReceiveCompleted(object sender, System.Messaging.ReceiveCompletedEventArgs e)
{
System.Messaging.Message m = MessQueue.EndReceive(e.AsyncResult);
m.Formatter = new System.Messaging.XmlMessageFormatter(new string[]{"System.String,mscorlib"});
Console.WriteLine("Message: " (string)m.Body);
MessQueue.BeginReceive() ;
}
反序列化还有另一种写法:m.Formatter = new XmlMessageFormatter ( new Type [] { typeof (string) } );
七、由于消息队列的代码有些是固定不便的,所以把这些代码封装成一个类方便以后使用:
以下为引用的内容: 5 |
UserObject的代码
以下为引用的内容: 1using System; 2 3namespace Goody9807 4{ 5 /**//// <summary> 6 /// 用与在MQ上传输数据的对象 7 /// </summary> 8 public class UserObject 9 { 10 public UserObject() 11 { 12 // 13 // TODO: Add constructor logic here 14 // 15 } 16 17 private long _curUserID; 18 public long curUserID 19 { 20 get 21 { 22 return _curUserID; 23 } 24 set 25 { 26 _curUserID=value; 27 } 28 } 29 30 private string _curUserName=""; 31 public string curUserName 32 { 33 get 34 { 35 return _curUserName; 36 } 37 set 38 { 39 _curUserName=value; 40 } 41 } 42 43 private string _curEmail=""; 44 public string curEmail 45 { 46 get 47 { 48 return _curEmail; 49 } 50 set 51 { 52 _curEmail=value; 53 } 54 } 55 56 57 private long _oppUserID; 58 public long oppUserID 59 { 60 get 61 { 62 return _oppUserID; 63 } 64 set 65 { 66 _oppUserID=value; 67 } 68 } 69 70 private string _oppUserName=""; 71 public string oppUserName 72 { 73 get 74 { 75 return _oppUserName; 76 } 77 set 78 { 79 _oppUserName=value; 80 } 81 } 82 83 private string _oppEmail=""; 84 public string oppEmail 85 { 86 get 87 { 88 return _oppEmail; 89 } 90 set 91 { 92 _oppEmail=value; 93 } 94 } 95 96 private string _subject =""; 97 public string subject 98 { 99 get 100 { 101 return _subject; 102 } 103 set 104 { 105 _subject=value; 106 } 107 } 108 109 private string _body=""; 110 public string body 111 { 112 get 113 { 114 return _body; 115 } 116 set 117 { 118 _body=value; 119 } 120 } 121 } 122} 123 |
另一个同事写的封装类
以下为引用的内容: 1using System; 2 3using System.Threading; 4 5using System.Messaging; 6 7 8 9namespace Wapdm.SmsApp 10 11{ 12 13 /**//// <summary> 14 15 /// <para> 16 17 /// A Logger implementation that writes messages to a message queue. 18 19 /// The default event formatter used is an instance of XMLEventFormatter 20 21 /// </para> 22 23 /// </summary> 24 25 public sealed class MsgQueue 26 27 { 28 29 30 31 private const string BLANK_STRING = ""; 32 33 private const string PERIOD = @".\private$"; //"."; 34 35 private const string ELLIPSIS = ""; 36 37 38 39 private string serverAddress; 40 41 private string queueName; 42 43 private string queuePath; 44 45 46 47 private bool IsContextEnabled; 48 49 50 51 private MessageQueue queue; 52 53 54 55 private object queueMonitor = new object(); 56 57 58 59 private MsgQueue() {} 60 61 62 63 public static MsgQueue mq = null; 64 65 public static WaitHandle[] waitHandleArray = new WaitHandle[Util.MAX_WORKER_THREADS]; 66 67 68 69 public MsgQueue(string _serverAddress, string _queueName, string _summaryPattern) 70 71 { 72 73 if ((_serverAddress == null) || (_queueName == null) || (_summaryPattern == null)) 74 75 { 76 77 throw new ArgumentNullException(); 78 79 } 80 81 ServerAddress = _serverAddress; 82 83 QueueName = _queueName; 84 85 IsContextEnabled = true; 86 87 } 88 89 90 91 public MsgQueue(string _serverAddress, string _queueName) 92 93 { 94 95 if ((_serverAddress == null) || (_queueName == null)) 96 97 { 98 99 throw new ArgumentNullException(); 100 101 } 102 103 ServerAddress = _serverAddress; 104 105 QueueName = _queueName; 106 107 IsContextEnabled = true; 108 109 } 110 111 112 113 public MsgQueue(string _queueName) 114 115 { 116 117 if (_queueName == null) 118 119 { 120 121 throw new ArgumentNullException(); 122 123 } 124 125 serverAddress = PERIOD; 126 127 QueueName = _queueName; 128 129 IsContextEnabled = true; 130 131 if ( IsContextEnabled == false ) 132 133 throw new ArgumentNullException(); 134 135 } 136 137 138 139 public string ServerAddress 140 141 { 142 143 get 144 145 { 146 147 return serverAddress; 148 149 } 150 151 set 152 153 { 154 155 if (value == null) 156 157 { 158 159 value = PERIOD; 160 161 } 162 163 value = value.Trim(); 164 165 if (value.Equals(BLANK_STRING)) 166 167 { 168 169 throw new ArgumentException("Invalid value (must contain non-whitespace characters)"); 170 171 } 172 173 lock (queueMonitor) 174 175 { 176 177 serverAddress = value; 178 179 queuePath = serverAddress '\\' queueName; 180 181 InitializeQueue(); 182 183 } 184 185 } 186 187 } 188 189 190 191 public string QueueName 192 193 { 194 195 get 196 197 { 198 199 return queueName; 200 201 } 202 203 set 204 205 { 206 207 if (value == null) 208 209 { 210 211 throw new ArgumentNullException(); 212 213 } 214 215 value = value.Trim(); 216 217 if (value.Equals(BLANK_STRING)) 218 219 { 220 221 throw new ArgumentException("Invalid value (must contain non-whitespace characters)"); 222 223 } 224 225 lock (queueMonitor) 226 227 { 228 229 queueName = value; 230 231 queuePath = serverAddress '\\' queueName; 232 233 InitializeQueue(); 234 235 } 236 237 } 238 239 } 240 241 242 243 private void InitializeQueue() 244 245 { 246 247 lock (queueMonitor) 248 249 { 250 251 if (queue != null) 252 253 { 254 255 try { queue.Close(); } 256 257 catch {} 258 259 queue = null; 260 261 } 262 263 264 265 try 266 267 { 268 269 if(!MessageQueue.Exists(queuePath)) 270 271 MessageQueue.Create(queuePath); 272 273 } 274 275 catch {} 276 277 try 278 279 { 280 281 queue = new MessageQueue(queuePath); 282 283 queue.SetPermissions("EveryOne",MessageQueueAccessRights.FullControl); 284 285 queue.Formatter = new XmlMessageFormatter(new Type[] {typeof(MoMsg)}); 286 287 } 288 289 catch (Exception e) 290 291 { 292 293 try { queue.Close(); } 294 295 catch {} 296 297 queue = null; 298 299 throw new ApplicationException("Couldn't open queue at '" queuePath "': " e.GetType().FullName ": " e.Message); 300 301 } 302 303 304 305 } 306 307 } 308 309 310 311 private void AcquireResources() 312 313 { 314 315 InitializeQueue(); 316 317 } 318 319 320 321 public void ReleaseResources() 322 323 { 324 325 lock (queueMonitor) 326 327 { 328 329 if (queue != null) 330 331 { 332 333 try 334 335 { 336 337 queue.Close(); 338 339 } 340 341 catch {} 342 343 queue = null; 344 345 } 346 347 } 348 349 } 350 351 352 353 //阻塞方式 354 355 public MoMsg Read( ) 356 357 { 358 359 MoMsg _event = null; 360 361 lock (queueMonitor) 362 363 { 364 365 if (queue == null) 366 367 { 368 369 InitializeQueue(); 370 371 } 372 373 try 374 375 { 376 377 Message message = queue.Receive( new TimeSpan(0,0,1) );//等待10秒 378 379 _event = (MoMsg) (message.Body); 380 381 return _event; 382 383 } 384 385 catch (Exception ) 386 387 { 388 389 try { queue.Close(); } 390 391 catch {} 392 393 queue = null; 394 395 } 396 397 } 398 399 return null; 400 401 } 402 403 404 405 public void Write(MoMsg _event) 406 407 { 408 409 if (_event == null) 410 411 { 412 413 return; 414 415 } 416 417 lock (queueMonitor) 418 419 { 420 421 try 422 423 { 424 425 if (queue == null) 426 427 { 428 429 InitializeQueue(); 430 431 } 432 433 434 435 Message message = new Message(); 436 437 message.Priority = _event.Priority; 438 439 message.Recoverable = true; 440 441 message.Body = _event; //eventFormatter.Format(_event); 442 443 444 445 queue.Send(message); 446 447 } 448 449 catch (Exception e) 450 451 { 452 453 try { queue.Close(); } 454 455 catch {} 456 457 queue = null; 458 459 Util.Log.log("Couldn't write Message (" e.GetType().FullName ": " e.Message ")"); 460 461 } 462 463 } 464 465 } 466 467 468 469 public static bool statusTest() 470 471 { 472 473 bool reValue = false; 474 475 try 476 477 { 478 479 MessageEnumerator re = mq.queue.GetMessageEnumerator(); 480 481 bool rev = re.MoveNext(); 482 483 reValue = true; 484 485 } 486 487 catch 488 489 { 490 491 reValue = false; 492 493 } 494 495 496 497 return reValue; 498 499 } 500 501 502 503 public static void startListen() 504 505 { 506 507 mq = new MsgQueue(Util.MqName); 508 509 510 511 mq.queue.ReceiveCompleted =new ReceiveCompletedEventHandler(queue_ReceiveCompleted); 512 513 514 515 //异步方式,并发 516 517 for(int i=0; i<Util.MAX_WORKER_THREADS; i ) 518 519 { 520 521 // Begin asynchronous operations. 522 523 waitHandleArray[i] = 524 525 mq.queue.BeginReceive().AsyncWaitHandle; 526 527 } 528 529 530 531 return; 532 533 } 534 535 536 537 public static void stopListen() 538 539 { 540 541 542 543 for(int i=0;i<waitHandleArray.Length;i ) 544 545 { 546 547 try 548 549 { 550 551 waitHandleArray[i].Close(); 552 553 } 554 555 catch 556 557 { 558 559 //忽略错误 560 561 } 562 563 } 564 565 566 567 try 568 569 { 570 571 // Specify to wait for all operations to return. 572 573 WaitHandle.WaitAll(waitHandleArray,1000,false); 574 575 } 576 577 catch 578 579 { 580 581 //忽略错误 582 583 } 584 585 } 586 587 588 589 private static void queue_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e) 590 591 { 592 593 // Connect to the queue. 594 595 MessageQueue mqq = (MessageQueue)sender; 596 597 598 599 // End the asynchronous Receive operation. 600 601 Message m = mqq.EndReceive(e.AsyncResult); 602 603 604 605 Util.ProcessMo((MoMsg)(m.Body)); 606 607 608 609 if(Util.isRunning) 610 611 { 612 613 // Restart the asynchronous Receive operation. 614 615 mqq.BeginReceive(); 616 617 } 618 619 620 621 return; 622 623 } 624 625 } 626 627} |
分享:ASP.NET 2.0 里输出文本格式流在用 ASP.NET 编程时,打开一个页面一般是通过指定超链接地址,调用指定的页面文件(.html、.aspx)等方法。 但是,如果即将打开的页面文件的内容是在程序中动态生成,或者是从数据库的表里取出
- asp.net如何得到GRIDVIEW中某行某列值的方法
- .net SMTP发送Email实例(可带附件)
- js实现广告漂浮效果的小例子
- asp.net Repeater 数据绑定的具体实现
- Asp.Net 无刷新文件上传并显示进度条的实现方法及思路
- Asp.net获取客户端IP常见代码存在的伪造IP问题探讨
- VS2010 水晶报表的使用方法
- ASP.NET中操作SQL数据库(连接字符串的配置及获取)
- asp.net页面传值测试实例代码
- DataGridView - DataGridViewCheckBoxCell的使用介绍
- asp.net中javascript的引用(直接引入和间接引入)
- 三层+存储过程实现分页示例代码
- 相关链接:
- 教程说明:
.Net教程-.Net下的MSMQ的同步异步调用。