You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

133 lines
4.8 KiB

  1. using System;
  2. using System.Threading.Tasks;
  3. using Dapper;
  4. using DotNetCore.CAP.Infrastructure;
  5. using DotNetCore.CAP.Models;
  6. using Xunit;
  7. namespace DotNetCore.CAP.PostgreSql.Test
  8. {
  9. [Collection("postgresql")]
  10. public class PostgreSqlStorageConnectionTest : DatabaseTestHost
  11. {
  12. private PostgreSqlStorageConnection _storage;
  13. public PostgreSqlStorageConnectionTest()
  14. {
  15. var options = GetService<PostgreSqlOptions>();
  16. var capOptions = GetService<CapOptions>();
  17. _storage = new PostgreSqlStorageConnection(options,capOptions);
  18. }
  19. [Fact]
  20. public async Task GetPublishedMessageAsync_Test()
  21. {
  22. var sql = @"INSERT INTO ""cap"".""published""(""Name"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING ""Id"";";
  23. var publishMessage = new CapPublishedMessage
  24. {
  25. Name = "PostgreSqlStorageConnectionTest",
  26. Content = "",
  27. StatusName = StatusName.Scheduled
  28. };
  29. var insertedId = default(int);
  30. using (var connection = ConnectionUtil.CreateConnection())
  31. {
  32. insertedId = connection.QueryFirst<int>(sql, publishMessage);
  33. }
  34. var message = await _storage.GetPublishedMessageAsync(insertedId);
  35. Assert.NotNull(message);
  36. Assert.Equal("PostgreSqlStorageConnectionTest", message.Name);
  37. Assert.Equal(StatusName.Scheduled, message.StatusName);
  38. }
  39. [Fact]
  40. public async Task FetchNextMessageAsync_Test()
  41. {
  42. var sql = @"INSERT INTO ""cap"".""queue""(""MessageId"",""MessageType"") VALUES(@MessageId,@MessageType);";
  43. var queue = new CapQueue
  44. {
  45. MessageId = 3333,
  46. MessageType = MessageType.Publish
  47. };
  48. using (var connection = ConnectionUtil.CreateConnection())
  49. {
  50. connection.Execute(sql, queue);
  51. }
  52. var fetchedMessage = await _storage.FetchNextMessageAsync();
  53. fetchedMessage.Dispose();
  54. Assert.NotNull(fetchedMessage);
  55. Assert.Equal(MessageType.Publish, fetchedMessage.MessageType);
  56. Assert.Equal(3333, fetchedMessage.MessageId);
  57. }
  58. [Fact]
  59. public async Task StoreReceivedMessageAsync_Test()
  60. {
  61. var receivedMessage = new CapReceivedMessage
  62. {
  63. Name = "PostgreSqlStorageConnectionTest",
  64. Content = "",
  65. Group = "mygroup",
  66. StatusName = StatusName.Scheduled
  67. };
  68. Exception exception = null;
  69. try
  70. {
  71. await _storage.StoreReceivedMessageAsync(receivedMessage);
  72. }
  73. catch (Exception ex)
  74. {
  75. exception = ex;
  76. }
  77. Assert.Null(exception);
  78. }
  79. [Fact]
  80. public async Task GetReceivedMessageAsync_Test()
  81. {
  82. var sql = $@"
  83. INSERT INTO ""cap"".""received""(""Name"",""Group"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"")
  84. VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName) RETURNING ""Id"";";
  85. var receivedMessage = new CapReceivedMessage
  86. {
  87. Name = "PostgreSqlStorageConnectionTest",
  88. Content = "",
  89. Group = "mygroup",
  90. StatusName = StatusName.Scheduled
  91. };
  92. var insertedId = default(int);
  93. using (var connection = ConnectionUtil.CreateConnection())
  94. {
  95. insertedId = connection.QueryFirst<int>(sql, receivedMessage);
  96. }
  97. var message = await _storage.GetReceivedMessageAsync(insertedId);
  98. Assert.NotNull(message);
  99. Assert.Equal(StatusName.Scheduled, message.StatusName);
  100. Assert.Equal("PostgreSqlStorageConnectionTest", message.Name);
  101. Assert.Equal("mygroup", message.Group);
  102. }
  103. [Fact]
  104. public async Task GetNextReceviedMessageToBeEnqueuedAsync_Test()
  105. {
  106. var receivedMessage = new CapReceivedMessage
  107. {
  108. Name = "PostgreSqlStorageConnectionTest",
  109. Content = "",
  110. Group = "mygroup",
  111. StatusName = StatusName.Scheduled
  112. };
  113. await _storage.StoreReceivedMessageAsync(receivedMessage);
  114. var message = await _storage.GetNextReceivedMessageToBeEnqueuedAsync();
  115. Assert.NotNull(message);
  116. Assert.Equal(StatusName.Scheduled, message.StatusName);
  117. Assert.Equal("PostgreSqlStorageConnectionTest", message.Name);
  118. Assert.Equal("mygroup", message.Group);
  119. }
  120. }
  121. }