소스 검색

Add AWS SQS Support (#603)

master
Savorboard 4 년 전
committed by GitHub
부모
커밋
55079a3127
No known key found for this signature in database GPG 키 ID: 4AEE18F83AFDEB23
11개의 변경된 파일469개의 추가작업 그리고 0개의 파일을 삭제
  1. +7
    -0
      CAP.sln
  2. +1
    -0
      CAP.sln.DotSettings
  3. +166
    -0
      src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs
  4. +31
    -0
      src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs
  5. +17
    -0
      src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptions.cs
  6. +30
    -0
      src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptionsExtension.cs
  7. +30
    -0
      src/DotNetCore.CAP.AmazonSQS/CAP.Options.Extensions.cs
  8. +23
    -0
      src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj
  9. +125
    -0
      src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs
  10. +18
    -0
      src/DotNetCore.CAP.AmazonSQS/SQSReceivedMessage.cs
  11. +21
    -0
      src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs

+ 7
- 0
CAP.sln 파일 보기

@@ -65,6 +65,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Test", "test
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.ConsoleApp", "samples\Sample.ConsoleApp\Sample.ConsoleApp.csproj", "{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.AmazonSQS", "src\DotNetCore.CAP.AmazonSQS\DotNetCore.CAP.AmazonSQS.csproj", "{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -147,6 +149,10 @@ Global
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Release|Any CPU.Build.0 = Release|Any CPU
{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -171,6 +177,7 @@ Global
{93176BAE-914B-4BED-9DE3-01FFB4F27FC5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{75CC45E6-BF06-40F4-977D-10DCC05B2EFA} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{43475E00-51B7-443D-BC2D-FC21F9D8A0B4} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}


+ 1
- 0
CAP.sln.DotSettings 파일 보기

@@ -1,4 +1,5 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=DB/@EntryIndexedValue">DB</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=SNS/@EntryIndexedValue">SNS</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Mongo/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Postgre/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>

+ 166
- 0
src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs 파일 보기

@@ -0,0 +1,166 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using Amazon.SQS;
using Amazon.SQS.Model;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Headers = DotNetCore.CAP.Messages.Headers;

namespace DotNetCore.CAP.AmazonSQS
{
internal sealed class AmazonSQSConsumerClient : IConsumerClient
{
private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);

private readonly string _groupId;
private readonly AmazonSQSOptions _amazonSQSOptions;

private IAmazonSimpleNotificationService _snsClient;
private IAmazonSQS _sqsClient;
private string _queueUrl = string.Empty;

public AmazonSQSConsumerClient(string groupId, IOptions<AmazonSQSOptions> options)
{
_groupId = groupId;
_amazonSQSOptions = options.Value;
}

public event EventHandler<TransportMessage> OnMessageReceived;

public event EventHandler<LogMessageEventArgs> OnLog;

public BrokerAddress BrokerAddress => new BrokerAddress("AmazonSQS", _queueUrl);

public void Subscribe(IEnumerable<string> topics)
{
if (topics == null)
{
throw new ArgumentNullException(nameof(topics));
}

Connect(initSNS: true, initSQS: false);

var topicArns = new List<string>();
foreach (var topic in topics)
{
var createTopicRequest = new CreateTopicRequest(topic.NormalizeForAws());

var createTopicResponse = _snsClient.CreateTopicAsync(createTopicRequest).GetAwaiter().GetResult();

topicArns.Add(createTopicResponse.TopicArn);
}

Connect(initSNS: false, initSQS: true);

_snsClient.SubscribeQueueToTopicsAsync(topicArns, _sqsClient, _queueUrl)
.GetAwaiter().GetResult();
}

public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
Connect();

var request = new ReceiveMessageRequest(_queueUrl)
{
WaitTimeSeconds = 5,
MaxNumberOfMessages = 1
};

while (true)
{
var response = _sqsClient.ReceiveMessageAsync(request, cancellationToken).GetAwaiter().GetResult();

if (response.Messages.Count == 1)
{
var messageObj = JsonConvert.DeserializeObject<SQSReceivedMessage>(response.Messages[0].Body);

var header = messageObj.MessageAttributes.ToDictionary(x => x.Key, x => x.Value.Value);
var body = messageObj.Message;

var message = new TransportMessage(header, body != null ? Encoding.UTF8.GetBytes(body) : null);

message.Headers.Add(Headers.Group, _groupId);

OnMessageReceived?.Invoke(response.Messages[0].ReceiptHandle, message);
}
else
{
cancellationToken.ThrowIfCancellationRequested();
cancellationToken.WaitHandle.WaitOne(timeout);
}
}
}

public void Commit(object sender)
{
_sqsClient.DeleteMessageAsync(_queueUrl, (string)sender);
}

public void Reject(object sender)
{
_sqsClient.ChangeMessageVisibilityAsync(_queueUrl, (string)sender, 3000);
}

public void Dispose()
{
_sqsClient?.Dispose();
_snsClient?.Dispose();
}

public void Connect(bool initSNS = true, bool initSQS = true)
{
if (_snsClient != null && _sqsClient != null)
{
return;
}

if (_snsClient == null && initSNS)
{
ConnectionLock.Wait();

try
{
_snsClient = _amazonSQSOptions.Credentials != null
? new AmazonSimpleNotificationServiceClient(_amazonSQSOptions.Credentials, _amazonSQSOptions.Region)
: new AmazonSimpleNotificationServiceClient(_amazonSQSOptions.Region);
}
finally
{
ConnectionLock.Release();
}
}

if (_sqsClient == null && initSQS)
{
ConnectionLock.Wait();

try
{

_sqsClient = _amazonSQSOptions.Credentials != null
? new AmazonSQSClient(_amazonSQSOptions.Credentials, _amazonSQSOptions.Region)
: new AmazonSQSClient(_amazonSQSOptions.Region);

// If provide the name of an existing queue along with the exact names and values
// of all the queue's attributes, <code>CreateQueue</code> returns the queue URL for
// the existing queue.
_queueUrl = _sqsClient.CreateQueueAsync(_groupId.NormalizeForAws()).GetAwaiter().GetResult().QueueUrl;
}
finally
{
ConnectionLock.Release();
}
}
}
}
}

+ 31
- 0
src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs 파일 보기

@@ -0,0 +1,31 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.AmazonSQS
{
internal sealed class AmazonSQSConsumerClientFactory : IConsumerClientFactory
{
private readonly IOptions<AmazonSQSOptions> _amazonSQSOptions;

public AmazonSQSConsumerClientFactory(IOptions<AmazonSQSOptions> amazonSQSOptions)
{
_amazonSQSOptions = amazonSQSOptions;
}

public IConsumerClient Create(string groupId)
{
try
{
var client = new AmazonSQSConsumerClient(groupId, _amazonSQSOptions);
return client;
}
catch (System.Exception e)
{
throw new BrokerConnectionException(e);
}
}
}
}

+ 17
- 0
src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptions.cs 파일 보기

@@ -0,0 +1,17 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Amazon;
using Amazon.Runtime;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
// ReSharper disable once InconsistentNaming
public class AmazonSQSOptions
{
public RegionEndpoint Region { get; set; }

public AWSCredentials Credentials { get; set; }
}
}

+ 30
- 0
src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptionsExtension.cs 파일 보기

@@ -0,0 +1,30 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.AmazonSQS;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.DependencyInjection;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
internal sealed class AmazonSQSOptionsExtension : ICapOptionsExtension
{
private readonly Action<AmazonSQSOptions> _configure;

public AmazonSQSOptionsExtension(Action<AmazonSQSOptions> configure)
{
_configure = configure;
}

public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapMessageQueueMakerService>();
services.Configure(_configure);
services.AddSingleton<ITransport, AmazonSQSTransport>();
services.AddSingleton<IConsumerClientFactory, AmazonSQSConsumerClientFactory>();
}
}
}

+ 30
- 0
src/DotNetCore.CAP.AmazonSQS/CAP.Options.Extensions.cs 파일 보기

@@ -0,0 +1,30 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Amazon;
using DotNetCore.CAP;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
public static CapOptions UseAmazonSQS(this CapOptions options, RegionEndpoint region)
{
return options.UseAmazonSQS(opt => { opt.Region = region; });
}

public static CapOptions UseAmazonSQS(this CapOptions options, Action<AmazonSQSOptions> configure)
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}

options.RegisterExtension(new AmazonSQSOptionsExtension(configure));

return options;
}
}
}

+ 23
- 0
src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj 파일 보기

@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.AmazonSQS</AssemblyName>
<PackageTags>$(PackageTags);AmazonSQS;SQS</PackageTags>
</PropertyGroup>
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.AmazonSQS.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="AWSSDK.SimpleNotificationService" Version="3.3.101.182" />
<PackageReference Include="AWSSDK.SQS" Version="3.3.102.125" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

</Project>

+ 125
- 0
src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs 파일 보기

@@ -0,0 +1,125 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.AmazonSQS
{
internal sealed class AmazonSQSTransport : ITransport
{
private readonly ILogger _logger;
private readonly IOptions<AmazonSQSOptions> _sqsOptions;
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
private IAmazonSimpleNotificationService _snsClient;
private IDictionary<string, string> _topicArnMaps;

public AmazonSQSTransport(ILogger<AmazonSQSTransport> logger, IOptions<AmazonSQSOptions> sqsOptions)
{
_logger = logger;
_sqsOptions = sqsOptions;
}

public BrokerAddress BrokerAddress => new BrokerAddress("RabbitMQ", string.Empty);

public async Task<OperateResult> SendAsync(TransportMessage message)
{
try
{
await TryAddTopicArns();

if (_topicArnMaps.TryGetValue(message.GetName().NormalizeForAws(), out var arn))
{
string bodyJson = null;
if (message.Body != null)
{
bodyJson = Encoding.UTF8.GetString(message.Body);
}

var attributes = message.Headers.Where(x => x.Value != null).ToDictionary(x => x.Key,
x => new MessageAttributeValue
{
StringValue = x.Value,
DataType = "String"
});
var request = new PublishRequest(arn, bodyJson)
{
MessageAttributes = attributes
};

await _snsClient.PublishAsync(request);

_logger.LogDebug($"SNS topic message [{message.GetName().NormalizeForAws()}] has been published.");
}
else
{
_logger.LogWarning($"Can't be found SNS topics for [{message.GetName().NormalizeForAws()}]");
}
return OperateResult.Success;
}
catch (Exception ex)
{
var wrapperEx = new PublisherSentFailedException(ex.Message, ex);
var errors = new OperateError
{
Code = ex.HResult.ToString(),
Description = ex.Message
};

return OperateResult.Failed(wrapperEx, errors);
}
}

public async Task<bool> TryAddTopicArns()
{
if (_topicArnMaps != null)
{
return true;
}

await _semaphore.WaitAsync();

try
{
_snsClient = _sqsOptions.Value.Credentials != null
? new AmazonSimpleNotificationServiceClient(_sqsOptions.Value.Credentials, _sqsOptions.Value.Region)
: new AmazonSimpleNotificationServiceClient(_sqsOptions.Value.Region);

if (_topicArnMaps == null)
{
_topicArnMaps = new Dictionary<string, string>();
var topics = await _snsClient.ListTopicsAsync();
topics.Topics.ForEach(x =>
{
var name = x.TopicArn.Split(':').Last();
_topicArnMaps.Add(name, x.TopicArn);
});

return true;
}
}
catch (Exception e)
{
_logger.LogError(e, "Init topics from aws sns error!");
}
finally
{
_semaphore.Release();
}

return false;
}
}
}

+ 18
- 0
src/DotNetCore.CAP.AmazonSQS/SQSReceivedMessage.cs 파일 보기

@@ -0,0 +1,18 @@
using System.Collections.Generic;

namespace DotNetCore.CAP.AmazonSQS
{
class SQSReceivedMessage
{
public string Message { get; set; }

public Dictionary<string, SQSReceivedMessageAttributes> MessageAttributes { get; set; }
}

class SQSReceivedMessageAttributes
{
public string Type { get; set; }

public string Value { get; set; }
}
}

+ 21
- 0
src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs 파일 보기

@@ -0,0 +1,21 @@
using System;

namespace DotNetCore.CAP.AmazonSQS
{
public static class TopicNormalizer
{
public static string NormalizeForAws(this string origin)
{
if (origin.Length > 256)
{
throw new ArgumentOutOfRangeException(nameof(origin) + " character string length must between 1~256!");
}
return origin.Replace(".", "-").Replace(":", "_");
}

public static string DeNormalizeForAws(this string origin)
{
return origin.Replace("-", ".").Replace("_", ":");
}
}
}

불러오는 중...
취소
저장