diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs index d585406..b029f69 100644 --- a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs +++ b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs @@ -2,262 +2,131 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using System.Data; using System.Diagnostics; +using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Models; -using Microsoft.Extensions.Logging; +using Microsoft.Extensions.DependencyInjection; namespace DotNetCore.CAP.Abstractions { - public abstract class CapPublisherBase : ICapPublisher, IDisposable + public abstract class CapPublisherBase : ICapPublisher { - private readonly IDispatcher _dispatcher; - protected readonly ILogger _logger; + private readonly CapTransactionBase _capTransaction; + private readonly IMessagePacker _msgPacker; + private readonly IContentSerializer _serializer; + + protected bool NotUseTransaction; - // diagnostics listener // ReSharper disable once InconsistentNaming protected static readonly DiagnosticListener s_diagnosticListener = new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); - protected CapPublisherBase(ILogger logger, IDispatcher dispatcher) - { - _logger = logger; - _dispatcher = dispatcher; - } - - protected IDbConnection DbConnection { get; set; } - protected IDbTransaction DbTransaction { get; set; } - protected bool IsCapOpenedTrans { get; set; } - protected bool IsCapOpenedConn { get; set; } - protected bool IsUsingEF { get; set; } - protected IServiceProvider ServiceProvider { get; set; } - - public void Publish(string name, T contentObj, string callbackName = null) + protected CapPublisherBase(IServiceProvider service) { - CheckIsUsingEF(name); - PrepareConnectionForEF(); - - PublishWithTrans(name, contentObj, callbackName); - } - - public Task PublishAsync(string name, T contentObj, string callbackName = null) - { - CheckIsUsingEF(name); - PrepareConnectionForEF(); - - return PublishWithTransAsync(name, contentObj, callbackName); - } - - public void Publish(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null) - { - CheckIsAdoNet(name); - PrepareConnectionForAdo(dbTransaction); - - PublishWithTrans(name, contentObj, callbackName); + ServiceProvider = service; + _capTransaction = service.GetRequiredService(); + _msgPacker = service.GetRequiredService(); + _serializer = service.GetRequiredService(); } - public Task PublishAsync(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null) - { - CheckIsAdoNet(name); - PrepareConnectionForAdo(dbTransaction); - - return PublishWithTransAsync(name, contentObj, callbackName); - } - - public virtual void PublishWithMongo(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null) - { - throw new NotImplementedException("Work for MongoDB only."); - } - - public virtual Task PublishWithMongoAsync(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null) - { - throw new NotImplementedException("Work for MongoDB only."); - } - - protected void Enqueue(CapPublishedMessage message) - { - _dispatcher.EnqueueToPublish(message); - } - - protected abstract void PrepareConnectionForEF(); - - protected abstract int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction, - CapPublishedMessage message); + protected IServiceProvider ServiceProvider { get; } - protected abstract Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, - CapPublishedMessage message); + public ICapTransaction CapTransaction => _capTransaction; - protected virtual string Serialize(T obj, string callbackName = null) + public void Publish(string name, T contentObj, string callbackName = null) { - var packer = (IMessagePacker)ServiceProvider.GetService(typeof(IMessagePacker)); - string content; - if (obj != null) - { - if (Helper.IsComplexType(obj.GetType())) - { - var serializer = (IContentSerializer)ServiceProvider.GetService(typeof(IContentSerializer)); - content = serializer.Serialize(obj); - } - else - { - content = obj.ToString(); - } - } - else - { - content = string.Empty; - } - - var message = new CapMessageDto(content) + var message = new CapPublishedMessage { - CallbackName = callbackName + Name = name, + Content = Serialize(contentObj, callbackName), + StatusName = StatusName.Scheduled }; - return packer.Pack(message); - } - - #region private methods - private void PrepareConnectionForAdo(IDbTransaction dbTransaction) - { - DbTransaction = dbTransaction ?? throw new ArgumentNullException(nameof(dbTransaction)); - DbConnection = DbTransaction.Connection; - if (DbConnection.State != ConnectionState.Open) - { - IsCapOpenedConn = true; - DbConnection.Open(); - } + PublishAsyncInternal(message).GetAwaiter().GetResult(); } - private void CheckIsUsingEF(string name) + public async Task PublishAsync(string name, T contentObj, string callbackName = null, + CancellationToken cancellationToken = default(CancellationToken)) { - if (name == null) + var message = new CapPublishedMessage { - throw new ArgumentNullException(nameof(name)); - } + Name = name, + Content = Serialize(contentObj, callbackName), + StatusName = StatusName.Scheduled + }; - if (!IsUsingEF) - { - throw new InvalidOperationException( - "If you are using the EntityFramework, you need to configure the DbContextType first." + - " otherwise you need to use overloaded method with IDbTransaction."); - } + await PublishAsyncInternal(message); } - private void CheckIsAdoNet(string name) + protected async Task PublishAsyncInternal(CapPublishedMessage message) { - if (name == null) - { - throw new ArgumentNullException(nameof(name)); - } - - if (IsUsingEF) + if (CapTransaction.DbTransaction == null) { - throw new InvalidOperationException( - "If you are using the EntityFramework, you do not need to use this overloaded."); + NotUseTransaction = true; + CapTransaction.DbTransaction = GetDbTransaction(); } - } - private async Task PublishWithTransAsync(string name, T contentObj, string callbackName = null) - { Guid operationId = default(Guid); - var content = Serialize(contentObj, callbackName); - - var message = new CapPublishedMessage - { - Name = name, - Content = content, - StatusName = StatusName.Scheduled - }; try { operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); - var id = await ExecuteAsync(DbConnection, DbTransaction, message); - - ClosedCap(); + message.Id = await ExecuteAsync(message, CapTransaction); - if (id > 0) + if (message.Id > 0) { - _logger.LogInformation($"message [{message}] has been persisted in the database."); + _capTransaction.AddToSent(message); s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); + } - message.Id = id; - - Enqueue(message); + if (NotUseTransaction || CapTransaction.AutoCommit) + { + _capTransaction.Commit(); } } catch (Exception e) { - _logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name); s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); - Console.WriteLine(e); + throw; } - } - - private void PublishWithTrans(string name, T contentObj, string callbackName = null) - { - Guid operationId = default(Guid); - - var content = Serialize(contentObj, callbackName); - - var message = new CapPublishedMessage + finally { - Name = name, - Content = content, - StatusName = StatusName.Scheduled - }; - - try - { - Console.WriteLine("================22222222222222====================="); - operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); - - var id = Execute(DbConnection, DbTransaction, message); - Console.WriteLine("================777777777777777777777====================="); - ClosedCap(); - - if (id > 0) + if (NotUseTransaction || CapTransaction.AutoCommit) { - _logger.LogInformation($"message [{message}] has been persisted in the database."); - s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); - message.Id = id; - Enqueue(message); + _capTransaction.Dispose(); } } - catch (Exception e) - { - _logger.LogError(e, "An exception was occurred when publish message. message:" + name); - s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); - Console.WriteLine(e); - throw; - } } - private void ClosedCap() + protected abstract object GetDbTransaction(); + + protected abstract Task ExecuteAsync(CapPublishedMessage message, + ICapTransaction transaction, + CancellationToken cancel = default(CancellationToken)); + + protected virtual string Serialize(T obj, string callbackName = null) { - if (IsCapOpenedTrans) + string content; + if (obj != null) { - DbTransaction.Commit(); - DbTransaction.Dispose(); + content = Helper.IsComplexType(obj.GetType()) + ? _serializer.Serialize(obj) + : obj.ToString(); } - - if (IsCapOpenedConn) + else { - DbConnection.Dispose(); + content = string.Empty; } + var message = new CapMessageDto(content) + { + CallbackName = callbackName + }; + return _msgPacker.Pack(message); } - - public void Dispose() - { - DbTransaction?.Dispose(); - DbConnection?.Dispose(); - } - - #endregion private methods } } \ No newline at end of file