Преглед изворни кода

Improvements to AWS provider (#962)

* Create AWS topic if if does not exists

* Compact SQS policy to wildcards for same group

* Improve naming in ITransport.AmazonSQS

Co-authored-by: Andrii Labyntsev <lab.andrii@gmail.com>
master
Andrii Labyntsev пре 3 година
committed by GitHub
родитељ
комит
4aeca71f4d
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 измењених фајлова са 127 додато и 21 уклоњено
  1. +85
    -12
      src/DotNetCore.CAP.AmazonSQS/AmazonPolicyExtensions.cs
  2. +18
    -2
      src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs
  3. +24
    -7
      src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs

+ 85
- 12
src/DotNetCore.CAP.AmazonSQS/AmazonPolicyExtensions.cs Прегледај датотеку

@@ -102,10 +102,10 @@ namespace DotNetCore.CAP.AmazonSQS
/// "AWS": "*"
/// },
/// "Action": "sqs:SendMessage",
/// "Resource": "arn:aws:sqs:us-east-1:MyQueue",
/// "Resource": "arn:aws:sqs:us-east-1:MyQueue-v1",
/// "Condition": {
/// "ArnLike": {
/// "aws:SourceArn": "arn:aws:sns:us-east-1:FirstTopic"
/// "aws:SourceArn": "arn:aws:sns:us-east-1:MyQueue-FirstTopic"
/// }
/// }
/// },
@@ -115,13 +115,26 @@ namespace DotNetCore.CAP.AmazonSQS
/// "AWS": "*"
/// },
/// "Action": "sqs:SendMessage",
/// "Resource": "arn:aws:sqs:us-east-1:MyQueue",
/// "Resource": "arn:aws:sqs:us-east-1:MyQueue-v1",
/// "Condition": {
/// "ArnLike": {
/// "aws:SourceArn": "arn:aws:sns:us-east-1:SecondTopic"
/// "aws:SourceArn": "arn:aws:sns:us-east-1:MyQueue-SecondTopic"
/// }
/// }
/// }]
/// },
/// {
/// "Effect": "Allow",
/// "Principal": {
/// "AWS": "*"
/// },
/// "Action": "sqs:SendMessage",
/// "Resource": "arn:aws:sqs:us-east-1:MyQueue-v1",
/// "Condition": {
/// "ArnLike": {
/// "aws:SourceArn": "arn:aws:sns:us-east-1:MyQueue2-FirstTopic"
/// }
/// }
/// },]
/// }
/// </code>
/// into compacted single statement:
@@ -135,13 +148,13 @@ namespace DotNetCore.CAP.AmazonSQS
/// "AWS": "*"
/// },
/// "Action": "sqs:SendMessage",
/// "Resource": "arn:aws:sqs:us-east-1:MyQueue",
/// "Resource": "arn:aws:sqs:us-east-1:MyQueue-v1",
/// "Condition": {
/// "ArnLike": {
/// "aws:SourceArn": [
/// "arn:aws:sns:us-east-1:FirstTopic",
/// "arn:aws:sns:us-east-1:SecondTopic"
/// ]
/// "arn:aws:sns:us-east-1:MyQueue-*",
/// "arn:aws:sns:us-east-1:MyQueue2-FirstTopic"
/// ]
/// }
/// }
/// }]
@@ -161,7 +174,12 @@ namespace DotNetCore.CAP.AmazonSQS
.Where(s => s.Principals.All(r => string.Equals(r.Id, "*", StringComparison.OrdinalIgnoreCase)))
.ToList();

if (statementsToCompact.Count < 2)
var groupName = GetGroupName(sqsQueueArn);
if (groupName != null)
{
groupName = $":{groupName}-";
}
if (statementsToCompact.Count < 2 && groupName == null)
{
return;
}
@@ -172,11 +190,66 @@ namespace DotNetCore.CAP.AmazonSQS
policy.Statements.Remove(statement);
foreach (var topicArn in statement.Conditions.SelectMany(c => c.Values))
{
topicArns.Add(topicArn);
topicArns.Add(
groupName != null && topicArn.Contains(groupName, StringComparison.InvariantCultureIgnoreCase)
? $"{GetArnGroupPrefix(topicArn)}-*"
: topicArn);
}
}

policy.AddSqsPermissions(topicArns, sqsQueueArn);
policy.AddSqsPermissions(topicArns.OrderBy(a => a), sqsQueueArn);
}

/// <summary>
/// Extract group prefix from ARN
/// For example for ARN:
/// arn:aws:sns:us-east-1:MyQueue-FirstTopic
/// group prefix will be extracted:
/// arn:aws:sns:us-east-1:MyQueue
/// </summary>
/// <param name="arn">Source ARN</param>
/// <returns>Group prefix or null if group not present</returns>
private static string GetArnGroupPrefix(string arn)
{
const char separator = '-';
if (string.IsNullOrEmpty(arn) || !arn.Contains(separator))
{
return null;
}

var groupPaths = arn.Split(separator);
if (groupPaths.Length < 2)
{
return null;
}

return string.Join(separator, groupPaths.Take(groupPaths.Length - 1));
}
/// <summary>
/// Extract group name from ARN
/// For example for ARN:
/// arn:aws:sns:us-east-1:MyQueue-FirstTopic
/// group name will be extracted:
/// MyQueue
/// </summary>
/// <param name="arn">Source ARN</param>
/// <returns>Group name or null if group not present</returns>
private static string GetGroupName(string arn)
{
const char separator = ':';
if (string.IsNullOrEmpty(arn) || !arn.Contains(separator))
{
return null;
}

var name = arn.Split(separator).LastOrDefault();
if(string.IsNullOrEmpty(name))
{
return null;
}

return GetArnGroupPrefix(name);
}
}
}

+ 18
- 2
src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs Прегледај датотеку

@@ -77,8 +77,7 @@ namespace DotNetCore.CAP.AmazonSQS

Connect();

_snsClient.SubscribeQueueToTopicsAsync(topics.ToList(), _sqsClient, _queueUrl)
.GetAwaiter().GetResult();
SubscribeToTopics(topics).GetAwaiter().GetResult();
}

public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
@@ -248,6 +247,23 @@ namespace DotNetCore.CAP.AmazonSQS
var setAttributes = new Dictionary<string, string> { { "Policy", policy.ToJson() } };
await _sqsClient.SetAttributesAsync(_queueUrl, setAttributes).ConfigureAwait(false);
}
private async Task SubscribeToTopics(IEnumerable<string> topics)
{
var queueAttributes = await _sqsClient.GetAttributesAsync(_queueUrl).ConfigureAwait(false);

var sqsQueueArn = queueAttributes["QueueArn"];
foreach (var topicArn in topics)
{
await _snsClient.SubscribeAsync(new SubscribeRequest
{
TopicArn = topicArn,
Protocol = "sqs",
Endpoint = sqsQueueArn,
})
.ConfigureAwait(false);
}
}

#endregion
}

+ 24
- 7
src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs Прегледај датотеку

@@ -37,9 +37,9 @@ namespace DotNetCore.CAP.AmazonSQS
{
try
{
await TryAddTopicArns();
await FetchExistingTopicArns();

if (_topicArnMaps.TryGetValue(message.GetName().NormalizeForAws(), out var arn))
if (TryGetOrCreateTopicArn(message.GetName().NormalizeForAws(), out var arn))
{
string bodyJson = null;
if (message.Body != null)
@@ -89,11 +89,11 @@ namespace DotNetCore.CAP.AmazonSQS
}
}

public async Task<bool> TryAddTopicArns()
private async Task FetchExistingTopicArns()
{
if (_topicArnMaps != null)
{
return true;
return;
}

await _semaphore.WaitAsync();
@@ -122,8 +122,6 @@ namespace DotNetCore.CAP.AmazonSQS
nextToken = topics.NextToken;
}
while (!string.IsNullOrEmpty(nextToken));

return true;
}
}
catch (Exception e)
@@ -134,8 +132,27 @@ namespace DotNetCore.CAP.AmazonSQS
{
_semaphore.Release();
}
}
private bool TryGetOrCreateTopicArn(string topicName, out string topicArn)
{
topicArn = null;
if (_topicArnMaps.TryGetValue(topicName, out topicArn))
{
return true;
}

var response = _snsClient.CreateTopicAsync(topicName).GetAwaiter().GetResult();

return false;
if (string.IsNullOrEmpty(response.TopicArn))
{
return false;
}
topicArn = response.TopicArn;
_topicArnMaps.Add(topicName, topicArn);
return true;
}
}
}

Loading…
Откажи
Сачувај