diff --git a/CAP.sln b/CAP.sln
index b5be9a6..c494fab 100644
--- a/CAP.sln
+++ b/CAP.sln
@@ -65,6 +65,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Test", "test
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.ConsoleApp", "samples\Sample.ConsoleApp\Sample.ConsoleApp.csproj", "{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.AmazonSQS", "src\DotNetCore.CAP.AmazonSQS\DotNetCore.CAP.AmazonSQS.csproj", "{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -147,6 +149,10 @@ Global
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Release|Any CPU.Build.0 = Release|Any CPU
+ {43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {43475E00-51B7-443D-BC2D-FC21F9D8A0B4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -171,6 +177,7 @@ Global
{93176BAE-914B-4BED-9DE3-01FFB4F27FC5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{75CC45E6-BF06-40F4-977D-10DCC05B2EFA} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266} = {3A6B6931-A123-477A-9469-8B468B5385AF}
+ {43475E00-51B7-443D-BC2D-FC21F9D8A0B4} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}
diff --git a/CAP.sln.DotSettings b/CAP.sln.DotSettings
index 3eef960..486eace 100644
--- a/CAP.sln.DotSettings
+++ b/CAP.sln.DotSettings
@@ -1,4 +1,5 @@
DB
+ SNS
True
True
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs
new file mode 100644
index 0000000..2bd6ee4
--- /dev/null
+++ b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs
@@ -0,0 +1,166 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using Amazon.SimpleNotificationService;
+using Amazon.SimpleNotificationService.Model;
+using Amazon.SQS;
+using Amazon.SQS.Model;
+using DotNetCore.CAP.Messages;
+using DotNetCore.CAP.Transport;
+using Microsoft.Extensions.Options;
+using Newtonsoft.Json;
+using Headers = DotNetCore.CAP.Messages.Headers;
+
+namespace DotNetCore.CAP.AmazonSQS
+{
+ internal sealed class AmazonSQSConsumerClient : IConsumerClient
+ {
+ private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
+
+ private readonly string _groupId;
+ private readonly AmazonSQSOptions _amazonSQSOptions;
+
+ private IAmazonSimpleNotificationService _snsClient;
+ private IAmazonSQS _sqsClient;
+ private string _queueUrl = string.Empty;
+
+ public AmazonSQSConsumerClient(string groupId, IOptions options)
+ {
+ _groupId = groupId;
+ _amazonSQSOptions = options.Value;
+ }
+
+ public event EventHandler OnMessageReceived;
+
+ public event EventHandler OnLog;
+
+ public BrokerAddress BrokerAddress => new BrokerAddress("AmazonSQS", _queueUrl);
+
+ public void Subscribe(IEnumerable topics)
+ {
+ if (topics == null)
+ {
+ throw new ArgumentNullException(nameof(topics));
+ }
+
+ Connect(initSNS: true, initSQS: false);
+
+ var topicArns = new List();
+ foreach (var topic in topics)
+ {
+ var createTopicRequest = new CreateTopicRequest(topic.NormalizeForAws());
+
+ var createTopicResponse = _snsClient.CreateTopicAsync(createTopicRequest).GetAwaiter().GetResult();
+
+ topicArns.Add(createTopicResponse.TopicArn);
+ }
+
+ Connect(initSNS: false, initSQS: true);
+
+ _snsClient.SubscribeQueueToTopicsAsync(topicArns, _sqsClient, _queueUrl)
+ .GetAwaiter().GetResult();
+ }
+
+ public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
+ {
+ Connect();
+
+ var request = new ReceiveMessageRequest(_queueUrl)
+ {
+ WaitTimeSeconds = 5,
+ MaxNumberOfMessages = 1
+ };
+
+ while (true)
+ {
+ var response = _sqsClient.ReceiveMessageAsync(request, cancellationToken).GetAwaiter().GetResult();
+
+ if (response.Messages.Count == 1)
+ {
+ var messageObj = JsonConvert.DeserializeObject(response.Messages[0].Body);
+
+ var header = messageObj.MessageAttributes.ToDictionary(x => x.Key, x => x.Value.Value);
+ var body = messageObj.Message;
+
+ var message = new TransportMessage(header, body != null ? Encoding.UTF8.GetBytes(body) : null);
+
+ message.Headers.Add(Headers.Group, _groupId);
+
+ OnMessageReceived?.Invoke(response.Messages[0].ReceiptHandle, message);
+ }
+ else
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ cancellationToken.WaitHandle.WaitOne(timeout);
+ }
+ }
+ }
+
+ public void Commit(object sender)
+ {
+ _sqsClient.DeleteMessageAsync(_queueUrl, (string)sender);
+ }
+
+ public void Reject(object sender)
+ {
+ _sqsClient.ChangeMessageVisibilityAsync(_queueUrl, (string)sender, 3000);
+ }
+
+ public void Dispose()
+ {
+ _sqsClient?.Dispose();
+ _snsClient?.Dispose();
+ }
+
+ public void Connect(bool initSNS = true, bool initSQS = true)
+ {
+ if (_snsClient != null && _sqsClient != null)
+ {
+ return;
+ }
+
+ if (_snsClient == null && initSNS)
+ {
+ ConnectionLock.Wait();
+
+ try
+ {
+ _snsClient = _amazonSQSOptions.Credentials != null
+ ? new AmazonSimpleNotificationServiceClient(_amazonSQSOptions.Credentials, _amazonSQSOptions.Region)
+ : new AmazonSimpleNotificationServiceClient(_amazonSQSOptions.Region);
+ }
+ finally
+ {
+ ConnectionLock.Release();
+ }
+ }
+
+ if (_sqsClient == null && initSQS)
+ {
+ ConnectionLock.Wait();
+
+ try
+ {
+
+ _sqsClient = _amazonSQSOptions.Credentials != null
+ ? new AmazonSQSClient(_amazonSQSOptions.Credentials, _amazonSQSOptions.Region)
+ : new AmazonSQSClient(_amazonSQSOptions.Region);
+
+ // If provide the name of an existing queue along with the exact names and values
+ // of all the queue's attributes, CreateQueue
returns the queue URL for
+ // the existing queue.
+ _queueUrl = _sqsClient.CreateQueueAsync(_groupId.NormalizeForAws()).GetAwaiter().GetResult().QueueUrl;
+ }
+ finally
+ {
+ ConnectionLock.Release();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs
new file mode 100644
index 0000000..7665991
--- /dev/null
+++ b/src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClientFactory.cs
@@ -0,0 +1,31 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using DotNetCore.CAP.Transport;
+using Microsoft.Extensions.Options;
+
+namespace DotNetCore.CAP.AmazonSQS
+{
+ internal sealed class AmazonSQSConsumerClientFactory : IConsumerClientFactory
+ {
+ private readonly IOptions _amazonSQSOptions;
+
+ public AmazonSQSConsumerClientFactory(IOptions amazonSQSOptions)
+ {
+ _amazonSQSOptions = amazonSQSOptions;
+ }
+
+ public IConsumerClient Create(string groupId)
+ {
+ try
+ {
+ var client = new AmazonSQSConsumerClient(groupId, _amazonSQSOptions);
+ return client;
+ }
+ catch (System.Exception e)
+ {
+ throw new BrokerConnectionException(e);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptions.cs b/src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptions.cs
new file mode 100644
index 0000000..00c751c
--- /dev/null
+++ b/src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptions.cs
@@ -0,0 +1,17 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using Amazon;
+using Amazon.Runtime;
+
+// ReSharper disable once CheckNamespace
+namespace DotNetCore.CAP
+{
+ // ReSharper disable once InconsistentNaming
+ public class AmazonSQSOptions
+ {
+ public RegionEndpoint Region { get; set; }
+
+ public AWSCredentials Credentials { get; set; }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptionsExtension.cs b/src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptionsExtension.cs
new file mode 100644
index 0000000..8f246fc
--- /dev/null
+++ b/src/DotNetCore.CAP.AmazonSQS/CAP.AmazonSQSOptionsExtension.cs
@@ -0,0 +1,30 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using DotNetCore.CAP.AmazonSQS;
+using DotNetCore.CAP.Transport;
+using Microsoft.Extensions.DependencyInjection;
+
+// ReSharper disable once CheckNamespace
+namespace DotNetCore.CAP
+{
+ internal sealed class AmazonSQSOptionsExtension : ICapOptionsExtension
+ {
+ private readonly Action _configure;
+
+ public AmazonSQSOptionsExtension(Action configure)
+ {
+ _configure = configure;
+ }
+
+ public void AddServices(IServiceCollection services)
+ {
+ services.AddSingleton();
+
+ services.Configure(_configure);
+ services.AddSingleton();
+ services.AddSingleton();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.AmazonSQS/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.AmazonSQS/CAP.Options.Extensions.cs
new file mode 100644
index 0000000..23ede03
--- /dev/null
+++ b/src/DotNetCore.CAP.AmazonSQS/CAP.Options.Extensions.cs
@@ -0,0 +1,30 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using Amazon;
+using DotNetCore.CAP;
+
+// ReSharper disable once CheckNamespace
+namespace Microsoft.Extensions.DependencyInjection
+{
+ public static class CapOptionsExtensions
+ {
+ public static CapOptions UseAmazonSQS(this CapOptions options, RegionEndpoint region)
+ {
+ return options.UseAmazonSQS(opt => { opt.Region = region; });
+ }
+
+ public static CapOptions UseAmazonSQS(this CapOptions options, Action configure)
+ {
+ if (configure == null)
+ {
+ throw new ArgumentNullException(nameof(configure));
+ }
+
+ options.RegisterExtension(new AmazonSQSOptionsExtension(configure));
+
+ return options;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj b/src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj
new file mode 100644
index 0000000..5a7116b
--- /dev/null
+++ b/src/DotNetCore.CAP.AmazonSQS/DotNetCore.CAP.AmazonSQS.csproj
@@ -0,0 +1,23 @@
+
+
+
+ netstandard2.0
+ DotNetCore.CAP.AmazonSQS
+ $(PackageTags);AmazonSQS;SQS
+
+
+
+ bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.AmazonSQS.xml
+ 1701;1702;1705;CS1591
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs b/src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs
new file mode 100644
index 0000000..2ab3bf6
--- /dev/null
+++ b/src/DotNetCore.CAP.AmazonSQS/ITransport.AmazonSQS.cs
@@ -0,0 +1,125 @@
+// Copyright (c) .NET Core Community. All rights reserved.
+// Licensed under the MIT License. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Amazon.SimpleNotificationService;
+using Amazon.SimpleNotificationService.Model;
+using DotNetCore.CAP.Internal;
+using DotNetCore.CAP.Messages;
+using DotNetCore.CAP.Transport;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+
+namespace DotNetCore.CAP.AmazonSQS
+{
+ internal sealed class AmazonSQSTransport : ITransport
+ {
+ private readonly ILogger _logger;
+ private readonly IOptions _sqsOptions;
+ private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
+ private IAmazonSimpleNotificationService _snsClient;
+ private IDictionary _topicArnMaps;
+
+ public AmazonSQSTransport(ILogger logger, IOptions sqsOptions)
+ {
+ _logger = logger;
+ _sqsOptions = sqsOptions;
+ }
+
+ public BrokerAddress BrokerAddress => new BrokerAddress("RabbitMQ", string.Empty);
+
+ public async Task SendAsync(TransportMessage message)
+ {
+ try
+ {
+ await TryAddTopicArns();
+
+ if (_topicArnMaps.TryGetValue(message.GetName().NormalizeForAws(), out var arn))
+ {
+ string bodyJson = null;
+ if (message.Body != null)
+ {
+ bodyJson = Encoding.UTF8.GetString(message.Body);
+ }
+
+ var attributes = message.Headers.Where(x => x.Value != null).ToDictionary(x => x.Key,
+ x => new MessageAttributeValue
+ {
+ StringValue = x.Value,
+ DataType = "String"
+ });
+
+ var request = new PublishRequest(arn, bodyJson)
+ {
+ MessageAttributes = attributes
+ };
+
+ await _snsClient.PublishAsync(request);
+
+ _logger.LogDebug($"SNS topic message [{message.GetName().NormalizeForAws()}] has been published.");
+ }
+ else
+ {
+ _logger.LogWarning($"Can't be found SNS topics for [{message.GetName().NormalizeForAws()}]");
+ }
+ return OperateResult.Success;
+ }
+ catch (Exception ex)
+ {
+ var wrapperEx = new PublisherSentFailedException(ex.Message, ex);
+ var errors = new OperateError
+ {
+ Code = ex.HResult.ToString(),
+ Description = ex.Message
+ };
+
+ return OperateResult.Failed(wrapperEx, errors);
+ }
+ }
+
+ public async Task TryAddTopicArns()
+ {
+ if (_topicArnMaps != null)
+ {
+ return true;
+ }
+
+ await _semaphore.WaitAsync();
+
+ try
+ {
+ _snsClient = _sqsOptions.Value.Credentials != null
+ ? new AmazonSimpleNotificationServiceClient(_sqsOptions.Value.Credentials, _sqsOptions.Value.Region)
+ : new AmazonSimpleNotificationServiceClient(_sqsOptions.Value.Region);
+
+ if (_topicArnMaps == null)
+ {
+ _topicArnMaps = new Dictionary();
+ var topics = await _snsClient.ListTopicsAsync();
+ topics.Topics.ForEach(x =>
+ {
+ var name = x.TopicArn.Split(':').Last();
+ _topicArnMaps.Add(name, x.TopicArn);
+ });
+
+ return true;
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.LogError(e, "Init topics from aws sns error!");
+ }
+ finally
+ {
+ _semaphore.Release();
+ }
+
+ return false;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.AmazonSQS/SQSReceivedMessage.cs b/src/DotNetCore.CAP.AmazonSQS/SQSReceivedMessage.cs
new file mode 100644
index 0000000..893b418
--- /dev/null
+++ b/src/DotNetCore.CAP.AmazonSQS/SQSReceivedMessage.cs
@@ -0,0 +1,18 @@
+using System.Collections.Generic;
+
+namespace DotNetCore.CAP.AmazonSQS
+{
+ class SQSReceivedMessage
+ {
+ public string Message { get; set; }
+
+ public Dictionary MessageAttributes { get; set; }
+ }
+
+ class SQSReceivedMessageAttributes
+ {
+ public string Type { get; set; }
+
+ public string Value { get; set; }
+ }
+}
diff --git a/src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs b/src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs
new file mode 100644
index 0000000..f6d0965
--- /dev/null
+++ b/src/DotNetCore.CAP.AmazonSQS/TopicNormalizer.cs
@@ -0,0 +1,21 @@
+using System;
+
+namespace DotNetCore.CAP.AmazonSQS
+{
+ public static class TopicNormalizer
+ {
+ public static string NormalizeForAws(this string origin)
+ {
+ if (origin.Length > 256)
+ {
+ throw new ArgumentOutOfRangeException(nameof(origin) + " character string length must between 1~256!");
+ }
+ return origin.Replace(".", "-").Replace(":", "_");
+ }
+
+ public static string DeNormalizeForAws(this string origin)
+ {
+ return origin.Replace("-", ".").Replace("_", ":");
+ }
+ }
+}