From 1cd8d6ff40b12ba50145f0105e11572006fb208e Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Tue, 8 Aug 2017 18:18:19 +0800 Subject: [PATCH] support PostgreSQL. --- build/version.props | 4 +- .../CAP.EFOptions.cs | 18 +++ .../CAP.Options.Extensions.cs | 49 ++++++ .../CAP.PostgreSqlCapOptionsExtension.cs | 52 +++++++ .../CAP.PostgreSqlOptions.cs | 11 ++ src/DotNetCore.CAP.PostgreSql/CapPublisher.cs | 78 ++++++++++ .../DotNetCore.CAP.PostgreSql.csproj | 26 ++++ .../FetchedMessage.cs | 11 ++ .../IAdditionalProcessor.Default.cs | 61 ++++++++ .../PostgreSqlFetchedMessage.cs | 74 +++++++++ .../PostgreSqlStorage.cs | 67 ++++++++ .../PostgreSqlStorageConnection.cs | 140 +++++++++++++++++ .../PostgreSqlStorageTransaction.cs | 71 +++++++++ .../Abstractions/CapPublisherBase.cs | 143 ++++++++++++++++++ .../Models/CapPublishedMessage.cs | 5 + .../Models/CapReceivedMessage.cs | 5 + 16 files changed, 813 insertions(+), 2 deletions(-) create mode 100644 src/DotNetCore.CAP.PostgreSql/CAP.EFOptions.cs create mode 100644 src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs create mode 100644 src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs create mode 100644 src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs create mode 100644 src/DotNetCore.CAP.PostgreSql/CapPublisher.cs create mode 100644 src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj create mode 100644 src/DotNetCore.CAP.PostgreSql/FetchedMessage.cs create mode 100644 src/DotNetCore.CAP.PostgreSql/IAdditionalProcessor.Default.cs create mode 100644 src/DotNetCore.CAP.PostgreSql/PostgreSqlFetchedMessage.cs create mode 100644 src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs create mode 100644 src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs create mode 100644 src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageTransaction.cs create mode 100644 src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs diff --git a/build/version.props b/build/version.props index 2480b9f..a5d60c8 100644 --- a/build/version.props +++ b/build/version.props @@ -1,8 +1,8 @@ 1 - 1 - 1 + 2 + 0 $(VersionMajor).$(VersionMinor).$(VersionPatch) diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.EFOptions.cs b/src/DotNetCore.CAP.PostgreSql/CAP.EFOptions.cs new file mode 100644 index 0000000..7daa1e9 --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/CAP.EFOptions.cs @@ -0,0 +1,18 @@ +using System; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + public class EFOptions + { + public const string DefaultSchema = "cap"; + + /// + /// Gets or sets the schema to use when creating database objects. + /// Default is . + /// + public string Schema { get; set; } = DefaultSchema; + + public Type DbContextType { get; set; } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs new file mode 100644 index 0000000..1505f02 --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/CAP.Options.Extensions.cs @@ -0,0 +1,49 @@ +using System; +using DotNetCore.CAP; +using Microsoft.EntityFrameworkCore; + +// ReSharper disable once CheckNamespace +namespace Microsoft.Extensions.DependencyInjection +{ + public static class CapOptionsExtensions + { + public static CapOptions UsePostgreSql(this CapOptions options, string connectionString) + { + return options.UsePostgreSql(opt => + { + opt.ConnectionString = connectionString; + }); + } + + public static CapOptions UsePostgreSql(this CapOptions options, Action configure) + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + + options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure)); + + return options; + } + + public static CapOptions UseEntityFramework(this CapOptions options) + where TContext : DbContext + { + return options.UseEntityFramework(opt => + { + opt.DbContextType = typeof(TContext); + }); + } + + public static CapOptions UseEntityFramework(this CapOptions options, Action configure) + where TContext : DbContext + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + + var efOptions = new EFOptions { DbContextType = typeof(TContext) }; + configure(efOptions); + + options.RegisterExtension(new PostgreSqlCapOptionsExtension(configure)); + + return options; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs new file mode 100644 index 0000000..1801ea8 --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlCapOptionsExtension.cs @@ -0,0 +1,52 @@ +using System; +using DotNetCore.CAP.Processor; +using DotNetCore.CAP.PostgreSql; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; + +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + internal class PostgreSqlCapOptionsExtension : ICapOptionsExtension + { + private readonly Action _configure; + + public PostgreSqlCapOptionsExtension(Action configure) + { + _configure = configure; + } + + public void AddServices(IServiceCollection services) + { + services.AddSingleton(); + services.AddScoped(); + services.AddScoped(); + services.AddTransient(); + + var postgreSqlOptions = new PostgreSqlOptions(); + _configure(postgreSqlOptions); + + if (postgreSqlOptions.DbContextType != null) + { + var provider = TempBuildService(services); + var dbContextObj = provider.GetService(postgreSqlOptions.DbContextType); + var dbContext = (DbContext)dbContextObj; + postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; + } + services.AddSingleton(postgreSqlOptions); + } + +#if NETSTANDARD1_6 + private IServiceProvider TempBuildService(IServiceCollection services) + { + return services.BuildServiceProvider(); + } +#else + private ServiceProvider TempBuildService(IServiceCollection services) + { + return services.BuildServiceProvider(); + } +#endif + + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs new file mode 100644 index 0000000..9a95464 --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs @@ -0,0 +1,11 @@ +// ReSharper disable once CheckNamespace +namespace DotNetCore.CAP +{ + public class PostgreSqlOptions : EFOptions + { + /// + /// Gets or sets the database's connection string that will be used to store database entities. + /// + public string ConnectionString { get; set; } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/CapPublisher.cs b/src/DotNetCore.CAP.PostgreSql/CapPublisher.cs new file mode 100644 index 0000000..b8edf5d --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/CapPublisher.cs @@ -0,0 +1,78 @@ +using System; +using System.Data; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Models; +using DotNetCore.CAP.Abstractions; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.Extensions.Logging; + +namespace DotNetCore.CAP.PostgreSql +{ + public class CapPublisher : CapPublisherBase + { + private readonly ILogger _logger; + private readonly PostgreSqlOptions _options; + private readonly DbContext _dbContext; + + public CapPublisher(IServiceProvider provider, + ILogger logger, + PostgreSqlOptions options) + { + _options = options; + _logger = logger; + + if (_options.DbContextType != null) + { + IsUsingEF = true; + _dbContext = (DbContext)ServiceProvider.GetService(_options.DbContextType); + } + } + + protected override void PrepareConnectionForEF() + { + _dbConnection = _dbContext.Database.GetDbConnection(); + var transaction = _dbContext.Database.CurrentTransaction; + if (transaction == null) + { + IsCapOpenedTrans = true; + transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); + } + _dbTranasaction = transaction.GetDbTransaction(); + } + + protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) + { + dbConnection.Execute(PrepareSql(), message, dbTransaction); + + _logger.LogDebug("Message has been persisted in the database. name:" + message.ToString()); + + if (IsCapOpenedTrans) + { + dbTransaction.Commit(); + dbTransaction.Dispose(); + dbConnection.Dispose(); + } + } + + protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) + { + await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction); + + _logger.LogDebug("Message has been persisted in the database. name:" + message.ToString()); + + if (IsCapOpenedTrans) + { + dbTransaction.Commit(); + dbTransaction.Dispose(); + dbConnection.Dispose(); + } + } + + private string PrepareSql() + { + return $"INSERT INTO \"{_options.Schema}\".\"published\" (\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj b/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj new file mode 100644 index 0000000..a49961e --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj @@ -0,0 +1,26 @@ + + + + + + netstandard1.6;netstandard2.0; + DotNetCore.CAP.PostgreSql + $(PackageTags);PostgreSQL + + + + TRACE;DEBUG + + + + + + + + + + + + + + diff --git a/src/DotNetCore.CAP.PostgreSql/FetchedMessage.cs b/src/DotNetCore.CAP.PostgreSql/FetchedMessage.cs new file mode 100644 index 0000000..4abd0f3 --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/FetchedMessage.cs @@ -0,0 +1,11 @@ +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.PostgreSql +{ + internal class FetchedMessage + { + public int MessageId { get; set; } + + public MessageType MessageType { get; set; } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/IAdditionalProcessor.Default.cs b/src/DotNetCore.CAP.PostgreSql/IAdditionalProcessor.Default.cs new file mode 100644 index 0000000..ed8786a --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/IAdditionalProcessor.Default.cs @@ -0,0 +1,61 @@ +using System; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Processor; +using Microsoft.Extensions.Logging; +using Npgsql; + +namespace DotNetCore.CAP.PostgreSql +{ + public class DefaultAdditionalProcessor : IAdditionalProcessor + { + private readonly IServiceProvider _provider; + private readonly ILogger _logger; + private readonly PostgreSqlOptions _options; + + private const int MaxBatch = 1000; + private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); + private readonly TimeSpan _waitingInterval = TimeSpan.FromHours(2); + + private static readonly string[] Tables = + { + "published","received" + }; + + public DefaultAdditionalProcessor( + IServiceProvider provider, + ILogger logger, + PostgreSqlOptions sqlServerOptions) + { + _logger = logger; + _provider = provider; + _options = sqlServerOptions; + } + + public async Task ProcessAsync(ProcessingContext context) + { + _logger.LogDebug("Collecting expired entities."); + + foreach (var table in Tables) + { + var removedCount = 0; + do + { + using (var connection = new NpgsqlConnection(_options.ConnectionString)) + { + removedCount = await connection.ExecuteAsync($"DELETE FROM \"{_options.Schema}\".\"{table}\" WHERE ExpiresAt < @now LIMIT @count;", + new { now = DateTime.Now, count = MaxBatch }); + } + + if (removedCount != 0) + { + await context.WaitAsync(_delay); + context.ThrowIfStopping(); + } + } while (removedCount != 0); + } + + await context.WaitAsync(_waitingInterval); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/PostgreSqlFetchedMessage.cs b/src/DotNetCore.CAP.PostgreSql/PostgreSqlFetchedMessage.cs new file mode 100644 index 0000000..7189595 --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/PostgreSqlFetchedMessage.cs @@ -0,0 +1,74 @@ +using System; +using System.Data; +using System.Threading; +using Dapper; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.PostgreSql +{ + public class PostgreSqlFetchedMessage : IFetchedMessage + { + private readonly IDbConnection _connection; + private readonly IDbTransaction _transaction; + private readonly Timer _timer; + private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1); + private readonly object _lockObject = new object(); + + public PostgreSqlFetchedMessage(int messageId, + MessageType type, + IDbConnection connection, + IDbTransaction transaction) + { + MessageId = messageId; + MessageType = type; + _connection = connection; + _transaction = transaction; + _timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval); + } + + public int MessageId { get; } + + public MessageType MessageType { get; } + + public void RemoveFromQueue() + { + lock (_lockObject) + { + _transaction.Commit(); + } + } + + public void Requeue() + { + lock (_lockObject) + { + _transaction.Rollback(); + } + } + + public void Dispose() + { + lock (_lockObject) + { + _timer?.Dispose(); + _transaction.Dispose(); + _connection.Dispose(); + } + } + + private void ExecuteKeepAliveQuery(object obj) + { + lock (_lockObject) + { + try + { + _connection?.Execute("SELECT 1", _transaction); + } + catch + { + // ignored + } + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs b/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs new file mode 100644 index 0000000..2704780 --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs @@ -0,0 +1,67 @@ +using System.Threading; +using System.Threading.Tasks; +using Dapper; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Logging; +using Npgsql; + +namespace DotNetCore.CAP.PostgreSql +{ + public class PostgreSqlStorage : IStorage + { + private readonly PostgreSqlOptions _options; + private readonly ILogger _logger; + + public PostgreSqlStorage(ILogger logger, PostgreSqlOptions options) + { + _options = options; + _logger = logger; + } + + public async Task InitializeAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) return; + + var sql = CreateDbTablesScript(_options.Schema); + + using (var connection = new NpgsqlConnection(_options.ConnectionString)) + { + await connection.ExecuteAsync(sql); + } + _logger.LogDebug("Ensuring all create database tables script are applied."); + } + + protected virtual string CreateDbTablesScript(string schema) + { + var batchSql = $@" +CREATE SCHEMA IF NOT EXISTS ""{schema}""; + +CREATE TABLE IF NOT EXISTS ""{schema}"".""queue""( + ""MessageId"" int NOT NULL , + ""MessageType"" int NOT NULL +); + +CREATE TABLE IF NOT EXISTS ""{schema}"".""received""( + ""Id"" SERIAL PRIMARY KEY NOT NULL, + ""Name"" VARCHAR(200) NOT NULL, + ""Group"" VARCHAR(200) NULL, + ""Content"" TEXT NULL, + ""Retries"" INT NOT NULL, + ""Added"" TIMESTAMP NOT NULL, + ""ExpiresAt"" TIMESTAMP NULL, + ""StatusName"" VARCHAR(50) NOT NULL +); + +CREATE TABLE IF NOT EXISTS ""{schema}"".""published""( + ""Id"" SERIAL PRIMARY KEY NOT NULL, + ""Name"" VARCHAR(200) NOT NULL, + ""Content"" TEXT NULL, + ""Retries"" INT NOT NULL, + ""Added"" TIMESTAMP NOT NULL, + ""ExpiresAt"" TIMESTAMP NULL, + ""StatusName"" VARCHAR(50) NOT NULL +);"; + return batchSql; + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs b/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs new file mode 100644 index 0000000..8e3be8d --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs @@ -0,0 +1,140 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; +using Npgsql; + +namespace DotNetCore.CAP.PostgreSql +{ + public class PostgreSqlStorageConnection : IStorageConnection + { + private readonly PostgreSqlOptions _options; + + public PostgreSqlStorageConnection(PostgreSqlOptions options) + { + _options = options; + } + + public PostgreSqlOptions Options => _options; + + public IStorageTransaction CreateTransaction() + { + return new PostgreSqlStorageTransaction(this); + } + + public async Task GetPublishedMessageAsync(int id) + { + var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"Id\"={id}"; + + using (var connection = new NpgsqlConnection(_options.ConnectionString)) + { + return await connection.QueryFirstOrDefaultAsync(sql); + } + } + + public Task FetchNextMessageAsync() + { + var sql = $@" +SELECT ""MessageId"",""MessageType"" FROM ""{_options.Schema}"".""queue"" LIMIT 1 FOR UPDATE; +DELETE FROM ""{_options.Schema}"".""queue"" LIMIT 1;"; + + return FetchNextMessageCoreAsync(sql); + } + + public async Task GetNextPublishedMessageToBeEnqueuedAsync() + { + var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\" = '{StatusName.Scheduled}' LIMIT 1;"; + + using (var connection = new NpgsqlConnection(_options.ConnectionString)) + { + return await connection.QueryFirstOrDefaultAsync(sql); + } + } + + public async Task> GetFailedPublishedMessages() + { + var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\"='{StatusName.Failed}'"; + + using (var connection = new NpgsqlConnection(_options.ConnectionString)) + { + return await connection.QueryAsync(sql); + } + } + + // CapReceviedMessage + + public async Task StoreReceivedMessageAsync(CapReceivedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $"INSERT INTO \"{_options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + + using (var connection = new NpgsqlConnection(_options.ConnectionString)) + { + await connection.ExecuteAsync(sql, message); + } + } + + public async Task GetReceivedMessageAsync(int id) + { + var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"Id\"={id}"; + using (var connection = new NpgsqlConnection(_options.ConnectionString)) + { + return await connection.QueryFirstOrDefaultAsync(sql); + } + } + + public async Task GetNextReceviedMessageToBeEnqueuedAsync() + { + var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\" = '{StatusName.Scheduled}' LIMIT 1;"; + using (var connection = new NpgsqlConnection(_options.ConnectionString)) + { + return await connection.QueryFirstOrDefaultAsync(sql); + } + } + + public async Task> GetFailedReceviedMessages() + { + var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\"='{StatusName.Failed}'"; + using (var connection = new NpgsqlConnection(_options.ConnectionString)) + { + return await connection.QueryAsync(sql); + } + } + + public void Dispose() + { + } + + private async Task FetchNextMessageCoreAsync(string sql, object args = null) + { + //here don't use `using` to dispose + var connection = new NpgsqlConnection(_options.ConnectionString); + await connection.OpenAsync(); + var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted); + FetchedMessage fetchedMessage = null; + try + { + fetchedMessage = await connection.QueryFirstOrDefaultAsync(sql, args, transaction); + } + catch (NpgsqlException) + { + transaction.Dispose(); + throw; + } + + if (fetchedMessage == null) + { + transaction.Rollback(); + transaction.Dispose(); + connection.Dispose(); + return null; + } + + return new PostgreSqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageTransaction.cs b/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageTransaction.cs new file mode 100644 index 0000000..cc085f4 --- /dev/null +++ b/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageTransaction.cs @@ -0,0 +1,71 @@ +using System; +using System.Data; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Models; +using Npgsql; + +namespace DotNetCore.CAP.PostgreSql +{ + public class PostgreSqlStorageTransaction : IStorageTransaction, IDisposable + { + private readonly string _schema; + + private readonly IDbTransaction _dbTransaction; + private readonly IDbConnection _dbConnection; + + public PostgreSqlStorageTransaction(PostgreSqlStorageConnection connection) + { + var options = connection.Options; + _schema = options.Schema; + + _dbConnection = new NpgsqlConnection(options.ConnectionString); + _dbConnection.Open(); + _dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + } + + public void UpdateMessage(CapPublishedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; + _dbConnection.Execute(sql, message, _dbTransaction); + } + + public void UpdateMessage(CapReceivedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; + _dbConnection.Execute(sql, message, _dbTransaction); + } + + public void EnqueueMessage(CapPublishedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);"; + _dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Publish }, _dbTransaction); + } + + public void EnqueueMessage(CapReceivedMessage message) + { + if (message == null) throw new ArgumentNullException(nameof(message)); + + var sql = $"INSERT INTO [{_schema}].[Queue] values(@MessageId,@MessageType);"; + _dbConnection.Execute(sql, new CapQueue { MessageId = message.Id, MessageType = MessageType.Subscribe }, _dbTransaction); + } + + public Task CommitAsync() + { + _dbTransaction.Commit(); + return Task.CompletedTask; + } + + public void Dispose() + { + _dbTransaction.Dispose(); + _dbConnection.Dispose(); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs new file mode 100644 index 0000000..dd961a1 --- /dev/null +++ b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs @@ -0,0 +1,143 @@ +using System; +using System.Data; +using System.Threading.Tasks; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; +using DotNetCore.CAP.Processor; + +namespace DotNetCore.CAP.Abstractions +{ + public abstract class CapPublisherBase : ICapPublisher + { + protected IDbConnection _dbConnection; + protected IDbTransaction _dbTranasaction; + + protected bool IsCapOpenedTrans { get; set; } + + protected bool IsUsingEF { get; set; } + + protected IServiceProvider ServiceProvider { get; } + + public void Publish(string name, T contentObj) + { + CheckIsUsingEF(name); + PrepareConnectionForEF(); + + var content = Serialize(contentObj); + + PublishWithTrans(name, content, _dbConnection, _dbTranasaction); + } + + public Task PublishAsync(string name, T contentObj) + { + CheckIsUsingEF(name); + PrepareConnectionForEF(); + + var content = Serialize(contentObj); + + return PublishWithTransAsync(name, content, _dbConnection, _dbTranasaction); + } + + public void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + PrepareConnectionForAdo(dbConnection, ref dbTransaction); + + var content = Serialize(contentObj); + + PublishWithTrans(name, content, dbConnection, dbTransaction); + } + + public Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + PrepareConnectionForAdo(dbConnection, ref dbTransaction); + + var content = Serialize(contentObj); + + return PublishWithTransAsync(name, content, dbConnection, dbTransaction); + } + + protected abstract void PrepareConnectionForEF(); + + protected abstract void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message); + + protected abstract Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message); + + #region private methods + + private string Serialize(T obj) + { + string content = string.Empty; + if (Helper.IsComplexType(typeof(T))) + { + content = Helper.ToJson(obj); + } + else + { + content = obj.ToString(); + } + return content; + } + + private void PrepareConnectionForAdo(IDbConnection dbConnection, ref IDbTransaction dbTransaction) + { + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + if (dbConnection.State != ConnectionState.Open) + dbConnection.Open(); + + if (dbTransaction == null) + { + IsCapOpenedTrans = true; + dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + } + } + + private void CheckIsUsingEF(string name) + { + if (name == null) throw new ArgumentNullException(nameof(name)); + if (!IsUsingEF) + throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + + " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); + } + + private void CheckIsAdoNet(string name) + { + if (name == null) throw new ArgumentNullException(nameof(name)); + if (IsUsingEF) + throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); + } + + private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + { + var message = new CapPublishedMessage + { + Name = name, + Content = content, + StatusName = StatusName.Scheduled + }; + + await ExecuteAsync(dbConnection, dbTransaction, message); + + PublishQueuer.PulseEvent.Set(); + } + + private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + { + var message = new CapPublishedMessage + { + Name = name, + Content = content, + StatusName = StatusName.Scheduled + }; + + Execute(dbConnection, dbTransaction, message); + + PublishQueuer.PulseEvent.Set(); + } + + #endregion private methods + } +} diff --git a/src/DotNetCore.CAP/Models/CapPublishedMessage.cs b/src/DotNetCore.CAP/Models/CapPublishedMessage.cs index c23f24d..e02fcb2 100644 --- a/src/DotNetCore.CAP/Models/CapPublishedMessage.cs +++ b/src/DotNetCore.CAP/Models/CapPublishedMessage.cs @@ -34,5 +34,10 @@ namespace DotNetCore.CAP.Models public int Retries { get; set; } public string StatusName { get; set; } + + public override string ToString() + { + return "name:" + Name + ", content:" + Content; + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Models/CapReceivedMessage.cs b/src/DotNetCore.CAP/Models/CapReceivedMessage.cs index 069e4d0..8efe24b 100644 --- a/src/DotNetCore.CAP/Models/CapReceivedMessage.cs +++ b/src/DotNetCore.CAP/Models/CapReceivedMessage.cs @@ -47,5 +47,10 @@ namespace DotNetCore.CAP.Models Content = Content }; } + + public override string ToString() + { + return "name:" + Name + ", content:" + Content; + } } } \ No newline at end of file