From 4aeca71f4d55c3aa31593398e417bc6730e8d1b7 Mon Sep 17 00:00:00 2001 From: Andrii Labyntsev Date: Wed, 28 Jul 2021 05:24:25 +0300 Subject: [PATCH] Improvements to AWS provider (#962) * Create AWS topic if if does not exists * Compact SQS policy to wildcards for same group * Improve naming in ITransport.AmazonSQS Co-authored-by: Andrii Labyntsev --- .../AmazonPolicyExtensions.cs | 97 ++++++++++++++++--- .../AmazonSQSConsumerClient.cs | 20 +++- .../ITransport.AmazonSQS.cs | 31 ++++-- 3 files changed, 127 insertions(+), 21 deletions(-) diff --git a/src/DotNetCore.CAP.AmazonSQS/AmazonPolicyExtensions.cs b/src/DotNetCore.CAP.AmazonSQS/AmazonPolicyExtensions.cs index 0cfef22..d3af025 100644 --- a/src/DotNetCore.CAP.AmazonSQS/AmazonPolicyExtensions.cs +++ b/src/DotNetCore.CAP.AmazonSQS/AmazonPolicyExtensions.cs @@ -102,10 +102,10 @@ namespace DotNetCore.CAP.AmazonSQS /// "AWS": "*" /// }, /// "Action": "sqs:SendMessage", - /// "Resource": "arn:aws:sqs:us-east-1:MyQueue", + /// "Resource": "arn:aws:sqs:us-east-1:MyQueue-v1", /// "Condition": { /// "ArnLike": { - /// "aws:SourceArn": "arn:aws:sns:us-east-1:FirstTopic" + /// "aws:SourceArn": "arn:aws:sns:us-east-1:MyQueue-FirstTopic" /// } /// } /// }, @@ -115,13 +115,26 @@ namespace DotNetCore.CAP.AmazonSQS /// "AWS": "*" /// }, /// "Action": "sqs:SendMessage", - /// "Resource": "arn:aws:sqs:us-east-1:MyQueue", + /// "Resource": "arn:aws:sqs:us-east-1:MyQueue-v1", /// "Condition": { /// "ArnLike": { - /// "aws:SourceArn": "arn:aws:sns:us-east-1:SecondTopic" + /// "aws:SourceArn": "arn:aws:sns:us-east-1:MyQueue-SecondTopic" /// } /// } - /// }] + /// }, + /// { + /// "Effect": "Allow", + /// "Principal": { + /// "AWS": "*" + /// }, + /// "Action": "sqs:SendMessage", + /// "Resource": "arn:aws:sqs:us-east-1:MyQueue-v1", + /// "Condition": { + /// "ArnLike": { + /// "aws:SourceArn": "arn:aws:sns:us-east-1:MyQueue2-FirstTopic" + /// } + /// } + /// },] /// } /// /// into compacted single statement: @@ -135,13 +148,13 @@ namespace DotNetCore.CAP.AmazonSQS /// "AWS": "*" /// }, /// "Action": "sqs:SendMessage", - /// "Resource": "arn:aws:sqs:us-east-1:MyQueue", + /// "Resource": "arn:aws:sqs:us-east-1:MyQueue-v1", /// "Condition": { /// "ArnLike": { /// "aws:SourceArn": [ - /// "arn:aws:sns:us-east-1:FirstTopic", - /// "arn:aws:sns:us-east-1:SecondTopic" - /// ] + /// "arn:aws:sns:us-east-1:MyQueue-*", + /// "arn:aws:sns:us-east-1:MyQueue2-FirstTopic" + /// ] /// } /// } /// }] @@ -161,7 +174,12 @@ namespace DotNetCore.CAP.AmazonSQS .Where(s => s.Principals.All(r => string.Equals(r.Id, "*", StringComparison.OrdinalIgnoreCase))) .ToList(); - if (statementsToCompact.Count < 2) + var groupName = GetGroupName(sqsQueueArn); + if (groupName != null) + { + groupName = $":{groupName}-"; + } + if (statementsToCompact.Count < 2 && groupName == null) { return; } @@ -172,11 +190,66 @@ namespace DotNetCore.CAP.AmazonSQS policy.Statements.Remove(statement); foreach (var topicArn in statement.Conditions.SelectMany(c => c.Values)) { - topicArns.Add(topicArn); + topicArns.Add( + groupName != null && topicArn.Contains(groupName, StringComparison.InvariantCultureIgnoreCase) + ? $"{GetArnGroupPrefix(topicArn)}-*" + : topicArn); } } - policy.AddSqsPermissions(topicArns, sqsQueueArn); + policy.AddSqsPermissions(topicArns.OrderBy(a => a), sqsQueueArn); + } + + /// + /// Extract group prefix from ARN + /// For example for ARN: + /// arn:aws:sns:us-east-1:MyQueue-FirstTopic + /// group prefix will be extracted: + /// arn:aws:sns:us-east-1:MyQueue + /// + /// Source ARN + /// Group prefix or null if group not present + private static string GetArnGroupPrefix(string arn) + { + const char separator = '-'; + if (string.IsNullOrEmpty(arn) || !arn.Contains(separator)) + { + return null; + } + + var groupPaths = arn.Split(separator); + if (groupPaths.Length < 2) + { + return null; + } + + return string.Join(separator, groupPaths.Take(groupPaths.Length - 1)); + } + + /// + /// Extract group name from ARN + /// For example for ARN: + /// arn:aws:sns:us-east-1:MyQueue-FirstTopic + /// group name will be extracted: + /// MyQueue + /// + /// Source ARN + /// Group name or null if group not present + private static string GetGroupName(string arn) + { + const char separator = ':'; + if (string.IsNullOrEmpty(arn) || !arn.Contains(separator)) + { + return null; + } + + var name = arn.Split(separator).LastOrDefault(); + if(string.IsNullOrEmpty(name)) + { + return null; + } + + return GetArnGroupPrefix(name); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs index 9ad9beb..a6b6c5c 100644 --- a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs +++ b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs @@ -77,8 +77,7 @@ namespace DotNetCore.CAP.AmazonSQS Connect(); - _snsClient.SubscribeQueueToTopicsAsync(topics.ToList(), _sqsClient, _queueUrl) - .GetAwaiter().GetResult(); + SubscribeToTopics(topics).GetAwaiter().GetResult(); } public void Listening(TimeSpan timeout, CancellationToken cancellationToken) @@ -248,6 +247,23 @@ namespace DotNetCore.CAP.AmazonSQS var setAttributes = new Dictionary { { "Policy", policy.ToJson() } }; await _sqsClient.SetAttributesAsync(_queueUrl, setAttributes).ConfigureAwait(false); } + + private async Task SubscribeToTopics(IEnumerable topics) + { + var queueAttributes = await _sqsClient.GetAttributesAsync(_queueUrl).ConfigureAwait(false); + + var sqsQueueArn = queueAttributes["QueueArn"]; + foreach (var topicArn in topics) + { + await _snsClient.SubscribeAsync(new SubscribeRequest + { + TopicArn = topicArn, + Protocol = "sqs", + Endpoint = sqsQueueArn, + }) + .ConfigureAwait(false); + } + } #endregion } diff --git a/src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs b/src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs index 8923de3..c8499e9 100644 --- a/src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs +++ b/src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs @@ -37,9 +37,9 @@ namespace DotNetCore.CAP.AmazonSQS { try { - await TryAddTopicArns(); + await FetchExistingTopicArns(); - if (_topicArnMaps.TryGetValue(message.GetName().NormalizeForAws(), out var arn)) + if (TryGetOrCreateTopicArn(message.GetName().NormalizeForAws(), out var arn)) { string bodyJson = null; if (message.Body != null) @@ -89,11 +89,11 @@ namespace DotNetCore.CAP.AmazonSQS } } - public async Task TryAddTopicArns() + private async Task FetchExistingTopicArns() { if (_topicArnMaps != null) { - return true; + return; } await _semaphore.WaitAsync(); @@ -122,8 +122,6 @@ namespace DotNetCore.CAP.AmazonSQS nextToken = topics.NextToken; } while (!string.IsNullOrEmpty(nextToken)); - - return true; } } catch (Exception e) @@ -134,8 +132,27 @@ namespace DotNetCore.CAP.AmazonSQS { _semaphore.Release(); } + } + + private bool TryGetOrCreateTopicArn(string topicName, out string topicArn) + { + topicArn = null; + if (_topicArnMaps.TryGetValue(topicName, out topicArn)) + { + return true; + } + + var response = _snsClient.CreateTopicAsync(topicName).GetAwaiter().GetResult(); - return false; + if (string.IsNullOrEmpty(response.TopicArn)) + { + return false; + } + + topicArn = response.TopicArn; + + _topicArnMaps.Add(topicName, topicArn); + return true; } } } \ No newline at end of file