You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

118 lines
3.6 KiB

  1. using EasyNetQ;
  2. using EasyNetQ.Topology;
  3. using HKLib.DataBus.数据缓存;
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Configuration;
  7. using System.Linq.Expressions;
  8. using System.Text;
  9. using System.Threading.Tasks;
  10. namespace HKLib.RabbitMQ.Config
  11. {
  12. public class MQFactory
  13. {
  14. private volatile static IBus Bus = null;
  15. /// <summary>
  16. /// 创建链接
  17. /// </summary>
  18. /// <returns></returns>
  19. public static IBus CreateMQ()
  20. {
  21. if (Bus == null)
  22. Bus = RabbitHutch.CreateBus(Configer.MqAddress);
  23. return Bus;
  24. }
  25. /// <summary>
  26. /// 释放链接
  27. /// </summary>
  28. public static void DisposeBus()
  29. {
  30. Bus?.Dispose();
  31. }
  32. /// <summary>
  33. /// 同步执行
  34. /// </summary>
  35. /// <typeparam name="T"></typeparam>
  36. /// <param name="Param"></param>
  37. /// <returns></returns>
  38. public static bool SendMQ<T>(PushEntity<T> Param)
  39. {
  40. try
  41. {
  42. if (Bus == null)
  43. CreateMQ();
  44. new PushManage().SendMQ(Param, Bus);
  45. return true;
  46. }
  47. catch (Exception)
  48. {
  49. return false;
  50. }
  51. }
  52. /// <summary>
  53. /// 推荐异步执行
  54. /// </summary>
  55. /// <typeparam name="T"></typeparam>
  56. /// <param name="Param"></param>
  57. /// <returns></returns>
  58. public static async Task SendMQAsync<T>(PushEntity<T> Param)
  59. {
  60. if (Bus == null)
  61. CreateMQ();
  62. await new PushManage().SendMQAsync(Param, Bus);
  63. }
  64. /// <summary>
  65. /// 订阅消息
  66. /// </summary>
  67. /// <typeparam name="TAccept"></typeparam>
  68. /// <param name="Args"></param>
  69. public static void Subscriber<TAccept, T>(AcceptEntity Args) where TAccept : IAccept, new()
  70. {
  71. try
  72. {
  73. if (Bus == null)
  74. CreateMQ();
  75. if (string.IsNullOrEmpty(Args.ExchangeName))
  76. return;
  77. Expression<Action<IAccept>> methodCall;
  78. IExchange EX = null;
  79. if (Args.SendType == MQEnum.Sub)
  80. {
  81. EX = Bus.Advanced.ExchangeDeclare(Args.ExchangeName, ExchangeType.Fanout);
  82. }
  83. if (Args.SendType == MQEnum.Push)
  84. {
  85. EX = Bus.Advanced.ExchangeDeclare(Args.ExchangeName, ExchangeType.Direct);
  86. }
  87. if (Args.SendType == MQEnum.Top)
  88. {
  89. EX = Bus.Advanced.ExchangeDeclare(Args.ExchangeName, ExchangeType.Topic);
  90. }
  91. IQueue queue = Bus.Advanced.QueueDeclare(Args.QueeName ?? null);
  92. Bus.Advanced.Bind(EX, queue, Args.RouteName);
  93. Bus.Advanced.Consume(queue, (body, properties, info) => Task.Factory.StartNew(() =>
  94. {
  95. try
  96. {
  97. var message = Encoding.UTF8.GetString(body);
  98. //处理消息
  99. methodCall = job => job.AcceptMQ<T>(message);
  100. methodCall.Compile()(new TAccept());
  101. }
  102. catch (Exception ex)
  103. {
  104. throw ex;
  105. }
  106. }));
  107. }
  108. catch (Exception ex)
  109. {
  110. }
  111. }
  112. }
  113. }