Browse Source

Repair when subscribe to different Group,can't receiving message bug.

master
Savorboard 7 years ago
parent
commit
75f5a81fbf
5 changed files with 86 additions and 57 deletions
  1. +18
    -4
      src/DotNetCore.CAP/CAP.Options.cs
  2. +18
    -22
      src/DotNetCore.CAP/IConsumerHandler.Default.cs
  3. +13
    -14
      src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
  4. +29
    -8
      src/DotNetCore.CAP/Internal/MethodMatcherCache.cs
  5. +8
    -9
      src/DotNetCore.CAP/Job/IJob.CapJob.cs

+ 18
- 4
src/DotNetCore.CAP/CAP.Options.cs View File

@@ -1,16 +1,30 @@
using DotNetCore.CAP.Job;

namespace DotNetCore.CAP
namespace DotNetCore.CAP
{ {
/// <summary> /// <summary>
/// Represents all the options you can use to configure the system. /// Represents all the options you can use to configure the system.
/// </summary> /// </summary>
public class CapOptions public class CapOptions
{ {
/// <summary>
/// Default value for polling delay timeout, in seconds.
/// </summary>
public const int DefaultPollingDelay = 8;

/// <summary>
/// Default value for CAP job.
/// </summary>
public const string DefaultCronExp = "* * * * *";

public CapOptions()
{
CronExp = DefaultCronExp;
PollingDelay = DefaultPollingDelay;
}

/// <summary> /// <summary>
/// Corn expression for configuring retry cron job. Default is 1 min. /// Corn expression for configuring retry cron job. Default is 1 min.
/// </summary> /// </summary>
public string CronExp { get; set; } = Cron.Minutely();
public string CronExp { get; set; }


/// <summary> /// <summary>
/// Productor job polling delay time. Default is 8 sec. /// Productor job polling delay time. Default is 8 sec.


+ 18
- 22
src/DotNetCore.CAP/IConsumerHandler.Default.cs View File

@@ -23,7 +23,7 @@ namespace DotNetCore.CAP
private readonly CapOptions _options; private readonly CapOptions _options;
private readonly CancellationTokenSource _cts; private readonly CancellationTokenSource _cts;


public event EventHandler<CapMessage> MessageReceieved;
public event EventHandler<MessageContext> MessageReceieved;


private Task _compositeTask; private Task _compositeTask;
private bool _disposed; private bool _disposed;
@@ -46,15 +46,14 @@ namespace DotNetCore.CAP
_cts = new CancellationTokenSource(); _cts = new CancellationTokenSource();
} }


protected virtual void OnMessageReceieved(CapMessage message)
protected virtual void OnMessageReceieved(MessageContext message)
{ {
MessageReceieved?.Invoke(this, message); MessageReceieved?.Invoke(this, message);
} }


public void Start() public void Start()
{ {
var matchs = _selector.GetCandidatesMethods(_serviceProvider);
var groupingMatchs = matchs.GroupBy(x => x.Value.Attribute.Group);
var groupingMatchs = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider);


foreach (var matchGroup in groupingMatchs) foreach (var matchGroup in groupingMatchs)
{ {
@@ -64,9 +63,9 @@ namespace DotNetCore.CAP
{ {
client.MessageReceieved += OnMessageReceieved; client.MessageReceieved += OnMessageReceieved;


foreach (var item in matchGroup)
foreach (var item in matchGroup.Value)
{ {
client.Subscribe(item.Key);
client.Subscribe(item.Attribute.Name);
} }


client.Listening(TimeSpan.FromSeconds(1)); client.Listening(TimeSpan.FromSeconds(1));
@@ -76,7 +75,7 @@ namespace DotNetCore.CAP
_compositeTask = Task.CompletedTask; _compositeTask = Task.CompletedTask;
} }


public virtual void OnMessageReceieved(object sender, MessageBase message)
public virtual void OnMessageReceieved(object sender, MessageContext message)
{ {
_logger.EnqueuingReceivedMessage(message.KeyName, message.Content); _logger.EnqueuingReceivedMessage(message.KeyName, message.Content);


@@ -88,27 +87,24 @@ namespace DotNetCore.CAP
var capMessage = new CapReceivedMessage(message) var capMessage = new CapReceivedMessage(message)
{ {
StatusName = StatusName.Enqueued, StatusName = StatusName.Enqueued,
Added = DateTime.Now
}; };
messageStore.StoreReceivedMessageAsync(capMessage).Wait(); messageStore.StoreReceivedMessageAsync(capMessage).Wait();

ConsumerExecutorDescriptor executeDescriptor = null;

try try
{ {
executeDescriptor = _selector.GetTopicExector(message.KeyName);

var consumerContext = new ConsumerContext(executeDescriptor, message);

var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext);

invoker.InvokeAsync();

messageStore.ChangeReceivedMessageStateAsync(capMessage, StatusName.Succeeded).Wait();
var executeDescriptorGroup = _selector.GetTopicExector(message.KeyName);
if (executeDescriptorGroup.ContainsKey(message.Group))
{
messageStore.ChangeReceivedMessageStateAsync(capMessage, StatusName.Processing).Wait();
// If there are multiple consumers in the same group, we will take the first
var executeDescriptor = executeDescriptorGroup[message.Group][0];
var consumerContext = new ConsumerContext(executeDescriptor, message);
_consumerInvokerFactory.CreateInvoker(consumerContext).InvokeAsync();
messageStore.ChangeReceivedMessageStateAsync(capMessage, StatusName.Succeeded).Wait();
}
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.ConsumerMethodExecutingFailed(executeDescriptor?.MethodInfo.Name, ex);
_logger.ConsumerMethodExecutingFailed($"Group:{message.Group}, Topic:{message.KeyName}", ex);
} }
} }
} }
@@ -126,7 +122,7 @@ namespace DotNetCore.CAP


try try
{ {
_compositeTask.Wait((int) TimeSpan.FromSeconds(60).TotalMilliseconds);
_compositeTask.Wait((int)TimeSpan.FromSeconds(60).TotalMilliseconds);
} }
catch (AggregateException ex) catch (AggregateException ex)
{ {


+ 13
- 14
src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs View File

@@ -9,11 +9,10 @@ namespace DotNetCore.CAP.Internal
{ {
public class DefaultConsumerInvoker : IConsumerInvoker public class DefaultConsumerInvoker : IConsumerInvoker
{ {
protected readonly ILogger Logger;
protected readonly IServiceProvider ServiceProvider;
protected readonly ConsumerContext ConsumerContext;

private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
private readonly IModelBinder _modelBinder; private readonly IModelBinder _modelBinder;
private readonly ConsumerContext _consumerContext;
private readonly ObjectMethodExecutor _executor; private readonly ObjectMethodExecutor _executor;


public DefaultConsumerInvoker(ILogger logger, public DefaultConsumerInvoker(ILogger logger,
@@ -22,24 +21,24 @@ namespace DotNetCore.CAP.Internal
ConsumerContext consumerContext) ConsumerContext consumerContext)
{ {
_modelBinder = modelBinder; _modelBinder = modelBinder;
_executor = ObjectMethodExecutor.Create(ConsumerContext.ConsumerDescriptor.MethodInfo,
ConsumerContext.ConsumerDescriptor.ImplTypeInfo);
_serviceProvider = serviceProvider;
_logger = logger ?? throw new ArgumentNullException(nameof(logger));


Logger = logger ?? throw new ArgumentNullException(nameof(logger));
ServiceProvider = serviceProvider;
ConsumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext));
_consumerContext = consumerContext ?? throw new ArgumentNullException(nameof(consumerContext));
_executor = ObjectMethodExecutor.Create(_consumerContext.ConsumerDescriptor.MethodInfo,
_consumerContext.ConsumerDescriptor.ImplTypeInfo);
} }


public Task InvokeAsync() public Task InvokeAsync()
{ {
using (Logger.BeginScope("consumer invoker begin"))
using (_logger.BeginScope("consumer invoker begin"))
{ {
Logger.LogDebug("Executing consumer Topic: {0}", ConsumerContext.ConsumerDescriptor.MethodInfo.Name);
_logger.LogDebug("Executing consumer Topic: {0}", _consumerContext.ConsumerDescriptor.MethodInfo.Name);


var obj = ActivatorUtilities.GetServiceOrCreateInstance(ServiceProvider,
ConsumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());
var obj = ActivatorUtilities.GetServiceOrCreateInstance(_serviceProvider,
_consumerContext.ConsumerDescriptor.ImplTypeInfo.AsType());


var value = ConsumerContext.DeliverMessage.Content;
var value = _consumerContext.DeliverMessage.Content;


if (_executor.MethodParameters.Length > 0) if (_executor.MethodParameters.Length > 0)
{ {


+ 29
- 8
src/DotNetCore.CAP/Internal/MethodMatcherCache.cs View File

@@ -1,5 +1,7 @@
using System; using System;
using System.Linq;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Collections.Generic;
using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Abstractions;


namespace DotNetCore.CAP.Internal namespace DotNetCore.CAP.Internal
@@ -8,35 +10,54 @@ namespace DotNetCore.CAP.Internal
{ {
private readonly IConsumerServiceSelector _selector; private readonly IConsumerServiceSelector _selector;


private ConcurrentDictionary<string, IList<ConsumerExecutorDescriptor>> Entries { get; }

public MethodMatcherCache(IConsumerServiceSelector selector) public MethodMatcherCache(IConsumerServiceSelector selector)
{ {
_selector = selector; _selector = selector;
Entries = new ConcurrentDictionary<string, IList<ConsumerExecutorDescriptor>>();
} }


public ConcurrentDictionary<string, ConsumerExecutorDescriptor> GetCandidatesMethods(IServiceProvider provider)
/// <summary>
/// Get a dictionary of candidates.In the dictionary,
/// the Key is the CAPSubscribeAttribute Group, the Value for the current Group of candidates
/// </summary>
/// <param name="provider"><see cref="IServiceProvider"/></param>
public ConcurrentDictionary<string, IList<ConsumerExecutorDescriptor>> GetCandidatesMethodsOfGroupNameGrouped(IServiceProvider provider)
{ {
if (Entries.Count != 0) return Entries; if (Entries.Count != 0) return Entries;


var executorCollection = _selector.SelectCandidates(provider); var executorCollection = _selector.SelectCandidates(provider);


foreach (var item in executorCollection)
var groupedCandidates = executorCollection.GroupBy(x => x.Attribute.Group);

foreach (var item in groupedCandidates)
{ {
Entries.GetOrAdd(item.Attribute.Name, item);
Entries.TryAdd(item.Key, item.ToList());
} }

return Entries; return Entries;
} }


public ConsumerExecutorDescriptor GetTopicExector(string topicName)
/// <summary>
/// Get a dictionary of specify topic candidates.
/// The Key is Group name, the value is specify topic candidates.
/// </summary>
/// <param name="topicName">message topic name</param>
public IDictionary<string, IList<ConsumerExecutorDescriptor>> GetTopicExector(string topicName)
{ {
if (Entries == null) if (Entries == null)
{ {
throw new ArgumentNullException(nameof(Entries)); throw new ArgumentNullException(nameof(Entries));
} }


return Entries[topicName];
var dic = new Dictionary<string, IList<ConsumerExecutorDescriptor>>();
foreach (var item in Entries)
{
var topicCandidates = item.Value.Where(x => x.Attribute.Name == topicName);
dic.Add(item.Key, topicCandidates.ToList());
}
return dic;
} }

public ConcurrentDictionary<string, ConsumerExecutorDescriptor> Entries { get; } =
new ConcurrentDictionary<string, ConsumerExecutorDescriptor>();
} }
} }

+ 8
- 9
src/DotNetCore.CAP/Job/IJob.CapJob.cs View File

@@ -32,26 +32,25 @@ namespace DotNetCore.CAP.Job


public async Task ExecuteAsync() public async Task ExecuteAsync()
{ {
var matchs = _selector.GetCandidatesMethods(_serviceProvider);
var groupedCandidates = _selector.GetCandidatesMethodsOfGroupNameGrouped(_serviceProvider);
using (var scope = _serviceProvider.CreateScope()) using (var scope = _serviceProvider.CreateScope())
{ {
var provider = scope.ServiceProvider; var provider = scope.ServiceProvider;
var messageStore = provider.GetService<ICapMessageStore>();


var messageStore = provider.GetService<ICapMessageStore>();
var nextReceivedMessage = await messageStore.GetNextReceivedMessageToBeExcuted(); var nextReceivedMessage = await messageStore.GetNextReceivedMessageToBeExcuted();
if (nextReceivedMessage != null)
if (nextReceivedMessage != null && groupedCandidates.ContainsKey(nextReceivedMessage.Group))
{ {
try try
{ {
var executeDescriptor = matchs[nextReceivedMessage.KeyName];
var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage);
var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext);

await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing); await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Processing);

// If there are multiple consumers in the same group, we will take the first
var executeDescriptor = groupedCandidates[nextReceivedMessage.Group][0];
var consumerContext = new ConsumerContext(executeDescriptor, nextReceivedMessage.ToMessageContext());
var invoker = _consumerInvokerFactory.CreateInvoker(consumerContext);
await invoker.InvokeAsync(); await invoker.InvokeAsync();

await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Succeeded); await messageStore.ChangeReceivedMessageStateAsync(nextReceivedMessage, StatusName.Succeeded);

} }
catch (Exception ex) catch (Exception ex)
{ {


Loading…
Cancel
Save