diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisher.cs b/src/DotNetCore.CAP/Abstractions/CapPublisher.cs
index 1e018f6..25c9927 100644
--- a/src/DotNetCore.CAP/Abstractions/CapPublisher.cs
+++ b/src/DotNetCore.CAP/Abstractions/CapPublisher.cs
@@ -57,7 +57,7 @@ namespace DotNetCore.CAP.Abstractions
optionHeaders.Add(Headers.CorrelationSequence, 0.ToString());
}
optionHeaders.Add(Headers.MessageName, name);
- optionHeaders.Add(Headers.Type, typeof(T).ToString());
+ optionHeaders.Add(Headers.Type, typeof(T).AssemblyQualifiedName);
optionHeaders.Add(Headers.SentTime, DateTimeOffset.Now.ToString());
var message = new Message(optionHeaders, value);
diff --git a/src/DotNetCore.CAP/CAP.Builder.cs b/src/DotNetCore.CAP/CAP.Builder.cs
index 8b9889f..c737b8a 100644
--- a/src/DotNetCore.CAP/CAP.Builder.cs
+++ b/src/DotNetCore.CAP/CAP.Builder.cs
@@ -53,26 +53,6 @@ namespace DotNetCore.CAP
return AddScoped(typeof(ICapPublisher), typeof(T));
}
- ///
- /// Add a custom content serializer
- ///
- /// The type of the service.
- public CapBuilder AddContentSerializer()
- where T : class, IContentSerializer
- {
- return AddSingleton(typeof(IContentSerializer), typeof(T));
- }
-
- ///
- /// Add a custom message wapper
- ///
- /// The type of the service.
- public CapBuilder AddMessagePacker()
- where T : class, IMessagePacker
- {
- return AddSingleton(typeof(IMessagePacker), typeof(T));
- }
-
///
/// Adds a scoped service of the type specified in serviceType with an implementation
///
diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
index 29e5e18..2275439 100644
--- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
+++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
@@ -38,19 +38,13 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton();
- //Serializer and model binder
- services.TryAddSingleton();
- services.TryAddSingleton();
services.TryAddSingleton();
- services.TryAddSingleton();
-
- //services.TryAddSingleton();
services.TryAddSingleton();
services.TryAddSingleton();
- //Processors
services.TryAddSingleton();
+ //Processors
services.TryAddEnumerable(ServiceDescriptor.Singleton());
services.TryAddEnumerable(ServiceDescriptor.Singleton());
@@ -63,7 +57,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton();
services.TryAddSingleton();
- services.TryAddSingleton();
+ services.TryAddSingleton();
// Warning: IPublishMessageSender need to inject at extension project.
services.TryAddSingleton();
@@ -77,8 +71,7 @@ namespace Microsoft.Extensions.DependencyInjection
}
services.Configure(setupAction);
- //Startup and Hosted
- //services.AddTransient();
+ //Startup and Hosted
services.AddHostedService();
return new CapBuilder(services);
diff --git a/src/DotNetCore.CAP/CAP.SubscribeAttribute.cs b/src/DotNetCore.CAP/CAP.SubscribeAttribute.cs
index bd3e1e5..fda43e2 100644
--- a/src/DotNetCore.CAP/CAP.SubscribeAttribute.cs
+++ b/src/DotNetCore.CAP/CAP.SubscribeAttribute.cs
@@ -1,6 +1,7 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
+using System;
using DotNetCore.CAP.Abstractions;
// ReSharper disable once CheckNamespace
@@ -22,4 +23,10 @@ namespace DotNetCore.CAP
return Name;
}
}
+
+ [AttributeUsage(AttributeTargets.Parameter)]
+ public class FromCapAttribute : Attribute
+ {
+
+ }
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/Cap.Header.cs b/src/DotNetCore.CAP/Cap.Header.cs
new file mode 100644
index 0000000..657f5cc
--- /dev/null
+++ b/src/DotNetCore.CAP/Cap.Header.cs
@@ -0,0 +1,13 @@
+using System.Collections.Generic;
+using System.Collections.ObjectModel;
+
+namespace DotNetCore.CAP
+{
+ public class CapHeader : ReadOnlyDictionary
+ {
+ public CapHeader(IDictionary dictionary) : base(dictionary)
+ {
+
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/ConsumerExecutorDescriptor.cs b/src/DotNetCore.CAP/ConsumerExecutorDescriptor.cs
index 8b33a55..6a7795a 100644
--- a/src/DotNetCore.CAP/ConsumerExecutorDescriptor.cs
+++ b/src/DotNetCore.CAP/ConsumerExecutorDescriptor.cs
@@ -1,6 +1,8 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
+using System;
+using System.Collections.Generic;
using System.Reflection;
using DotNetCore.CAP.Abstractions;
@@ -18,5 +20,16 @@ namespace DotNetCore.CAP
public TypeInfo ImplTypeInfo { get; set; }
public TopicAttribute Attribute { get; set; }
+
+ public IList Parameters { get; set; }
+ }
+
+ public class ParameterDescriptor
+ {
+ public string Name { get; set; }
+
+ public Type ParameterType { get; set; }
+
+ public bool IsFromCap { get; set; }
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/DotNetCore.CAP.csproj b/src/DotNetCore.CAP/DotNetCore.CAP.csproj
index 61a59bd..b3667f3 100644
--- a/src/DotNetCore.CAP/DotNetCore.CAP.csproj
+++ b/src/DotNetCore.CAP/DotNetCore.CAP.csproj
@@ -10,13 +10,11 @@
-
-
-
-
-
-
-
-
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/IConsumerRegister.Default.cs
index da3e363..ecfb31c 100644
--- a/src/DotNetCore.CAP/IConsumerRegister.Default.cs
+++ b/src/DotNetCore.CAP/IConsumerRegister.Default.cs
@@ -148,20 +148,31 @@ namespace DotNetCore.CAP
private void RegisterMessageProcessor(IConsumerClient client)
{
- client.OnMessageReceived += async (sender, messageContext) =>
+ client.OnMessageReceived += async (sender, transportMessage) =>
{
_cts.Token.ThrowIfCancellationRequested();
Guid? operationId = null;
try
{
- operationId = TracingBefore(messageContext);
+ operationId = TracingBefore(transportMessage);
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
- var message = await _serializer.DeserializeAsync(messageContext);
+ var name = transportMessage.GetName();
+ var group = transportMessage.GetGroup();
- var mediumMessage = await _storage.StoreMessageAsync(message.GetName(), message.GetGroup(), message);
+ if (!_selector.TryGetTopicExecutor(name, group, out var executor))
+ {
+ var error = $"Message can not be found subscriber. Name:{name}, Group:{group}. {Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63";
+ throw new SubscriberNotFoundException(error);
+ }
+
+ var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType;
+
+ var message = await _serializer.DeserializeAsync(transportMessage, type);
+
+ var mediumMessage = await _storage.StoreMessageAsync(name, group, message);
client.Commit();
@@ -170,17 +181,17 @@ namespace DotNetCore.CAP
TracingAfter(operationId.Value, message, startTime, stopwatch.Elapsed);
}
- _dispatcher.EnqueueToExecute(mediumMessage);
+ _dispatcher.EnqueueToExecute(mediumMessage, executor);
}
catch (Exception e)
{
- _logger.LogError(e, "An exception occurred when store received message. Message:'{0}'.", messageContext);
+ _logger.LogError(e, "An exception occurred when store received message. Message:'{0}'.", transportMessage);
client.Reject();
if (operationId != null)
{
- TracingError(operationId.Value, messageContext, e);
+ TracingError(operationId.Value, transportMessage, e);
}
}
};
diff --git a/src/DotNetCore.CAP/IConsumerServiceSelector.Default.cs b/src/DotNetCore.CAP/IConsumerServiceSelector.Default.cs
index f9a030d..4039bf6 100644
--- a/src/DotNetCore.CAP/IConsumerServiceSelector.Default.cs
+++ b/src/DotNetCore.CAP/IConsumerServiceSelector.Default.cs
@@ -134,7 +134,15 @@ namespace DotNetCore.CAP
attr.Group = attr.Group + "." + _capOptions.Version;
}
- yield return InitDescriptor(attr, method, typeInfo, serviceTypeInfo);
+ var parameters = method.GetParameters()
+ .Select(parameter => new ParameterDescriptor
+ {
+ Name = parameter.Name,
+ ParameterType = parameter.ParameterType,
+ IsFromCap = parameter.GetCustomAttributes(typeof(FromCapAttribute)).Any()
+ }).ToList();
+
+ yield return InitDescriptor(attr, method, typeInfo, serviceTypeInfo, parameters);
}
}
}
@@ -143,14 +151,16 @@ namespace DotNetCore.CAP
TopicAttribute attr,
MethodInfo methodInfo,
TypeInfo implType,
- TypeInfo serviceTypeInfo)
+ TypeInfo serviceTypeInfo,
+ IList parameters)
{
var descriptor = new ConsumerExecutorDescriptor
{
Attribute = attr,
MethodInfo = methodInfo,
ImplTypeInfo = implType,
- ServiceTypeInfo = serviceTypeInfo
+ ServiceTypeInfo = serviceTypeInfo,
+ Parameters = parameters
};
return descriptor;
diff --git a/src/DotNetCore.CAP/IDispatcher.cs b/src/DotNetCore.CAP/IDispatcher.cs
index c56a582..bc47c02 100644
--- a/src/DotNetCore.CAP/IDispatcher.cs
+++ b/src/DotNetCore.CAP/IDispatcher.cs
@@ -1,7 +1,6 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
-using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Persistence;
namespace DotNetCore.CAP
@@ -10,6 +9,6 @@ namespace DotNetCore.CAP
{
void EnqueueToPublish(MediumMessage message);
- void EnqueueToExecute(MediumMessage message);
+ void EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor descriptor);
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
index 4e6c0cc..8b2625e 100644
--- a/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
+++ b/src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
@@ -24,7 +24,6 @@ namespace DotNetCore.CAP
private readonly ILogger _logger;
private readonly IServiceProvider _provider;
private readonly CapOptions _options;
- private readonly MethodMatcherCache _selector;
// diagnostics listener
// ReSharper disable once InconsistentNaming
@@ -34,28 +33,40 @@ namespace DotNetCore.CAP
public DefaultSubscriberExecutor(
ILogger logger,
IOptions options,
- IServiceProvider provider,
- MethodMatcherCache selector)
+ IServiceProvider provider)
{
- _selector = selector;
-
_provider = provider;
_logger = logger;
_options = options.Value;
-
+
_dataStorage = _provider.GetService();
Invoker = _provider.GetService().CreateInvoker();
}
private IConsumerInvoker Invoker { get; }
- public async Task ExecuteAsync(MediumMessage message, CancellationToken cancellationToken)
+ public Task ExecuteAsync(MediumMessage message, CancellationToken cancellationToken)
+ {
+ var selector = _provider.GetService();
+ if (!selector.TryGetTopicExecutor(message.Origin.GetName(), message.Origin.GetGroup(), out var executor))
+ {
+ var error = $"Message (Name:{message.Origin.GetName()},Group:{message.Origin.GetGroup()}) can not be found subscriber." +
+ $"{Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63";
+ _logger.LogError(error);
+
+ return Task.FromResult(OperateResult.Failed(new SubscriberNotFoundException(error)));
+ }
+
+ return ExecuteAsync(message, executor, cancellationToken);
+ }
+
+ public async Task ExecuteAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken)
{
bool retry;
OperateResult result;
do
{
- var executedResult = await ExecuteWithoutRetryAsync(message, cancellationToken);
+ var executedResult = await ExecuteWithoutRetryAsync(message, descriptor, cancellationToken);
result = executedResult.Item2;
if (result == OperateResult.Success)
{
@@ -67,7 +78,7 @@ namespace DotNetCore.CAP
return result;
}
- private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(MediumMessage message, CancellationToken cancellationToken)
+ private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken)
{
if (message == null)
{
@@ -80,7 +91,7 @@ namespace DotNetCore.CAP
{
var sp = Stopwatch.StartNew();
- await InvokeConsumerMethodAsync(message, cancellationToken);
+ await InvokeConsumerMethodAsync(message, descriptor, cancellationToken);
sp.Stop();
@@ -157,22 +168,13 @@ namespace DotNetCore.CAP
// message.Content = Helper.AddExceptionProperty(message.Content, exception);
//}
- private async Task InvokeConsumerMethodAsync(MediumMessage message, CancellationToken cancellationToken)
+ private async Task InvokeConsumerMethodAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken)
{
- if (!_selector.TryGetTopicExecutor(
- message.Origin.GetName(),
- message.Origin.GetGroup(),
- out var executor))
- {
- var error = $"Message can not be found subscriber. {message} \r\n see: https://github.com/dotnetcore/CAP/issues/63";
- throw new SubscriberNotFoundException(error);
- }
-
var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
var operationId = Guid.Empty;
- var consumerContext = new ConsumerContext(executor, message.Origin);
+ var consumerContext = new ConsumerContext(descriptor, message.Origin);
try
{
diff --git a/src/DotNetCore.CAP/ISubscriberExecutor.cs b/src/DotNetCore.CAP/ISubscriberExecutor.cs
index 559bee0..b60d5f0 100644
--- a/src/DotNetCore.CAP/ISubscriberExecutor.cs
+++ b/src/DotNetCore.CAP/ISubscriberExecutor.cs
@@ -13,5 +13,7 @@ namespace DotNetCore.CAP
public interface ISubscriberExecutor
{
Task ExecuteAsync(MediumMessage message, CancellationToken cancellationToken = default);
+
+ Task ExecuteAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken = default);
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/Infrastructure/Helper.cs b/src/DotNetCore.CAP/Infrastructure/Helper.cs
index 1fe4e39..4acc33f 100644
--- a/src/DotNetCore.CAP/Infrastructure/Helper.cs
+++ b/src/DotNetCore.CAP/Infrastructure/Helper.cs
@@ -2,63 +2,13 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
-using System.Collections.Generic;
using System.ComponentModel;
using System.Reflection;
-using DotNetCore.CAP.Diagnostics;
-using Newtonsoft.Json;
-using Newtonsoft.Json.Linq;
namespace DotNetCore.CAP.Infrastructure
{
public static class Helper
{
- private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Local);
- private static JsonSerializerSettings _serializerSettings;
-
- public static void SetSerializerSettings(JsonSerializerSettings setting)
- {
- _serializerSettings = setting;
- }
-
- public static string ToJson(object value)
- {
- return value != null
- ? JsonConvert.SerializeObject(value, _serializerSettings)
- : null;
- }
-
- public static T FromJson(string value)
- {
- return value != null
- ? JsonConvert.DeserializeObject(value, _serializerSettings)
- : default(T);
- }
-
- public static object FromJson(string value, Type type)
- {
- if (type == null)
- {
- throw new ArgumentNullException(nameof(type));
- }
-
- return value != null
- ? JsonConvert.DeserializeObject(value, type, _serializerSettings)
- : null;
- }
-
- public static long ToTimestamp(DateTime value)
- {
- var elapsedTime = value - Epoch;
- return (long)elapsedTime.TotalSeconds;
- }
-
-
- public static DateTime FromTimestamp(long value)
- {
- return Epoch.AddSeconds(value);
- }
-
public static bool IsController(TypeInfo typeInfo)
{
if (!typeInfo.IsClass)
@@ -85,40 +35,6 @@ namespace DotNetCore.CAP.Infrastructure
return !CanConvertFromString(type);
}
- public static string AddExceptionProperty(string json, Exception exception)
- {
- var jObject = ToJObject(exception);
- return AddJsonProperty(json, "ExceptionMessage", jObject);
- }
-
- public static string AddTracingHeaderProperty(string json, TracingHeaders headers)
- {
- var jObject = ToJObject(headers);
- return AddJsonProperty(json, nameof(TracingHeaders), jObject);
- }
-
- public static bool TryExtractTracingHeaders(string json, out TracingHeaders headers, out string removedHeadersJson)
- {
- var jObj = JObject.Parse(json);
- var jToken = jObj[nameof(TracingHeaders)];
- if (jToken != null)
- {
- headers = new TracingHeaders();
- foreach (var item in jToken.ToObject>())
- {
- headers.Add(item.Key, item.Value);
- }
-
- jObj.Remove(nameof(TracingHeaders));
- removedHeadersJson = jObj.ToString();
- return true;
- }
-
- headers = null;
- removedHeadersJson = null;
- return false;
- }
-
public static bool IsInnerIP(string ipAddress)
{
bool isInnerIp;
@@ -138,7 +54,6 @@ namespace DotNetCore.CAP.Infrastructure
isInnerIp = IsInner(ipNum, aBegin, aEnd) || IsInner(ipNum, bBegin, bEnd) || IsInner(ipNum, cBegin, cEnd);
return isInnerIp;
}
-
private static long GetIpNum(string ipAddress)
{
var ip = ipAddress.Split('.');
@@ -174,41 +89,5 @@ namespace DotNetCore.CAP.Infrastructure
type == typeof(TimeSpan) ||
type == typeof(Uri);
}
-
- private static JObject ToJObject(Exception exception)
- {
- return JObject.FromObject(new
- {
- exception.Source,
- exception.Message,
- InnerMessage = exception.InnerException?.Message
- });
- }
-
- private static JObject ToJObject(TracingHeaders headers)
- {
- var jobj = new JObject();
- foreach (var keyValuePair in headers)
- {
- jobj[keyValuePair.Key] = keyValuePair.Value;
- }
- return jobj;
- }
-
- private static string AddJsonProperty(string json, string propertyName, JObject propertyValue)
- {
- var jObj = JObject.Parse(json);
-
- if (jObj.TryGetValue(propertyName, out var _))
- {
- jObj[propertyName] = propertyValue;
- }
- else
- {
- jObj.Add(new JProperty(propertyName, propertyValue));
- }
-
- return jObj.ToString(Formatting.None);
- }
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs b/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs
index 1feeb6a..cb7247a 100644
--- a/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs
+++ b/src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs
@@ -11,6 +11,7 @@ namespace DotNetCore.CAP.Internal
{
private readonly ILoggerFactory _loggerFactory;
//private readonly IMessagePacker _messagePacker;
+ //
//private readonly IModelBinderFactory _modelBinderFactory;
private readonly IServiceProvider _serviceProvider;
diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
index 62af0a7..577468d 100644
--- a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
+++ b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
@@ -59,65 +59,34 @@ namespace DotNetCore.CAP.Internal
obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, implType);
}
- //var jsonContent = context.DeliverMessage.Content;
- //var message = _messagePacker.UnPack(jsonContent);
-
var message = context.DeliverMessage;
- object resultObj;
- if (executor.MethodParameters.Length > 0)
- {
- resultObj = await ExecuteWithParameterAsync(executor, obj, message.Value);
- }
- else
+ var parameterDescriptors = context.ConsumerDescriptor.Parameters;
+ var executeParameters = new object[parameterDescriptors.Count];
+ for (var i = 0; i < parameterDescriptors.Count; i++)
{
- resultObj = await ExecuteAsync(executor, obj);
+ if (parameterDescriptors[i].IsFromCap)
+ {
+ executeParameters[i] = new CapHeader(message.Headers);
+ }
+ else
+ {
+ executeParameters[i] = message.Value;
+ }
}
+ var resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters);
return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName());
}
}
- private async Task