diff --git a/src/DotNetCore.CAP.MySql/CapPublisher.cs b/src/DotNetCore.CAP.MySql/CapPublisher.cs index 82476b3..6b8476a 100644 --- a/src/DotNetCore.CAP.MySql/CapPublisher.cs +++ b/src/DotNetCore.CAP.MySql/CapPublisher.cs @@ -2,34 +2,27 @@ using System.Data; using System.Threading.Tasks; using Dapper; -using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Models; -using DotNetCore.CAP.Processor; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.Logging; namespace DotNetCore.CAP.MySql { - public class CapPublisher : ICapPublisher + public class CapPublisher : CapPublisherBase { private readonly ILogger _logger; private readonly MySqlOptions _options; private readonly DbContext _dbContext; - protected bool IsCapOpenedTrans { get; set; } - - protected bool IsUsingEF { get; } - - protected IServiceProvider ServiceProvider { get; } - public CapPublisher(IServiceProvider provider, ILogger logger, MySqlOptions options) { ServiceProvider = provider; - _logger = logger; _options = options; + _logger = logger; if (_options.DbContextType != null) { @@ -38,166 +31,41 @@ namespace DotNetCore.CAP.MySql } } - public void Publish(string name, T contentObj) - { - CheckIsUsingEF(name); - - var content = Serialize(contentObj); - - PublishCore(name, content); - } - - public Task PublishAsync(string name, T contentObj) - { - CheckIsUsingEF(name); - - var content = Serialize(contentObj); - - return PublishCoreAsync(name, content); - } - - public void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) - { - CheckIsAdoNet(name); - - PrepareConnection(dbConnection, ref dbTransaction); - - var content = Serialize(contentObj); - - PublishWithTrans(name, content, dbConnection, dbTransaction); - } - - public Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) - { - CheckIsAdoNet(name); - - PrepareConnection(dbConnection, ref dbTransaction); - - var content = Serialize(contentObj); - - return PublishWithTransAsync(name, content, dbConnection, dbTransaction); - } - - #region private methods - - private string Serialize(T obj) - { - string content = string.Empty; - if (Helper.IsComplexType(typeof(T))) - { - content = Helper.ToJson(obj); - } - else - { - content = obj?.ToString(); - } - return content; - } - - private void PrepareConnection(IDbConnection dbConnection, ref IDbTransaction dbTransaction) + protected override void PrepareConnectionForEF() { - if (dbConnection == null) - throw new ArgumentNullException(nameof(dbConnection)); - - if (dbConnection.State != ConnectionState.Open) - dbConnection.Open(); - - if (dbTransaction == null) - { - IsCapOpenedTrans = true; - dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); - } - } - - private void CheckIsUsingEF(string name) - { - if (name == null) throw new ArgumentNullException(nameof(name)); - if (!IsUsingEF) - throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + - " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); - } - - private void CheckIsAdoNet(string name) - { - if (name == null) throw new ArgumentNullException(nameof(name)); - if (IsUsingEF) - throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); - } - - private async Task PublishCoreAsync(string name, string content) - { - var connection = _dbContext.Database.GetDbConnection(); - var transaction = _dbContext.Database.CurrentTransaction; - if (transaction == null) - { - IsCapOpenedTrans = true; - transaction = await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); - } - var dbTransaction = transaction.GetDbTransaction(); - await PublishWithTransAsync(name, content, connection, dbTransaction); - } - - private void PublishCore(string name, string content) - { - var connection = _dbContext.Database.GetDbConnection(); + DbConnection = _dbContext.Database.GetDbConnection(); var transaction = _dbContext.Database.CurrentTransaction; if (transaction == null) { IsCapOpenedTrans = true; transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); } - var dbTransaction = transaction.GetDbTransaction(); - PublishWithTrans(name, content, connection, dbTransaction); + DbTranasaction = transaction.GetDbTransaction(); } - private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) { - var message = new CapPublishedMessage - { - Name = name, - Content = content, - StatusName = StatusName.Scheduled - }; - await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction); - - _logger.LogInformation("Message has been persisted in the database. name:" + name); - - if (IsCapOpenedTrans) - { - dbTransaction.Commit(); - dbTransaction.Dispose(); - dbConnection.Dispose(); - } + dbConnection.Execute(PrepareSql(), message, dbTransaction); - PublishQueuer.PulseEvent.Set(); + _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); } - private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) { - var message = new CapPublishedMessage - { - Name = name, - Content = content, - StatusName = StatusName.Scheduled - }; - var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction); - - _logger.LogInformation("Message has been persisted in the database. name:" + name); - - if (IsCapOpenedTrans) - { - dbTransaction.Commit(); - dbTransaction.Dispose(); - dbConnection.Dispose(); - } - PublishQueuer.PulseEvent.Set(); + await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction); + + _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); } + #region private methods + private string PrepareSql() { return $"INSERT INTO `{_options.TableNamePrefix}.published` (`Name`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`)VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; } + + #endregion private methods } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/CapPublisher.cs b/src/DotNetCore.CAP.PostgreSql/CapPublisher.cs index ae2ccfd..f141b1e 100644 --- a/src/DotNetCore.CAP.PostgreSql/CapPublisher.cs +++ b/src/DotNetCore.CAP.PostgreSql/CapPublisher.cs @@ -33,42 +33,28 @@ namespace DotNetCore.CAP.PostgreSql protected override void PrepareConnectionForEF() { - _dbConnection = _dbContext.Database.GetDbConnection(); + DbConnection = _dbContext.Database.GetDbConnection(); var transaction = _dbContext.Database.CurrentTransaction; if (transaction == null) { IsCapOpenedTrans = true; transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); } - _dbTranasaction = transaction.GetDbTransaction(); + DbTranasaction = transaction.GetDbTransaction(); } protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) { dbConnection.Execute(PrepareSql(), message, dbTransaction); - _logger.LogDebug("Message has been persisted in the database. name:" + message.ToString()); - - if (IsCapOpenedTrans) - { - dbTransaction.Commit(); - dbTransaction.Dispose(); - dbConnection.Dispose(); - } + _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); } protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) { await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction); - _logger.LogDebug("Message has been persisted in the database. name:" + message.ToString()); - - if (IsCapOpenedTrans) - { - dbTransaction.Commit(); - dbTransaction.Dispose(); - dbConnection.Dispose(); - } + _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); } private string PrepareSql() diff --git a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs index 674b7f7..3acddb2 100644 --- a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs +++ b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs @@ -2,27 +2,20 @@ using System.Data; using System.Threading.Tasks; using Dapper; -using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Models; -using DotNetCore.CAP.Processor; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.Logging; namespace DotNetCore.CAP.SqlServer { - public class CapPublisher : ICapPublisher + public class CapPublisher : CapPublisherBase { private readonly ILogger _logger; private readonly SqlServerOptions _options; private readonly DbContext _dbContext; - protected bool IsCapOpenedTrans { get; set; } - - protected bool IsUsingEF { get; } - - protected IServiceProvider ServiceProvider { get; } - public CapPublisher(IServiceProvider provider, ILogger logger, SqlServerOptions options) @@ -38,164 +31,40 @@ namespace DotNetCore.CAP.SqlServer } } - public void Publish(string name, T contentObj) - { - CheckIsUsingEF(name); - - var content = Serialize(contentObj); - - PublishCore(name, content); - } - - public Task PublishAsync(string name, T contentObj) - { - CheckIsUsingEF(name); - - var content = Serialize(contentObj); - - return PublishCoreAsync(name, content); - } - - public void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) - { - CheckIsAdoNet(name); - PrepareConnection(dbConnection, ref dbTransaction); - - var content = Serialize(contentObj); - - PublishWithTrans(name, content, dbConnection, dbTransaction); - } - - public Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) - { - CheckIsAdoNet(name); - PrepareConnection(dbConnection, ref dbTransaction); - - var content = Serialize(contentObj); - - return PublishWithTransAsync(name, content, dbConnection, dbTransaction); - } - - #region private methods - - private string Serialize(T obj) - { - string content = string.Empty; - if (Helper.IsComplexType(typeof(T))) - { - content = Helper.ToJson(obj); - } - else - { - content = obj.ToString(); - } - return content; - } - - private void PrepareConnection(IDbConnection dbConnection, ref IDbTransaction dbTransaction) - { - if (dbConnection == null) - throw new ArgumentNullException(nameof(dbConnection)); - - if (dbConnection.State != ConnectionState.Open) - dbConnection.Open(); - - if (dbTransaction == null) - { - IsCapOpenedTrans = true; - dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); - } - } - - private void CheckIsUsingEF(string name) + protected override void PrepareConnectionForEF() { - if (name == null) throw new ArgumentNullException(nameof(name)); - if (!IsUsingEF) - throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + - " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); - } - - private void CheckIsAdoNet(string name) - { - if (name == null) throw new ArgumentNullException(nameof(name)); - if (IsUsingEF) - throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); - } - - private async Task PublishCoreAsync(string name, string content) - { - var connection = _dbContext.Database.GetDbConnection(); - var transaction = _dbContext.Database.CurrentTransaction; - if (transaction == null) - { - IsCapOpenedTrans = true; - transaction = await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); - } - var dbTransaction = transaction.GetDbTransaction(); - await PublishWithTransAsync(name, content, connection, dbTransaction); - } - - private void PublishCore(string name, string content) - { - var connection = _dbContext.Database.GetDbConnection(); + DbConnection = _dbContext.Database.GetDbConnection(); var transaction = _dbContext.Database.CurrentTransaction; if (transaction == null) { IsCapOpenedTrans = true; transaction = _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); } - var dbTransaction = transaction.GetDbTransaction(); - PublishWithTrans(name, content, connection, dbTransaction); + DbTranasaction = transaction.GetDbTransaction(); } - private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + protected override void Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) { - var message = new CapPublishedMessage - { - Name = name, - Content = content, - StatusName = StatusName.Scheduled - }; - await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction); - - _logger.LogInformation("Message has been persisted in the database. name:" + name); - - if (IsCapOpenedTrans) - { - dbTransaction.Commit(); - dbTransaction.Dispose(); - dbConnection.Dispose(); - } + dbConnection.Execute(PrepareSql(), message, dbTransaction); - PublishQueuer.PulseEvent.Set(); + _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); } - private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) { - var message = new CapPublishedMessage - { - Name = name, - Content = content, - StatusName = StatusName.Scheduled - }; - var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction); + await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction); - _logger.LogInformation("Message has been persisted in the database. name:" + name); - - if (IsCapOpenedTrans) - { - dbTransaction.Commit(); - dbTransaction.Dispose(); - dbConnection.Dispose(); - } - PublishQueuer.PulseEvent.Set(); + _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); } + #region private methods + private string PrepareSql() { return $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; } + #endregion private methods } } \ No newline at end of file