From ac0fc62a8d4df8ad7db48ab69886e963f319eafd Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Tue, 15 Aug 2017 18:33:45 +0800 Subject: [PATCH] add feature of #22. --- src/DotNetCore.CAP.SqlServer/CapPublisher.cs | 11 +- .../Abstractions/CapPublisherBase.cs | 102 +++++++++--------- src/DotNetCore.CAP/ICallbackPublisher.cs | 12 +++ src/DotNetCore.CAP/ICapPublisher.cs | 12 ++- .../Internal/IConsumerInvoker.Default.cs | 30 ++++-- .../Internal/IModelBinder.ComplexType.cs | 7 +- src/DotNetCore.CAP/Models/Message.cs | 26 +++++ 7 files changed, 135 insertions(+), 65 deletions(-) create mode 100644 src/DotNetCore.CAP/ICallbackPublisher.cs create mode 100644 src/DotNetCore.CAP/Models/Message.cs diff --git a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs index 500a82a..7a0da8f 100644 --- a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs +++ b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs @@ -11,7 +11,7 @@ using Microsoft.Extensions.Logging; namespace DotNetCore.CAP.SqlServer { - public class CapPublisher : CapPublisherBase + public class CapPublisher : CapPublisherBase, ICallbackPublisher { private readonly ILogger _logger; private readonly SqlServerOptions _options; @@ -61,6 +61,14 @@ namespace DotNetCore.CAP.SqlServer _logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); } + public Task PublishAsync(string name, object contentObj) + { + using (var conn = new SqlConnection(_options.ConnectionString)) + { + return conn.ExecuteAsync(PrepareSql(), contentObj); + } + } + #region private methods private string PrepareSql() @@ -68,7 +76,6 @@ namespace DotNetCore.CAP.SqlServer return $"INSERT INTO {_options.Schema}.[Published] ([Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName)"; } - #endregion private methods } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs index 93f0b6c..34aba5b 100644 --- a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs +++ b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs @@ -7,7 +7,7 @@ using DotNetCore.CAP.Processor; namespace DotNetCore.CAP.Abstractions { - public abstract class CapPublisherBase : ICapPublisher + public abstract class CapPublisherBase : ICapPublisher, IDisposable { protected IDbConnection DbConnection { get; set; } protected IDbTransaction DbTranasaction { get; set; } @@ -16,44 +16,46 @@ namespace DotNetCore.CAP.Abstractions protected bool IsUsingEF { get; set; } protected IServiceProvider ServiceProvider { get; set; } - public void Publish(string name, T contentObj) + public void Publish(string name, T contentObj, string callbackName = null) { CheckIsUsingEF(name); PrepareConnectionForEF(); - var content = Serialize(contentObj); + var content = Serialize(contentObj, callbackName); - PublishWithTrans(name, content, DbConnection, DbTranasaction); + PublishWithTrans(name, content); } - public Task PublishAsync(string name, T contentObj) + public Task PublishAsync(string name, T contentObj, string callbackName = null) { CheckIsUsingEF(name); PrepareConnectionForEF(); - var content = Serialize(contentObj); + var content = Serialize(contentObj, callbackName); - return PublishWithTransAsync(name, content, DbConnection, DbTranasaction); + return PublishWithTransAsync(name, content); } - public void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + public void Publish(string name, T contentObj, IDbConnection dbConnection, + string callbackName = null, IDbTransaction dbTransaction = null) { CheckIsAdoNet(name); - PrepareConnectionForAdo(dbConnection, ref dbTransaction); + PrepareConnectionForAdo(dbConnection, dbTransaction); - var content = Serialize(contentObj); + var content = Serialize(contentObj, callbackName); - PublishWithTrans(name, content, dbConnection, dbTransaction); + PublishWithTrans(name, content); } - public Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null) + public Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, + string callbackName = null, IDbTransaction dbTransaction = null) { CheckIsAdoNet(name); - PrepareConnectionForAdo(dbConnection, ref dbTransaction); + PrepareConnectionForAdo(dbConnection, dbTransaction); - var content = Serialize(contentObj); + var content = Serialize(contentObj, callbackName); - return PublishWithTransAsync(name, content, dbConnection, dbTransaction); + return PublishWithTransAsync(name, content); } protected abstract void PrepareConnectionForEF(); @@ -64,35 +66,29 @@ namespace DotNetCore.CAP.Abstractions #region private methods - private string Serialize(T obj) + private string Serialize(T obj, string callbackName = null) { - string content = string.Empty; - if (Helper.IsComplexType(typeof(T))) + var message = new Message(obj) { - content = Helper.ToJson(obj); - } - else - { - content = obj.ToString(); - } - return content; + CallbackName = callbackName + }; + + return Helper.ToJson(message); } - private void PrepareConnectionForAdo(IDbConnection dbConnection, ref IDbTransaction dbTransaction) + private void PrepareConnectionForAdo(IDbConnection dbConnection, IDbTransaction dbTransaction) { - if (dbConnection == null) - throw new ArgumentNullException(nameof(dbConnection)); - - if (dbConnection.State != ConnectionState.Open) + DbConnection = dbConnection ?? throw new ArgumentNullException(nameof(dbConnection)); + if (DbConnection.State != ConnectionState.Open) { IsCapOpenedConn = true; - dbConnection.Open(); + DbConnection.Open(); } - - if (dbTransaction == null) + DbTranasaction = dbTransaction; + if (DbTranasaction == null) { IsCapOpenedTrans = true; - dbTransaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); + DbTranasaction = dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); } } @@ -111,7 +107,7 @@ namespace DotNetCore.CAP.Abstractions throw new InvalidOperationException("If you are using the EntityFramework, you do not need to use this overloaded."); } - private async Task PublishWithTransAsync(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + private async Task PublishWithTransAsync(string name, string content) { var message = new CapPublishedMessage { @@ -120,23 +116,14 @@ namespace DotNetCore.CAP.Abstractions StatusName = StatusName.Scheduled }; - await ExecuteAsync(dbConnection, dbTransaction, message); - - if (IsCapOpenedTrans) - { - dbTransaction.Commit(); - dbTransaction.Dispose(); - } + await ExecuteAsync(DbConnection, DbTranasaction, message); - if (IsCapOpenedConn) - { - dbConnection.Dispose(); - } + ClosedCap(); PublishQueuer.PulseEvent.Set(); } - private void PublishWithTrans(string name, string content, IDbConnection dbConnection, IDbTransaction dbTransaction) + private void PublishWithTrans(string name, string content) { var message = new CapPublishedMessage { @@ -145,19 +132,30 @@ namespace DotNetCore.CAP.Abstractions StatusName = StatusName.Scheduled }; - Execute(dbConnection, dbTransaction, message); + Execute(DbConnection, DbTranasaction, message); + ClosedCap(); + + PublishQueuer.PulseEvent.Set(); + } + + private void ClosedCap() + { if (IsCapOpenedTrans) { - dbTransaction.Commit(); - dbTransaction.Dispose(); + DbTranasaction.Commit(); + DbTranasaction.Dispose(); } if (IsCapOpenedConn) { - dbConnection.Dispose(); + DbConnection.Dispose(); } + } - PublishQueuer.PulseEvent.Set(); + public void Dispose() + { + DbTranasaction?.Dispose(); + DbConnection?.Dispose(); } #endregion private methods diff --git a/src/DotNetCore.CAP/ICallbackPublisher.cs b/src/DotNetCore.CAP/ICallbackPublisher.cs new file mode 100644 index 0000000..d0a0456 --- /dev/null +++ b/src/DotNetCore.CAP/ICallbackPublisher.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP +{ + public interface ICallbackPublisher + { + Task PublishAsync(string name, object obj); + } +} diff --git a/src/DotNetCore.CAP/ICapPublisher.cs b/src/DotNetCore.CAP/ICapPublisher.cs index a3a7210..fc05bed 100644 --- a/src/DotNetCore.CAP/ICapPublisher.cs +++ b/src/DotNetCore.CAP/ICapPublisher.cs @@ -18,7 +18,8 @@ namespace DotNetCore.CAP /// The type of conetent object. /// the topic name or exchange router key. /// message body content, that will be serialized of json. - Task PublishAsync(string name, T contentObj); + /// callback subscriber name + Task PublishAsync(string name, T contentObj, string callbackName = null); /// /// (EntityFramework) Publish a object message. @@ -30,24 +31,27 @@ namespace DotNetCore.CAP /// The type of conetent object. /// the topic name or exchange router key. /// message body content, that will be serialized of json. - void Publish(string name, T contentObj); + /// callback subscriber name + void Publish(string name, T contentObj, string callbackName = null); /// /// (ado.net) Asynchronous publish a object message. /// /// the topic name or exchange router key. /// message body content, that will be serialized of json. + /// callback subscriber name /// the connection of /// the transaction of - Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null); + Task PublishAsync(string name, T contentObj, IDbConnection dbConnection, string callbackName = null, IDbTransaction dbTransaction = null); /// /// (ado.net) Publish a object message. /// /// the topic name or exchange router key. /// message body content, that will be serialized of json. + /// callback subscriber name /// the connection of /// the transaction of - void Publish(string name, T contentObj, IDbConnection dbConnection, IDbTransaction dbTransaction = null); + void Publish(string name, T contentObj, IDbConnection dbConnection, string callbackName = null, IDbTransaction dbTransaction = null); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs index 3009f73..d164a0f 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs @@ -1,6 +1,8 @@ using System; using System.Threading.Tasks; using DotNetCore.CAP.Abstractions; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -37,31 +39,47 @@ namespace DotNetCore.CAP.Internal var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider, _consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType()); - var value = _consumerContext.DeliverMessage.Content; + var jsonConent = _consumerContext.DeliverMessage.Content; + + var message = Helper.FromJson(jsonConent); + + object returnObj = null; if (_executor.MethodParameters.Length > 0) { var firstParameter = _executor.MethodParameters[0]; try { var binder = _modelBinderFactory.CreateBinder(firstParameter); - var result = await binder.BindModelAsync(value); + var result = await binder.BindModelAsync(message.Content.ToString()); if (result.IsSuccess) { - _executor.Execute(obj, result.Model); + returnObj = _executor.Execute(obj, result.Model); } else { - _logger.LogWarning($"Parameters:{firstParameter.Name} bind failed! the content is:" + value); + _logger.LogWarning($"Parameters:{firstParameter.Name} bind failed! the content is:" + jsonConent); } } catch (FormatException ex) { - _logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, value, ex); + _logger.ModelBinderFormattingException(_executor.MethodInfo?.Name, firstParameter.Name, jsonConent, ex); } } else { - _executor.Execute(obj); + returnObj = _executor.Execute(obj); + } + + //TODO :refactor + if (returnObj != null && !string.IsNullOrEmpty(message.CallbackName)) + { + var publisher = _serviceProvider.GetRequiredService(); + var callbackMessage = new Message(returnObj) + { + Id = message.Id, + Timestamp = DateTime.Now + }; + await publisher.PublishAsync(message.CallbackName, callbackMessage); } } } diff --git a/src/DotNetCore.CAP/Internal/IModelBinder.ComplexType.cs b/src/DotNetCore.CAP/Internal/IModelBinder.ComplexType.cs index c25f7c6..f4362f2 100644 --- a/src/DotNetCore.CAP/Internal/IModelBinder.ComplexType.cs +++ b/src/DotNetCore.CAP/Internal/IModelBinder.ComplexType.cs @@ -3,6 +3,7 @@ using System.Reflection; using System.Threading.Tasks; using DotNetCore.CAP.Abstractions.ModelBinding; using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; namespace DotNetCore.CAP.Internal { @@ -20,7 +21,11 @@ namespace DotNetCore.CAP.Internal try { var type = _parameterInfo.ParameterType; - var value = Helper.FromJson(content, type); + + var message = Helper.FromJson(content); + + var value = Helper.FromJson(message.Content.ToString(), type); + return Task.FromResult(ModelBindingResult.Success(value)); } catch (Exception) diff --git a/src/DotNetCore.CAP/Models/Message.cs b/src/DotNetCore.CAP/Models/Message.cs new file mode 100644 index 0000000..d739886 --- /dev/null +++ b/src/DotNetCore.CAP/Models/Message.cs @@ -0,0 +1,26 @@ +using System; + +namespace DotNetCore.CAP.Models +{ + public class Message + { + public string Id { get; set; } + + public DateTime Timestamp { get; set; } + + public object Content { get; set; } + + public string CallbackName { get; set; } + + public Message() + { + Id = ObjectId.GenerateNewStringId(); + Timestamp = DateTime.Now; + } + + public Message(object content) : this() + { + Content = content; + } + } +}