From b1bd53bf1c925fa1306d5408fef325ed19f99d8d Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 24 Oct 2019 18:29:07 +0800 Subject: [PATCH] Code refactor --- .../Abstractions/CapPublisher.cs | 2 +- src/DotNetCore.CAP/CAP.Builder.cs | 20 --- .../CAP.ServiceCollectionExtensions.cs | 13 +- src/DotNetCore.CAP/CAP.SubscribeAttribute.cs | 7 + src/DotNetCore.CAP/Cap.Header.cs | 13 ++ .../ConsumerExecutorDescriptor.cs | 13 ++ src/DotNetCore.CAP/DotNetCore.CAP.csproj | 14 +- .../IConsumerRegister.Default.cs | 25 +++- .../IConsumerServiceSelector.Default.cs | 16 ++- src/DotNetCore.CAP/IDispatcher.cs | 3 +- .../ISubscribeExecutor.Default.cs | 44 ++++--- src/DotNetCore.CAP/ISubscriberExecutor.cs | 2 + src/DotNetCore.CAP/Infrastructure/Helper.cs | 121 ------------------ .../Internal/ConsumerInvokerFactory.cs | 1 + .../Internal/IConsumerInvoker.Default.cs | 61 +++------ src/DotNetCore.CAP/Messages/Headers.cs | 3 - .../Messages/TransportMessage.cs | 5 + .../Processor/IDispatcher.Default.cs | 10 +- .../Serialization/ISerializer.JsonUtf8.cs | 25 ++++ .../Serialization/ISerializer.cs | 6 +- .../Serialization/StringSerializer.cs | 17 +-- .../SubscriberNotFoundException.cs | 9 -- 22 files changed, 159 insertions(+), 271 deletions(-) create mode 100644 src/DotNetCore.CAP/Cap.Header.cs create mode 100644 src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisher.cs b/src/DotNetCore.CAP/Abstractions/CapPublisher.cs index 1e018f6..25c9927 100644 --- a/src/DotNetCore.CAP/Abstractions/CapPublisher.cs +++ b/src/DotNetCore.CAP/Abstractions/CapPublisher.cs @@ -57,7 +57,7 @@ namespace DotNetCore.CAP.Abstractions optionHeaders.Add(Headers.CorrelationSequence, 0.ToString()); } optionHeaders.Add(Headers.MessageName, name); - optionHeaders.Add(Headers.Type, typeof(T).ToString()); + optionHeaders.Add(Headers.Type, typeof(T).AssemblyQualifiedName); optionHeaders.Add(Headers.SentTime, DateTimeOffset.Now.ToString()); var message = new Message(optionHeaders, value); diff --git a/src/DotNetCore.CAP/CAP.Builder.cs b/src/DotNetCore.CAP/CAP.Builder.cs index 8b9889f..c737b8a 100644 --- a/src/DotNetCore.CAP/CAP.Builder.cs +++ b/src/DotNetCore.CAP/CAP.Builder.cs @@ -53,26 +53,6 @@ namespace DotNetCore.CAP return AddScoped(typeof(ICapPublisher), typeof(T)); } - /// - /// Add a custom content serializer - /// - /// The type of the service. - public CapBuilder AddContentSerializer() - where T : class, IContentSerializer - { - return AddSingleton(typeof(IContentSerializer), typeof(T)); - } - - /// - /// Add a custom message wapper - /// - /// The type of the service. - public CapBuilder AddMessagePacker() - where T : class, IMessagePacker - { - return AddSingleton(typeof(IMessagePacker), typeof(T)); - } - /// /// Adds a scoped service of the type specified in serviceType with an implementation /// diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs index 29e5e18..2275439 100644 --- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs +++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs @@ -38,19 +38,13 @@ namespace Microsoft.Extensions.DependencyInjection services.TryAddSingleton(); - //Serializer and model binder - services.TryAddSingleton(); - services.TryAddSingleton(); services.TryAddSingleton(); - services.TryAddSingleton(); - - //services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); - //Processors services.TryAddSingleton(); + //Processors services.TryAddEnumerable(ServiceDescriptor.Singleton()); services.TryAddEnumerable(ServiceDescriptor.Singleton()); @@ -63,7 +57,7 @@ namespace Microsoft.Extensions.DependencyInjection services.TryAddSingleton(); services.TryAddSingleton(); - services.TryAddSingleton(); + services.TryAddSingleton(); // Warning: IPublishMessageSender need to inject at extension project. services.TryAddSingleton(); @@ -77,8 +71,7 @@ namespace Microsoft.Extensions.DependencyInjection } services.Configure(setupAction); - //Startup and Hosted - //services.AddTransient(); + //Startup and Hosted services.AddHostedService(); return new CapBuilder(services); diff --git a/src/DotNetCore.CAP/CAP.SubscribeAttribute.cs b/src/DotNetCore.CAP/CAP.SubscribeAttribute.cs index bd3e1e5..fda43e2 100644 --- a/src/DotNetCore.CAP/CAP.SubscribeAttribute.cs +++ b/src/DotNetCore.CAP/CAP.SubscribeAttribute.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System; using DotNetCore.CAP.Abstractions; // ReSharper disable once CheckNamespace @@ -22,4 +23,10 @@ namespace DotNetCore.CAP return Name; } } + + [AttributeUsage(AttributeTargets.Parameter)] + public class FromCapAttribute : Attribute + { + + } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Cap.Header.cs b/src/DotNetCore.CAP/Cap.Header.cs new file mode 100644 index 0000000..657f5cc --- /dev/null +++ b/src/DotNetCore.CAP/Cap.Header.cs @@ -0,0 +1,13 @@ +using System.Collections.Generic; +using System.Collections.ObjectModel; + +namespace DotNetCore.CAP +{ + public class CapHeader : ReadOnlyDictionary + { + public CapHeader(IDictionary dictionary) : base(dictionary) + { + + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/ConsumerExecutorDescriptor.cs b/src/DotNetCore.CAP/ConsumerExecutorDescriptor.cs index 8b33a55..6a7795a 100644 --- a/src/DotNetCore.CAP/ConsumerExecutorDescriptor.cs +++ b/src/DotNetCore.CAP/ConsumerExecutorDescriptor.cs @@ -1,6 +1,8 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System; +using System.Collections.Generic; using System.Reflection; using DotNetCore.CAP.Abstractions; @@ -18,5 +20,16 @@ namespace DotNetCore.CAP public TypeInfo ImplTypeInfo { get; set; } public TopicAttribute Attribute { get; set; } + + public IList Parameters { get; set; } + } + + public class ParameterDescriptor + { + public string Name { get; set; } + + public Type ParameterType { get; set; } + + public bool IsFromCap { get; set; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/DotNetCore.CAP.csproj b/src/DotNetCore.CAP/DotNetCore.CAP.csproj index 61a59bd..b3667f3 100644 --- a/src/DotNetCore.CAP/DotNetCore.CAP.csproj +++ b/src/DotNetCore.CAP/DotNetCore.CAP.csproj @@ -10,13 +10,11 @@ - - - - - - - - + + + + + + \ No newline at end of file diff --git a/src/DotNetCore.CAP/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/IConsumerRegister.Default.cs index da3e363..ecfb31c 100644 --- a/src/DotNetCore.CAP/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/IConsumerRegister.Default.cs @@ -148,20 +148,31 @@ namespace DotNetCore.CAP private void RegisterMessageProcessor(IConsumerClient client) { - client.OnMessageReceived += async (sender, messageContext) => + client.OnMessageReceived += async (sender, transportMessage) => { _cts.Token.ThrowIfCancellationRequested(); Guid? operationId = null; try { - operationId = TracingBefore(messageContext); + operationId = TracingBefore(transportMessage); var startTime = DateTimeOffset.UtcNow; var stopwatch = Stopwatch.StartNew(); - var message = await _serializer.DeserializeAsync(messageContext); + var name = transportMessage.GetName(); + var group = transportMessage.GetGroup(); - var mediumMessage = await _storage.StoreMessageAsync(message.GetName(), message.GetGroup(), message); + if (!_selector.TryGetTopicExecutor(name, group, out var executor)) + { + var error = $"Message can not be found subscriber. Name:{name}, Group:{group}. {Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63"; + throw new SubscriberNotFoundException(error); + } + + var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType; + + var message = await _serializer.DeserializeAsync(transportMessage, type); + + var mediumMessage = await _storage.StoreMessageAsync(name, group, message); client.Commit(); @@ -170,17 +181,17 @@ namespace DotNetCore.CAP TracingAfter(operationId.Value, message, startTime, stopwatch.Elapsed); } - _dispatcher.EnqueueToExecute(mediumMessage); + _dispatcher.EnqueueToExecute(mediumMessage, executor); } catch (Exception e) { - _logger.LogError(e, "An exception occurred when store received message. Message:'{0}'.", messageContext); + _logger.LogError(e, "An exception occurred when store received message. Message:'{0}'.", transportMessage); client.Reject(); if (operationId != null) { - TracingError(operationId.Value, messageContext, e); + TracingError(operationId.Value, transportMessage, e); } } }; diff --git a/src/DotNetCore.CAP/IConsumerServiceSelector.Default.cs b/src/DotNetCore.CAP/IConsumerServiceSelector.Default.cs index f9a030d..4039bf6 100644 --- a/src/DotNetCore.CAP/IConsumerServiceSelector.Default.cs +++ b/src/DotNetCore.CAP/IConsumerServiceSelector.Default.cs @@ -134,7 +134,15 @@ namespace DotNetCore.CAP attr.Group = attr.Group + "." + _capOptions.Version; } - yield return InitDescriptor(attr, method, typeInfo, serviceTypeInfo); + var parameters = method.GetParameters() + .Select(parameter => new ParameterDescriptor + { + Name = parameter.Name, + ParameterType = parameter.ParameterType, + IsFromCap = parameter.GetCustomAttributes(typeof(FromCapAttribute)).Any() + }).ToList(); + + yield return InitDescriptor(attr, method, typeInfo, serviceTypeInfo, parameters); } } } @@ -143,14 +151,16 @@ namespace DotNetCore.CAP TopicAttribute attr, MethodInfo methodInfo, TypeInfo implType, - TypeInfo serviceTypeInfo) + TypeInfo serviceTypeInfo, + IList parameters) { var descriptor = new ConsumerExecutorDescriptor { Attribute = attr, MethodInfo = methodInfo, ImplTypeInfo = implType, - ServiceTypeInfo = serviceTypeInfo + ServiceTypeInfo = serviceTypeInfo, + Parameters = parameters }; return descriptor; diff --git a/src/DotNetCore.CAP/IDispatcher.cs b/src/DotNetCore.CAP/IDispatcher.cs index c56a582..bc47c02 100644 --- a/src/DotNetCore.CAP/IDispatcher.cs +++ b/src/DotNetCore.CAP/IDispatcher.cs @@ -1,7 +1,6 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -using DotNetCore.CAP.Messages; using DotNetCore.CAP.Persistence; namespace DotNetCore.CAP @@ -10,6 +9,6 @@ namespace DotNetCore.CAP { void EnqueueToPublish(MediumMessage message); - void EnqueueToExecute(MediumMessage message); + void EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor descriptor); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs index 4e6c0cc..8b2625e 100644 --- a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs +++ b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs @@ -24,7 +24,6 @@ namespace DotNetCore.CAP private readonly ILogger _logger; private readonly IServiceProvider _provider; private readonly CapOptions _options; - private readonly MethodMatcherCache _selector; // diagnostics listener // ReSharper disable once InconsistentNaming @@ -34,28 +33,40 @@ namespace DotNetCore.CAP public DefaultSubscriberExecutor( ILogger logger, IOptions options, - IServiceProvider provider, - MethodMatcherCache selector) + IServiceProvider provider) { - _selector = selector; - _provider = provider; _logger = logger; _options = options.Value; - + _dataStorage = _provider.GetService(); Invoker = _provider.GetService().CreateInvoker(); } private IConsumerInvoker Invoker { get; } - public async Task ExecuteAsync(MediumMessage message, CancellationToken cancellationToken) + public Task ExecuteAsync(MediumMessage message, CancellationToken cancellationToken) + { + var selector = _provider.GetService(); + if (!selector.TryGetTopicExecutor(message.Origin.GetName(), message.Origin.GetGroup(), out var executor)) + { + var error = $"Message (Name:{message.Origin.GetName()},Group:{message.Origin.GetGroup()}) can not be found subscriber." + + $"{Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63"; + _logger.LogError(error); + + return Task.FromResult(OperateResult.Failed(new SubscriberNotFoundException(error))); + } + + return ExecuteAsync(message, executor, cancellationToken); + } + + public async Task ExecuteAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken) { bool retry; OperateResult result; do { - var executedResult = await ExecuteWithoutRetryAsync(message, cancellationToken); + var executedResult = await ExecuteWithoutRetryAsync(message, descriptor, cancellationToken); result = executedResult.Item2; if (result == OperateResult.Success) { @@ -67,7 +78,7 @@ namespace DotNetCore.CAP return result; } - private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(MediumMessage message, CancellationToken cancellationToken) + private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken) { if (message == null) { @@ -80,7 +91,7 @@ namespace DotNetCore.CAP { var sp = Stopwatch.StartNew(); - await InvokeConsumerMethodAsync(message, cancellationToken); + await InvokeConsumerMethodAsync(message, descriptor, cancellationToken); sp.Stop(); @@ -157,22 +168,13 @@ namespace DotNetCore.CAP // message.Content = Helper.AddExceptionProperty(message.Content, exception); //} - private async Task InvokeConsumerMethodAsync(MediumMessage message, CancellationToken cancellationToken) + private async Task InvokeConsumerMethodAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken) { - if (!_selector.TryGetTopicExecutor( - message.Origin.GetName(), - message.Origin.GetGroup(), - out var executor)) - { - var error = $"Message can not be found subscriber. {message} \r\n see: https://github.com/dotnetcore/CAP/issues/63"; - throw new SubscriberNotFoundException(error); - } - var startTime = DateTimeOffset.UtcNow; var stopwatch = Stopwatch.StartNew(); var operationId = Guid.Empty; - var consumerContext = new ConsumerContext(executor, message.Origin); + var consumerContext = new ConsumerContext(descriptor, message.Origin); try { diff --git a/src/DotNetCore.CAP/ISubscriberExecutor.cs b/src/DotNetCore.CAP/ISubscriberExecutor.cs index 559bee0..b60d5f0 100644 --- a/src/DotNetCore.CAP/ISubscriberExecutor.cs +++ b/src/DotNetCore.CAP/ISubscriberExecutor.cs @@ -13,5 +13,7 @@ namespace DotNetCore.CAP public interface ISubscriberExecutor { Task ExecuteAsync(MediumMessage message, CancellationToken cancellationToken = default); + + Task ExecuteAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Infrastructure/Helper.cs b/src/DotNetCore.CAP/Infrastructure/Helper.cs index 1fe4e39..4acc33f 100644 --- a/src/DotNetCore.CAP/Infrastructure/Helper.cs +++ b/src/DotNetCore.CAP/Infrastructure/Helper.cs @@ -2,63 +2,13 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using System.Collections.Generic; using System.ComponentModel; using System.Reflection; -using DotNetCore.CAP.Diagnostics; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; namespace DotNetCore.CAP.Infrastructure { public static class Helper { - private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Local); - private static JsonSerializerSettings _serializerSettings; - - public static void SetSerializerSettings(JsonSerializerSettings setting) - { - _serializerSettings = setting; - } - - public static string ToJson(object value) - { - return value != null - ? JsonConvert.SerializeObject(value, _serializerSettings) - : null; - } - - public static T FromJson(string value) - { - return value != null - ? JsonConvert.DeserializeObject(value, _serializerSettings) - : default(T); - } - - public static object FromJson(string value, Type type) - { - if (type == null) - { - throw new ArgumentNullException(nameof(type)); - } - - return value != null - ? JsonConvert.DeserializeObject(value, type, _serializerSettings) - : null; - } - - public static long ToTimestamp(DateTime value) - { - var elapsedTime = value - Epoch; - return (long)elapsedTime.TotalSeconds; - } - - - public static DateTime FromTimestamp(long value) - { - return Epoch.AddSeconds(value); - } - public static bool IsController(TypeInfo typeInfo) { if (!typeInfo.IsClass) @@ -85,40 +35,6 @@ namespace DotNetCore.CAP.Infrastructure return !CanConvertFromString(type); } - public static string AddExceptionProperty(string json, Exception exception) - { - var jObject = ToJObject(exception); - return AddJsonProperty(json, "ExceptionMessage", jObject); - } - - public static string AddTracingHeaderProperty(string json, TracingHeaders headers) - { - var jObject = ToJObject(headers); - return AddJsonProperty(json, nameof(TracingHeaders), jObject); - } - - public static bool TryExtractTracingHeaders(string json, out TracingHeaders headers, out string removedHeadersJson) - { - var jObj = JObject.Parse(json); - var jToken = jObj[nameof(TracingHeaders)]; - if (jToken != null) - { - headers = new TracingHeaders(); - foreach (var item in jToken.ToObject>()) - { - headers.Add(item.Key, item.Value); - } - - jObj.Remove(nameof(TracingHeaders)); - removedHeadersJson = jObj.ToString(); - return true; - } - - headers = null; - removedHeadersJson = null; - return false; - } - public static bool IsInnerIP(string ipAddress) { bool isInnerIp; @@ -138,7 +54,6 @@ namespace DotNetCore.CAP.Infrastructure isInnerIp = IsInner(ipNum, aBegin, aEnd) || IsInner(ipNum, bBegin, bEnd) || IsInner(ipNum, cBegin, cEnd); return isInnerIp; } - private static long GetIpNum(string ipAddress) { var ip = ipAddress.Split('.'); @@ -174,41 +89,5 @@ namespace DotNetCore.CAP.Infrastructure type == typeof(TimeSpan) || type == typeof(Uri); } - - private static JObject ToJObject(Exception exception) - { - return JObject.FromObject(new - { - exception.Source, - exception.Message, - InnerMessage = exception.InnerException?.Message - }); - } - - private static JObject ToJObject(TracingHeaders headers) - { - var jobj = new JObject(); - foreach (var keyValuePair in headers) - { - jobj[keyValuePair.Key] = keyValuePair.Value; - } - return jobj; - } - - private static string AddJsonProperty(string json, string propertyName, JObject propertyValue) - { - var jObj = JObject.Parse(json); - - if (jObj.TryGetValue(propertyName, out var _)) - { - jObj[propertyName] = propertyValue; - } - else - { - jObj.Add(new JProperty(propertyName, propertyValue)); - } - - return jObj.ToString(Formatting.None); - } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs b/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs index 1feeb6a..cb7247a 100644 --- a/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs +++ b/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs @@ -11,6 +11,7 @@ namespace DotNetCore.CAP.Internal { private readonly ILoggerFactory _loggerFactory; //private readonly IMessagePacker _messagePacker; + // //private readonly IModelBinderFactory _modelBinderFactory; private readonly IServiceProvider _serviceProvider; diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs index 62af0a7..577468d 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs @@ -59,65 +59,34 @@ namespace DotNetCore.CAP.Internal obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, implType); } - //var jsonContent = context.DeliverMessage.Content; - //var message = _messagePacker.UnPack(jsonContent); - var message = context.DeliverMessage; - object resultObj; - if (executor.MethodParameters.Length > 0) - { - resultObj = await ExecuteWithParameterAsync(executor, obj, message.Value); - } - else + var parameterDescriptors = context.ConsumerDescriptor.Parameters; + var executeParameters = new object[parameterDescriptors.Count]; + for (var i = 0; i < parameterDescriptors.Count; i++) { - resultObj = await ExecuteAsync(executor, obj); + if (parameterDescriptors[i].IsFromCap) + { + executeParameters[i] = new CapHeader(message.Headers); + } + else + { + executeParameters[i] = message.Value; + } } + var resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters); return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName()); } } - private async Task ExecuteAsync(ObjectMethodExecutor executor, object @class) + private async Task ExecuteWithParameterAsync(ObjectMethodExecutor executor, object @class, object[] parameter) { if (executor.IsMethodAsync) { - return await executor.ExecuteAsync(@class); + return await executor.ExecuteAsync(@class, parameter); } - return executor.Execute(@class); - } - - private async Task ExecuteWithParameterAsync(ObjectMethodExecutor executor, object @class, object parameter) - { - var firstParameter = executor.MethodParameters[0]; - try - { - if (executor.IsMethodAsync) - { - return await executor.ExecuteAsync(@class, parameter); - } - - return executor.Execute(@class, parameter); - //var binder = _modelBinderFactory.CreateBinder(firstParameter); - //var bindResult = await binder.BindModelAsync(parameter); - //if (bindResult.IsSuccess) - //{ - // if (executor.IsMethodAsync) - // { - // return await executor.ExecuteAsync(@class, bindResult.Model); - // } - - // return executor.Execute(@class, bindResult.Model); - //} - - //throw new MethodBindException( - // $"Parameters:{firstParameter.Name} bind failed! ParameterString is: {parameter} "); - } - catch (FormatException ex) - { - //_logger.ModelBinderFormattingException(executor.MethodInfo?.Name, firstParameter.Name, parameter, ex); - return null; - } + return executor.Execute(@class, parameter); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Messages/Headers.cs b/src/DotNetCore.CAP/Messages/Headers.cs index 8aad623..f953bd4 100644 --- a/src/DotNetCore.CAP/Messages/Headers.cs +++ b/src/DotNetCore.CAP/Messages/Headers.cs @@ -23,8 +23,5 @@ public const string Group = "cap-msg-group"; public const string SentTime = "cap-senttime"; - - public const string ContentType = "cap-content-type"; - } } diff --git a/src/DotNetCore.CAP/Messages/TransportMessage.cs b/src/DotNetCore.CAP/Messages/TransportMessage.cs index 93e32bd..7183bb4 100644 --- a/src/DotNetCore.CAP/Messages/TransportMessage.cs +++ b/src/DotNetCore.CAP/Messages/TransportMessage.cs @@ -28,5 +28,10 @@ namespace DotNetCore.CAP.Messages { return Headers.TryGetValue(Messages.Headers.MessageName, out var value) ? value : null; } + + public string GetGroup() + { + return Headers.TryGetValue(Messages.Headers.Group, out var value) ? value : null; + } } } diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs index 60b7f17..7f1fb8c 100644 --- a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs @@ -21,8 +21,8 @@ namespace DotNetCore.CAP.Processor private readonly BlockingCollection _publishedMessageQueue = new BlockingCollection(new ConcurrentQueue()); - private readonly BlockingCollection _receivedMessageQueue = - new BlockingCollection(new ConcurrentQueue()); + private readonly BlockingCollection<(MediumMessage, ConsumerExecutorDescriptor)> _receivedMessageQueue = + new BlockingCollection<(MediumMessage, ConsumerExecutorDescriptor)>(new ConcurrentQueue<(MediumMessage, ConsumerExecutorDescriptor)>()); public Dispatcher(ILogger logger, IMessageSender sender, @@ -41,9 +41,9 @@ namespace DotNetCore.CAP.Processor _publishedMessageQueue.Add(message); } - public void EnqueueToExecute(MediumMessage message) + public void EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor descriptor) { - _receivedMessageQueue.Add(message); + _receivedMessageQueue.Add((message, descriptor)); } public void Dispose() @@ -85,7 +85,7 @@ namespace DotNetCore.CAP.Processor { foreach (var message in _receivedMessageQueue.GetConsumingEnumerable(_cts.Token)) { - _executor.ExecuteAsync(message, _cts.Token); + _executor.ExecuteAsync(message.Item1, message.Item2, _cts.Token); } } catch (OperationCanceledException) diff --git a/src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs b/src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs new file mode 100644 index 0000000..5362342 --- /dev/null +++ b/src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs @@ -0,0 +1,25 @@ +using System; +using System.Threading.Tasks; +using DotNetCore.CAP.Messages; +using System.Text.Json; + +namespace DotNetCore.CAP.Serialization +{ + public class JsonUtf8Serializer : ISerializer + { + public Task SerializeAsync(Message message) + { + return Task.FromResult(new TransportMessage(message.Headers, JsonSerializer.SerializeToUtf8Bytes(message.Value))); + } + + public Task DeserializeAsync(TransportMessage transportMessage, Type valueType) + { + if (valueType == null) + { + return Task.FromResult(new Message(transportMessage.Headers, null)); + } + + return Task.FromResult(new Message(transportMessage.Headers, JsonSerializer.Deserialize(transportMessage.Body, valueType))); + } + } +} diff --git a/src/DotNetCore.CAP/Serialization/ISerializer.cs b/src/DotNetCore.CAP/Serialization/ISerializer.cs index 75d05d5..c38d064 100644 --- a/src/DotNetCore.CAP/Serialization/ISerializer.cs +++ b/src/DotNetCore.CAP/Serialization/ISerializer.cs @@ -1,5 +1,7 @@ -using System.Threading.Tasks; +using System; +using System.Threading.Tasks; using DotNetCore.CAP.Messages; +using JetBrains.Annotations; namespace DotNetCore.CAP.Serialization { @@ -13,6 +15,6 @@ namespace DotNetCore.CAP.Serialization /// /// Deserializes the given back into a /// - Task DeserializeAsync(TransportMessage transportMessage); + Task DeserializeAsync(TransportMessage transportMessage, [CanBeNull] Type valueType); } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Serialization/StringSerializer.cs b/src/DotNetCore.CAP/Serialization/StringSerializer.cs index 42ba12a..8e89111 100644 --- a/src/DotNetCore.CAP/Serialization/StringSerializer.cs +++ b/src/DotNetCore.CAP/Serialization/StringSerializer.cs @@ -1,7 +1,5 @@ -using System; -using System.Runtime.Serialization; -using DotNetCore.CAP.Messages; -using Newtonsoft.Json; +using DotNetCore.CAP.Messages; +using System.Text.Json; namespace DotNetCore.CAP.Serialization { @@ -9,19 +7,12 @@ namespace DotNetCore.CAP.Serialization { public static string Serialize(Message message) { - return JsonConvert.SerializeObject(message); + return JsonSerializer.Serialize(message); } public static Message DeSerialize(string json) { - try - { - return JsonConvert.DeserializeObject(json); - } - catch (Exception exception) - { - throw new SerializationException($"Could not deserialize JSON text '{json}'", exception); - } + return JsonSerializer.Deserialize(json); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/SubscriberNotFoundException.cs b/src/DotNetCore.CAP/SubscriberNotFoundException.cs index 6e5738f..b312159 100644 --- a/src/DotNetCore.CAP/SubscriberNotFoundException.cs +++ b/src/DotNetCore.CAP/SubscriberNotFoundException.cs @@ -7,17 +7,8 @@ namespace DotNetCore.CAP { public class SubscriberNotFoundException : Exception { - public SubscriberNotFoundException() - { - } - public SubscriberNotFoundException(string message) : base(message) { } - - public SubscriberNotFoundException(string message, Exception inner) : - base(message, inner) - { - } } } \ No newline at end of file