@@ -23,6 +23,6 @@ namespace DotNetCore.CAP | |||||
/// <summary> | /// <summary> | ||||
/// Data version | /// Data version | ||||
/// </summary> | /// </summary> | ||||
internal string Version { get; set; } | |||||
internal string Version { get; set; } = "v1"; | |||||
} | } | ||||
} | } |
@@ -5,6 +5,7 @@ using Microsoft.EntityFrameworkCore; | |||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
using Microsoft.Extensions.Options; | using Microsoft.Extensions.Options; | ||||
// ReSharper disable once CheckNamespace | |||||
namespace DotNetCore.CAP | namespace DotNetCore.CAP | ||||
{ | { | ||||
public class MySqlOptions : EFOptions | public class MySqlOptions : EFOptions | ||||
@@ -28,14 +29,10 @@ namespace DotNetCore.CAP | |||||
{ | { | ||||
if (options.DbContextType != null) | if (options.DbContextType != null) | ||||
{ | { | ||||
using (var scope = _serviceScopeFactory.CreateScope()) | |||||
{ | |||||
var provider = scope.ServiceProvider; | |||||
using (var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType)) | |||||
{ | |||||
options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; | |||||
} | |||||
} | |||||
using var scope = _serviceScopeFactory.CreateScope(); | |||||
var provider = scope.ServiceProvider; | |||||
using var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType); | |||||
options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -1,5 +1,6 @@ | |||||
using System.Threading; | using System.Threading; | ||||
using Dapper; | using Dapper; | ||||
using DotNetCore.CAP.Persistence; | |||||
namespace DotNetCore.CAP.MySql.Test | namespace DotNetCore.CAP.MySql.Test | ||||
{ | { | ||||
@@ -30,7 +31,7 @@ namespace DotNetCore.CAP.MySql.Test | |||||
{ | { | ||||
using (CreateScope()) | using (CreateScope()) | ||||
{ | { | ||||
var storage = GetService<MySqlStorage>(); | |||||
var storage = GetService<IStorageInitializer>(); | |||||
var token = new CancellationTokenSource().Token; | var token = new CancellationTokenSource().Token; | ||||
CreateDatabase(); | CreateDatabase(); | ||||
storage.InitializeAsync(token).GetAwaiter().GetResult(); | storage.InitializeAsync(token).GetAwaiter().GetResult(); | ||||
@@ -1,7 +1,7 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | <Project Sdk="Microsoft.NET.Sdk"> | ||||
<PropertyGroup> | <PropertyGroup> | ||||
<TargetFramework>netcoreapp2.1</TargetFramework> | |||||
<TargetFramework>netcoreapp3.0</TargetFramework> | |||||
<IsPackable>false</IsPackable> | <IsPackable>false</IsPackable> | ||||
</PropertyGroup> | </PropertyGroup> | ||||
@@ -11,22 +11,11 @@ | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="Dapper" Version="1.60.6" /> | |||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.0.1" /> | |||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1"> | |||||
<PrivateAssets>all</PrivateAssets> | |||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets> | |||||
</PackageReference> | |||||
<PackageReference Include="xunit" Version="2.4.1" /> | |||||
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.2.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" /> | |||||
<PackageReference Include="Moq" Version="4.10.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="2.2.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration" Version="2.2.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.2.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.2.0" /> | |||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" /> | |||||
<PackageReference Include="xunit" Version="2.4.0" /> | |||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" /> | |||||
<PackageReference Include="coverlet.collector" Version="1.0.1" /> | |||||
<PackageReference Include="Moq" Version="4.10.1" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
</Project> | </Project> |
@@ -1,8 +1,8 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using Dapper; | |||||
using DotNetCore.CAP.Infrastructure; | |||||
using DotNetCore.CAP.Internal; | |||||
using DotNetCore.CAP.Messages; | using DotNetCore.CAP.Messages; | ||||
using DotNetCore.CAP.Persistence; | |||||
using Microsoft.Extensions.Options; | using Microsoft.Extensions.Options; | ||||
using Xunit; | using Xunit; | ||||
@@ -11,86 +11,78 @@ namespace DotNetCore.CAP.MySql.Test | |||||
[Collection("MySql")] | [Collection("MySql")] | ||||
public class MySqlStorageConnectionTest : DatabaseTestHost | public class MySqlStorageConnectionTest : DatabaseTestHost | ||||
{ | { | ||||
private MySqlStorageConnection _storage; | |||||
private readonly MySqlDataStorage _storage; | |||||
public MySqlStorageConnectionTest() | public MySqlStorageConnectionTest() | ||||
{ | { | ||||
var options = GetService<IOptions<MySqlOptions>>(); | var options = GetService<IOptions<MySqlOptions>>(); | ||||
var capOptions = GetService<IOptions<CapOptions>>(); | var capOptions = GetService<IOptions<CapOptions>>(); | ||||
_storage = new MySqlStorageConnection(options, capOptions); | |||||
var initializer = GetService<IStorageInitializer>(); | |||||
_storage = new MySqlDataStorage(options, capOptions, initializer); | |||||
} | } | ||||
[Fact] | [Fact] | ||||
public async Task GetPublishedMessageAsync_Test() | |||||
public async Task StorageMessageTest() | |||||
{ | { | ||||
var sql = "INSERT INTO `cap.published`(`Id`,`Version`,`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'v1',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; | |||||
var insertedId = SnowflakeId.Default().NextId(); | |||||
var publishMessage = new CapPublishedMessage | |||||
var msgId = SnowflakeId.Default().NextId().ToString(); | |||||
var header = new Dictionary<string, string>() | |||||
{ | { | ||||
Id = insertedId, | |||||
Name = "MySqlStorageConnectionTest", | |||||
Content = "", | |||||
StatusName = StatusName.Scheduled | |||||
[Headers.MessageId] = msgId | |||||
}; | }; | ||||
using (var connection = ConnectionUtil.CreateConnection()) | |||||
{ | |||||
await connection.ExecuteAsync(sql, publishMessage); | |||||
} | |||||
var message = await _storage.GetPublishedMessageAsync(insertedId); | |||||
Assert.NotNull(message); | |||||
Assert.Equal("MySqlStorageConnectionTest", message.Name); | |||||
Assert.Equal(StatusName.Scheduled, message.StatusName); | |||||
var message = new Message(header, null); | |||||
var mdMessage = await _storage.StoreMessageAsync("test.name", message); | |||||
Assert.NotNull(mdMessage); | |||||
} | } | ||||
[Fact] | [Fact] | ||||
public void StoreReceivedMessageAsync_Test() | |||||
public async Task StoreReceivedMessageTest() | |||||
{ | { | ||||
var receivedMessage = new CapReceivedMessage | |||||
var msgId = SnowflakeId.Default().NextId().ToString(); | |||||
var header = new Dictionary<string, string>() | |||||
{ | { | ||||
Id = SnowflakeId.Default().NextId(), | |||||
Name = "MySqlStorageConnectionTest", | |||||
Content = "", | |||||
Group = "mygroup", | |||||
StatusName = StatusName.Scheduled | |||||
[Headers.MessageId] = msgId | |||||
}; | }; | ||||
var message = new Message(header, null); | |||||
Exception exception = null; | |||||
try | |||||
{ | |||||
_storage.StoreReceivedMessage(receivedMessage); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
exception = ex; | |||||
} | |||||
Assert.Null(exception); | |||||
var mdMessage = await _storage.StoreReceivedMessageAsync("test.name", "test.group", message); | |||||
Assert.NotNull(mdMessage); | |||||
} | |||||
[Fact] | |||||
public async Task StoreReceivedExceptionMessageTest() | |||||
{ | |||||
await _storage.StoreReceivedExceptionMessageAsync("test.name", "test.group", ""); | |||||
} | } | ||||
[Fact] | [Fact] | ||||
public async Task GetReceivedMessageAsync_Test() | |||||
public async Task ChangePublishStateTest() | |||||
{ | { | ||||
var sql = $@" | |||||
INSERT INTO `cap.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) | |||||
VALUES(@Id,'v1',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; | |||||
var insertedId = SnowflakeId.Default().NextId(); | |||||
var receivedMessage = new CapReceivedMessage | |||||
var msgId = SnowflakeId.Default().NextId().ToString(); | |||||
var header = new Dictionary<string, string>() | |||||
{ | { | ||||
Id = insertedId, | |||||
Name = "MySqlStorageConnectionTest", | |||||
Content = "", | |||||
Group = "mygroup", | |||||
StatusName = StatusName.Scheduled | |||||
[Headers.MessageId] = msgId | |||||
}; | }; | ||||
using (var connection = ConnectionUtil.CreateConnection()) | |||||
var message = new Message(header, null); | |||||
var mdMessage = await _storage.StoreMessageAsync("test.name", message); | |||||
await _storage.ChangePublishStateAsync(mdMessage, StatusName.Succeeded); | |||||
} | |||||
[Fact] | |||||
public async Task ChangeReceiveStateTest() | |||||
{ | |||||
var msgId = SnowflakeId.Default().NextId().ToString(); | |||||
var header = new Dictionary<string, string>() | |||||
{ | { | ||||
await connection.ExecuteAsync(sql, receivedMessage); | |||||
} | |||||
[Headers.MessageId] = msgId | |||||
}; | |||||
var message = new Message(header, null); | |||||
var mdMessage = await _storage.StoreMessageAsync("test.name", message); | |||||
var message = await _storage.GetReceivedMessageAsync(insertedId); | |||||
Assert.NotNull(message); | |||||
Assert.Equal(StatusName.Scheduled, message.StatusName); | |||||
Assert.Equal("MySqlStorageConnectionTest", message.Name); | |||||
Assert.Equal("mygroup", message.Group); | |||||
await _storage.ChangeReceiveStateAsync(mdMessage, StatusName.Succeeded); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -1,4 +1,5 @@ | |||||
using System; | using System; | ||||
using DotNetCore.CAP.Persistence; | |||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
namespace DotNetCore.CAP.MySql.Test | namespace DotNetCore.CAP.MySql.Test | ||||
@@ -6,7 +7,7 @@ namespace DotNetCore.CAP.MySql.Test | |||||
public abstract class TestHost : IDisposable | public abstract class TestHost : IDisposable | ||||
{ | { | ||||
protected IServiceCollection _services; | protected IServiceCollection _services; | ||||
protected string _connectionString; | |||||
protected string ConnectionString; | |||||
private IServiceProvider _provider; | private IServiceProvider _provider; | ||||
private IServiceProvider _scopedProvider; | private IServiceProvider _scopedProvider; | ||||
@@ -27,12 +28,14 @@ namespace DotNetCore.CAP.MySql.Test | |||||
services.AddOptions(); | services.AddOptions(); | ||||
services.AddLogging(); | services.AddLogging(); | ||||
_connectionString = ConnectionUtil.GetConnectionString(); | |||||
ConnectionString = ConnectionUtil.GetConnectionString(); | |||||
services.AddOptions<CapOptions>(); | services.AddOptions<CapOptions>(); | ||||
services.Configure<MySqlOptions>(x => x.ConnectionString = _connectionString); | |||||
services.AddSingleton<MySqlStorage>(); | |||||
services.Configure<MySqlOptions>(x => | |||||
{ | |||||
x.ConnectionString = ConnectionString; | |||||
}); | |||||
services.AddSingleton<MySqlDataStorage>(); | |||||
services.AddSingleton<IStorageInitializer,MySqlStorageInitializer>(); | |||||
_services = services; | _services = services; | ||||
} | } | ||||
@@ -72,10 +75,6 @@ namespace DotNetCore.CAP.MySql.Test | |||||
public T GetService<T>() => Provider.GetService<T>(); | public T GetService<T>() => Provider.GetService<T>(); | ||||
public T Ensure<T>(ref T service) | |||||
where T : class | |||||
=> service ?? (service = GetService<T>()); | |||||
public virtual void Dispose() | public virtual void Dispose() | ||||
{ | { | ||||
(_provider as IDisposable)?.Dispose(); | (_provider as IDisposable)?.Dispose(); | ||||
@@ -0,0 +1,92 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using DotNetCore.CAP.Messages; | |||||
using Xunit; | |||||
namespace DotNetCore.CAP.Test | |||||
{ | |||||
public class MessageExtensionTest | |||||
{ | |||||
[Fact] | |||||
public void GetIdTest() | |||||
{ | |||||
var msgId = Guid.NewGuid().ToString(); | |||||
var header = new Dictionary<string, string>() | |||||
{ | |||||
[Headers.MessageId] = msgId | |||||
}; | |||||
var message = new Message(header, null); | |||||
Assert.NotNull(message.GetId()); | |||||
Assert.Equal(msgId,message.GetId()); | |||||
} | |||||
[Fact] | |||||
public void GetNameTest() | |||||
{ | |||||
var msgName = Guid.NewGuid().ToString(); | |||||
var header = new Dictionary<string, string>() | |||||
{ | |||||
[Headers.MessageName] = msgName | |||||
}; | |||||
var message = new Message(header, null); | |||||
Assert.NotNull(message.GetName()); | |||||
Assert.Equal(msgName, message.GetName()); | |||||
} | |||||
[Fact] | |||||
public void GetCallbackNameTest() | |||||
{ | |||||
var callbackName = Guid.NewGuid().ToString(); | |||||
var header = new Dictionary<string, string>() | |||||
{ | |||||
[Headers.CallbackName] = callbackName | |||||
}; | |||||
var message = new Message(header, null); | |||||
Assert.NotNull(message.GetCallbackName()); | |||||
Assert.Equal(callbackName, message.GetCallbackName()); | |||||
} | |||||
[Fact] | |||||
public void GetGroupTest() | |||||
{ | |||||
var group = Guid.NewGuid().ToString(); | |||||
var header = new Dictionary<string, string>() | |||||
{ | |||||
[Headers.Group] = group | |||||
}; | |||||
var message = new Message(header, null); | |||||
Assert.NotNull(message.GetGroup()); | |||||
Assert.Equal(group, message.GetGroup()); | |||||
} | |||||
[Fact] | |||||
public void GetCorrelationSequenceTest() | |||||
{ | |||||
var seq = 1; | |||||
var header = new Dictionary<string, string>() | |||||
{ | |||||
[Headers.CorrelationSequence] = seq.ToString() | |||||
}; | |||||
var message = new Message(header, null); | |||||
Assert.Equal(seq, message.GetCorrelationSequence()); | |||||
} | |||||
[Fact] | |||||
public void HasExceptionTest() | |||||
{ | |||||
var exception = "exception message"; | |||||
var header = new Dictionary<string, string>() | |||||
{ | |||||
[Headers.Exception] = exception | |||||
}; | |||||
var message = new Message(header, null); | |||||
Assert.True(message.HasException()); | |||||
} | |||||
} | |||||
} |