|
|
@@ -1,4 +1,5 @@ |
|
|
|
using System; |
|
|
|
using System.Linq; |
|
|
|
using System.Threading.Tasks; |
|
|
|
using DotNetCore.CAP.Abstractions; |
|
|
|
using DotNetCore.CAP.Models; |
|
|
@@ -28,22 +29,18 @@ namespace DotNetCore.CAP.Internal |
|
|
|
|
|
|
|
public async Task<OperateResult> ExecuteAsync(CapReceivedMessage receivedMessage) |
|
|
|
{ |
|
|
|
try |
|
|
|
if (!_selector.TryGetTopicExector(receivedMessage.Name, receivedMessage.Group, |
|
|
|
out var executor)) |
|
|
|
{ |
|
|
|
var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.Name); |
|
|
|
|
|
|
|
if (!executeDescriptorGroup.ContainsKey(receivedMessage.Group)) |
|
|
|
{ |
|
|
|
var error = $"Topic:{receivedMessage.Name}, can not be found subscriber method."; |
|
|
|
throw new SubscriberNotFoundException(error); |
|
|
|
} |
|
|
|
|
|
|
|
// If there are multiple consumers in the same group, we will take the first |
|
|
|
var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0]; |
|
|
|
var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext()); |
|
|
|
var error = "this message can not be found subscriber. Message:" + receivedMessage; |
|
|
|
error += "\r\n see: https://github.com/dotnetcore/CAP/issues/63"; |
|
|
|
throw new SubscriberNotFoundException(error); |
|
|
|
} |
|
|
|
|
|
|
|
var consumerContext = new ConsumerContext(executor, receivedMessage.ToMessageContext()); |
|
|
|
try |
|
|
|
{ |
|
|
|
var ret = await Invoker.InvokeAsync(consumerContext); |
|
|
|
|
|
|
|
if (!string.IsNullOrEmpty(ret.CallbackName)) |
|
|
|
await _callbackMessageSender.SendAsync(ret.MessageId, ret.CallbackName, ret.Result); |
|
|
|
|
|
|
|