Browse Source

Code refactor

master
Savorboard 5 years ago
parent
commit
b1bd53bf1c
22 changed files with 159 additions and 271 deletions
  1. +1
    -1
      src/DotNetCore.CAP/Abstractions/CapPublisher.cs
  2. +0
    -20
      src/DotNetCore.CAP/CAP.Builder.cs
  3. +3
    -10
      src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs
  4. +7
    -0
      src/DotNetCore.CAP/CAP.SubscribeAttribute.cs
  5. +13
    -0
      src/DotNetCore.CAP/Cap.Header.cs
  6. +13
    -0
      src/DotNetCore.CAP/ConsumerExecutorDescriptor.cs
  7. +6
    -8
      src/DotNetCore.CAP/DotNetCore.CAP.csproj
  8. +18
    -7
      src/DotNetCore.CAP/IConsumerRegister.Default.cs
  9. +13
    -3
      src/DotNetCore.CAP/IConsumerServiceSelector.Default.cs
  10. +1
    -2
      src/DotNetCore.CAP/IDispatcher.cs
  11. +23
    -21
      src/DotNetCore.CAP/ISubscribeExecutor.Default.cs
  12. +2
    -0
      src/DotNetCore.CAP/ISubscriberExecutor.cs
  13. +0
    -121
      src/DotNetCore.CAP/Infrastructure/Helper.cs
  14. +1
    -0
      src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs
  15. +15
    -46
      src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
  16. +0
    -3
      src/DotNetCore.CAP/Messages/Headers.cs
  17. +5
    -0
      src/DotNetCore.CAP/Messages/TransportMessage.cs
  18. +5
    -5
      src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
  19. +25
    -0
      src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs
  20. +4
    -2
      src/DotNetCore.CAP/Serialization/ISerializer.cs
  21. +4
    -13
      src/DotNetCore.CAP/Serialization/StringSerializer.cs
  22. +0
    -9
      src/DotNetCore.CAP/SubscriberNotFoundException.cs

+ 1
- 1
src/DotNetCore.CAP/Abstractions/CapPublisher.cs View File

@@ -57,7 +57,7 @@ namespace DotNetCore.CAP.Abstractions
optionHeaders.Add(Headers.CorrelationSequence, 0.ToString());
}
optionHeaders.Add(Headers.MessageName, name);
optionHeaders.Add(Headers.Type, typeof(T).ToString());
optionHeaders.Add(Headers.Type, typeof(T).AssemblyQualifiedName);
optionHeaders.Add(Headers.SentTime, DateTimeOffset.Now.ToString());

var message = new Message(optionHeaders, value);


+ 0
- 20
src/DotNetCore.CAP/CAP.Builder.cs View File

@@ -53,26 +53,6 @@ namespace DotNetCore.CAP
return AddScoped(typeof(ICapPublisher), typeof(T));
}

/// <summary>
/// Add a custom content serializer
/// </summary>
/// <typeparam name="T">The type of the service.</typeparam>
public CapBuilder AddContentSerializer<T>()
where T : class, IContentSerializer
{
return AddSingleton(typeof(IContentSerializer), typeof(T));
}

/// <summary>
/// Add a custom message wapper
/// </summary>
/// <typeparam name="T">The type of the service.</typeparam>
public CapBuilder AddMessagePacker<T>()
where T : class, IMessagePacker
{
return AddSingleton(typeof(IMessagePacker), typeof(T));
}

/// <summary>
/// Adds a scoped service of the type specified in serviceType with an implementation
/// </summary>


+ 3
- 10
src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs View File

@@ -38,19 +38,13 @@ namespace Microsoft.Extensions.DependencyInjection

services.TryAddSingleton<ICapPublisher, CapPublisher>();

//Serializer and model binder
services.TryAddSingleton<IContentSerializer, JsonContentSerializer>();
services.TryAddSingleton<IMessagePacker, DefaultMessagePacker>();
services.TryAddSingleton<IConsumerServiceSelector, DefaultConsumerServiceSelector>();
services.TryAddSingleton<IModelBinderFactory, ModelBinderFactory>();

//services.TryAddSingleton<ICallbackMessageSender, CallbackMessageSender>();
services.TryAddSingleton<IConsumerInvokerFactory, ConsumerInvokerFactory>();
services.TryAddSingleton<MethodMatcherCache>();

//Processors
services.TryAddSingleton<IConsumerRegister, ConsumerRegister>();

//Processors
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, CapProcessingServer>());
services.TryAddEnumerable(ServiceDescriptor.Singleton<IProcessingServer, ConsumerRegister>());

@@ -63,7 +57,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.TryAddSingleton<IMessageSender, MessageSender>();
services.TryAddSingleton<IDispatcher, Dispatcher>();

services.TryAddSingleton<ISerializer, MemorySerializer>();
services.TryAddSingleton<ISerializer, JsonUtf8Serializer>();

// Warning: IPublishMessageSender need to inject at extension project.
services.TryAddSingleton<ISubscriberExecutor, DefaultSubscriberExecutor>();
@@ -77,8 +71,7 @@ namespace Microsoft.Extensions.DependencyInjection
}
services.Configure(setupAction);

//Startup and Hosted
//services.AddTransient<IStartupFilter, CapStartupFilter>();
//Startup and Hosted
services.AddHostedService<DefaultBootstrapper>();

return new CapBuilder(services);


+ 7
- 0
src/DotNetCore.CAP/CAP.SubscribeAttribute.cs View File

@@ -1,6 +1,7 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.Abstractions;

// ReSharper disable once CheckNamespace
@@ -22,4 +23,10 @@ namespace DotNetCore.CAP
return Name;
}
}

[AttributeUsage(AttributeTargets.Parameter)]
public class FromCapAttribute : Attribute
{
}
}

+ 13
- 0
src/DotNetCore.CAP/Cap.Header.cs View File

@@ -0,0 +1,13 @@
using System.Collections.Generic;
using System.Collections.ObjectModel;

namespace DotNetCore.CAP
{
public class CapHeader : ReadOnlyDictionary<string, string>
{
public CapHeader(IDictionary<string, string> dictionary) : base(dictionary)
{

}
}
}

+ 13
- 0
src/DotNetCore.CAP/ConsumerExecutorDescriptor.cs View File

@@ -1,6 +1,8 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Reflection;
using DotNetCore.CAP.Abstractions;

@@ -18,5 +20,16 @@ namespace DotNetCore.CAP
public TypeInfo ImplTypeInfo { get; set; }

public TopicAttribute Attribute { get; set; }

public IList<ParameterDescriptor> Parameters { get; set; }
}

public class ParameterDescriptor
{
public string Name { get; set; }

public Type ParameterType { get; set; }

public bool IsFromCap { get; set; }
}
}

+ 6
- 8
src/DotNetCore.CAP/DotNetCore.CAP.csproj View File

@@ -10,13 +10,11 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Consul" Version="0.7.2.6" />
<PackageReference Include="Microsoft.AspNetCore.Hosting.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.5.1" />
</ItemGroup>
<ItemGroup>
<Folder Include="Persistence\InMemory\" />
<PackageReference Include="JetBrains.Annotations" Version="2019.1.3" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.0.0" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="4.6.0" />
<PackageReference Include="System.Text.Json" Version="4.6.0" />
</ItemGroup>
</Project>

+ 18
- 7
src/DotNetCore.CAP/IConsumerRegister.Default.cs View File

@@ -148,20 +148,31 @@ namespace DotNetCore.CAP

private void RegisterMessageProcessor(IConsumerClient client)
{
client.OnMessageReceived += async (sender, messageContext) =>
client.OnMessageReceived += async (sender, transportMessage) =>
{
_cts.Token.ThrowIfCancellationRequested();
Guid? operationId = null;
try
{
operationId = TracingBefore(messageContext);
operationId = TracingBefore(transportMessage);

var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();

var message = await _serializer.DeserializeAsync(messageContext);
var name = transportMessage.GetName();
var group = transportMessage.GetGroup();

var mediumMessage = await _storage.StoreMessageAsync(message.GetName(), message.GetGroup(), message);
if (!_selector.TryGetTopicExecutor(name, group, out var executor))
{
var error = $"Message can not be found subscriber. Name:{name}, Group:{group}. {Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63";
throw new SubscriberNotFoundException(error);
}

var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType;

var message = await _serializer.DeserializeAsync(transportMessage, type);

var mediumMessage = await _storage.StoreMessageAsync(name, group, message);

client.Commit();

@@ -170,17 +181,17 @@ namespace DotNetCore.CAP
TracingAfter(operationId.Value, message, startTime, stopwatch.Elapsed);
}

_dispatcher.EnqueueToExecute(mediumMessage);
_dispatcher.EnqueueToExecute(mediumMessage, executor);
}
catch (Exception e)
{
_logger.LogError(e, "An exception occurred when store received message. Message:'{0}'.", messageContext);
_logger.LogError(e, "An exception occurred when store received message. Message:'{0}'.", transportMessage);

client.Reject();

if (operationId != null)
{
TracingError(operationId.Value, messageContext, e);
TracingError(operationId.Value, transportMessage, e);
}
}
};


+ 13
- 3
src/DotNetCore.CAP/IConsumerServiceSelector.Default.cs View File

@@ -134,7 +134,15 @@ namespace DotNetCore.CAP
attr.Group = attr.Group + "." + _capOptions.Version;
}
yield return InitDescriptor(attr, method, typeInfo, serviceTypeInfo);
var parameters = method.GetParameters()
.Select(parameter => new ParameterDescriptor
{
Name = parameter.Name,
ParameterType = parameter.ParameterType,
IsFromCap = parameter.GetCustomAttributes(typeof(FromCapAttribute)).Any()
}).ToList();
yield return InitDescriptor(attr, method, typeInfo, serviceTypeInfo, parameters);
}
}
}
@@ -143,14 +151,16 @@ namespace DotNetCore.CAP
TopicAttribute attr,
MethodInfo methodInfo,
TypeInfo implType,
TypeInfo serviceTypeInfo)
TypeInfo serviceTypeInfo,
IList<ParameterDescriptor> parameters)
{
var descriptor = new ConsumerExecutorDescriptor
{
Attribute = attr,
MethodInfo = methodInfo,
ImplTypeInfo = implType,
ServiceTypeInfo = serviceTypeInfo
ServiceTypeInfo = serviceTypeInfo,
Parameters = parameters
};
return descriptor;


+ 1
- 2
src/DotNetCore.CAP/IDispatcher.cs View File

@@ -1,7 +1,6 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Persistence;

namespace DotNetCore.CAP
@@ -10,6 +9,6 @@ namespace DotNetCore.CAP
{
void EnqueueToPublish(MediumMessage message);

void EnqueueToExecute(MediumMessage message);
void EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor descriptor);
}
}

+ 23
- 21
src/DotNetCore.CAP/ISubscribeExecutor.Default.cs View File

@@ -24,7 +24,6 @@ namespace DotNetCore.CAP
private readonly ILogger _logger;
private readonly IServiceProvider _provider;
private readonly CapOptions _options;
private readonly MethodMatcherCache _selector;

// diagnostics listener
// ReSharper disable once InconsistentNaming
@@ -34,28 +33,40 @@ namespace DotNetCore.CAP
public DefaultSubscriberExecutor(
ILogger<DefaultSubscriberExecutor> logger,
IOptions<CapOptions> options,
IServiceProvider provider,
MethodMatcherCache selector)
IServiceProvider provider)
{
_selector = selector;

_provider = provider;
_logger = logger;
_options = options.Value;
_dataStorage = _provider.GetService<IDataStorage>();
Invoker = _provider.GetService<IConsumerInvokerFactory>().CreateInvoker();
}

private IConsumerInvoker Invoker { get; }

public async Task<OperateResult> ExecuteAsync(MediumMessage message, CancellationToken cancellationToken)
public Task<OperateResult> ExecuteAsync(MediumMessage message, CancellationToken cancellationToken)
{
var selector = _provider.GetService<MethodMatcherCache>();
if (!selector.TryGetTopicExecutor(message.Origin.GetName(), message.Origin.GetGroup(), out var executor))
{
var error = $"Message (Name:{message.Origin.GetName()},Group:{message.Origin.GetGroup()}) can not be found subscriber." +
$"{Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63";
_logger.LogError(error);

return Task.FromResult(OperateResult.Failed(new SubscriberNotFoundException(error)));
}

return ExecuteAsync(message, executor, cancellationToken);
}

public async Task<OperateResult> ExecuteAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken)
{
bool retry;
OperateResult result;
do
{
var executedResult = await ExecuteWithoutRetryAsync(message, cancellationToken);
var executedResult = await ExecuteWithoutRetryAsync(message, descriptor, cancellationToken);
result = executedResult.Item2;
if (result == OperateResult.Success)
{
@@ -67,7 +78,7 @@ namespace DotNetCore.CAP
return result;
}

private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(MediumMessage message, CancellationToken cancellationToken)
private async Task<(bool, OperateResult)> ExecuteWithoutRetryAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken)
{
if (message == null)
{
@@ -80,7 +91,7 @@ namespace DotNetCore.CAP
{
var sp = Stopwatch.StartNew();

await InvokeConsumerMethodAsync(message, cancellationToken);
await InvokeConsumerMethodAsync(message, descriptor, cancellationToken);

sp.Stop();

@@ -157,22 +168,13 @@ namespace DotNetCore.CAP
// message.Content = Helper.AddExceptionProperty(message.Content, exception);
//}

private async Task InvokeConsumerMethodAsync(MediumMessage message, CancellationToken cancellationToken)
private async Task InvokeConsumerMethodAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken)
{
if (!_selector.TryGetTopicExecutor(
message.Origin.GetName(),
message.Origin.GetGroup(),
out var executor))
{
var error = $"Message can not be found subscriber. {message} \r\n see: https://github.com/dotnetcore/CAP/issues/63";
throw new SubscriberNotFoundException(error);
}

var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();
var operationId = Guid.Empty;

var consumerContext = new ConsumerContext(executor, message.Origin);
var consumerContext = new ConsumerContext(descriptor, message.Origin);

try
{


+ 2
- 0
src/DotNetCore.CAP/ISubscriberExecutor.cs View File

@@ -13,5 +13,7 @@ namespace DotNetCore.CAP
public interface ISubscriberExecutor
{
Task<OperateResult> ExecuteAsync(MediumMessage message, CancellationToken cancellationToken = default);

Task<OperateResult> ExecuteAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor, CancellationToken cancellationToken = default);
}
}

+ 0
- 121
src/DotNetCore.CAP/Infrastructure/Helper.cs View File

@@ -2,63 +2,13 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Reflection;
using DotNetCore.CAP.Diagnostics;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace DotNetCore.CAP.Infrastructure
{
public static class Helper
{
private static readonly DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Local);
private static JsonSerializerSettings _serializerSettings;

public static void SetSerializerSettings(JsonSerializerSettings setting)
{
_serializerSettings = setting;
}

public static string ToJson(object value)
{
return value != null
? JsonConvert.SerializeObject(value, _serializerSettings)
: null;
}

public static T FromJson<T>(string value)
{
return value != null
? JsonConvert.DeserializeObject<T>(value, _serializerSettings)
: default(T);
}

public static object FromJson(string value, Type type)
{
if (type == null)
{
throw new ArgumentNullException(nameof(type));
}

return value != null
? JsonConvert.DeserializeObject(value, type, _serializerSettings)
: null;
}

public static long ToTimestamp(DateTime value)
{
var elapsedTime = value - Epoch;
return (long)elapsedTime.TotalSeconds;
}


public static DateTime FromTimestamp(long value)
{
return Epoch.AddSeconds(value);
}

public static bool IsController(TypeInfo typeInfo)
{
if (!typeInfo.IsClass)
@@ -85,40 +35,6 @@ namespace DotNetCore.CAP.Infrastructure
return !CanConvertFromString(type);
}

public static string AddExceptionProperty(string json, Exception exception)
{
var jObject = ToJObject(exception);
return AddJsonProperty(json, "ExceptionMessage", jObject);
}

public static string AddTracingHeaderProperty(string json, TracingHeaders headers)
{
var jObject = ToJObject(headers);
return AddJsonProperty(json, nameof(TracingHeaders), jObject);
}

public static bool TryExtractTracingHeaders(string json, out TracingHeaders headers, out string removedHeadersJson)
{
var jObj = JObject.Parse(json);
var jToken = jObj[nameof(TracingHeaders)];
if (jToken != null)
{
headers = new TracingHeaders();
foreach (var item in jToken.ToObject<Dictionary<string,string>>())
{
headers.Add(item.Key, item.Value);
}

jObj.Remove(nameof(TracingHeaders));
removedHeadersJson = jObj.ToString();
return true;
}

headers = null;
removedHeadersJson = null;
return false;
}

public static bool IsInnerIP(string ipAddress)
{
bool isInnerIp;
@@ -138,7 +54,6 @@ namespace DotNetCore.CAP.Infrastructure
isInnerIp = IsInner(ipNum, aBegin, aEnd) || IsInner(ipNum, bBegin, bEnd) || IsInner(ipNum, cBegin, cEnd);
return isInnerIp;
}

private static long GetIpNum(string ipAddress)
{
var ip = ipAddress.Split('.');
@@ -174,41 +89,5 @@ namespace DotNetCore.CAP.Infrastructure
type == typeof(TimeSpan) ||
type == typeof(Uri);
}

private static JObject ToJObject(Exception exception)
{
return JObject.FromObject(new
{
exception.Source,
exception.Message,
InnerMessage = exception.InnerException?.Message
});
}

private static JObject ToJObject(TracingHeaders headers)
{
var jobj = new JObject();
foreach (var keyValuePair in headers)
{
jobj[keyValuePair.Key] = keyValuePair.Value;
}
return jobj;
}

private static string AddJsonProperty(string json, string propertyName, JObject propertyValue)
{
var jObj = JObject.Parse(json);

if (jObj.TryGetValue(propertyName, out var _))
{
jObj[propertyName] = propertyValue;
}
else
{
jObj.Add(new JProperty(propertyName, propertyValue));
}

return jObj.ToString(Formatting.None);
}
}
}

+ 1
- 0
src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs View File

@@ -11,6 +11,7 @@ namespace DotNetCore.CAP.Internal
{
private readonly ILoggerFactory _loggerFactory;
//private readonly IMessagePacker _messagePacker;
//
//private readonly IModelBinderFactory _modelBinderFactory;
private readonly IServiceProvider _serviceProvider;



+ 15
- 46
src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs View File

@@ -59,65 +59,34 @@ namespace DotNetCore.CAP.Internal
obj = ActivatorUtilities.GetServiceOrCreateInstance(provider, implType);
}
//var jsonContent = context.DeliverMessage.Content;
//var message = _messagePacker.UnPack(jsonContent);
var message = context.DeliverMessage;
object resultObj;
if (executor.MethodParameters.Length > 0)
{
resultObj = await ExecuteWithParameterAsync(executor, obj, message.Value);
}
else
var parameterDescriptors = context.ConsumerDescriptor.Parameters;
var executeParameters = new object[parameterDescriptors.Count];
for (var i = 0; i < parameterDescriptors.Count; i++)
{
resultObj = await ExecuteAsync(executor, obj);
if (parameterDescriptors[i].IsFromCap)
{
executeParameters[i] = new CapHeader(message.Headers);
}
else
{
executeParameters[i] = message.Value;
}
}
var resultObj = await ExecuteWithParameterAsync(executor, obj, executeParameters);
return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName());
}
}
private async Task<object> ExecuteAsync(ObjectMethodExecutor executor, object @class)
private async Task<object> ExecuteWithParameterAsync(ObjectMethodExecutor executor, object @class, object[] parameter)
{
if (executor.IsMethodAsync)
{
return await executor.ExecuteAsync(@class);
return await executor.ExecuteAsync(@class, parameter);
}
return executor.Execute(@class);
}
private async Task<object> ExecuteWithParameterAsync(ObjectMethodExecutor executor, object @class, object parameter)
{
var firstParameter = executor.MethodParameters[0];
try
{
if (executor.IsMethodAsync)
{
return await executor.ExecuteAsync(@class, parameter);
}
return executor.Execute(@class, parameter);
//var binder = _modelBinderFactory.CreateBinder(firstParameter);
//var bindResult = await binder.BindModelAsync(parameter);
//if (bindResult.IsSuccess)
//{
// if (executor.IsMethodAsync)
// {
// return await executor.ExecuteAsync(@class, bindResult.Model);
// }
// return executor.Execute(@class, bindResult.Model);
//}
//throw new MethodBindException(
// $"Parameters:{firstParameter.Name} bind failed! ParameterString is: {parameter} ");
}
catch (FormatException ex)
{
//_logger.ModelBinderFormattingException(executor.MethodInfo?.Name, firstParameter.Name, parameter, ex);
return null;
}
return executor.Execute(@class, parameter);
}
}
}

+ 0
- 3
src/DotNetCore.CAP/Messages/Headers.cs View File

@@ -23,8 +23,5 @@
public const string Group = "cap-msg-group";

public const string SentTime = "cap-senttime";

public const string ContentType = "cap-content-type";

}
}

+ 5
- 0
src/DotNetCore.CAP/Messages/TransportMessage.cs View File

@@ -28,5 +28,10 @@ namespace DotNetCore.CAP.Messages
{
return Headers.TryGetValue(Messages.Headers.MessageName, out var value) ? value : null;
}

public string GetGroup()
{
return Headers.TryGetValue(Messages.Headers.Group, out var value) ? value : null;
}
}
}

+ 5
- 5
src/DotNetCore.CAP/Processor/IDispatcher.Default.cs View File

@@ -21,8 +21,8 @@ namespace DotNetCore.CAP.Processor
private readonly BlockingCollection<MediumMessage> _publishedMessageQueue =
new BlockingCollection<MediumMessage>(new ConcurrentQueue<MediumMessage>());

private readonly BlockingCollection<MediumMessage> _receivedMessageQueue =
new BlockingCollection<MediumMessage>(new ConcurrentQueue<MediumMessage>());
private readonly BlockingCollection<(MediumMessage, ConsumerExecutorDescriptor)> _receivedMessageQueue =
new BlockingCollection<(MediumMessage, ConsumerExecutorDescriptor)>(new ConcurrentQueue<(MediumMessage, ConsumerExecutorDescriptor)>());

public Dispatcher(ILogger<Dispatcher> logger,
IMessageSender sender,
@@ -41,9 +41,9 @@ namespace DotNetCore.CAP.Processor
_publishedMessageQueue.Add(message);
}

public void EnqueueToExecute(MediumMessage message)
public void EnqueueToExecute(MediumMessage message, ConsumerExecutorDescriptor descriptor)
{
_receivedMessageQueue.Add(message);
_receivedMessageQueue.Add((message, descriptor));
}

public void Dispose()
@@ -85,7 +85,7 @@ namespace DotNetCore.CAP.Processor
{
foreach (var message in _receivedMessageQueue.GetConsumingEnumerable(_cts.Token))
{
_executor.ExecuteAsync(message, _cts.Token);
_executor.ExecuteAsync(message.Item1, message.Item2, _cts.Token);
}
}
catch (OperationCanceledException)


+ 25
- 0
src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs View File

@@ -0,0 +1,25 @@
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Messages;
using System.Text.Json;

namespace DotNetCore.CAP.Serialization
{
public class JsonUtf8Serializer : ISerializer
{
public Task<TransportMessage> SerializeAsync(Message message)
{
return Task.FromResult(new TransportMessage(message.Headers, JsonSerializer.SerializeToUtf8Bytes(message.Value)));
}

public Task<Message> DeserializeAsync(TransportMessage transportMessage, Type valueType)
{
if (valueType == null)
{
return Task.FromResult(new Message(transportMessage.Headers, null));
}

return Task.FromResult(new Message(transportMessage.Headers, JsonSerializer.Deserialize(transportMessage.Body, valueType)));
}
}
}

+ 4
- 2
src/DotNetCore.CAP/Serialization/ISerializer.cs View File

@@ -1,5 +1,7 @@
using System.Threading.Tasks;
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Messages;
using JetBrains.Annotations;

namespace DotNetCore.CAP.Serialization
{
@@ -13,6 +15,6 @@ namespace DotNetCore.CAP.Serialization
/// <summary>
/// Deserializes the given <see cref="TransportMessage"/> back into a <see cref="Message"/>
/// </summary>
Task<Message> DeserializeAsync(TransportMessage transportMessage);
Task<Message> DeserializeAsync(TransportMessage transportMessage, [CanBeNull] Type valueType);
}
}

+ 4
- 13
src/DotNetCore.CAP/Serialization/StringSerializer.cs View File

@@ -1,7 +1,5 @@
using System;
using System.Runtime.Serialization;
using DotNetCore.CAP.Messages;
using Newtonsoft.Json;
using DotNetCore.CAP.Messages;
using System.Text.Json;

namespace DotNetCore.CAP.Serialization
{
@@ -9,19 +7,12 @@ namespace DotNetCore.CAP.Serialization
{
public static string Serialize(Message message)
{
return JsonConvert.SerializeObject(message);
return JsonSerializer.Serialize(message);
}

public static Message DeSerialize(string json)
{
try
{
return JsonConvert.DeserializeObject<Message>(json);
}
catch (Exception exception)
{
throw new SerializationException($"Could not deserialize JSON text '{json}'", exception);
}
return JsonSerializer.Deserialize<Message>(json);
}
}
}

+ 0
- 9
src/DotNetCore.CAP/SubscriberNotFoundException.cs View File

@@ -7,17 +7,8 @@ namespace DotNetCore.CAP
{
public class SubscriberNotFoundException : Exception
{
public SubscriberNotFoundException()
{
}

public SubscriberNotFoundException(string message) : base(message)
{
}

public SubscriberNotFoundException(string message, Exception inner) :
base(message, inner)
{
}
}
}

Loading…
Cancel
Save