Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.
 
 
 

137 рядки
3.9 KiB

  1. using System;
  2. using System.Diagnostics;
  3. using System.Threading.Tasks;
  4. using DotNetCore.CAP;
  5. using Microsoft.AspNetCore.Authorization;
  6. using Microsoft.AspNetCore.Mvc;
  7. using Newtonsoft.Json;
  8. namespace Sample.Kafka.SqlServer.Controllers
  9. {
  10. public class Person
  11. {
  12. [JsonProperty("id")]
  13. public string Id { get; set; }
  14. [JsonProperty("uname")]
  15. public string Name { get; set; }
  16. public HAHA Haha { get; set; }
  17. public override string ToString()
  18. {
  19. return "Name:" + Name + ";Id:" + Id + "Haha:" + Haha?.ToString();
  20. }
  21. }
  22. public class HAHA
  23. {
  24. [JsonProperty("id")]
  25. public string Id { get; set; }
  26. [JsonProperty("uname")]
  27. public string Name { get; set; }
  28. public override string ToString()
  29. {
  30. return "Name:" + Name + ";Id:" + Id;
  31. }
  32. }
  33. [Route("api/[controller]")]
  34. public class ValuesController : Controller, ICapSubscribe
  35. {
  36. private readonly ICapPublisher _capBus;
  37. private readonly AppDbContext _dbContext;
  38. public ValuesController(ICapPublisher producer, AppDbContext dbContext)
  39. {
  40. _capBus = producer;
  41. _dbContext = dbContext;
  42. }
  43. [Route("~/publish")]
  44. public IActionResult PublishMessage()
  45. {
  46. var p = new Person
  47. {
  48. Id = Guid.NewGuid().ToString(),
  49. Name = "杨晓东",
  50. Haha = new HAHA
  51. {
  52. Id = Guid.NewGuid().ToString(),
  53. Name = "1-1杨晓东",
  54. }
  55. };
  56. _capBus.Publish("wl.yxd.test", p, "wl.yxd.test.callback");
  57. //_capBus.Publish("wl.cj.test", p);
  58. return Ok();
  59. }
  60. [CapSubscribe("wl.yxd.test.callback")]
  61. public void KafkaTestCallback(Person p)
  62. {
  63. Console.WriteLine("回调内容:" + p);
  64. }
  65. [CapSubscribe("wl.cj.test")]
  66. public string KafkaTestReceived(Person person)
  67. {
  68. Console.WriteLine(person);
  69. Debug.WriteLine(person);
  70. return "this is callback message";
  71. }
  72. [Route("~/publishWithTrans")]
  73. public async Task<IActionResult> PublishMessageWithTransaction()
  74. {
  75. using (var trans = await _dbContext.Database.BeginTransactionAsync())
  76. {
  77. await _capBus.PublishAsync("sample.rabbitmq.mysql", "");
  78. trans.Commit();
  79. }
  80. return Ok();
  81. }
  82. [CapSubscribe("sample.rabbitmq.mysql33333", Group = "Test.Group")]
  83. public void KafkaTest22(Person person)
  84. {
  85. var aa = _dbContext.Database;
  86. _dbContext.Dispose();
  87. Console.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
  88. Debug.WriteLine("[sample.kafka.sqlserver] message received " + person.ToString());
  89. }
  90. //[CapSubscribe("sample.rabbitmq.mysql22222")]
  91. //public void KafkaTest22(DateTime time)
  92. //{
  93. // Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
  94. // Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
  95. //}
  96. [CapSubscribe("sample.rabbitmq.mysql22222")]
  97. public async Task<DateTime> KafkaTest33(DateTime time)
  98. {
  99. Console.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
  100. Debug.WriteLine("[sample.kafka.sqlserver] message received " + time.ToString());
  101. return await Task.FromResult(time);
  102. }
  103. [NonAction]
  104. [CapSubscribe("sample.kafka.sqlserver3")]
  105. [CapSubscribe("sample.kafka.sqlserver4")]
  106. public void KafkaTest()
  107. {
  108. Console.WriteLine("[sample.kafka.sqlserver] message received");
  109. Debug.WriteLine("[sample.kafka.sqlserver] message received");
  110. }
  111. }
  112. }