@@ -0,0 +1,47 @@ | |||||
using System; | |||||
using Npgsql; | |||||
namespace DotNetCore.CAP.PostgreSql.Test | |||||
{ | |||||
public static class ConnectionUtil | |||||
{ | |||||
private const string DatabaseVariable = "Cap_PostgreSql_DatabaseName"; | |||||
private const string ConnectionStringTemplateVariable = "Cap_PostgreSql_ConnectionStringTemplate"; | |||||
private const string MasterDatabaseName = "master"; | |||||
private const string DefaultDatabaseName = @"DotNetCore.CAP.PostgreSql.Test"; | |||||
private const string DefaultConnectionStringTemplate = | |||||
@"Server=192.168.2.206;Initial Catalog={0};User Id=sa;Password=123123;MultipleActiveResultSets=True"; | |||||
public static string GetDatabaseName() | |||||
{ | |||||
return Environment.GetEnvironmentVariable(DatabaseVariable) ?? DefaultDatabaseName; | |||||
} | |||||
public static string GetMasterConnectionString() | |||||
{ | |||||
return string.Format(GetConnectionStringTemplate(), MasterDatabaseName); | |||||
} | |||||
public static string GetConnectionString() | |||||
{ | |||||
return string.Format(GetConnectionStringTemplate(), GetDatabaseName()); | |||||
} | |||||
private static string GetConnectionStringTemplate() | |||||
{ | |||||
return | |||||
Environment.GetEnvironmentVariable(ConnectionStringTemplateVariable) ?? | |||||
DefaultConnectionStringTemplate; | |||||
} | |||||
public static NpgsqlConnection CreateConnection(string connectionString = null) | |||||
{ | |||||
connectionString = connectionString ?? GetConnectionString(); | |||||
var connection = new NpgsqlConnection(connectionString); | |||||
connection.Open(); | |||||
return connection; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,79 @@ | |||||
using System.Data; | |||||
using System.Data.SqlClient; | |||||
using System.Threading; | |||||
using Dapper; | |||||
using Microsoft.EntityFrameworkCore; | |||||
namespace DotNetCore.CAP.PostgreSql.Test | |||||
{ | |||||
public abstract class DatabaseTestHost : TestHost | |||||
{ | |||||
private static bool _sqlObjectInstalled; | |||||
public static object _lock = new object(); | |||||
protected override void PostBuildServices() | |||||
{ | |||||
base.PostBuildServices(); | |||||
lock (_lock) | |||||
{ | |||||
if (!_sqlObjectInstalled) | |||||
{ | |||||
InitializeDatabase(); | |||||
} | |||||
} | |||||
} | |||||
public override void Dispose() | |||||
{ | |||||
DeleteAllData(); | |||||
base.Dispose(); | |||||
} | |||||
private void InitializeDatabase() | |||||
{ | |||||
using (CreateScope()) | |||||
{ | |||||
var storage = GetService<PostgreSqlStorage>(); | |||||
var token = new CancellationTokenSource().Token; | |||||
CreateDatabase(); | |||||
storage.InitializeAsync(token).GetAwaiter().GetResult(); | |||||
_sqlObjectInstalled = true; | |||||
} | |||||
} | |||||
private void CreateDatabase() | |||||
{ | |||||
var masterConn = ConnectionUtil.GetMasterConnectionString(); | |||||
var databaseName = ConnectionUtil.GetDatabaseName(); | |||||
using (var connection = ConnectionUtil.CreateConnection(masterConn)) | |||||
{ | |||||
connection.Execute($@" | |||||
IF NOT EXISTS (SELECT * FROM sysdatabases WHERE name = N'{databaseName}') | |||||
CREATE DATABASE [{databaseName}];"); | |||||
} | |||||
} | |||||
private void DeleteAllData() | |||||
{ | |||||
var conn = ConnectionUtil.GetConnectionString(); | |||||
using (var connection = new SqlConnection(conn)) | |||||
{ | |||||
var commands = new[] { | |||||
"DISABLE TRIGGER ALL ON ?", | |||||
"ALTER TABLE ? NOCHECK CONSTRAINT ALL", | |||||
"DELETE FROM ?", | |||||
"ALTER TABLE ? CHECK CONSTRAINT ALL", | |||||
"ENABLE TRIGGER ALL ON ?" | |||||
}; | |||||
foreach (var command in commands) | |||||
{ | |||||
connection.Execute( | |||||
"sp_MSforeachtable", | |||||
new { command1 = command }, | |||||
commandType: CommandType.StoredProcedure); | |||||
} | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,22 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | |||||
<TargetFramework>netcoreapp2.0</TargetFramework> | |||||
<IsPackable>false</IsPackable> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="Dapper" Version="1.50.2" /> | |||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.3.0" /> | |||||
<PackageReference Include="Npgsql" Version="3.2.5" /> | |||||
<PackageReference Include="xunit" Version="2.2.0" /> | |||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.2.0" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\..\src\DotNetCore.CAP.PostgreSql\DotNetCore.CAP.PostgreSql.csproj" /> | |||||
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" /> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,134 @@ | |||||
using System; | |||||
using System.Threading.Tasks; | |||||
using Dapper; | |||||
using DotNetCore.CAP.Infrastructure; | |||||
using DotNetCore.CAP.Models; | |||||
using Xunit; | |||||
namespace DotNetCore.CAP.PostgreSql.Test | |||||
{ | |||||
[Collection("postgresql")] | |||||
public class PostgreSqlStorageConnectionTest : DatabaseTestHost | |||||
{ | |||||
private PostgreSqlStorageConnection _storage; | |||||
public PostgreSqlStorageConnectionTest() | |||||
{ | |||||
var options = GetService<PostgreSqlOptions>(); | |||||
_storage = new PostgreSqlStorageConnection(options); | |||||
} | |||||
[Fact] | |||||
public async Task GetPublishedMessageAsync_Test() | |||||
{ | |||||
var sql = @"INSERT INTO ""Cap"".""Published""(""Name"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);SELECT @@IDENTITY;"; | |||||
var publishMessage = new CapPublishedMessage | |||||
{ | |||||
Name = "PostgreSqlStorageConnectionTest", | |||||
Content = "", | |||||
StatusName = StatusName.Scheduled | |||||
}; | |||||
var insertedId = default(int); | |||||
using (var connection = ConnectionUtil.CreateConnection()) | |||||
{ | |||||
insertedId = connection.QueryFirst<int>(sql, publishMessage); | |||||
} | |||||
var message = await _storage.GetPublishedMessageAsync(insertedId); | |||||
Assert.NotNull(message); | |||||
Assert.Equal("PostgreSqlStorageConnectionTest", message.Name); | |||||
Assert.Equal(StatusName.Scheduled, message.StatusName); | |||||
} | |||||
[Fact] | |||||
public async Task FetchNextMessageAsync_Test() | |||||
{ | |||||
var sql = @"INSERT INTO ""Cap"".""Queue""(""MessageId"",""MessageType"") VALUES(@MessageId,@MessageType);"; | |||||
var queue = new CapQueue | |||||
{ | |||||
MessageId = 3333, | |||||
MessageType = MessageType.Publish | |||||
}; | |||||
using (var connection = ConnectionUtil.CreateConnection()) | |||||
{ | |||||
connection.Execute(sql, queue); | |||||
} | |||||
var fetchedMessage = await _storage.FetchNextMessageAsync(); | |||||
fetchedMessage.Dispose(); | |||||
Assert.NotNull(fetchedMessage); | |||||
Assert.Equal(MessageType.Publish, fetchedMessage.MessageType); | |||||
Assert.Equal(3333, fetchedMessage.MessageId); | |||||
} | |||||
[Fact] | |||||
public async Task StoreReceivedMessageAsync_Test() | |||||
{ | |||||
var receivedMessage = new CapReceivedMessage | |||||
{ | |||||
Name = "PostgreSqlStorageConnectionTest", | |||||
Content = "", | |||||
Group = "mygroup", | |||||
StatusName = StatusName.Scheduled | |||||
}; | |||||
Exception exception = null; | |||||
try | |||||
{ | |||||
await _storage.StoreReceivedMessageAsync(receivedMessage); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
exception = ex; | |||||
} | |||||
Assert.Null(exception); | |||||
} | |||||
[Fact] | |||||
public async Task GetReceivedMessageAsync_Test() | |||||
{ | |||||
var sql = $@" | |||||
INSERT INTO ""Cap"".""Received""(""Name"",""Group"",""Content"",""Retries"",""Added"",""ExpiresAt"",""StatusName"") OUTPUT INSERTED.Id | |||||
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; | |||||
var receivedMessage = new CapReceivedMessage | |||||
{ | |||||
Name = "PostgreSqlStorageConnectionTest", | |||||
Content = "", | |||||
Group = "mygroup", | |||||
StatusName = StatusName.Scheduled | |||||
}; | |||||
var insertedId = default(int); | |||||
using (var connection = ConnectionUtil.CreateConnection()) | |||||
{ | |||||
insertedId = connection.QueryFirst<int>(sql, receivedMessage); | |||||
} | |||||
var message = await _storage.GetReceivedMessageAsync(insertedId); | |||||
Assert.NotNull(message); | |||||
Assert.Equal(StatusName.Scheduled, message.StatusName); | |||||
Assert.Equal("PostgreSqlStorageConnectionTest", message.Name); | |||||
Assert.Equal("mygroup", message.Group); | |||||
} | |||||
[Fact] | |||||
public async Task GetNextReceviedMessageToBeEnqueuedAsync_Test() | |||||
{ | |||||
var receivedMessage = new CapReceivedMessage | |||||
{ | |||||
Name = "PostgreSqlStorageConnectionTest", | |||||
Content = "", | |||||
Group = "mygroup", | |||||
StatusName = StatusName.Scheduled | |||||
}; | |||||
await _storage.StoreReceivedMessageAsync(receivedMessage); | |||||
var message = await _storage.GetNextReceviedMessageToBeEnqueuedAsync(); | |||||
Assert.NotNull(message); | |||||
Assert.Equal(StatusName.Scheduled, message.StatusName); | |||||
Assert.Equal("PostgreSqlStorageConnectionTest", message.Name); | |||||
Assert.Equal("mygroup", message.Group); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,70 @@ | |||||
using Xunit; | |||||
using Dapper; | |||||
namespace DotNetCore.CAP.PostgreSql.Test | |||||
{ | |||||
//[Collection("postgresql")] | |||||
public class SqlServerStorageTest : DatabaseTestHost | |||||
{ | |||||
private readonly string _dbName; | |||||
private readonly string _masterDbConnectionString; | |||||
public SqlServerStorageTest() | |||||
{ | |||||
_dbName = ConnectionUtil.GetDatabaseName(); | |||||
_masterDbConnectionString = ConnectionUtil.GetMasterConnectionString(); | |||||
} | |||||
//[Fact] | |||||
public void Database_IsExists() | |||||
{ | |||||
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString)) | |||||
{ | |||||
var databaseName = ConnectionUtil.GetDatabaseName(); | |||||
var sql = $@"SELECT SCHEMA_NAME FROM SCHEMATA WHERE SCHEMA_NAME = '{databaseName}'"; | |||||
var result = connection.QueryFirstOrDefault<string>(sql); | |||||
Assert.NotNull(result); | |||||
Assert.True(databaseName.Equals(result, System.StringComparison.CurrentCultureIgnoreCase)); | |||||
} | |||||
} | |||||
//[Fact] | |||||
public void DatabaseTable_Published_IsExists() | |||||
{ | |||||
var tableName = "cap.published"; | |||||
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString)) | |||||
{ | |||||
var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'"; | |||||
var result = connection.QueryFirstOrDefault<string>(sql); | |||||
Assert.NotNull(result); | |||||
Assert.Equal(tableName, result); | |||||
} | |||||
} | |||||
//[Fact] | |||||
public void DatabaseTable_Queue_IsExists() | |||||
{ | |||||
var tableName = "cap.queue"; | |||||
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString)) | |||||
{ | |||||
var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'"; | |||||
var result = connection.QueryFirstOrDefault<string>(sql); | |||||
Assert.NotNull(result); | |||||
Assert.Equal(tableName, result); | |||||
} | |||||
} | |||||
//[Fact] | |||||
public void DatabaseTable_Received_IsExists() | |||||
{ | |||||
var tableName = "cap.received"; | |||||
using (var connection = ConnectionUtil.CreateConnection(_masterDbConnectionString)) | |||||
{ | |||||
var sql = $"SELECT TABLE_NAME FROM `TABLES` WHERE TABLE_SCHEMA='{_dbName}' AND TABLE_NAME = '{tableName}'"; | |||||
var result = connection.QueryFirstOrDefault<string>(sql); | |||||
Assert.NotNull(result); | |||||
Assert.Equal(tableName, result); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,97 @@ | |||||
using System; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
namespace DotNetCore.CAP.PostgreSql.Test | |||||
{ | |||||
public abstract class TestHost : IDisposable | |||||
{ | |||||
protected IServiceCollection _services; | |||||
protected string _connectionString; | |||||
private IServiceProvider _provider; | |||||
private IServiceProvider _scopedProvider; | |||||
public TestHost() | |||||
{ | |||||
CreateServiceCollection(); | |||||
PreBuildServices(); | |||||
BuildServices(); | |||||
PostBuildServices(); | |||||
} | |||||
protected IServiceProvider Provider => _scopedProvider ?? _provider; | |||||
private void CreateServiceCollection() | |||||
{ | |||||
var services = new ServiceCollection(); | |||||
services.AddOptions(); | |||||
services.AddLogging(); | |||||
_connectionString = ConnectionUtil.GetConnectionString(); | |||||
services.AddSingleton(new PostgreSqlOptions { ConnectionString = _connectionString }); | |||||
services.AddSingleton<PostgreSqlStorage>(); | |||||
_services = services; | |||||
} | |||||
protected virtual void PreBuildServices() | |||||
{ | |||||
} | |||||
private void BuildServices() | |||||
{ | |||||
_provider = _services.BuildServiceProvider(); | |||||
} | |||||
protected virtual void PostBuildServices() | |||||
{ | |||||
} | |||||
public IDisposable CreateScope() | |||||
{ | |||||
var scope = CreateScope(_provider); | |||||
var loc = scope.ServiceProvider; | |||||
_scopedProvider = loc; | |||||
return new DelegateDisposable(() => | |||||
{ | |||||
if (_scopedProvider == loc) | |||||
{ | |||||
_scopedProvider = null; | |||||
} | |||||
scope.Dispose(); | |||||
}); | |||||
} | |||||
public IServiceScope CreateScope(IServiceProvider provider) | |||||
{ | |||||
var scope = provider.GetService<IServiceScopeFactory>().CreateScope(); | |||||
return scope; | |||||
} | |||||
public T GetService<T>() => Provider.GetService<T>(); | |||||
public T Ensure<T>(ref T service) | |||||
where T : class | |||||
=> service ?? (service = GetService<T>()); | |||||
public virtual void Dispose() | |||||
{ | |||||
(_provider as IDisposable)?.Dispose(); | |||||
} | |||||
private class DelegateDisposable : IDisposable | |||||
{ | |||||
private Action _dispose; | |||||
public DelegateDisposable(Action dispose) | |||||
{ | |||||
_dispose = dispose; | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
_dispose(); | |||||
} | |||||
} | |||||
} | |||||
} |