diff --git a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs index 6889c27..5f84093 100644 --- a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs +++ b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs @@ -7,11 +7,13 @@ using DotNetCore.CAP.Models; using DotNetCore.CAP.Processor; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.Extensions.Logging; namespace DotNetCore.CAP.SqlServer { public class CapPublisher : ICapPublisher { + private readonly ILogger _logger; private readonly SqlServerOptions _options; private readonly DbContext _dbContext; @@ -21,9 +23,12 @@ namespace DotNetCore.CAP.SqlServer protected IServiceProvider ServiceProvider { get; } - public CapPublisher(IServiceProvider provider, SqlServerOptions options) + public CapPublisher(IServiceProvider provider, + ILogger logger, + SqlServerOptions options) { ServiceProvider = provider; + _logger = logger; _options = options; if (_options.DbContextType != null) @@ -33,57 +38,152 @@ namespace DotNetCore.CAP.SqlServer } } + public void Publish(string name, string content) + { + CheckIsUsingEF(name); + + PublishCore(name, content); + } + public Task PublishAsync(string name, string content) { - if (name == null) throw new ArgumentNullException(nameof(name)); - if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + - " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); + CheckIsUsingEF(name); - return Publish(name, content); + return PublishCoreAsync(name, content); + } + + public void Publish(string name, T contentObj) + { + CheckIsUsingEF(name); + + var content = Helper.ToJson(contentObj); + + PublishCore(name, content); } public Task PublishAsync(string name, T contentObj) { - if (name == null) throw new ArgumentNullException(nameof(name)); - if (!IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + - " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); + CheckIsUsingEF(name); var content = Helper.ToJson(contentObj); - return Publish(name, content); + + return PublishCoreAsync(name, content); } - public Task PublishAsync(string name, string content, IDbConnection dbConnection) + public void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null) { - if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); - if (name == null) throw new ArgumentNullException(nameof(name)); - if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection)); + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); - var dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); IsCapOpenedTrans = true; - return PublishWithTrans(name, content, dbConnection, dbTransaction); + + PublishWithTrans(name, content, dbConnection, dbTransaction); } - public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + public Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + IsCapOpenedTrans = true; + + return PublishWithTransAsync(name, content, dbConnection, dbTransaction); + } + + public void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + var content = Helper.ToJson(contentObj); + + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + + PublishWithTrans(name, content, dbConnection, dbTransaction); + } + + public Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + { + CheckIsAdoNet(name); + + if (dbConnection == null) + throw new ArgumentNullException(nameof(dbConnection)); + + var content = Helper.ToJson(contentObj); + + dbTransaction = dbTransaction ?? dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + + return PublishWithTransAsync(name, content, dbConnection, dbTransaction); + } + + #region private methods + + private void CheckIsUsingEF(string name) { - if (IsUsingEF) throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); if (name == null) throw new ArgumentNullException(nameof(name)); - if (dbConnection == null) throw new ArgumentNullException(nameof(dbConnection)); - if (dbTransaction == null) throw new ArgumentNullException(nameof(dbTransaction)); + if (!IsUsingEF) + throw new InvalidOperationException("If you are using the EntityFramework, you need to configure the DbContextType first." + + " otherwise you need to use overloaded method with IDbConnection and IDbTransaction."); + } - return PublishWithTrans(name, content, dbConnection, dbTransaction); + private void CheckIsAdoNet(string name) + { + if (name == null) throw new ArgumentNullException(nameof(name)); + if (IsUsingEF) + throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); } - private async Task Publish(string name, string content) + private async Task PublishCoreAsync(string name, string content) { var connection = _dbContext.Database.GetDbConnection(); var transaction = _dbContext.Database.CurrentTransaction; IsCapOpenedTrans = transaction == null; transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); var dbTransaction = transaction.GetDbTransaction(); - await PublishWithTrans(name, content, connection, dbTransaction); + await PublishWithTransAsync(name, content, connection, dbTransaction); + } + + private void PublishCore(string name, string content) + { + var connection = _dbContext.Database.GetDbConnection(); + var transaction = _dbContext.Database.CurrentTransaction; + IsCapOpenedTrans = transaction == null; + transaction = transaction ?? _dbContext.Database.BeginTransaction(IsolationLevel.ReadCommitted); + var dbTransaction = transaction.GetDbTransaction(); + PublishWithTrans(name, content, connection, dbTransaction); + } + + private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + { + var message = new CapPublishedMessage + { + Name = name, + Content = content, + StatusName = StatusName.Scheduled + }; + await dbConnection.ExecuteAsync(PrepareSql(), message, transaction: dbTransaction); + + _logger.LogInformation("Message has been persisted in the database. name:" + name); + + if (IsCapOpenedTrans) + { + dbTransaction.Commit(); + dbTransaction.Dispose(); + dbConnection.Dispose(); + } + + PublishQueuer.PulseEvent.Set(); } - protected virtual async Task PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) { var message = new CapPublishedMessage { @@ -91,15 +191,24 @@ namespace DotNetCore.CAP.SqlServer Content = content, StatusName = StatusName.Scheduled }; + var count = dbConnection.Execute(PrepareSql(), message, transaction: dbTransaction); + + _logger.LogInformation("Message has been persisted in the database. name:" + name); - var sql = $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; - await dbConnection.ExecuteAsync(sql, message, transaction: dbTransaction); if (IsCapOpenedTrans) { dbTransaction.Commit(); + dbTransaction.Dispose(); dbConnection.Dispose(); } PublishQueuer.PulseEvent.Set(); } + + private string PrepareSql() + { + return $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; + } + + #endregion private methods } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/ICapPublisher.cs b/src/DotNetCore.CAP/ICapPublisher.cs index 95f28b0..a45061c 100644 --- a/src/DotNetCore.CAP/ICapPublisher.cs +++ b/src/DotNetCore.CAP/ICapPublisher.cs @@ -9,7 +9,7 @@ namespace DotNetCore.CAP public interface ICapPublisher { /// - /// Publish a string message to specified topic. + /// (EntityFramework) Asynchronous publish a message. /// /// If you are using the EntityFramework, you need to configure the DbContextType first. /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. @@ -20,7 +20,18 @@ namespace DotNetCore.CAP Task PublishAsync(string name, string content); /// - /// Publis a object message to specified topic. + /// (EntityFramework) Publish a message. + /// + /// If you are using the EntityFramework, you need to configure the DbContextType first. + /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. + /// + /// + /// the topic name or exchange router key. + /// message body content. + void Publish(string name, string content); + + /// + /// (EntityFramework) Asynchronous publish a object message. /// /// If you are using the EntityFramework, you need to configure the DbContextType first. /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. @@ -28,24 +39,55 @@ namespace DotNetCore.CAP /// /// The type of conetent object. /// the topic name or exchange router key. - /// object instance that will be serialized of json. + /// message body content, that will be serialized of json. Task PublishAsync(string name, T contentObj); /// - /// Publish a string message to specified topic with transacton. + /// (EntityFramework) Publish a object message. + /// + /// If you are using the EntityFramework, you need to configure the DbContextType first. + /// otherwise you need to use overloaded method with IDbConnection and IDbTransaction. + /// /// + /// The type of conetent object. /// the topic name or exchange router key. - /// message body content. - /// the dbConnection of - Task PublishAsync(string name, string content, IDbConnection dbConnection); + /// message body content, that will be serialized of json. + void Publish(string name, T contentObj); /// - /// Publish a string message to specified topic with transacton. + /// (ado.net) Asynchronous publish a message. + /// + /// the topic name or exchange router key. + /// message body content + /// the connection of + /// the transaction of + Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null); + + /// + /// (ado.net) Publish a message. /// /// the topic name or exchange router key. /// message body content. /// the connection of /// the transaction of - Task PublishAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction); + void Publish(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction = null); + + /// + /// (ado.net) Asynchronous publish a object message. + /// + /// the topic name or exchange router key. + /// message body content, that will be serialized of json. + /// the connection of + /// the transaction of + Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null); + + /// + /// (ado.net) Publish a object message. + /// + /// the topic name or exchange router key. + /// message body content, that will be serialized of json. + /// the connection of + /// the transaction of + void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null); } } \ No newline at end of file