Browse Source

Try to fix the consumer reconnection bug. (#966)

master
Savorboard 3 years ago
parent
commit
2f63b707fb
3 changed files with 14 additions and 14 deletions
  1. +12
    -3
      src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
  2. +0
    -11
      src/DotNetCore.CAP/Internal/LoggerExtensions.cs
  3. +2
    -0
      src/DotNetCore.CAP/Processor/IProcessor.TransportCheck.cs

+ 12
- 3
src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs View File

@@ -75,9 +75,18 @@ namespace DotNetCore.CAP.Internal
foreach (var matchGroup in groupingMatches)
{
ICollection<string> topics;
using (var client = _consumerClientFactory.Create(matchGroup.Key))
try
{
topics = client.FetchTopics(matchGroup.Value.Select(x => x.TopicName));
using (var client = _consumerClientFactory.Create(matchGroup.Key))
{
topics = client.FetchTopics(matchGroup.Value.Select(x => x.TopicName));
}
}
catch (BrokerConnectionException e)
{
_isHealthy = false;
_logger.LogError(e, e.Message);
return;
}

for (int i = 0; i < _options.ConsumerThreadCount; i++)
@@ -122,7 +131,7 @@ namespace DotNetCore.CAP.Internal
if (!IsHealthy() || force)
{
Pulse();
_cts = new CancellationTokenSource();
_isHealthy = true;



+ 0
- 11
src/DotNetCore.CAP/Internal/LoggerExtensions.cs View File

@@ -35,11 +35,6 @@ namespace DotNetCore.CAP.Internal
logger.LogWarning($"The {retries}th retrying send a message failed. message id: {messageId} ");
}

public static void MessageHasBeenSent(this ILogger logger, string name, string content)
{
logger.LogDebug($"Message published. name: {name}, content:{content}.");
}

public static void MessageReceived(this ILogger logger, string messageId, string name)
{
logger.LogDebug($"Received message. id:{messageId}, name: {name}");
@@ -74,11 +69,5 @@ namespace DotNetCore.CAP.Internal
{
logger.LogWarning(ex, $"Expected an OperationCanceledException, but found '{ex.Message}'.");
}

public static void ModelBinderFormattingException(this ILogger logger, string methodName, string parameterName,
string content, Exception ex)
{
logger.LogError(ex, $"When call subscribe method, a parameter format conversion exception occurs. MethodName:'{methodName}' ParameterName:'{parameterName}' Content:'{content}'.");
}
}
}

+ 2
- 0
src/DotNetCore.CAP/Processor/IProcessor.TransportCheck.cs View File

@@ -28,6 +28,8 @@ namespace DotNetCore.CAP.Processor
throw new ArgumentNullException(nameof(context));
}

context.ThrowIfStopping();

_logger.LogDebug("Transport connection checking...");

if (!_register.IsHealthy())


Loading…
Cancel
Save