Browse Source

rename.

master
yangxiaodong 7 years ago
parent
commit
e9c4e926f6
1 changed files with 9 additions and 9 deletions
  1. +9
    -9
      src/Cap.Consistency/Consumer/ConsumerHandler.cs

+ 9
- 9
src/Cap.Consistency/Consumer/ConsumerHandler.cs View File

@@ -6,7 +6,6 @@ using System.Threading.Tasks;
using Cap.Consistency.Abstractions;
using Cap.Consistency.Infrastructure;
using Cap.Consistency.Internal;
using Cap.Consistency.Store;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

@@ -22,7 +21,7 @@ namespace Cap.Consistency.Consumer

private readonly MethodMatcherCache _selector;
private readonly ConsistencyOptions _options;
private readonly ConsistencyMessageManager _messageManager;
private readonly IConsistencyMessageStore _messageStore;
private readonly CancellationTokenSource _cts;

public event EventHandler<ConsistencyMessage> MessageReceieved;
@@ -36,7 +35,7 @@ namespace Cap.Consistency.Consumer
IConsumerInvokerFactory consumerInvokerFactory,
IConsumerClientFactory consumerClientFactory,
ILoggerFactory loggerFactory,
ConsistencyMessageManager messageManager,
IConsistencyMessageStore messageStore,
MethodMatcherCache selector,
IOptions<ConsistencyOptions> options) {
_selector = selector;
@@ -46,7 +45,7 @@ namespace Cap.Consistency.Consumer
_consumerInvokerFactory = consumerInvokerFactory;
_consumerClientFactory = consumerClientFactory;
_options = options.Value;
_messageManager = messageManager;
_messageStore = messageStore;
_cts = new CancellationTokenSource();
}

@@ -72,20 +71,21 @@ namespace Cap.Consistency.Consumer

client.Listening(TimeSpan.FromSeconds(1));
}
}, TaskCreationOptions.LongRunning);
}, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Current);
}
_compositeTask = Task.CompletedTask;
}

public virtual void OnMessageReceieved(object sender, DeliverMessage message) {
var consistencyMessage = new ConsistencyMessage() {
Id = message.MessageKey,
Payload = Encoding.UTF8.GetString(message.Body)
Topic = message.MessageKey,
Payload = "Reveived:" + Encoding.UTF8.GetString(message.Body),
Status = MessageStatus.Received
};

_logger.LogInformation("message receieved message topic name: " + consistencyMessage.Id);

_messageManager.CreateAsync(consistencyMessage).Wait();
_messageStore.CreateAsync(consistencyMessage, _cts.Token).Wait();

try {
var executeDescriptor = _selector.GetTopicExector(message.MessageKey);
@@ -96,7 +96,7 @@ namespace Cap.Consistency.Consumer

invoker.InvokeAsync();

_messageManager.UpdateAsync(consistencyMessage).Wait();
_messageStore.UpdateAsync(consistencyMessage, _cts.Token).Wait();
}
catch (Exception ex) {
_logger.LogError("exception raised when excute method : " + ex.Message);


Loading…
Cancel
Save