25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.
 
 
 

132 satır
4.8 KiB

  1. using System;
  2. using System.Threading.Tasks;
  3. using Dapper;
  4. using DotNetCore.CAP.Infrastructure;
  5. using DotNetCore.CAP.Models;
  6. using Xunit;
  7. namespace DotNetCore.CAP.PostgreSql.Test
  8. {
  9. [Collection("postgresql")]
  10. public class PostgreSqlStorageConnectionTest : DatabaseTestHost
  11. {
  12. private PostgreSqlStorageConnection _storage;
  13. public PostgreSqlStorageConnectionTest()
  14. {
  15. var options = GetService<PostgreSqlOptions>();
  16. _storage = new PostgreSqlStorageConnection(options);
  17. }
  18. [Fact]
  19. public async Task GetPublishedMessageAsync_Test()
  20. {
  21. var sql = @"INSERT INTO ""cap"".""published""(""Name"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING ""Id"";";
  22. var publishMessage = new CapPublishedMessage
  23. {
  24. Name = "PostgreSqlStorageConnectionTest",
  25. Content = "",
  26. StatusName = StatusName.Scheduled
  27. };
  28. var insertedId = default(int);
  29. using (var connection = ConnectionUtil.CreateConnection())
  30. {
  31. insertedId = connection.QueryFirst<int>(sql, publishMessage);
  32. }
  33. var message = await _storage.GetPublishedMessageAsync(insertedId);
  34. Assert.NotNull(message);
  35. Assert.Equal("PostgreSqlStorageConnectionTest", message.Name);
  36. Assert.Equal(StatusName.Scheduled, message.StatusName);
  37. }
  38. [Fact]
  39. public async Task FetchNextMessageAsync_Test()
  40. {
  41. var sql = @"INSERT INTO ""cap"".""queue""(""MessageId"",""MessageType"") VALUES(@MessageId,@MessageType);";
  42. var queue = new CapQueue
  43. {
  44. MessageId = 3333,
  45. MessageType = MessageType.Publish
  46. };
  47. using (var connection = ConnectionUtil.CreateConnection())
  48. {
  49. connection.Execute(sql, queue);
  50. }
  51. var fetchedMessage = await _storage.FetchNextMessageAsync();
  52. fetchedMessage.Dispose();
  53. Assert.NotNull(fetchedMessage);
  54. Assert.Equal(MessageType.Publish, fetchedMessage.MessageType);
  55. Assert.Equal(3333, fetchedMessage.MessageId);
  56. }
  57. [Fact]
  58. public async Task StoreReceivedMessageAsync_Test()
  59. {
  60. var receivedMessage = new CapReceivedMessage
  61. {
  62. Name = "PostgreSqlStorageConnectionTest",
  63. Content = "",
  64. Group = "mygroup",
  65. StatusName = StatusName.Scheduled
  66. };
  67. Exception exception = null;
  68. try
  69. {
  70. await _storage.StoreReceivedMessageAsync(receivedMessage);
  71. }
  72. catch (Exception ex)
  73. {
  74. exception = ex;
  75. }
  76. Assert.Null(exception);
  77. }
  78. [Fact]
  79. public async Task GetReceivedMessageAsync_Test()
  80. {
  81. var sql = $@"
  82. INSERT INTO ""cap"".""received""(""Name"",""Group"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"")
  83. VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING ""Id"";";
  84. var receivedMessage = new CapReceivedMessage
  85. {
  86. Name = "PostgreSqlStorageConnectionTest",
  87. Content = "",
  88. Group = "mygroup",
  89. StatusName = StatusName.Scheduled
  90. };
  91. var insertedId = default(int);
  92. using (var connection = ConnectionUtil.CreateConnection())
  93. {
  94. insertedId = connection.QueryFirst<int>(sql, receivedMessage);
  95. }
  96. var message = await _storage.GetReceivedMessageAsync(insertedId);
  97. Assert.NotNull(message);
  98. Assert.Equal(StatusName.Scheduled, message.StatusName);
  99. Assert.Equal("PostgreSqlStorageConnectionTest", message.Name);
  100. Assert.Equal("mygroup", message.Group);
  101. }
  102. [Fact]
  103. public async Task GetNextReceviedMessageToBeEnqueuedAsync_Test()
  104. {
  105. var receivedMessage = new CapReceivedMessage
  106. {
  107. Name = "PostgreSqlStorageConnectionTest",
  108. Content = "",
  109. Group = "mygroup",
  110. StatusName = StatusName.Scheduled
  111. };
  112. await _storage.StoreReceivedMessageAsync(receivedMessage);
  113. var message = await _storage.GetNextReceviedMessageToBeEnqueuedAsync();
  114. Assert.NotNull(message);
  115. Assert.Equal(StatusName.Scheduled, message.StatusName);
  116. Assert.Equal("PostgreSqlStorageConnectionTest", message.Name);
  117. Assert.Equal("mygroup", message.Group);
  118. }
  119. }
  120. }