@@ -209,7 +209,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
/// </summary> | |||
/// <param name="arn">Source ARN</param> | |||
/// <returns>Group prefix or null if group not present</returns> | |||
private static string GetArnGroupPrefix(string arn) | |||
private static string? GetArnGroupPrefix(string arn) | |||
{ | |||
const char separator = '-'; | |||
if (string.IsNullOrEmpty(arn) || !arn.Contains(separator)) | |||
@@ -235,7 +235,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
/// </summary> | |||
/// <param name="arn">Source ARN</param> | |||
/// <returns>Group name or null if group not present</returns> | |||
private static string GetGroupName(string arn) | |||
private static string? GetGroupName(string arn) | |||
{ | |||
const char separator = ':'; | |||
if (string.IsNullOrEmpty(arn) || !arn.Contains(separator)) | |||
@@ -27,8 +27,8 @@ namespace DotNetCore.CAP.AmazonSQS | |||
private readonly string _groupId; | |||
private readonly AmazonSQSOptions _amazonSQSOptions; | |||
private IAmazonSimpleNotificationService _snsClient; | |||
private IAmazonSQS _sqsClient; | |||
private IAmazonSimpleNotificationService? _snsClient; | |||
private IAmazonSQS? _sqsClient; | |||
private string _queueUrl = string.Empty; | |||
public AmazonSQSConsumerClient(string groupId, IOptions<AmazonSQSOptions> options) | |||
@@ -37,9 +37,9 @@ namespace DotNetCore.CAP.AmazonSQS | |||
_amazonSQSOptions = options.Value; | |||
} | |||
public event EventHandler<TransportMessage> OnMessageReceived; | |||
public event EventHandler<TransportMessage>? OnMessageReceived; | |||
public event EventHandler<LogMessageEventArgs> OnLog; | |||
public event EventHandler<LogMessageEventArgs>? OnLog; | |||
public BrokerAddress BrokerAddress => new BrokerAddress("AmazonSQS", _queueUrl); | |||
@@ -57,7 +57,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
{ | |||
var createTopicRequest = new CreateTopicRequest(topic.NormalizeForAws()); | |||
var createTopicResponse = _snsClient.CreateTopicAsync(createTopicRequest).GetAwaiter().GetResult(); | |||
var createTopicResponse = _snsClient!.CreateTopicAsync(createTopicRequest).GetAwaiter().GetResult(); | |||
topicArns.Add(createTopicResponse.TopicArn); | |||
} | |||
@@ -92,13 +92,13 @@ namespace DotNetCore.CAP.AmazonSQS | |||
while (true) | |||
{ | |||
var response = _sqsClient.ReceiveMessageAsync(request, cancellationToken).GetAwaiter().GetResult(); | |||
var response = _sqsClient!.ReceiveMessageAsync(request, cancellationToken).GetAwaiter().GetResult(); | |||
if (response.Messages.Count == 1) | |||
{ | |||
var messageObj = JsonSerializer.Deserialize<SQSReceivedMessage>(response.Messages[0].Body); | |||
var header = messageObj.MessageAttributes.ToDictionary(x => x.Key, x => x.Value.Value); | |||
var header = messageObj!.MessageAttributes.ToDictionary(x => x.Key, x => x.Value.Value); | |||
var body = messageObj.Message; | |||
var message = new TransportMessage(header, body != null ? Encoding.UTF8.GetBytes(body) : null); | |||
@@ -119,7 +119,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
{ | |||
try | |||
{ | |||
_ = _sqsClient.DeleteMessageAsync(_queueUrl, (string)sender).GetAwaiter().GetResult(); | |||
_ = _sqsClient!.DeleteMessageAsync(_queueUrl, (string)sender).GetAwaiter().GetResult(); | |||
} | |||
catch (InvalidIdFormatException ex) | |||
{ | |||
@@ -127,12 +127,12 @@ namespace DotNetCore.CAP.AmazonSQS | |||
} | |||
} | |||
public void Reject(object sender) | |||
public void Reject(object? sender) | |||
{ | |||
try | |||
{ | |||
// Visible again in 3 seconds | |||
_ = _sqsClient.ChangeMessageVisibilityAsync(_queueUrl, (string)sender, 3).GetAwaiter().GetResult(); | |||
_ = _sqsClient!.ChangeMessageVisibilityAsync(_queueUrl, (string)sender!, 3).GetAwaiter().GetResult(); | |||
} | |||
catch (MessageNotInflightException ex) | |||
{ | |||
@@ -237,7 +237,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
{ | |||
Connect(initSNS: false, initSQS: true); | |||
var queueAttributes = await _sqsClient.GetAttributesAsync(_queueUrl).ConfigureAwait(false); | |||
var queueAttributes = await _sqsClient!.GetAttributesAsync(_queueUrl).ConfigureAwait(false); | |||
var sqsQueueArn = queueAttributes["QueueArn"]; | |||
@@ -263,12 +263,12 @@ namespace DotNetCore.CAP.AmazonSQS | |||
private async Task SubscribeToTopics(IEnumerable<string> topics) | |||
{ | |||
var queueAttributes = await _sqsClient.GetAttributesAsync(_queueUrl).ConfigureAwait(false); | |||
var queueAttributes = await _sqsClient!.GetAttributesAsync(_queueUrl).ConfigureAwait(false); | |||
var sqsQueueArn = queueAttributes["QueueArn"]; | |||
foreach (var topicArn in topics) | |||
{ | |||
await _snsClient.SubscribeAsync(new SubscribeRequest | |||
await _snsClient!.SubscribeAsync(new SubscribeRequest | |||
{ | |||
TopicArn = topicArn, | |||
Protocol = "sqs", | |||
@@ -10,19 +10,19 @@ namespace DotNetCore.CAP | |||
// ReSharper disable once InconsistentNaming | |||
public class AmazonSQSOptions | |||
{ | |||
public RegionEndpoint Region { get; set; } | |||
public RegionEndpoint Region { get; set; } = default!; | |||
public AWSCredentials Credentials { get; set; } | |||
public AWSCredentials? Credentials { get; set; } | |||
/// <summary> | |||
/// Overrides Service Url deduced from AWS Region. To use in local development environments like localstack. | |||
/// </summary> | |||
public string SNSServiceUrl { get; set; } | |||
public string? SNSServiceUrl { get; set; } | |||
/// <summary> | |||
/// Overrides Service Url deduced from AWS Region. To use in local development environments like localstack. | |||
/// </summary> | |||
public string SQSServiceUrl { get; set; } | |||
public string? SQSServiceUrl { get; set; } | |||
} | |||
} |
@@ -2,12 +2,12 @@ | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.1</TargetFramework> | |||
<AssemblyName>DotNetCore.CAP.AmazonSQS</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);AmazonSQS;SQS</PackageTags> | |||
</PropertyGroup> | |||
<PropertyGroup> | |||
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.AmazonSQS.xml</DocumentationFile> | |||
<GenerateDocumentationFile>true</GenerateDocumentationFile> | |||
<NoWarn>1701;1702;1705;CS1591</NoWarn> | |||
</PropertyGroup> | |||
@@ -22,8 +22,8 @@ namespace DotNetCore.CAP.AmazonSQS | |||
private readonly ILogger _logger; | |||
private readonly IOptions<AmazonSQSOptions> _sqsOptions; | |||
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1); | |||
private IAmazonSimpleNotificationService _snsClient; | |||
private IDictionary<string, string> _topicArnMaps; | |||
private IAmazonSimpleNotificationService? _snsClient; | |||
private IDictionary<string, string>? _topicArnMaps; | |||
public AmazonSQSTransport(ILogger<AmazonSQSTransport> logger, IOptions<AmazonSQSOptions> sqsOptions) | |||
{ | |||
@@ -41,7 +41,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
if (TryGetOrCreateTopicArn(message.GetName().NormalizeForAws(), out var arn)) | |||
{ | |||
string bodyJson = null; | |||
string? bodyJson = null; | |||
if (message.Body != null) | |||
{ | |||
bodyJson = Encoding.UTF8.GetString(message.Body); | |||
@@ -59,7 +59,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
MessageAttributes = attributes | |||
}; | |||
await _snsClient.PublishAsync(request); | |||
await _snsClient!.PublishAsync(request); | |||
_logger.LogDebug($"SNS topic message [{message.GetName().NormalizeForAws()}] has been published."); | |||
return OperateResult.Success; | |||
@@ -117,7 +117,7 @@ namespace DotNetCore.CAP.AmazonSQS | |||
{ | |||
_topicArnMaps = new Dictionary<string, string>(); | |||
string nextToken = null; | |||
string? nextToken = null; | |||
do | |||
{ | |||
var topics = nextToken == null | |||
@@ -143,15 +143,15 @@ namespace DotNetCore.CAP.AmazonSQS | |||
} | |||
} | |||
private bool TryGetOrCreateTopicArn(string topicName, out string topicArn) | |||
private bool TryGetOrCreateTopicArn(string topicName,[System.Diagnostics.CodeAnalysis.NotNullWhen(true)] out string? topicArn) | |||
{ | |||
topicArn = null; | |||
if (_topicArnMaps.TryGetValue(topicName, out topicArn)) | |||
if (_topicArnMaps!.TryGetValue(topicName, out topicArn)) | |||
{ | |||
return true; | |||
} | |||
var response = _snsClient.CreateTopicAsync(topicName).GetAwaiter().GetResult(); | |||
var response = _snsClient!.CreateTopicAsync(topicName).GetAwaiter().GetResult(); | |||
if (string.IsNullOrEmpty(response.TopicArn)) | |||
{ | |||
@@ -4,15 +4,15 @@ namespace DotNetCore.CAP.AmazonSQS | |||
{ | |||
class SQSReceivedMessage | |||
{ | |||
public string Message { get; set; } | |||
public string? Message { get; set; } | |||
public Dictionary<string, SQSReceivedMessageAttributes> MessageAttributes { get; set; } | |||
public Dictionary<string, SQSReceivedMessageAttributes> MessageAttributes { get; set; } = default!; | |||
} | |||
class SQSReceivedMessageAttributes | |||
{ | |||
public string Type { get; set; } | |||
public string? Type { get; set; } | |||
public string Value { get; set; } | |||
public string? Value { get; set; } | |||
} | |||
} |