Przeglądaj źródła

Fix store received message statename bug.

master
yangxiaodong 7 lat temu
rodzic
commit
7ae7180b9e
3 zmienionych plików z 39 dodań i 11 usunięć
  1. +2
    -1
      src/DotNetCore.CAP/ICapProducerService.Default.cs
  2. +10
    -5
      src/DotNetCore.CAP/IConsumerHandler.Default.cs
  3. +27
    -5
      src/DotNetCore.CAP/LoggerExtensions.cs

+ 2
- 1
src/DotNetCore.CAP/ICapProducerService.Default.cs Wyświetl plik

@@ -48,11 +48,12 @@ namespace DotNetCore.CAP
Content = content
};

message.StateName = StateName.Enqueued;
await _store.StoreSentMessageAsync(message);

WaitHandleEx.PulseEvent.Set();

_logger.EnqueuingMessage(topic, content);
_logger.EnqueuingSentMessage(topic, content);
}
}
}

+ 10
- 5
src/DotNetCore.CAP/IConsumerHandler.Default.cs Wyświetl plik

@@ -84,15 +84,20 @@ namespace DotNetCore.CAP

public virtual void OnMessageReceieved(object sender, MessageBase message)
{
var capMessage = new CapReceivedMessage(message);

_logger.LogInformation("message receieved message topic name: " + capMessage.Id);
_logger.EnqueuingReceivedMessage(message.KeyName, message.Content);

var capMessage = new CapReceivedMessage(message)
{
StateName = StateName.Enqueued,
Added = DateTime.Now
};
_messageStore.StoreReceivedMessageAsync(capMessage).Wait();

ConsumerExecutorDescriptor executeDescriptor = null;

try
{
var executeDescriptor = _selector.GetTopicExector(message.KeyName);
executeDescriptor = _selector.GetTopicExector(message.KeyName);

var consumerContext = new ConsumerContext(executeDescriptor, message);

@@ -104,7 +109,7 @@ namespace DotNetCore.CAP
}
catch (Exception ex)
{
_logger.LogError("exception raised when excute method : " + ex.Message);
_logger.ConsumerMethodExecutingFailed(executeDescriptor.MethodInfo.Name, ex);
}
}



+ 27
- 5
src/DotNetCore.CAP/LoggerExtensions.cs Wyświetl plik

@@ -17,7 +17,9 @@ namespace DotNetCore.CAP
private static Action<ILogger, string, double, Exception> _cronJobExecuted;
private static Action<ILogger, string, Exception> _cronJobFailed;

private static Action<ILogger, string, string, Exception> _enqueuingMessage;
private static Action<ILogger, string, string, Exception> _enqueuingSentMessage;
private static Action<ILogger, string, string, Exception> _enqueuingReceivdeMessage;
private static Action<ILogger, string, Exception> _executingConsumerMethod;

static LoggerExtensions()
{
@@ -56,15 +58,35 @@ namespace DotNetCore.CAP
4,
"Cron job '{jobName}' failed to execute.");

_enqueuingMessage = LoggerMessage.Define<string, string>(
_enqueuingSentMessage = LoggerMessage.Define<string, string>(
LogLevel.Debug,
2,
"Enqueuing a topic to the store. NameKey: {NameKey}. Content: {Content}");
"Enqueuing a topic to the sent message store. NameKey: {NameKey}. Content: {Content}");

_enqueuingReceivdeMessage = LoggerMessage.Define<string, string>(
LogLevel.Debug,
2,
"Enqueuing a topic to the received message store. NameKey: {NameKey}. Content: {Content}");

_executingConsumerMethod = LoggerMessage.Define<string>(
LogLevel.Error,
5,
"Consumer method '{methodName}' failed to execute.");
}

public static void ConsumerMethodExecutingFailed(this ILogger logger, string methodName, Exception ex)
{
_executingConsumerMethod(logger, methodName, ex);
}

public static void EnqueuingReceivedMessage(this ILogger logger, string nameKey, string content)
{
_enqueuingReceivdeMessage(logger, nameKey, content, null);
}

public static void EnqueuingMessage(this ILogger logger, string nameKey, string content)
public static void EnqueuingSentMessage(this ILogger logger, string nameKey, string content)
{
_enqueuingMessage(logger, nameKey, content, null);
_enqueuingSentMessage(logger, nameKey, content, null);
}

public static void ServerStarting(this ILogger logger, int machineProcessorCount, int processorCount)


Ładowanie…
Anuluj
Zapisz