diff --git a/CAP.sln b/CAP.sln index a1b6644..12bcbda 100644 --- a/CAP.sln +++ b/CAP.sln @@ -69,7 +69,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AmazonSQS.InMemory", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.NATS", "src\DotNetCore.CAP.NATS\DotNetCore.CAP.NATS.csproj", "{8B2FD3EA-E72B-4A82-B182-B87EC0C15D07}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.RabbitMQ.Postgres.DashboardAuth", "samples\Sample.RabbitMQ.Postgres.DashboardAuth\Sample.RabbitMQ.Postgres.DashboardAuth.csproj", "{54F6C206-2A23-4971-AE5A-FC47EB772452}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.Postgres.DashboardAuth", "samples\Sample.RabbitMQ.Postgres.DashboardAuth\Sample.RabbitMQ.Postgres.DashboardAuth.csproj", "{54F6C206-2A23-4971-AE5A-FC47EB772452}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Redis", "src\DotNetCore.CAP.Redis\DotNetCore.CAP.Redis.csproj", "{462B6245-46F6-42BF-8CE6-588ACAA53190}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Samples.Redis.SqlServer", "samples\Samples.Redis.SqlServer\Samples.Redis.SqlServer.csproj", "{375AF85D-8C81-47C6-BE5B-D0874D4971EA}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -165,6 +169,14 @@ Global {54F6C206-2A23-4971-AE5A-FC47EB772452}.Debug|Any CPU.Build.0 = Debug|Any CPU {54F6C206-2A23-4971-AE5A-FC47EB772452}.Release|Any CPU.ActiveCfg = Release|Any CPU {54F6C206-2A23-4971-AE5A-FC47EB772452}.Release|Any CPU.Build.0 = Release|Any CPU + {462B6245-46F6-42BF-8CE6-588ACAA53190}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {462B6245-46F6-42BF-8CE6-588ACAA53190}.Debug|Any CPU.Build.0 = Debug|Any CPU + {462B6245-46F6-42BF-8CE6-588ACAA53190}.Release|Any CPU.ActiveCfg = Release|Any CPU + {462B6245-46F6-42BF-8CE6-588ACAA53190}.Release|Any CPU.Build.0 = Release|Any CPU + {375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -192,6 +204,8 @@ Global {B187DD15-092D-4B72-9807-50856607D237} = {3A6B6931-A123-477A-9469-8B468B5385AF} {8B2FD3EA-E72B-4A82-B182-B87EC0C15D07} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {54F6C206-2A23-4971-AE5A-FC47EB772452} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {462B6245-46F6-42BF-8CE6-588ACAA53190} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} + {375AF85D-8C81-47C6-BE5B-D0874D4971EA} = {3A6B6931-A123-477A-9469-8B468B5385AF} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs b/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs new file mode 100644 index 0000000..6593a76 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs @@ -0,0 +1,32 @@ +using DotNetCore.CAP; +using DotNetCore.CAP.Internal; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Reflection; + +namespace DotNetCore.CAP.Redis +{ + public static class CapRedisOptionsExtensions + { + public static CapOptions UseRedis(this CapOptions options) => + options.UseRedis(_ => { }); + + public static CapOptions UseRedis(this CapOptions options, string connection) => + options.UseRedis(opt => opt.Configuration = ConfigurationOptions.Parse(connection)); + + + public static CapOptions UseRedis(this CapOptions options, Action configure) + { + if (configure is null) + throw new ArgumentNullException(nameof(configure)); + + options.RegisterExtension(new RedisOptionsExtension(configure)); + + return options; + } + } +} diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs b/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs new file mode 100644 index 0000000..e81e935 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs @@ -0,0 +1,38 @@ +using DotNetCore.CAP; +using DotNetCore.CAP.Internal; +using Microsoft.Extensions.Options; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + class CapRedisOptionsPostConfigure : IPostConfigureOptions + { + private readonly CapOptions capOptions; + + public CapRedisOptionsPostConfigure(IOptions options) + { + capOptions = options.Value; + } + + public void PostConfigure(string name, CapRedisOptions options) + { + var groupPrefix = string.IsNullOrWhiteSpace(capOptions.GroupNamePrefix) ? string.Empty : $"{capOptions.GroupNamePrefix}."; + + options.DefaultChannel = $"{groupPrefix}{capOptions.DefaultGroupName}"; + + options.Configuration ??= new ConfigurationOptions(); + + if (!options.Configuration.EndPoints.Any()) + { + options.Configuration.EndPoints.Add(IPAddress.Loopback, 0); + options.Configuration.SetDefaultPorts(); + } + } + } +} diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs b/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs new file mode 100644 index 0000000..e15c978 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs @@ -0,0 +1,22 @@ +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + public class CapRedisOptions + { + /// + /// Gets or sets the options of redis connections + /// + public ConfigurationOptions Configuration { get; set; } + + internal string Endpoint { get; set; } + + internal string DefaultChannel { get; set; } + } +} diff --git a/src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj b/src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj new file mode 100644 index 0000000..fb6bdaf --- /dev/null +++ b/src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj @@ -0,0 +1,22 @@ + + + + netstandard2.1 + DotNetCore.CAP.Redis + $(PackageTags);Redis + + + + bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.Redis.xml + 1701;1702;1705;CS1591 + + + + + + + + + + + diff --git a/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs b/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs new file mode 100644 index 0000000..502c7b8 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs @@ -0,0 +1,41 @@ +using DotNetCore.CAP.Redis; +using DotNetCore.CAP; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + class RedisOptionsExtension : ICapOptionsExtension + { + private readonly Action configure; + public RedisOptionsExtension(Action configure) + { + if (configure is null) + { + throw new ArgumentNullException(nameof(configure)); + } + + this.configure = configure; + } + + public void AddServices(IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + services.TryAddEnumerable(ServiceDescriptor.Singleton, CapRedisOptionsPostConfigure>()); + services.AddOptions().Configure(configure); + } + } +} diff --git a/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs b/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs new file mode 100644 index 0000000..56858bd --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs @@ -0,0 +1,87 @@ +using DotNetCore.CAP.Redis; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.Serialization; +using System.Text; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + class RedisConsumerClient : IConsumerClient + { + private readonly ILogger logger; + private readonly IRedisCacheManager redis; + private readonly CapRedisOptions options; + private readonly string groupId; + + public RedisConsumerClient( + string groubId, + IRedisCacheManager redis, + CapRedisOptions options, + ILogger logger + ) + { + this.groupId = groubId; + this.redis = redis; + this.options = options; + this.logger = logger; + } + + public event EventHandler OnMessageReceived; + + public event EventHandler OnLog; + + public BrokerAddress BrokerAddress => new BrokerAddress("redis", options.Endpoint); + + public void Subscribe(IEnumerable topics) + { + if (topics == null) throw new ArgumentNullException(nameof(topics)); + + redis.SubscribeAsync(groupId, topics, (channel, message) => + { + logger.LogDebug($"Redis message with name {message.GetName()} subscribed."); + + message.GroupId = groupId; + + OnMessageReceived?.Invoke(channel, message); + + return Task.CompletedTask; + }); + } + + public void Listening(TimeSpan timeout, CancellationToken cancellationToken) + { + cancellationToken.Register(async () => await redis.UnsubscribeAsync()); + + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + cancellationToken.WaitHandle.WaitOne(timeout); + } + } + + public void Commit(object sender) + { + // ignore + } + + public void Reject(object sender) + { + // ignore + } + + public void Dispose() + { + //ignore + } + + } +} diff --git a/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs b/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs new file mode 100644 index 0000000..319166b --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs @@ -0,0 +1,33 @@ +using DotNetCore.CAP.Redis; +using DotNetCore.CAP; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + class RedisConsumerClientFactory : IConsumerClientFactory + { + private readonly CapRedisOptions redisOptions; + private readonly IRedisCacheManager redis; + private readonly ILogger logger; + + public RedisConsumerClientFactory(IOptions redisOptions, IRedisCacheManager redis, ILogger logger) + { + this.redisOptions = redisOptions.Value; + this.redis = redis; + this.logger = logger; + } + + public IConsumerClient Create(string groupId) + { + return new RedisConsumerClient(groupId, redis, redisOptions, logger); + } + } +} diff --git a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Default.cs b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Default.cs new file mode 100644 index 0000000..9ea0434 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Default.cs @@ -0,0 +1,108 @@ +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + class RedisCacheManager : IRedisCacheManager + { + private readonly SemaphoreSlim connectionLock = new SemaphoreSlim(1, 1); + private readonly CapRedisOptions options; + private readonly ILogger logger; + private ConnectionMultiplexer redis; + private ISubscriber subscriber; + private bool disposed; + + public RedisCacheManager(IOptions options, ILogger logger) + { + this.options = options.Value; + this.logger = logger; + _ = ConnectAsync(); + } + + public async Task SubscribeAsync(string channelName, IEnumerable topics, Func callback) + { + var channel = await GetChannel(channelName); + channel.OnMessage(channelMessage => + { + var message = RedisMessage.Create(channelMessage.Message); + + if (topics.Any(c => c == message.GetName())) + { + callback?.Invoke(channelMessage.SubscriptionChannel, message); + } + }); + } + + public async Task PublishAsync(string channelName, RedisValue message) + { + await subscriber.PublishAsync(channelName, message); + } + public async Task UnsubscribeAsync() + { + await subscriber?.UnsubscribeAllAsync(); + } + + public void Dispose() + { + if (!disposed) + { + subscriber?.UnsubscribeAll(); + redis?.Close(); + disposed = true; + } + } + + + private async Task GetChannel(string channelName) + { + await ConnectAsync().ConfigureAwait(false); + return await subscriber.SubscribeAsync(channelName).ConfigureAwait(false); + } + + private async Task ConnectAsync() + { + if (disposed == true) + throw new ObjectDisposedException(nameof(IRedisCacheManager)); + + if (redis != null) + { + subscriber ??= redis.GetSubscriber(); + return; + } + else + { + try + { + await connectionLock.WaitAsync().ConfigureAwait(false); + + if (redis != null) + { + return; + } + + var redisLogger = new RedisCacheLogger(logger); + + redis = await ConnectionMultiplexer.ConnectAsync(options.Configuration, redisLogger) + .ConfigureAwait(false); + + redis.LogEvents(logger); + + subscriber = redis.GetSubscriber(); + } + finally + { + connectionLock.Release(); + } + } + } + + } +} diff --git a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Events.cs b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Events.cs new file mode 100644 index 0000000..507da02 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Events.cs @@ -0,0 +1,57 @@ +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + static class RedisConnectionExtensions + { + public static void LogEvents(this IConnectionMultiplexer connection,ILogger logger) + { + if (connection is null) + { + throw new ArgumentNullException(nameof(connection)); + } + + if (logger is null) + { + throw new ArgumentNullException(nameof(logger)); + } + + _ = new RedisCacheManagerEvents(connection, logger); + } + } + + class RedisCacheManagerEvents + { + private readonly ILogger logger; + + public RedisCacheManagerEvents(IConnectionMultiplexer connection, ILogger logger) + { + this.logger = logger; + connection.ErrorMessage += Connection_ErrorMessage; + connection.ConnectionRestored += Connection_ConnectionRestored; + connection.ConnectionFailed += Connection_ConnectionFailed; + } + + private void Connection_ConnectionFailed(object sender, ConnectionFailedEventArgs e) + { + logger.LogError(e.Exception, $"Connection failed!, {e.Exception.Message}, for endpoint:{e.EndPoint}, failure type:{e.FailureType}, connection type:{e.ConnectionType}"); + } + + private void Connection_ConnectionRestored(object sender, ConnectionFailedEventArgs e) + { + logger.LogWarning(e.Exception, $"Connection restored back!, {e.Exception.Message}, for endpoint:{e.EndPoint}, failure type:{e.FailureType}, connection type:{e.ConnectionType}"); + } + + private void Connection_ErrorMessage(object sender, RedisErrorEventArgs e) + { + logger.LogError($"Server replied with error, {e.Message}, for endpoint:{e.EndPoint}"); + } + } +} diff --git a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Logger.cs b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Logger.cs new file mode 100644 index 0000000..7eb613b --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Logger.cs @@ -0,0 +1,27 @@ +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + class RedisCacheLogger : TextWriter + { + private readonly ILogger logger; + + public RedisCacheLogger(ILogger logger) + { + this.logger = logger; + } + + + public override void WriteLine(string value) + { + logger.LogInformation(value); + } + public override Encoding Encoding => Encoding.UTF8; + } +} diff --git a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.cs b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.cs new file mode 100644 index 0000000..dddca4c --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.cs @@ -0,0 +1,20 @@ +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + interface IRedisCacheManager : IDisposable + { + Task SubscribeAsync(string channelName, IEnumerable topics, Func callback); + Task PublishAsync(string channelName, RedisValue message); + Task UnsubscribeAsync(); + } +} diff --git a/src/DotNetCore.CAP.Redis/ITransport.Redis.cs b/src/DotNetCore.CAP.Redis/ITransport.Redis.cs new file mode 100644 index 0000000..defcdd3 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/ITransport.Redis.cs @@ -0,0 +1,55 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using DotNetCore.CAP; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using StackExchange.Redis; + +namespace DotNetCore.CAP.Redis +{ + class RedisTransport : ITransport + { + private readonly IRedisCacheManager redis; + private readonly ILogger logger; + private readonly MethodMatcherCache selector; + private readonly CapRedisOptions options; + + public RedisTransport(IRedisCacheManager redis, MethodMatcherCache selector, IOptions options, ILogger logger) + { + this.redis = redis; + this.selector = selector; + this.options = options.Value; + this.logger = logger; + } + + public BrokerAddress BrokerAddress => new BrokerAddress("redis", options.Endpoint); + + public async Task SendAsync(TransportMessage message) + { + try + { + var redisMessage = new RedisMessage(message.Headers, message.Body).AsRedisValue(); + + var channelName = selector.GetGroupByTopic(message.GetName()) ?? options.DefaultChannel; + + await redis.PublishAsync(channelName, redisMessage); + + logger.LogDebug($"Redis message [{message.GetName()}] has been published."); + + return OperateResult.Success; + } + catch (Exception ex) + { + var wrapperEx = new PublisherSentFailedException(ex.Message, ex); + + return OperateResult.Failed(wrapperEx); + } + } + } +} diff --git a/src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs b/src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs new file mode 100644 index 0000000..349abf7 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs @@ -0,0 +1,22 @@ +using DotNetCore.CAP.Internal; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + static class MethodMatcherCacheExtensions + { + public static string GetGroupByTopic(this MethodMatcherCache source, string topicName) + { + var groupsMap = source.GetCandidatesMethodsOfGroupNameGrouped(); + + return (from groupMap in groupsMap + from topic in groupMap.Value + where topic.TopicName == topicName + select topic.Attribute.Group).FirstOrDefault(); + } + } +} diff --git a/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs b/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs new file mode 100644 index 0000000..44f3917 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs @@ -0,0 +1,53 @@ +using DotNetCore.CAP.Messages; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using Messages = DotNetCore.CAP.Messages; + +namespace DotNetCore.CAP.Redis +{ + class RedisMessage : TransportMessage + { + public RedisMessage(IDictionary headers, byte[] body) : base(headers, body) { } + + public string GroupId + { + get => Headers.TryGetValue(Messages.Headers.Group, out var value) ? value : default; + set => Headers.TryAdd(Messages.Headers.Group, value); + } + + internal RedisValue AsRedisValue() + { + return JsonSerializer.Serialize(new RedisMessageValue + { + Headers = Headers, + Body = Body + }, new JsonSerializerOptions(JsonSerializerDefaults.Web)); + } + + public static RedisMessage Create(RedisValue redisValue) + { + if (redisValue.IsNullOrEmpty) + return Empty(); + + var value = JsonSerializer.Deserialize(redisValue, new JsonSerializerOptions(JsonSerializerDefaults.Web)); + + return new RedisMessage(value.Headers, value.Body); + } + + public static RedisMessage Empty() + { + return new RedisMessage(new Dictionary(), Array.Empty()); + } + } + + class RedisMessageValue + { + public IDictionary Headers { get; set; } + public byte[] Body { get; set; } + } +}