diff --git a/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs b/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs
index 692548c..e770eb1 100644
--- a/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs
+++ b/src/DotNetCore.CAP.MySql/CAP.EFOptions.cs
@@ -23,6 +23,6 @@ namespace DotNetCore.CAP
///
/// Data version
///
- internal string Version { get; set; }
+ internal string Version { get; set; } = "v1";
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs b/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs
index bac37d2..ea1f503 100644
--- a/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs
+++ b/src/DotNetCore.CAP.MySql/CAP.MySqlOptions.cs
@@ -5,6 +5,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
+// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class MySqlOptions : EFOptions
@@ -28,14 +29,10 @@ namespace DotNetCore.CAP
{
if (options.DbContextType != null)
{
- using (var scope = _serviceScopeFactory.CreateScope())
- {
- var provider = scope.ServiceProvider;
- using (var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType))
- {
- options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
- }
- }
+ using var scope = _serviceScopeFactory.CreateScope();
+ var provider = scope.ServiceProvider;
+ using var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType);
+ options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
}
}
}
diff --git a/test/DotNetCore.CAP.MySql.Test/DatabaseTestHost.cs b/test/DotNetCore.CAP.MySql.Test/DatabaseTestHost.cs
index 66ffa33..90f7317 100644
--- a/test/DotNetCore.CAP.MySql.Test/DatabaseTestHost.cs
+++ b/test/DotNetCore.CAP.MySql.Test/DatabaseTestHost.cs
@@ -1,5 +1,6 @@
using System.Threading;
using Dapper;
+using DotNetCore.CAP.Persistence;
namespace DotNetCore.CAP.MySql.Test
{
@@ -30,7 +31,7 @@ namespace DotNetCore.CAP.MySql.Test
{
using (CreateScope())
{
- var storage = GetService();
+ var storage = GetService();
var token = new CancellationTokenSource().Token;
CreateDatabase();
storage.InitializeAsync(token).GetAwaiter().GetResult();
diff --git a/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj b/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj
index 5a259c7..b78c682 100644
--- a/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj
+++ b/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj
@@ -1,7 +1,7 @@
- netcoreapp2.1
+ netcoreapp3.0
false
@@ -11,22 +11,11 @@
-
-
-
- all
- runtime; build; native; contentfiles; analyzers
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
\ No newline at end of file
diff --git a/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs b/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs
index 82cfdcd..dee1948 100644
--- a/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs
+++ b/test/DotNetCore.CAP.MySql.Test/MySqlStorageConnectionTest.cs
@@ -1,8 +1,8 @@
-using System;
+using System.Collections.Generic;
using System.Threading.Tasks;
-using Dapper;
-using DotNetCore.CAP.Infrastructure;
+using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
+using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.Options;
using Xunit;
@@ -11,86 +11,78 @@ namespace DotNetCore.CAP.MySql.Test
[Collection("MySql")]
public class MySqlStorageConnectionTest : DatabaseTestHost
{
- private MySqlStorageConnection _storage;
+ private readonly MySqlDataStorage _storage;
public MySqlStorageConnectionTest()
{
var options = GetService>();
var capOptions = GetService>();
- _storage = new MySqlStorageConnection(options, capOptions);
+ var initializer = GetService();
+ _storage = new MySqlDataStorage(options, capOptions, initializer);
}
[Fact]
- public async Task GetPublishedMessageAsync_Test()
+ public async Task StorageMessageTest()
{
- var sql = "INSERT INTO `cap.published`(`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'v1',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
- var insertedId = SnowflakeId.Default().NextId();
- var publishMessage = new CapPublishedMessage
+ var msgId = SnowflakeId.Default().NextId().ToString();
+ var header = new Dictionary()
{
- Id = insertedId,
- Name = "MySqlStorageConnectionTest",
- Content = "",
- StatusName = StatusName.Scheduled
+ [Headers.MessageId] = msgId
};
- using (var connection = ConnectionUtil.CreateConnection())
- {
- await connection.ExecuteAsync(sql, publishMessage);
- }
- var message = await _storage.GetPublishedMessageAsync(insertedId);
- Assert.NotNull(message);
- Assert.Equal("MySqlStorageConnectionTest", message.Name);
- Assert.Equal(StatusName.Scheduled, message.StatusName);
+ var message = new Message(header, null);
+
+ var mdMessage = await _storage.StoreMessageAsync("test.name", message);
+ Assert.NotNull(mdMessage);
}
[Fact]
- public void StoreReceivedMessageAsync_Test()
+ public async Task StoreReceivedMessageTest()
{
- var receivedMessage = new CapReceivedMessage
+ var msgId = SnowflakeId.Default().NextId().ToString();
+ var header = new Dictionary()
{
- Id = SnowflakeId.Default().NextId(),
- Name = "MySqlStorageConnectionTest",
- Content = "",
- Group = "mygroup",
- StatusName = StatusName.Scheduled
+ [Headers.MessageId] = msgId
};
+ var message = new Message(header, null);
- Exception exception = null;
- try
- {
- _storage.StoreReceivedMessage(receivedMessage);
- }
- catch (Exception ex)
- {
- exception = ex;
- }
- Assert.Null(exception);
+ var mdMessage = await _storage.StoreReceivedMessageAsync("test.name", "test.group", message);
+ Assert.NotNull(mdMessage);
+ }
+
+ [Fact]
+ public async Task StoreReceivedExceptionMessageTest()
+ {
+ await _storage.StoreReceivedExceptionMessageAsync("test.name", "test.group", "");
}
[Fact]
- public async Task GetReceivedMessageAsync_Test()
+ public async Task ChangePublishStateTest()
{
- var sql = $@"
- INSERT INTO `cap.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)
- VALUES(@Id,'v1',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
- var insertedId = SnowflakeId.Default().NextId();
- var receivedMessage = new CapReceivedMessage
+ var msgId = SnowflakeId.Default().NextId().ToString();
+ var header = new Dictionary()
{
- Id = insertedId,
- Name = "MySqlStorageConnectionTest",
- Content = "",
- Group = "mygroup",
- StatusName = StatusName.Scheduled
+ [Headers.MessageId] = msgId
};
- using (var connection = ConnectionUtil.CreateConnection())
+ var message = new Message(header, null);
+
+ var mdMessage = await _storage.StoreMessageAsync("test.name", message);
+
+ await _storage.ChangePublishStateAsync(mdMessage, StatusName.Succeeded);
+ }
+
+ [Fact]
+ public async Task ChangeReceiveStateTest()
+ {
+ var msgId = SnowflakeId.Default().NextId().ToString();
+ var header = new Dictionary()
{
- await connection.ExecuteAsync(sql, receivedMessage);
- }
+ [Headers.MessageId] = msgId
+ };
+ var message = new Message(header, null);
+
+ var mdMessage = await _storage.StoreMessageAsync("test.name", message);
- var message = await _storage.GetReceivedMessageAsync(insertedId);
- Assert.NotNull(message);
- Assert.Equal(StatusName.Scheduled, message.StatusName);
- Assert.Equal("MySqlStorageConnectionTest", message.Name);
- Assert.Equal("mygroup", message.Group);
+ await _storage.ChangeReceiveStateAsync(mdMessage, StatusName.Succeeded);
}
}
}
\ No newline at end of file
diff --git a/test/DotNetCore.CAP.MySql.Test/TestHost.cs b/test/DotNetCore.CAP.MySql.Test/TestHost.cs
index 3549800..acb9fc0 100644
--- a/test/DotNetCore.CAP.MySql.Test/TestHost.cs
+++ b/test/DotNetCore.CAP.MySql.Test/TestHost.cs
@@ -1,4 +1,5 @@
using System;
+using DotNetCore.CAP.Persistence;
using Microsoft.Extensions.DependencyInjection;
namespace DotNetCore.CAP.MySql.Test
@@ -6,7 +7,7 @@ namespace DotNetCore.CAP.MySql.Test
public abstract class TestHost : IDisposable
{
protected IServiceCollection _services;
- protected string _connectionString;
+ protected string ConnectionString;
private IServiceProvider _provider;
private IServiceProvider _scopedProvider;
@@ -27,12 +28,14 @@ namespace DotNetCore.CAP.MySql.Test
services.AddOptions();
services.AddLogging();
- _connectionString = ConnectionUtil.GetConnectionString();
+ ConnectionString = ConnectionUtil.GetConnectionString();
services.AddOptions();
- services.Configure(x => x.ConnectionString = _connectionString);
-
- services.AddSingleton();
-
+ services.Configure(x =>
+ {
+ x.ConnectionString = ConnectionString;
+ });
+ services.AddSingleton();
+ services.AddSingleton();
_services = services;
}
@@ -72,10 +75,6 @@ namespace DotNetCore.CAP.MySql.Test
public T GetService() => Provider.GetService();
- public T Ensure(ref T service)
- where T : class
- => service ?? (service = GetService());
-
public virtual void Dispose()
{
(_provider as IDisposable)?.Dispose();
diff --git a/test/DotNetCore.CAP.Test/MessageExtensionTest.cs b/test/DotNetCore.CAP.Test/MessageExtensionTest.cs
new file mode 100644
index 0000000..f0fe846
--- /dev/null
+++ b/test/DotNetCore.CAP.Test/MessageExtensionTest.cs
@@ -0,0 +1,92 @@
+using System;
+using System.Collections.Generic;
+using DotNetCore.CAP.Messages;
+using Xunit;
+
+namespace DotNetCore.CAP.Test
+{
+ public class MessageExtensionTest
+ {
+ [Fact]
+ public void GetIdTest()
+ {
+ var msgId = Guid.NewGuid().ToString();
+ var header = new Dictionary()
+ {
+ [Headers.MessageId] = msgId
+ };
+ var message = new Message(header, null);
+
+ Assert.NotNull(message.GetId());
+ Assert.Equal(msgId,message.GetId());
+ }
+
+ [Fact]
+ public void GetNameTest()
+ {
+ var msgName = Guid.NewGuid().ToString();
+ var header = new Dictionary()
+ {
+ [Headers.MessageName] = msgName
+ };
+ var message = new Message(header, null);
+
+ Assert.NotNull(message.GetName());
+ Assert.Equal(msgName, message.GetName());
+ }
+
+ [Fact]
+ public void GetCallbackNameTest()
+ {
+ var callbackName = Guid.NewGuid().ToString();
+ var header = new Dictionary()
+ {
+ [Headers.CallbackName] = callbackName
+ };
+ var message = new Message(header, null);
+
+ Assert.NotNull(message.GetCallbackName());
+ Assert.Equal(callbackName, message.GetCallbackName());
+ }
+
+ [Fact]
+ public void GetGroupTest()
+ {
+ var group = Guid.NewGuid().ToString();
+ var header = new Dictionary()
+ {
+ [Headers.Group] = group
+ };
+ var message = new Message(header, null);
+
+ Assert.NotNull(message.GetGroup());
+ Assert.Equal(group, message.GetGroup());
+ }
+
+ [Fact]
+ public void GetCorrelationSequenceTest()
+ {
+ var seq = 1;
+ var header = new Dictionary()
+ {
+ [Headers.CorrelationSequence] = seq.ToString()
+ };
+ var message = new Message(header, null);
+
+ Assert.Equal(seq, message.GetCorrelationSequence());
+ }
+
+ [Fact]
+ public void HasExceptionTest()
+ {
+ var exception = "exception message";
+ var header = new Dictionary()
+ {
+ [Headers.Exception] = exception
+ };
+ var message = new Message(header, null);
+
+ Assert.True(message.HasException());
+ }
+ }
+}