From daf4efbf50b708c91f054297f9fd396d08f3b4c4 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Mon, 20 Jul 2020 23:21:13 +0800 Subject: [PATCH] Fix amazon sqs reject message bug --- .../AmazonSQSConsumerClient.cs | 50 ++++++++++++++++++- .../TopicNormalizer.cs | 2 +- .../Internal/IConsumerRegister.Default.cs | 6 +++ src/DotNetCore.CAP/Transport/MqLogType.cs | 6 ++- 4 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs index 2bd6ee4..75b933a 100644 --- a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs +++ b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs @@ -6,6 +6,7 @@ using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; +using System.Threading.Tasks; using Amazon.SimpleNotificationService; using Amazon.SimpleNotificationService.Model; using Amazon.SQS; @@ -103,12 +104,27 @@ namespace DotNetCore.CAP.AmazonSQS public void Commit(object sender) { - _sqsClient.DeleteMessageAsync(_queueUrl, (string)sender); + try + { + _sqsClient.DeleteMessageAsync(_queueUrl, (string)sender); + } + catch (InvalidIdFormatException ex) + { + InvalidIdFormatLog(ex.Message); + } } public void Reject(object sender) { - _sqsClient.ChangeMessageVisibilityAsync(_queueUrl, (string)sender, 3000); + try + { + // Visible again in 3 seconds + _sqsClient.ChangeMessageVisibilityAsync(_queueUrl, (string)sender, 3); + } + catch (MessageNotInflightException ex) + { + MessageNotInflightLog(ex.Message); + } } public void Dispose() @@ -162,5 +178,35 @@ namespace DotNetCore.CAP.AmazonSQS } } } + + #region private methods + + private Task InvalidIdFormatLog(string exceptionMessage) + { + var logArgs = new LogMessageEventArgs + { + LogType = MqLogType.InvalidIdFormat, + Reason = exceptionMessage + }; + + OnLog?.Invoke(null, logArgs); + + return Task.CompletedTask; + } + + private Task MessageNotInflightLog(string exceptionMessage) + { + var logArgs = new LogMessageEventArgs + { + LogType = MqLogType.MessageNotInflight, + Reason = exceptionMessage + }; + + OnLog?.Invoke(null, logArgs); + + return Task.CompletedTask; + } + + #endregion } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs b/src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs index f08c481..622fc2e 100644 --- a/src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs +++ b/src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs @@ -2,7 +2,7 @@ namespace DotNetCore.CAP.AmazonSQS { - public static class TopicNormalizer + internal static class TopicNormalizer { public static string NormalizeForAws(this string origin) { diff --git a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs index 7e32a68..65cb999 100644 --- a/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs +++ b/src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs @@ -271,6 +271,12 @@ namespace DotNetCore.CAP.Internal case MqLogType.ExceptionReceived: _logger.LogError("AzureServiceBus subscriber received an error. --> " + logmsg.Reason); break; + case MqLogType.InvalidIdFormat: + _logger.LogError("AmazonSQS subscriber delete inflight message failed, invalid id. --> " + logmsg.Reason); + break; + case MqLogType.MessageNotInflight: + _logger.LogError("AmazonSQS subscriber change message's visibility failed, message isn't in flight. --> " + logmsg.Reason); + break; default: throw new ArgumentOutOfRangeException(); } diff --git a/src/DotNetCore.CAP/Transport/MqLogType.cs b/src/DotNetCore.CAP/Transport/MqLogType.cs index 3412b3a..659e004 100644 --- a/src/DotNetCore.CAP/Transport/MqLogType.cs +++ b/src/DotNetCore.CAP/Transport/MqLogType.cs @@ -18,7 +18,11 @@ namespace DotNetCore.CAP.Transport ServerConnError, //AzureServiceBus - ExceptionReceived + ExceptionReceived, + + //Amazon SQS + InvalidIdFormat, + MessageNotInflight } public class LogMessageEventArgs : EventArgs