Browse Source

Merge pull request #28 from dotnetcore/postgre-sql

Supported PostgreSql
master
Savorboard 7 years ago
committed by GitHub
parent
commit
c579667a33
20 changed files with 848 additions and 298 deletions
  1. +11
    -1
      CAP.sln
  2. +1
    -1
      appveyor.yml
  3. +2
    -2
      build/version.props
  4. +17
    -149
      src/DotNetCore.CAP.MySql/CapPublisher.cs
  5. +18
    -0
      src/DotNetCore.CAP.PostgreSql/CAP.EFOptions.cs
  6. +49
    -0
      src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs
  7. +51
    -0
      src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs
  8. +11
    -0
      src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs
  9. +65
    -0
      src/DotNetCore.CAP.PostgreSql/CapPublisher.cs
  10. +26
    -0
      src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj
  11. +11
    -0
      src/DotNetCore.CAP.PostgreSql/FetchedMessage.cs
  12. +61
    -0
      src/DotNetCore.CAP.PostgreSql/IAdditionalProcessor.Default.cs
  13. +74
    -0
      src/DotNetCore.CAP.PostgreSql/PostgreSqlFetchedMessage.cs
  14. +67
    -0
      src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs
  15. +137
    -0
      src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs
  16. +71
    -0
      src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageTransaction.cs
  17. +14
    -145
      src/DotNetCore.CAP.SqlServer/CapPublisher.cs
  18. +152
    -0
      src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
  19. +5
    -0
      src/DotNetCore.CAP/Models/CapPublishedMessage.cs
  20. +5
    -0
      src/DotNetCore.CAP/Models/CapReceivedMessage.cs

+ 11
- 1
CAP.sln View File

@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26430.16
VisualStudioVersion = 15.0.26730.0
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{9B2AE124-6636-4DE9-83A3-70360DABD0C4}"
EndProject
@@ -65,6 +65,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.MySql", "sa
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Kafka.SqlServer", "samples\Sample.Kafka.SqlServer\Sample.Kafka.SqlServer.csproj", "{AF17B956-B79E-48B7-9B5B-EB15A386B112}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.PostgreSql", "src\DotNetCore.CAP.PostgreSql\DotNetCore.CAP.PostgreSql.csproj", "{82C403AB-ED68-4084-9A1D-11334F9F08F9}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -110,6 +112,10 @@ Global
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AF17B956-B79E-48B7-9B5B-EB15A386B112}.Release|Any CPU.Build.0 = Release|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{82C403AB-ED68-4084-9A1D-11334F9F08F9}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -126,5 +132,9 @@ Global
{80A84F62-1558-427B-BA74-B47AA8A665B5} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{9F3F9BFE-7B6A-4A7A-A6E6-8B517D611873} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{AF17B956-B79E-48B7-9B5B-EB15A386B112} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{82C403AB-ED68-4084-9A1D-11334F9F08F9} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}
EndGlobalSection
EndGlobal

+ 1
- 1
appveyor.yml View File

@@ -1,5 +1,5 @@
version: '{build}'
os: Visual Studio 2015
os: Visual Studio 2017 Preview
environment:
BUILDING_ON_PLATFORM: win
BuildEnvironment: appveyor


+ 2
- 2
build/version.props View File

@@ -1,8 +1,8 @@
<Project>
<PropertyGroup>
<VersionMajor>1</VersionMajor>
<VersionMinor>1</VersionMinor>
<VersionPatch>1</VersionPatch>
<VersionMinor>2</VersionMinor>
<VersionPatch>0</VersionPatch>
<VersionQuality></VersionQuality>
<VersionPrefix>$(VersionMajor).$(VersionMinor).$(VersionPatch)</VersionPrefix>
</PropertyGroup>


+ 17
- 149
src/DotNetCore.CAP.MySql/CapPublisher.cs View File

@@ -2,34 +2,27 @@
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP.MySql
{
public class CapPublisher : ICapPublisher
public class CapPublisher : CapPublisherBase
{
private readonly ILogger _logger;
private readonly MySqlOptions _options;
private readonly DbContext _dbContext;

protected bool IsCapOpenedTrans { get; set; }

protected bool IsUsingEF { get; }

protected IServiceProvider ServiceProvider { get; }

public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
MySqlOptions options)
{
ServiceProvider = provider;
_logger = logger;
_options = options;
_logger = logger;

if (_options.DbContextType != null)
{
@@ -38,166 +31,41 @@ namespace DotNetCore.CAP.MySql
}
}

public void Publish<T>(string name, T contentObj)
{
CheckIsUsingEF(name);

var content = Serialize(contentObj);

PublishCore(name, content);
}

public Task PublishAsync<T>(string name, T contentObj)
{
CheckIsUsingEF(name);

var content = Serialize(contentObj);

return PublishCoreAsync(name, content);
}

public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);

PrepareConnection(dbConnection, ref dbTransaction);

var content = Serialize(contentObj);

PublishWithTrans(name, content, dbConnection, dbTransaction);
}

public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);

PrepareConnection(dbConnection, ref dbTransaction);

var content = Serialize(contentObj);

return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
}

#region private methods

private string Serialize<T>(T obj)
{
string content = string.Empty;
if (Helper.IsComplexType(typeof(T)))
{
content = Helper.ToJson(obj);
}
else
{
content = obj?.ToString();
}
return content;
}

private void PrepareConnection(IDbConnection dbConnection, ref IDbTransaction dbTransaction)
protected override void PrepareConnectionForEF()
{
if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));

if (dbConnection.State != ConnectionState.Open)
dbConnection.Open();

if (dbTransaction == null)
{
IsCapOpenedTrans = true;
dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
}

private void CheckIsUsingEF(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}

private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
}

private async Task PublishCoreAsync(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
}
var dbTransaction = transaction.GetDbTransaction();
await PublishWithTransAsync(name, content, connection, dbTransaction);
}

private void PublishCore(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
DbConnection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
}
var dbTransaction = transaction.GetDbTransaction();
PublishWithTrans(name, content, connection, dbTransaction);
DbTranasaction = transaction.GetDbTransaction();
}

private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction);

_logger.LogInformation("Message has been persisted in the database. name:" + name);

if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
dbConnection.Execute(PrepareSql(), message, dbTransaction);

PublishQueuer.PulseEvent.Set();
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}

private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction);

_logger.LogInformation("Message has been persisted in the database. name:" + name);

if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set();
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}

#region private methods

private string PrepareSql()
{
return $"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}



#endregion private methods
}
}

+ 18
- 0
src/DotNetCore.CAP.PostgreSql/CAP.EFOptions.cs View File

@@ -0,0 +1,18 @@
using System;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class EFOptions
{
public const string DefaultSchema = "cap";

/// <summary>
/// Gets or sets the schema to use when creating database objects.
/// Default is <see cref="DefaultSchema"/>.
/// </summary>
public string Schema { get; set; } = DefaultSchema;

public Type DbContextType { get; set; }
}
}

+ 49
- 0
src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs View File

@@ -0,0 +1,49 @@
using System;
using DotNetCore.CAP;
using Microsoft.EntityFrameworkCore;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
public static CapOptions UsePostgreSql(this CapOptions options, string connectionString)
{
return options.UsePostgreSql(opt =>
{
opt.ConnectionString = connectionString;
});
}

public static CapOptions UsePostgreSql(this CapOptions options, Action<PostgreSqlOptions> configure)
{
if (configure == null) throw new ArgumentNullException(nameof(configure));

options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure));

return options;
}

public static CapOptions UseEntityFramework<TContext>(this CapOptions options)
where TContext : DbContext
{
return options.UseEntityFramework<TContext>(opt =>
{
opt.DbContextType = typeof(TContext);
});
}

public static CapOptions UseEntityFramework<TContext>(this CapOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));

var efOptions = new EFOptions { DbContextType = typeof(TContext) };
configure(efOptions);

options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure));

return options;
}
}
}

+ 51
- 0
src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs View File

@@ -0,0 +1,51 @@
using System;
using DotNetCore.CAP.Processor;
using DotNetCore.CAP.PostgreSql;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
internal class PostgreSqlCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<PostgreSqlOptions> _configure;

public PostgreSqlCapOptionsExtension(Action<PostgreSqlOptions> configure)
{
_configure = configure;
}

public void AddServices(IServiceCollection services)
{
services.AddSingleton<IStorage, PostgreSqlStorage>();
services.AddScoped<IStorageConnection, PostgreSqlStorageConnection>();
services.AddScoped<ICapPublisher, CapPublisher>();
services.AddTransient<IAdditionalProcessor, DefaultAdditionalProcessor>();

var postgreSqlOptions = new PostgreSqlOptions();
_configure(postgreSqlOptions);

if (postgreSqlOptions.DbContextType != null)
{
var provider = TempBuildService(services);
var dbContextObj = provider.GetService(postgreSqlOptions.DbContextType);
var dbContext = (DbContext)dbContextObj;
postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
}
services.AddSingleton(postgreSqlOptions);
}

#if NETSTANDARD1_6
private IServiceProvider TempBuildService(IServiceCollection services)
{
return services.BuildServiceProvider();
}
#else
private ServiceProvider TempBuildService(IServiceCollection services)
{
return services.BuildServiceProvider();
}
#endif
}
}

+ 11
- 0
src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs View File

@@ -0,0 +1,11 @@
// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
public class PostgreSqlOptions : EFOptions
{
/// <summary>
/// Gets or sets the database's connection string that will be used to store database entities.
/// </summary>
public string ConnectionString { get; set; }
}
}

+ 65
- 0
src/DotNetCore.CAP.PostgreSql/CapPublisher.cs View File

@@ -0,0 +1,65 @@
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Abstractions;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP.PostgreSql
{
public class CapPublisher : CapPublisherBase
{
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;
private readonly DbContext _dbContext;

public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
PostgreSqlOptions options)
{
ServiceProvider = provider;
_options = options;
_logger = logger;

if (_options.DbContextType != null)
{
IsUsingEF = true;
_dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType);
}
}

protected override void PrepareConnectionForEF()
{
DbConnection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
}
DbTranasaction = transaction.GetDbTransaction();
}

protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);

_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}

protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);

_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}

private string PrepareSql()
{
return $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}
}
}

+ 26
- 0
src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj View File

@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="..\..\build\common.props" />
<PropertyGroup>
<TargetFrameworks>netstandard1.6;netstandard2.0;</TargetFrameworks>
<AssemblyName>DotNetCore.CAP.PostgreSql</AssemblyName>
<PackageTags>$(PackageTags);PostgreSQL</PackageTags>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(TargetFramework)|$(Platform)'=='Debug|netstandard1.6|AnyCPU'">
<DefineConstants>TRACE;DEBUG</DefineConstants>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="1.1.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="1.1.2" />
<PackageReference Include="Npgsql" Version="3.2.5" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

</Project>

+ 11
- 0
src/DotNetCore.CAP.PostgreSql/FetchedMessage.cs View File

@@ -0,0 +1,11 @@
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.PostgreSql
{
internal class FetchedMessage
{
public int MessageId { get; set; }

public MessageType MessageType { get; set; }
}
}

+ 61
- 0
src/DotNetCore.CAP.PostgreSql/IAdditionalProcessor.Default.cs View File

@@ -0,0 +1,61 @@
using System;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Processor;
using Microsoft.Extensions.Logging;
using Npgsql;

namespace DotNetCore.CAP.PostgreSql
{
internal class DefaultAdditionalProcessor : IAdditionalProcessor
{
private readonly IServiceProvider _provider;
private readonly ILogger _logger;
private readonly PostgreSqlOptions _options;

private const int MaxBatch = 1000;
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2);

private static readonly string[] Tables =
{
"published","received"
};

public DefaultAdditionalProcessor(
IServiceProvider provider,
ILogger<DefaultAdditionalProcessor> logger,
PostgreSqlOptions sqlServerOptions)
{
_logger = logger;
_provider = provider;
_options = sqlServerOptions;
}

public async Task ProcessAsync(ProcessingContext context)
{
_logger.LogDebug("Collecting expired entities.");

foreach (var table in Tables)
{
var removedCount = 0;
do
{
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
removedCount = await connection.ExecuteAsync($"DELETE FROM \"{_options.Schema}\".\"{table}\" WHERE \"ExpiresAt\" < @now AND \"Id\" IN (SELECT \"Id\" FROM \"{_options.Schema}\".\"{table}\" LIMIT @count);",
new { now = DateTime.Now, count = MaxBatch });
}

if (removedCount != 0)
{
await context.WaitAsync(_delay);
context.ThrowIfStopping();
}
} while (removedCount != 0);
}

await context.WaitAsync(_waitingInterval);
}
}
}

+ 74
- 0
src/DotNetCore.CAP.PostgreSql/PostgreSqlFetchedMessage.cs View File

@@ -0,0 +1,74 @@
using System;
using System.Data;
using System.Threading;
using Dapper;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlFetchedMessage : IFetchedMessage
{
private readonly IDbConnection _connection;
private readonly IDbTransaction _transaction;
private readonly Timer _timer;
private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
private readonly object _lockObject = new object();

public PostgreSqlFetchedMessage(int messageId,
MessageType type,
IDbConnection connection,
IDbTransaction transaction)
{
MessageId = messageId;
MessageType = type;
_connection = connection;
_transaction = transaction;
_timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
}

public int MessageId { get; }

public MessageType MessageType { get; }

public void RemoveFromQueue()
{
lock (_lockObject)
{
_transaction.Commit();
}
}

public void Requeue()
{
lock (_lockObject)
{
_transaction.Rollback();
}
}

public void Dispose()
{
lock (_lockObject)
{
_timer?.Dispose();
_transaction.Dispose();
_connection.Dispose();
}
}

private void ExecuteKeepAliveQuery(object obj)
{
lock (_lockObject)
{
try
{
_connection?.Execute("SELECT 1", _transaction);
}
catch
{
// ignored
}
}
}
}
}

+ 67
- 0
src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs View File

@@ -0,0 +1,67 @@
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Npgsql;

namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorage : IStorage
{
private readonly PostgreSqlOptions _options;
private readonly ILogger _logger;

public PostgreSqlStorage(ILogger<PostgreSqlStorage> logger, PostgreSqlOptions options)
{
_options = options;
_logger = logger;
}

public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;

var sql = CreateDbTablesScript(_options.Schema);

using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
_logger.LogDebug("Ensuring all create database tables script are applied.");
}

protected virtual string CreateDbTablesScript(string schema)
{
var batchSql = $@"
CREATE SCHEMA IF NOT EXISTS ""{schema}"";

CREATE TABLE IF NOT EXISTS ""{schema}"".""queue""(
""MessageId"" int NOT NULL ,
""MessageType"" int NOT NULL
);

CREATE TABLE IF NOT EXISTS ""{schema}"".""received""(
""Id"" SERIAL PRIMARY KEY NOT NULL,
""Name"" VARCHAR(200) NOT NULL,
""Group"" VARCHAR(200) NULL,
""Content"" TEXT NULL,
""Retries"" INT NOT NULL,
""Added"" TIMESTAMP NOT NULL,
""ExpiresAt"" TIMESTAMP NULL,
""StatusName"" VARCHAR(50) NOT NULL
);

CREATE TABLE IF NOT EXISTS ""{schema}"".""published""(
""Id"" SERIAL PRIMARY KEY NOT NULL,
""Name"" VARCHAR(200) NOT NULL,
""Content"" TEXT NULL,
""Retries"" INT NOT NULL,
""Added"" TIMESTAMP NOT NULL,
""ExpiresAt"" TIMESTAMP NULL,
""StatusName"" VARCHAR(50) NOT NULL
);";
return batchSql;
}
}
}

+ 137
- 0
src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs View File

@@ -0,0 +1,137 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using Npgsql;

namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorageConnection : IStorageConnection
{
private readonly PostgreSqlOptions _options;

public PostgreSqlStorageConnection(PostgreSqlOptions options)
{
_options = options;
}

public PostgreSqlOptions Options => _options;

public IStorageTransaction CreateTransaction()
{
return new PostgreSqlStorageTransaction(this);
}

public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"Id\"={id}";

using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}

public Task<IFetchedMessage> FetchNextMessageAsync()
{
var sql = $@"DELETE FROM ""{_options.Schema}"".""queue"" WHERE ""MessageId"" = (SELECT ""MessageId"" FROM ""{_options.Schema}"".""queue"" FOR UPDATE SKIP LOCKED LIMIT 1) RETURNING *;";
return FetchNextMessageCoreAsync(sql);
}

public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";

using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
}

public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;";

using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
}

// CapReceviedMessage

public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $"INSERT INTO \"{_options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
}
}

public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"Id\"={id}";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}

public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
}

public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
}
}

public void Dispose()
{
}

private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
//here don't use `using` to dispose
var connection = new NpgsqlConnection(_options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage = null;
try
{
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
}
catch (NpgsqlException)
{
transaction.Dispose();
throw;
}

if (fetchedMessage == null)
{
transaction.Rollback();
transaction.Dispose();
connection.Dispose();
return null;
}

return new PostgreSqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction);
}
}
}

+ 71
- 0
src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageTransaction.cs View File

@@ -0,0 +1,71 @@
using System;
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Models;
using Npgsql;

namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorageTransaction : IStorageTransaction, IDisposable
{
private readonly string _schema;

private readonly IDbTransaction _dbTransaction;
private readonly IDbConnection _dbConnection;

public PostgreSqlStorageTransaction(PostgreSqlStorageConnection connection)
{
var options = connection.Options;
_schema = options.Schema;

_dbConnection = new NpgsqlConnection(options.ConnectionString);
_dbConnection.Open();
_dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}

public void UpdateMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $@"UPDATE ""{_schema}"".""published"" SET ""Retries""=@Retries,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}

public void UpdateMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $@"UPDATE ""{_schema}"".""received"" SET ""Retries""=@Retries,""ExpiresAt""=@ExpiresAt,""StatusName""=@StatusName WHERE ""Id""=@Id;";
_dbConnection.Execute(sql, message, _dbTransaction);
}

public void EnqueueMessage(CapPublishedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }, _dbTransaction);
}

public void EnqueueMessage(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $@"INSERT INTO ""{_schema}"".""queue"" values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }, _dbTransaction);
}

public Task CommitAsync()
{
_dbTransaction.Commit();
return Task.CompletedTask;
}

public void Dispose()
{
_dbTransaction.Dispose();
_dbConnection.Dispose();
}
}
}

+ 14
- 145
src/DotNetCore.CAP.SqlServer/CapPublisher.cs View File

@@ -2,27 +2,20 @@
using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Abstractions;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP.SqlServer
{
public class CapPublisher : ICapPublisher
public class CapPublisher : CapPublisherBase
{
private readonly ILogger _logger;
private readonly SqlServerOptions _options;
private readonly DbContext _dbContext;

protected bool IsCapOpenedTrans { get; set; }

protected bool IsUsingEF { get; }

protected IServiceProvider ServiceProvider { get; }

public CapPublisher(IServiceProvider provider,
ILogger<CapPublisher> logger,
SqlServerOptions options)
@@ -38,164 +31,40 @@ namespace DotNetCore.CAP.SqlServer
}
}

public void Publish<T>(string name, T contentObj)
{
CheckIsUsingEF(name);

var content = Serialize(contentObj);

PublishCore(name, content);
}

public Task PublishAsync<T>(string name, T contentObj)
{
CheckIsUsingEF(name);

var content = Serialize(contentObj);

return PublishCoreAsync(name, content);
}

public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnection(dbConnection, ref dbTransaction);

var content = Serialize(contentObj);

PublishWithTrans(name, content, dbConnection, dbTransaction);
}

public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnection(dbConnection, ref dbTransaction);

var content = Serialize(contentObj);

return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
}

#region private methods

private string Serialize<T>(T obj)
{
string content = string.Empty;
if (Helper.IsComplexType(typeof(T)))
{
content = Helper.ToJson(obj);
}
else
{
content = obj.ToString();
}
return content;
}

private void PrepareConnection(IDbConnection dbConnection, ref IDbTransaction dbTransaction)
{
if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));

if (dbConnection.State != ConnectionState.Open)
dbConnection.Open();

if (dbTransaction == null)
{
IsCapOpenedTrans = true;
dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
}

private void CheckIsUsingEF(string name)
protected override void PrepareConnectionForEF()
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}

private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
}

private async Task PublishCoreAsync(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted);
}
var dbTransaction = transaction.GetDbTransaction();
await PublishWithTransAsync(name, content, connection, dbTransaction);
}

private void PublishCore(string name, string content)
{
var connection = _dbContext.Database.GetDbConnection();
DbConnection = _dbContext.Database.GetDbConnection();
var transaction = _dbContext.Database.CurrentTransaction;
if (transaction == null)
{
IsCapOpenedTrans = true;
transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted);
}
var dbTransaction = transaction.GetDbTransaction();
PublishWithTrans(name, content, connection, dbTransaction);
DbTranasaction = transaction.GetDbTransaction();
}

private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction);

_logger.LogInformation("Message has been persisted in the database. name:" + name);

if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
dbConnection.Execute(PrepareSql(), message, dbTransaction);

PublishQueuer.PulseEvent.Set();
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}

private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};
var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction);
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);

_logger.LogInformation("Message has been persisted in the database. name:" + name);

if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set();
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString());
}

#region private methods

private string PrepareSql()
{
return $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)";
}


#endregion private methods
}
}

+ 152
- 0
src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs View File

@@ -0,0 +1,152 @@
using System;
using System.Data;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor;

namespace DotNetCore.CAP.Abstractions
{
public abstract class CapPublisherBase : ICapPublisher
{
protected IDbConnection DbConnection { get; set; }
protected IDbTransaction DbTranasaction { get; set; }
protected bool IsCapOpenedTrans { get; set; }
protected bool IsUsingEF { get; set; }
protected IServiceProvider ServiceProvider { get; set; }
public void Publish<T>(string name, T contentObj)
{
CheckIsUsingEF(name);
PrepareConnectionForEF();

var content = Serialize(contentObj);

PublishWithTrans(name, content, DbConnection, DbTranasaction);
}

public Task PublishAsync<T>(string name, T contentObj)
{
CheckIsUsingEF(name);
PrepareConnectionForEF();

var content = Serialize(contentObj);

return PublishWithTransAsync(name, content, DbConnection, DbTranasaction);
}

public void Publish<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbConnection, ref dbTransaction);

var content = Serialize(contentObj);

PublishWithTrans(name, content, dbConnection, dbTransaction);
}

public Task PublishAsync<T>(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null)
{
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbConnection, ref dbTransaction);

var content = Serialize(contentObj);

return PublishWithTransAsync(name, content, dbConnection, dbTransaction);
}

protected abstract void PrepareConnectionForEF();

protected abstract void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message);

protected abstract Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message);

#region private methods

private string Serialize<T>(T obj)
{
string content = string.Empty;
if (Helper.IsComplexType(typeof(T)))
{
content = Helper.ToJson(obj);
}
else
{
content = obj.ToString();
}
return content;
}

private void PrepareConnectionForAdo(IDbConnection dbConnection, ref IDbTransaction dbTransaction)
{
if (dbConnection == null)
throw new ArgumentNullException(nameof(dbConnection));

if (dbConnection.State != ConnectionState.Open)
dbConnection.Open();

if (dbTransaction == null)
{
IsCapOpenedTrans = true;
dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted);
}
}

private void CheckIsUsingEF(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (!IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbConnection and IDbTransaction.");
}

private void CheckIsAdoNet(string name)
{
if (name == null) throw new ArgumentNullException(nameof(name));
if (IsUsingEF)
throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded.");
}

private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};

await ExecuteAsync(dbConnection, dbTransaction, message);

if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set();
}

private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction)
{
var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};

Execute(dbConnection, dbTransaction, message);

if (IsCapOpenedTrans)
{
dbTransaction.Commit();
dbTransaction.Dispose();
dbConnection.Dispose();
}
PublishQueuer.PulseEvent.Set();
}

#endregion private methods
}
}

+ 5
- 0
src/DotNetCore.CAP/Models/CapPublishedMessage.cs View File

@@ -34,5 +34,10 @@ namespace DotNetCore.CAP.Models
public int Retries { get; set; }

public string StatusName { get; set; }

public override string ToString()
{
return "name:" + Name + ", content:" + Content;
}
}
}

+ 5
- 0
src/DotNetCore.CAP/Models/CapReceivedMessage.cs View File

@@ -47,5 +47,10 @@ namespace DotNetCore.CAP.Models
Content = Content
};
}

public override string ToString()
{
return "name:" + Name + ", content:" + Content;
}
}
}

Loading…
Cancel
Save