You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

MQFactory.cs 3.5 KiB

2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. using EasyNetQ;
  2. using EasyNetQ.Topology;
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Configuration;
  6. using System.Linq.Expressions;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. namespace HKLib.RabbitMQ.Config
  10. {
  11. public class MQFactory
  12. {
  13. private volatile static IBus Bus = null;
  14. /// <summary>
  15. /// 创建链接
  16. /// </summary>
  17. /// <returns></returns>
  18. public static IBus CreateMQ()
  19. {
  20. if (Bus == null)
  21. Bus = RabbitHutch.CreateBus(Configer.MqAddress);
  22. return Bus;
  23. }
  24. /// <summary>
  25. /// 释放链接
  26. /// </summary>
  27. public static void DisposeBus()
  28. {
  29. Bus?.Dispose();
  30. }
  31. /// <summary>
  32. /// 同步执行
  33. /// </summary>
  34. /// <typeparam name="T"></typeparam>
  35. /// <param name="Param"></param>
  36. /// <returns></returns>
  37. public static bool SendMQ<T>(PushEntity<T> Param)
  38. {
  39. try
  40. {
  41. if (Bus == null)
  42. CreateMQ();
  43. new PushManage().SendMQ(Param, Bus);
  44. return true;
  45. }
  46. catch (Exception)
  47. {
  48. return false;
  49. }
  50. }
  51. /// <summary>
  52. /// 推荐异步执行
  53. /// </summary>
  54. /// <typeparam name="T"></typeparam>
  55. /// <param name="Param"></param>
  56. /// <returns></returns>
  57. public static async Task SendMQAsync<T>(PushEntity<T> Param)
  58. {
  59. if (Bus == null)
  60. CreateMQ();
  61. await new PushManage().SendMQAsync(Param, Bus);
  62. }
  63. /// <summary>
  64. /// 订阅消息
  65. /// </summary>
  66. /// <typeparam name="TAccept"></typeparam>
  67. /// <param name="Args"></param>
  68. public static void Subscriber<TAccept, T>(AcceptEntity Args) where TAccept : IAccept, new()
  69. {
  70. try
  71. {
  72. if (Bus == null)
  73. CreateMQ();
  74. if (string.IsNullOrEmpty(Args.ExchangeName))
  75. return;
  76. Expression<Action<IAccept>> methodCall;
  77. IExchange EX = null;
  78. if (Args.SendType == MQEnum.Sub)
  79. {
  80. EX = Bus.Advanced.ExchangeDeclare(Args.ExchangeName, ExchangeType.Fanout);
  81. }
  82. if (Args.SendType == MQEnum.Push)
  83. {
  84. EX = Bus.Advanced.ExchangeDeclare(Args.ExchangeName, ExchangeType.Direct);
  85. }
  86. if (Args.SendType == MQEnum.Top)
  87. {
  88. EX = Bus.Advanced.ExchangeDeclare(Args.ExchangeName, ExchangeType.Topic);
  89. }
  90. IQueue queue = Bus.Advanced.QueueDeclare(Args.QueeName ?? null);
  91. Bus.Advanced.Bind(EX, queue, Args.RouteName);
  92. Bus.Advanced.Consume(queue, (body, properties, info) => Task.Factory.StartNew(() =>
  93. {
  94. try
  95. {
  96. var message = Encoding.UTF8.GetString(body);
  97. //处理消息
  98. methodCall = job => job.AcceptMQ<T>(message);
  99. methodCall.Compile()(new TAccept());
  100. }
  101. catch (Exception ex)
  102. {
  103. throw ex;
  104. }
  105. }));
  106. }
  107. catch (Exception ex)
  108. {
  109. }
  110. }
  111. }
  112. }