From 05e0f53bb327e66f66f730ab29dff26f0de02f50 Mon Sep 17 00:00:00 2001 From: "mahmoud.samir" Date: Thu, 1 Apr 2021 00:42:51 +0300 Subject: [PATCH 1/6] add redis --- CAP.sln | 16 ++- .../CapOptions.Redis.Extensions.cs | 32 ++++++ .../CapOptions.Redis.PostConfigure.cs | 38 ++++++ src/DotNetCore.CAP.Redis/CapOptions.Redis.cs | 22 ++++ .../DotNetCore.CAP.Redis.csproj | 22 ++++ .../ICapOptionsExtension.Redis.cs | 41 +++++++ .../IConsumerClient.Redis.cs | 87 ++++++++++++++ .../IConsumerClientFactory.Redis.cs | 33 ++++++ .../IRedisCache.Manager.Default.cs | 108 ++++++++++++++++++ .../IRedisCache.Manager.Events.cs | 57 +++++++++ .../IRedisCache.Manager.Logger.cs | 27 +++++ .../IRedisCache.Manager.cs | 20 ++++ src/DotNetCore.CAP.Redis/ITransport.Redis.cs | 55 +++++++++ .../MethodMatcherCache.Extensions.cs | 22 ++++ .../TransportMessage.Redis.cs | 53 +++++++++ 15 files changed, 632 insertions(+), 1 deletion(-) create mode 100644 src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs create mode 100644 src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs create mode 100644 src/DotNetCore.CAP.Redis/CapOptions.Redis.cs create mode 100644 src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj create mode 100644 src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs create mode 100644 src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs create mode 100644 src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs create mode 100644 src/DotNetCore.CAP.Redis/IRedisCache.Manager.Default.cs create mode 100644 src/DotNetCore.CAP.Redis/IRedisCache.Manager.Events.cs create mode 100644 src/DotNetCore.CAP.Redis/IRedisCache.Manager.Logger.cs create mode 100644 src/DotNetCore.CAP.Redis/IRedisCache.Manager.cs create mode 100644 src/DotNetCore.CAP.Redis/ITransport.Redis.cs create mode 100644 src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs create mode 100644 src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs diff --git a/CAP.sln b/CAP.sln index a1b6644..12bcbda 100644 --- a/CAP.sln +++ b/CAP.sln @@ -69,7 +69,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AmazonSQS.InMemory", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.NATS", "src\DotNetCore.CAP.NATS\DotNetCore.CAP.NATS.csproj", "{8B2FD3EA-E72B-4A82-B182-B87EC0C15D07}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.RabbitMQ.Postgres.DashboardAuth", "samples\Sample.RabbitMQ.Postgres.DashboardAuth\Sample.RabbitMQ.Postgres.DashboardAuth.csproj", "{54F6C206-2A23-4971-AE5A-FC47EB772452}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.Postgres.DashboardAuth", "samples\Sample.RabbitMQ.Postgres.DashboardAuth\Sample.RabbitMQ.Postgres.DashboardAuth.csproj", "{54F6C206-2A23-4971-AE5A-FC47EB772452}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Redis", "src\DotNetCore.CAP.Redis\DotNetCore.CAP.Redis.csproj", "{462B6245-46F6-42BF-8CE6-588ACAA53190}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Samples.Redis.SqlServer", "samples\Samples.Redis.SqlServer\Samples.Redis.SqlServer.csproj", "{375AF85D-8C81-47C6-BE5B-D0874D4971EA}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -165,6 +169,14 @@ Global {54F6C206-2A23-4971-AE5A-FC47EB772452}.Debug|Any CPU.Build.0 = Debug|Any CPU {54F6C206-2A23-4971-AE5A-FC47EB772452}.Release|Any CPU.ActiveCfg = Release|Any CPU {54F6C206-2A23-4971-AE5A-FC47EB772452}.Release|Any CPU.Build.0 = Release|Any CPU + {462B6245-46F6-42BF-8CE6-588ACAA53190}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {462B6245-46F6-42BF-8CE6-588ACAA53190}.Debug|Any CPU.Build.0 = Debug|Any CPU + {462B6245-46F6-42BF-8CE6-588ACAA53190}.Release|Any CPU.ActiveCfg = Release|Any CPU + {462B6245-46F6-42BF-8CE6-588ACAA53190}.Release|Any CPU.Build.0 = Release|Any CPU + {375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -192,6 +204,8 @@ Global {B187DD15-092D-4B72-9807-50856607D237} = {3A6B6931-A123-477A-9469-8B468B5385AF} {8B2FD3EA-E72B-4A82-B182-B87EC0C15D07} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {54F6C206-2A23-4971-AE5A-FC47EB772452} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {462B6245-46F6-42BF-8CE6-588ACAA53190} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} + {375AF85D-8C81-47C6-BE5B-D0874D4971EA} = {3A6B6931-A123-477A-9469-8B468B5385AF} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs b/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs new file mode 100644 index 0000000..6593a76 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs @@ -0,0 +1,32 @@ +using DotNetCore.CAP; +using DotNetCore.CAP.Internal; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using System.Reflection; + +namespace DotNetCore.CAP.Redis +{ + public static class CapRedisOptionsExtensions + { + public static CapOptions UseRedis(this CapOptions options) => + options.UseRedis(_ => { }); + + public static CapOptions UseRedis(this CapOptions options, string connection) => + options.UseRedis(opt => opt.Configuration = ConfigurationOptions.Parse(connection)); + + + public static CapOptions UseRedis(this CapOptions options, Action configure) + { + if (configure is null) + throw new ArgumentNullException(nameof(configure)); + + options.RegisterExtension(new RedisOptionsExtension(configure)); + + return options; + } + } +} diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs b/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs new file mode 100644 index 0000000..e81e935 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs @@ -0,0 +1,38 @@ +using DotNetCore.CAP; +using DotNetCore.CAP.Internal; +using Microsoft.Extensions.Options; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + class CapRedisOptionsPostConfigure : IPostConfigureOptions + { + private readonly CapOptions capOptions; + + public CapRedisOptionsPostConfigure(IOptions options) + { + capOptions = options.Value; + } + + public void PostConfigure(string name, CapRedisOptions options) + { + var groupPrefix = string.IsNullOrWhiteSpace(capOptions.GroupNamePrefix) ? string.Empty : $"{capOptions.GroupNamePrefix}."; + + options.DefaultChannel = $"{groupPrefix}{capOptions.DefaultGroupName}"; + + options.Configuration ??= new ConfigurationOptions(); + + if (!options.Configuration.EndPoints.Any()) + { + options.Configuration.EndPoints.Add(IPAddress.Loopback, 0); + options.Configuration.SetDefaultPorts(); + } + } + } +} diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs b/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs new file mode 100644 index 0000000..e15c978 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs @@ -0,0 +1,22 @@ +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + public class CapRedisOptions + { + /// + /// Gets or sets the options of redis connections + /// + public ConfigurationOptions Configuration { get; set; } + + internal string Endpoint { get; set; } + + internal string DefaultChannel { get; set; } + } +} diff --git a/src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj b/src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj new file mode 100644 index 0000000..fb6bdaf --- /dev/null +++ b/src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj @@ -0,0 +1,22 @@ + + + + netstandard2.1 + DotNetCore.CAP.Redis + $(PackageTags);Redis + + + + bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.Redis.xml + 1701;1702;1705;CS1591 + + + + + + + + + + + diff --git a/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs b/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs new file mode 100644 index 0000000..502c7b8 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs @@ -0,0 +1,41 @@ +using DotNetCore.CAP.Redis; +using DotNetCore.CAP; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + class RedisOptionsExtension : ICapOptionsExtension + { + private readonly Action configure; + public RedisOptionsExtension(Action configure) + { + if (configure is null) + { + throw new ArgumentNullException(nameof(configure)); + } + + this.configure = configure; + } + + public void AddServices(IServiceCollection services) + { + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + services.TryAddEnumerable(ServiceDescriptor.Singleton, CapRedisOptionsPostConfigure>()); + services.AddOptions().Configure(configure); + } + } +} diff --git a/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs b/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs new file mode 100644 index 0000000..56858bd --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs @@ -0,0 +1,87 @@ +using DotNetCore.CAP.Redis; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.Serialization; +using System.Text; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + class RedisConsumerClient : IConsumerClient + { + private readonly ILogger logger; + private readonly IRedisCacheManager redis; + private readonly CapRedisOptions options; + private readonly string groupId; + + public RedisConsumerClient( + string groubId, + IRedisCacheManager redis, + CapRedisOptions options, + ILogger logger + ) + { + this.groupId = groubId; + this.redis = redis; + this.options = options; + this.logger = logger; + } + + public event EventHandler OnMessageReceived; + + public event EventHandler OnLog; + + public BrokerAddress BrokerAddress => new BrokerAddress("redis", options.Endpoint); + + public void Subscribe(IEnumerable topics) + { + if (topics == null) throw new ArgumentNullException(nameof(topics)); + + redis.SubscribeAsync(groupId, topics, (channel, message) => + { + logger.LogDebug($"Redis message with name {message.GetName()} subscribed."); + + message.GroupId = groupId; + + OnMessageReceived?.Invoke(channel, message); + + return Task.CompletedTask; + }); + } + + public void Listening(TimeSpan timeout, CancellationToken cancellationToken) + { + cancellationToken.Register(async () => await redis.UnsubscribeAsync()); + + while (true) + { + cancellationToken.ThrowIfCancellationRequested(); + cancellationToken.WaitHandle.WaitOne(timeout); + } + } + + public void Commit(object sender) + { + // ignore + } + + public void Reject(object sender) + { + // ignore + } + + public void Dispose() + { + //ignore + } + + } +} diff --git a/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs b/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs new file mode 100644 index 0000000..319166b --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs @@ -0,0 +1,33 @@ +using DotNetCore.CAP.Redis; +using DotNetCore.CAP; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + class RedisConsumerClientFactory : IConsumerClientFactory + { + private readonly CapRedisOptions redisOptions; + private readonly IRedisCacheManager redis; + private readonly ILogger logger; + + public RedisConsumerClientFactory(IOptions redisOptions, IRedisCacheManager redis, ILogger logger) + { + this.redisOptions = redisOptions.Value; + this.redis = redis; + this.logger = logger; + } + + public IConsumerClient Create(string groupId) + { + return new RedisConsumerClient(groupId, redis, redisOptions, logger); + } + } +} diff --git a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Default.cs b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Default.cs new file mode 100644 index 0000000..9ea0434 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Default.cs @@ -0,0 +1,108 @@ +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + class RedisCacheManager : IRedisCacheManager + { + private readonly SemaphoreSlim connectionLock = new SemaphoreSlim(1, 1); + private readonly CapRedisOptions options; + private readonly ILogger logger; + private ConnectionMultiplexer redis; + private ISubscriber subscriber; + private bool disposed; + + public RedisCacheManager(IOptions options, ILogger logger) + { + this.options = options.Value; + this.logger = logger; + _ = ConnectAsync(); + } + + public async Task SubscribeAsync(string channelName, IEnumerable topics, Func callback) + { + var channel = await GetChannel(channelName); + channel.OnMessage(channelMessage => + { + var message = RedisMessage.Create(channelMessage.Message); + + if (topics.Any(c => c == message.GetName())) + { + callback?.Invoke(channelMessage.SubscriptionChannel, message); + } + }); + } + + public async Task PublishAsync(string channelName, RedisValue message) + { + await subscriber.PublishAsync(channelName, message); + } + public async Task UnsubscribeAsync() + { + await subscriber?.UnsubscribeAllAsync(); + } + + public void Dispose() + { + if (!disposed) + { + subscriber?.UnsubscribeAll(); + redis?.Close(); + disposed = true; + } + } + + + private async Task GetChannel(string channelName) + { + await ConnectAsync().ConfigureAwait(false); + return await subscriber.SubscribeAsync(channelName).ConfigureAwait(false); + } + + private async Task ConnectAsync() + { + if (disposed == true) + throw new ObjectDisposedException(nameof(IRedisCacheManager)); + + if (redis != null) + { + subscriber ??= redis.GetSubscriber(); + return; + } + else + { + try + { + await connectionLock.WaitAsync().ConfigureAwait(false); + + if (redis != null) + { + return; + } + + var redisLogger = new RedisCacheLogger(logger); + + redis = await ConnectionMultiplexer.ConnectAsync(options.Configuration, redisLogger) + .ConfigureAwait(false); + + redis.LogEvents(logger); + + subscriber = redis.GetSubscriber(); + } + finally + { + connectionLock.Release(); + } + } + } + + } +} diff --git a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Events.cs b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Events.cs new file mode 100644 index 0000000..507da02 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Events.cs @@ -0,0 +1,57 @@ +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + static class RedisConnectionExtensions + { + public static void LogEvents(this IConnectionMultiplexer connection,ILogger logger) + { + if (connection is null) + { + throw new ArgumentNullException(nameof(connection)); + } + + if (logger is null) + { + throw new ArgumentNullException(nameof(logger)); + } + + _ = new RedisCacheManagerEvents(connection, logger); + } + } + + class RedisCacheManagerEvents + { + private readonly ILogger logger; + + public RedisCacheManagerEvents(IConnectionMultiplexer connection, ILogger logger) + { + this.logger = logger; + connection.ErrorMessage += Connection_ErrorMessage; + connection.ConnectionRestored += Connection_ConnectionRestored; + connection.ConnectionFailed += Connection_ConnectionFailed; + } + + private void Connection_ConnectionFailed(object sender, ConnectionFailedEventArgs e) + { + logger.LogError(e.Exception, $"Connection failed!, {e.Exception.Message}, for endpoint:{e.EndPoint}, failure type:{e.FailureType}, connection type:{e.ConnectionType}"); + } + + private void Connection_ConnectionRestored(object sender, ConnectionFailedEventArgs e) + { + logger.LogWarning(e.Exception, $"Connection restored back!, {e.Exception.Message}, for endpoint:{e.EndPoint}, failure type:{e.FailureType}, connection type:{e.ConnectionType}"); + } + + private void Connection_ErrorMessage(object sender, RedisErrorEventArgs e) + { + logger.LogError($"Server replied with error, {e.Message}, for endpoint:{e.EndPoint}"); + } + } +} diff --git a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Logger.cs b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Logger.cs new file mode 100644 index 0000000..7eb613b --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Logger.cs @@ -0,0 +1,27 @@ +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + class RedisCacheLogger : TextWriter + { + private readonly ILogger logger; + + public RedisCacheLogger(ILogger logger) + { + this.logger = logger; + } + + + public override void WriteLine(string value) + { + logger.LogInformation(value); + } + public override Encoding Encoding => Encoding.UTF8; + } +} diff --git a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.cs b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.cs new file mode 100644 index 0000000..dddca4c --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.cs @@ -0,0 +1,20 @@ +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + interface IRedisCacheManager : IDisposable + { + Task SubscribeAsync(string channelName, IEnumerable topics, Func callback); + Task PublishAsync(string channelName, RedisValue message); + Task UnsubscribeAsync(); + } +} diff --git a/src/DotNetCore.CAP.Redis/ITransport.Redis.cs b/src/DotNetCore.CAP.Redis/ITransport.Redis.cs new file mode 100644 index 0000000..defcdd3 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/ITransport.Redis.cs @@ -0,0 +1,55 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using DotNetCore.CAP; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using StackExchange.Redis; + +namespace DotNetCore.CAP.Redis +{ + class RedisTransport : ITransport + { + private readonly IRedisCacheManager redis; + private readonly ILogger logger; + private readonly MethodMatcherCache selector; + private readonly CapRedisOptions options; + + public RedisTransport(IRedisCacheManager redis, MethodMatcherCache selector, IOptions options, ILogger logger) + { + this.redis = redis; + this.selector = selector; + this.options = options.Value; + this.logger = logger; + } + + public BrokerAddress BrokerAddress => new BrokerAddress("redis", options.Endpoint); + + public async Task SendAsync(TransportMessage message) + { + try + { + var redisMessage = new RedisMessage(message.Headers, message.Body).AsRedisValue(); + + var channelName = selector.GetGroupByTopic(message.GetName()) ?? options.DefaultChannel; + + await redis.PublishAsync(channelName, redisMessage); + + logger.LogDebug($"Redis message [{message.GetName()}] has been published."); + + return OperateResult.Success; + } + catch (Exception ex) + { + var wrapperEx = new PublisherSentFailedException(ex.Message, ex); + + return OperateResult.Failed(wrapperEx); + } + } + } +} diff --git a/src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs b/src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs new file mode 100644 index 0000000..349abf7 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs @@ -0,0 +1,22 @@ +using DotNetCore.CAP.Internal; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + static class MethodMatcherCacheExtensions + { + public static string GetGroupByTopic(this MethodMatcherCache source, string topicName) + { + var groupsMap = source.GetCandidatesMethodsOfGroupNameGrouped(); + + return (from groupMap in groupsMap + from topic in groupMap.Value + where topic.TopicName == topicName + select topic.Attribute.Group).FirstOrDefault(); + } + } +} diff --git a/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs b/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs new file mode 100644 index 0000000..44f3917 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs @@ -0,0 +1,53 @@ +using DotNetCore.CAP.Messages; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Threading.Tasks; +using Messages = DotNetCore.CAP.Messages; + +namespace DotNetCore.CAP.Redis +{ + class RedisMessage : TransportMessage + { + public RedisMessage(IDictionary headers, byte[] body) : base(headers, body) { } + + public string GroupId + { + get => Headers.TryGetValue(Messages.Headers.Group, out var value) ? value : default; + set => Headers.TryAdd(Messages.Headers.Group, value); + } + + internal RedisValue AsRedisValue() + { + return JsonSerializer.Serialize(new RedisMessageValue + { + Headers = Headers, + Body = Body + }, new JsonSerializerOptions(JsonSerializerDefaults.Web)); + } + + public static RedisMessage Create(RedisValue redisValue) + { + if (redisValue.IsNullOrEmpty) + return Empty(); + + var value = JsonSerializer.Deserialize(redisValue, new JsonSerializerOptions(JsonSerializerDefaults.Web)); + + return new RedisMessage(value.Headers, value.Body); + } + + public static RedisMessage Empty() + { + return new RedisMessage(new Dictionary(), Array.Empty()); + } + } + + class RedisMessageValue + { + public IDictionary Headers { get; set; } + public byte[] Body { get; set; } + } +} From 1522d1c2b1f76e2419d8a42ade80a1db28261138 Mon Sep 17 00:00:00 2001 From: "mahmoud.samir" Date: Thu, 1 Apr 2021 00:43:07 +0300 Subject: [PATCH 2/6] add redis sample --- .../Samples.Redis.SqlServer/AppDbContext.cs | 14 ++++ .../Controllers/HomeController.cs | 39 ++++++++++ samples/Samples.Redis.SqlServer/Program.cs | 26 +++++++ .../Samples.Redis.SqlServer.csproj | 26 +++++++ samples/Samples.Redis.SqlServer/Startup.cs | 76 +++++++++++++++++++ .../appsettings.Development.json | 9 +++ .../Samples.Redis.SqlServer/appsettings.json | 10 +++ 7 files changed, 200 insertions(+) create mode 100644 samples/Samples.Redis.SqlServer/AppDbContext.cs create mode 100644 samples/Samples.Redis.SqlServer/Controllers/HomeController.cs create mode 100644 samples/Samples.Redis.SqlServer/Program.cs create mode 100644 samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj create mode 100644 samples/Samples.Redis.SqlServer/Startup.cs create mode 100644 samples/Samples.Redis.SqlServer/appsettings.Development.json create mode 100644 samples/Samples.Redis.SqlServer/appsettings.json diff --git a/samples/Samples.Redis.SqlServer/AppDbContext.cs b/samples/Samples.Redis.SqlServer/AppDbContext.cs new file mode 100644 index 0000000..879342a --- /dev/null +++ b/samples/Samples.Redis.SqlServer/AppDbContext.cs @@ -0,0 +1,14 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Options; + +namespace Sample.Redis.SqlServer +{ + public class AppDbContext : DbContext + { + public AppDbContext(DbContextOptions optionsBuilder) : base(optionsBuilder) + { + + } + + } +} diff --git a/samples/Samples.Redis.SqlServer/Controllers/HomeController.cs b/samples/Samples.Redis.SqlServer/Controllers/HomeController.cs new file mode 100644 index 0000000..733ab85 --- /dev/null +++ b/samples/Samples.Redis.SqlServer/Controllers/HomeController.cs @@ -0,0 +1,39 @@ +using DotNetCore.CAP; +using DotNetCore.CAP.Messages; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Samples.Redis.SqlServer.Controllers +{ + [ApiController] + [Route("[controller]/[action]")] + public class HomeController : ControllerBase + { + private readonly ILogger _logger; + private readonly ICapPublisher publisher; + + public HomeController(ILogger logger, ICapPublisher publisher) + { + _logger = logger; + this.publisher = publisher; + } + + [HttpGet] + public async Task Publish() + { + await publisher.PublishAsync("test-message", DateTime.UtcNow); + } + + [CapSubscribe("test-message")] + [NonAction] + public void Subscribe(DateTime date, [FromCap] IDictionary headers) + { + var str = string.Join(",", headers.Select(kv => $"({kv.Key}:{kv.Value})")); + _logger.LogInformation($"test-message subscribed with value {date}, headers : {str}"); + } + } +} diff --git a/samples/Samples.Redis.SqlServer/Program.cs b/samples/Samples.Redis.SqlServer/Program.cs new file mode 100644 index 0000000..d4a4fc1 --- /dev/null +++ b/samples/Samples.Redis.SqlServer/Program.cs @@ -0,0 +1,26 @@ +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Samples.Redis.SqlServer +{ + public class Program + { + public static void Main(string[] args) + { + CreateHostBuilder(args).Build().Run(); + } + + public static IHostBuilder CreateHostBuilder(string[] args) => + Host.CreateDefaultBuilder(args) + .ConfigureWebHostDefaults(webBuilder => + { + webBuilder.UseStartup(); + }); + } +} diff --git a/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj b/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj new file mode 100644 index 0000000..1ef9648 --- /dev/null +++ b/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj @@ -0,0 +1,26 @@ + + + + net5.0 + 78587bd3-9076-4357-869f-4f4652d35322 + Linux + ..\.. + ..\..\docker-compose.dcproj + + + + + + + + + + + + + + + + + + diff --git a/samples/Samples.Redis.SqlServer/Startup.cs b/samples/Samples.Redis.SqlServer/Startup.cs new file mode 100644 index 0000000..cd445bc --- /dev/null +++ b/samples/Samples.Redis.SqlServer/Startup.cs @@ -0,0 +1,76 @@ +using DotNetCore.CAP.Redis; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.HttpsPolicy; +using Microsoft.AspNetCore.Mvc; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.OpenApi.Models; +using Sample.Redis.SqlServer; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Samples.Redis.SqlServer +{ + public class Startup + { + public Startup(IConfiguration configuration) + { + Configuration = configuration; + } + + public IConfiguration Configuration { get; } + + // This method gets called by the runtime. Use this method to add services to the container. + public void ConfigureServices(IServiceCollection services) + { + + services.AddControllers(); + services.AddSwaggerGen(c => + { + c.SwaggerDoc("v1", new OpenApiInfo { Title = "Samples.Redis.SqlServer", Version = "v1" }); + }); + + services.AddDbContext(options => + { + options.UseSqlServer("data source=.;initial catalog=cap;integrated security=true"); + }); + + services.AddCap(options => + { + options.UseRedis(); + + options.UseEntityFramework(); + + options.DefaultGroupName = "Samples.Redis.SqlServer"; + }); + } + + // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. + public void Configure(IApplicationBuilder app, IWebHostEnvironment env) + { + if (env.IsDevelopment()) + { + app.UseDeveloperExceptionPage(); + app.UseSwagger(); + app.UseSwaggerUI(c => c.SwaggerEndpoint("/swagger/v1/swagger.json", "Samples.Redis.SqlServer v1")); + } + + app.UseHttpsRedirection(); + + app.UseRouting(); + + app.UseAuthorization(); + + app.UseEndpoints(endpoints => + { + endpoints.MapControllers(); + }); + } + } +} diff --git a/samples/Samples.Redis.SqlServer/appsettings.Development.json b/samples/Samples.Redis.SqlServer/appsettings.Development.json new file mode 100644 index 0000000..8983e0f --- /dev/null +++ b/samples/Samples.Redis.SqlServer/appsettings.Development.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft": "Warning", + "Microsoft.Hosting.Lifetime": "Information" + } + } +} diff --git a/samples/Samples.Redis.SqlServer/appsettings.json b/samples/Samples.Redis.SqlServer/appsettings.json new file mode 100644 index 0000000..d9d9a9b --- /dev/null +++ b/samples/Samples.Redis.SqlServer/appsettings.json @@ -0,0 +1,10 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft": "Warning", + "Microsoft.Hosting.Lifetime": "Information" + } + }, + "AllowedHosts": "*" +} From 433460cf488fce2b9e0043a821e614da0060c8bb Mon Sep 17 00:00:00 2001 From: "mahmoud.samir" Date: Sat, 24 Apr 2021 00:02:47 +0300 Subject: [PATCH 3/6] add redis streams support --- .../Samples.Redis.SqlServer.csproj | 2 - .../CapOptions.Redis.PostConfigure.cs | 15 +-- src/DotNetCore.CAP.Redis/CapOptions.Redis.cs | 3 +- .../DotNetCore.CAP.Redis.csproj | 6 +- .../ICapOptionsExtension.Redis.cs | 4 +- .../IConnectionPool.Default.cs | 110 ++++++++++++++++ .../IConnectionPool.LazyConnection.cs | 67 ++++++++++ src/DotNetCore.CAP.Redis/IConnectionPool.cs | 13 ++ .../IConsumerClient.Redis.cs | 72 +++++++++-- .../IConsumerClientFactory.Redis.cs | 4 +- .../IProcessingServer.PollPendingTopic.cs | 71 ++++++++++ ...ager.Logger.cs => IRedis.Events.Logger.cs} | 4 +- ...che.Manager.Events.cs => IRedis.Events.cs} | 46 +++---- .../IRedisCache.Manager.Default.cs | 108 ---------------- .../IRedisCache.Manager.cs | 20 --- .../IRedisStream.Manager.Default.cs | 122 ++++++++++++++++++ .../IRedisStream.Manager.cs | 23 ++++ src/DotNetCore.CAP.Redis/ITransport.Redis.cs | 10 +- .../TransportMessage.Redis.cs | 51 ++++---- 19 files changed, 537 insertions(+), 214 deletions(-) create mode 100644 src/DotNetCore.CAP.Redis/IConnectionPool.Default.cs create mode 100644 src/DotNetCore.CAP.Redis/IConnectionPool.LazyConnection.cs create mode 100644 src/DotNetCore.CAP.Redis/IConnectionPool.cs create mode 100644 src/DotNetCore.CAP.Redis/IProcessingServer.PollPendingTopic.cs rename src/DotNetCore.CAP.Redis/{IRedisCache.Manager.Logger.cs => IRedis.Events.Logger.cs} (85%) rename src/DotNetCore.CAP.Redis/{IRedisCache.Manager.Events.cs => IRedis.Events.cs} (73%) delete mode 100644 src/DotNetCore.CAP.Redis/IRedisCache.Manager.Default.cs delete mode 100644 src/DotNetCore.CAP.Redis/IRedisCache.Manager.cs create mode 100644 src/DotNetCore.CAP.Redis/IRedisStream.Manager.Default.cs create mode 100644 src/DotNetCore.CAP.Redis/IRedisStream.Manager.cs diff --git a/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj b/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj index 1ef9648..3bfb741 100644 --- a/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj +++ b/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj @@ -4,8 +4,6 @@ net5.0 78587bd3-9076-4357-869f-4f4652d35322 Linux - ..\.. - ..\..\docker-compose.dcproj diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs b/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs index e81e935..7ab86bd 100644 --- a/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs +++ b/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs @@ -13,21 +13,20 @@ namespace DotNetCore.CAP.Redis { class CapRedisOptionsPostConfigure : IPostConfigureOptions { - private readonly CapOptions capOptions; - - public CapRedisOptionsPostConfigure(IOptions options) + public CapRedisOptionsPostConfigure() { - capOptions = options.Value; } public void PostConfigure(string name, CapRedisOptions options) { - var groupPrefix = string.IsNullOrWhiteSpace(capOptions.GroupNamePrefix) ? string.Empty : $"{capOptions.GroupNamePrefix}."; - - options.DefaultChannel = $"{groupPrefix}{capOptions.DefaultGroupName}"; - options.Configuration ??= new ConfigurationOptions(); + if (options.StreamEntriesCount == default) + options.StreamEntriesCount = 10; + + if (options.ConnectionPoolSize == default) + options.ConnectionPoolSize= 10; + if (!options.Configuration.EndPoints.Any()) { options.Configuration.EndPoints.Add(IPAddress.Loopback, 0); diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs b/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs index e15c978..bf03d49 100644 --- a/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs +++ b/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs @@ -17,6 +17,7 @@ namespace DotNetCore.CAP.Redis internal string Endpoint { get; set; } - internal string DefaultChannel { get; set; } + public uint StreamEntriesCount { get; set; } + public uint ConnectionPoolSize { get; set; } } } diff --git a/src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj b/src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj index fb6bdaf..51e56cf 100644 --- a/src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj +++ b/src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj @@ -1,4 +1,4 @@ - + netstandard2.1 @@ -11,6 +11,10 @@ 1701;1702;1705;CS1591 + + + + diff --git a/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs b/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs index 502c7b8..6cb5404 100644 --- a/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs +++ b/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs @@ -30,10 +30,10 @@ namespace DotNetCore.CAP.Redis public void AddServices(IServiceCollection services) { services.AddSingleton(); - services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); - + services.AddSingleton(); services.TryAddEnumerable(ServiceDescriptor.Singleton, CapRedisOptionsPostConfigure>()); services.AddOptions().Configure(configure); } diff --git a/src/DotNetCore.CAP.Redis/IConnectionPool.Default.cs b/src/DotNetCore.CAP.Redis/IConnectionPool.Default.cs new file mode 100644 index 0000000..0bc96a8 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IConnectionPool.Default.cs @@ -0,0 +1,110 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using StackExchange.Redis; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + class RedisConnectionPool : IRedisConnectionPool,IDisposable + { + private readonly ConcurrentBag connections = new ConcurrentBag(); + private readonly SemaphoreSlim poolLock = new SemaphoreSlim(1); + private readonly CapRedisOptions redisOptions; + private readonly ILoggerFactory loggerFactory; + private bool poolAlreadyConfigured = false; + private bool isDisposed; + + private AsyncLazyRedisConnection QuietConnection + { + get + { + if (poolAlreadyConfigured) + return connections.OrderBy(async c => (await c).ConnectionCapacity).First(); + else + return null; + } + } + + public RedisConnectionPool(IOptions options, ILoggerFactory loggerFactory) + { + redisOptions = options.Value; + this.loggerFactory = loggerFactory; + Init().GetAwaiter().GetResult(); + } + + public async Task ConnectAsync() + { + if (QuietConnection == null) + { + poolAlreadyConfigured = connections.Count(c => c.IsValueCreated) == redisOptions.ConnectionPoolSize; + if (QuietConnection != null) + return (await QuietConnection).Connection; + } + + foreach (var lazy in connections) + { + if (!lazy.IsValueCreated) + return (await lazy).Connection; + + var connection = await lazy; + if (connection.ConnectionCapacity == default) + return connection.Connection; + } + + return (await connections.OrderBy(async c => (await c).ConnectionCapacity).First()).Connection; + } + + private async Task Init() + { + try + { + await poolLock.WaitAsync(); + + if (connections.Any()) + return; + + for (int i = 0; i < redisOptions.ConnectionPoolSize; i++) + { + var connection = new AsyncLazyRedisConnection(redisOptions, loggerFactory.CreateLogger()); + + connections.Add(connection); + } + } + finally + { + poolLock.Release(); + } + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + void Dispose(bool disposing) + { + if (isDisposed) + return; + + if (disposing) + { + foreach (var connection in this.connections) + { + if (!connection.IsValueCreated) + continue; + + connection.GetAwaiter().GetResult().Dispose(); + } + } + + isDisposed = true; + } + } +} diff --git a/src/DotNetCore.CAP.Redis/IConnectionPool.LazyConnection.cs b/src/DotNetCore.CAP.Redis/IConnectionPool.LazyConnection.cs new file mode 100644 index 0000000..4ffb5db --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IConnectionPool.LazyConnection.cs @@ -0,0 +1,67 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using StackExchange.Redis; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + public class AsyncLazyRedisConnection : Lazy> + { + public AsyncLazyRedisConnection(CapRedisOptions redisOptions, ILogger logger) + : base(() => ConnectAsync(redisOptions, logger)) + { + } + + public TaskAwaiter GetAwaiter() { return Value.GetAwaiter(); } + + static async Task ConnectAsync(CapRedisOptions redisOptions, ILogger logger) + { + var redisLogger = new RedisLogger(logger); + + var connection = await ConnectionMultiplexer.ConnectAsync(redisOptions.Configuration, redisLogger).ConfigureAwait(false); + + connection.LogEvents(logger); + + return new RedisConnection(connection); + } + } + + public class RedisConnection:IDisposable + { + private bool isDisposed; + + public RedisConnection(IConnectionMultiplexer connection) + { + Connection = connection ?? throw new ArgumentNullException(nameof(connection)); + } + + public IConnectionMultiplexer Connection { get; } + public long ConnectionCapacity => Connection.GetCounters().TotalOutstanding; + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + void Dispose(bool disposing) + { + if (isDisposed) + return; + + if (disposing) + { + Connection.Dispose(); + } + + isDisposed = true; + } + } +} diff --git a/src/DotNetCore.CAP.Redis/IConnectionPool.cs b/src/DotNetCore.CAP.Redis/IConnectionPool.cs new file mode 100644 index 0000000..c619f0d --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IConnectionPool.cs @@ -0,0 +1,13 @@ +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + interface IRedisConnectionPool + { + Task ConnectAsync(); + } +} diff --git a/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs b/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs index 56858bd..6ccc728 100644 --- a/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs +++ b/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs @@ -12,19 +12,21 @@ using System.Text; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; +using StackExchange.Redis; namespace DotNetCore.CAP.Redis { class RedisConsumerClient : IConsumerClient { private readonly ILogger logger; - private readonly IRedisCacheManager redis; + private readonly IRedisStreamManager redis; private readonly CapRedisOptions options; private readonly string groupId; + private string[] topics; public RedisConsumerClient( string groubId, - IRedisCacheManager redis, + IRedisStreamManager redis, CapRedisOptions options, ILogger logger ) @@ -45,21 +47,17 @@ namespace DotNetCore.CAP.Redis { if (topics == null) throw new ArgumentNullException(nameof(topics)); - redis.SubscribeAsync(groupId, topics, (channel, message) => + foreach (var topic in topics) { - logger.LogDebug($"Redis message with name {message.GetName()} subscribed."); - - message.GroupId = groupId; - - OnMessageReceived?.Invoke(channel, message); + redis.CreateStreamWithConsumerGroupAsync(topic, groupId).GetAwaiter().GetResult(); + } - return Task.CompletedTask; - }); + this.topics = topics.ToArray(); } public void Listening(TimeSpan timeout, CancellationToken cancellationToken) { - cancellationToken.Register(async () => await redis.UnsubscribeAsync()); + _ = ListeningForMessagesAsync(timeout, cancellationToken); while (true) { @@ -68,9 +66,59 @@ namespace DotNetCore.CAP.Redis } } + private async Task ListeningForMessagesAsync(TimeSpan timeout, CancellationToken cancellationToken) + { + //first time, we want to read our pending messages, in case we crashed and are recovering. + var pendingMsgs = redis.PollStreamsPendingMessagesAsync(topics, groupId, timeout, cancellationToken); + + await ConsumeMessages(pendingMsgs, StreamPosition.Beginning); + + //Once we consumed our history, we can start getting new messages. + var newMsgs = redis.PollStreamsLatestMessagesAsync(topics, groupId, timeout, cancellationToken); + + _ = ConsumeMessages(newMsgs, StreamPosition.NewMessages); + } + + private async Task ConsumeMessages(IAsyncEnumerable streamsSet, RedisValue position) + { + await foreach (var set in streamsSet) + { + foreach (var stream in set) + { + foreach (var entry in stream.Entries) + { + if (entry.IsNull) + return; + try + { + var message = RedisMessage.Create(entry, groupId); + OnMessageReceived?.Invoke((stream.Key.ToString(), groupId, entry.Id.ToString()), message); + } + catch (Exception ex) + { + logger.LogError(ex.Message, ex); + var logArgs = new LogMessageEventArgs + { + LogType = MqLogType.ConsumeError, + Reason = ex.ToString() + }; + OnLog?.Invoke(entry, logArgs); + } + finally + { + string positionName = position == StreamPosition.Beginning ? nameof(StreamPosition.Beginning) : nameof(StreamPosition.NewMessages); + logger.LogDebug($"Redis stream entry [{entry.Id}] [position : {positionName}] was delivered."); + } + } + } + } + } + public void Commit(object sender) { - // ignore + var (stream, group, id) = ((string stream, string group, string id))sender; + + redis.Ack(stream, group, id).GetAwaiter().GetResult(); } public void Reject(object sender) diff --git a/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs b/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs index 319166b..ee0580b 100644 --- a/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs +++ b/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs @@ -15,10 +15,10 @@ namespace DotNetCore.CAP.Redis class RedisConsumerClientFactory : IConsumerClientFactory { private readonly CapRedisOptions redisOptions; - private readonly IRedisCacheManager redis; + private readonly IRedisStreamManager redis; private readonly ILogger logger; - public RedisConsumerClientFactory(IOptions redisOptions, IRedisCacheManager redis, ILogger logger) + public RedisConsumerClientFactory(IOptions redisOptions, IRedisStreamManager redis, ILogger logger) { this.redisOptions = redisOptions.Value; this.redis = redis; diff --git a/src/DotNetCore.CAP.Redis/IProcessingServer.PollPendingTopic.cs b/src/DotNetCore.CAP.Redis/IProcessingServer.PollPendingTopic.cs new file mode 100644 index 0000000..f33bd05 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IProcessingServer.PollPendingTopic.cs @@ -0,0 +1,71 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Linq; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Redis; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.Kafka +{ + class PollPendingTopic : IProcessingServer + { + private readonly IRedisStreamManager redis; + private readonly ILogger logger; + private readonly CapRedisOptions options; + private readonly MethodMatcherCache selector; + + public PollPendingTopic( + IRedisStreamManager redis, + ILogger logger, + IOptions options, + MethodMatcherCache selector) + { + this.redis = redis; + this.logger = logger; + this.options = options.Value; + this.selector = selector; + } + + public void Start() + { + try + { + var streams = selector.GetAllTopics(); + + foreach (var stream in streams) + { + var streamExist=redis. + } + + topics.Value.First().TopicName + using var adminClient = new AdminClientBuilder(config).Build(); + + adminClient.CreateTopicsAsync(topics.Select(x => new TopicSpecification + { + Name = x + })).GetAwaiter().GetResult(); + + logger.LogInformation("Topic is automatically created successfully!"); + } + catch (CreateTopicsException ex) when (ex.Message.Contains("already exists")) + { + } + catch (Exception ex) + { + logger.LogError(ex, "An error was encountered when automatically creating topic!"); + } + finally + { + KafkaConsumerClientFactory.WaitCreateTopic.Set(); + } + } + + public void Dispose() + { + + } + } +} diff --git a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Logger.cs b/src/DotNetCore.CAP.Redis/IRedis.Events.Logger.cs similarity index 85% rename from src/DotNetCore.CAP.Redis/IRedisCache.Manager.Logger.cs rename to src/DotNetCore.CAP.Redis/IRedis.Events.Logger.cs index 7eb613b..e5bbe53 100644 --- a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Logger.cs +++ b/src/DotNetCore.CAP.Redis/IRedis.Events.Logger.cs @@ -8,11 +8,11 @@ using System.Threading.Tasks; namespace DotNetCore.CAP.Redis { - class RedisCacheLogger : TextWriter + class RedisLogger : TextWriter { private readonly ILogger logger; - public RedisCacheLogger(ILogger logger) + public RedisLogger(ILogger logger) { this.logger = logger; } diff --git a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Events.cs b/src/DotNetCore.CAP.Redis/IRedis.Events.cs similarity index 73% rename from src/DotNetCore.CAP.Redis/IRedisCache.Manager.Events.cs rename to src/DotNetCore.CAP.Redis/IRedis.Events.cs index 507da02..7a0abfe 100644 --- a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Events.cs +++ b/src/DotNetCore.CAP.Redis/IRedis.Events.cs @@ -8,30 +8,12 @@ using System.Text; using System.Threading.Tasks; namespace DotNetCore.CAP.Redis -{ - static class RedisConnectionExtensions - { - public static void LogEvents(this IConnectionMultiplexer connection,ILogger logger) - { - if (connection is null) - { - throw new ArgumentNullException(nameof(connection)); - } - - if (logger is null) - { - throw new ArgumentNullException(nameof(logger)); - } - - _ = new RedisCacheManagerEvents(connection, logger); - } - } - - class RedisCacheManagerEvents +{ + class RedisEvents { private readonly ILogger logger; - public RedisCacheManagerEvents(IConnectionMultiplexer connection, ILogger logger) + public RedisEvents(IConnectionMultiplexer connection, ILogger logger) { this.logger = logger; connection.ErrorMessage += Connection_ErrorMessage; @@ -41,12 +23,12 @@ namespace DotNetCore.CAP.Redis private void Connection_ConnectionFailed(object sender, ConnectionFailedEventArgs e) { - logger.LogError(e.Exception, $"Connection failed!, {e.Exception.Message}, for endpoint:{e.EndPoint}, failure type:{e.FailureType}, connection type:{e.ConnectionType}"); + logger.LogError(e.Exception, $"Connection failed!, {e.Exception?.Message}, for endpoint:{e.EndPoint}, failure type:{e.FailureType}, connection type:{e.ConnectionType}"); } private void Connection_ConnectionRestored(object sender, ConnectionFailedEventArgs e) { - logger.LogWarning(e.Exception, $"Connection restored back!, {e.Exception.Message}, for endpoint:{e.EndPoint}, failure type:{e.FailureType}, connection type:{e.ConnectionType}"); + logger.LogWarning($"Connection restored back!, {e.Exception?.Message}, for endpoint:{e.EndPoint}, failure type:{e.FailureType}, connection type:{e.ConnectionType}"); } private void Connection_ErrorMessage(object sender, RedisErrorEventArgs e) @@ -54,4 +36,22 @@ namespace DotNetCore.CAP.Redis logger.LogError($"Server replied with error, {e.Message}, for endpoint:{e.EndPoint}"); } } + + static class RedisConnectionExtensions + { + public static void LogEvents(this IConnectionMultiplexer connection, ILogger logger) + { + if (connection is null) + { + throw new ArgumentNullException(nameof(connection)); + } + + if (logger is null) + { + throw new ArgumentNullException(nameof(logger)); + } + + _ = new RedisEvents(connection, logger); + } + } } diff --git a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Default.cs b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Default.cs deleted file mode 100644 index 9ea0434..0000000 --- a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.Default.cs +++ /dev/null @@ -1,108 +0,0 @@ -using DotNetCore.CAP.Transport; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using StackExchange.Redis; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace DotNetCore.CAP.Redis -{ - class RedisCacheManager : IRedisCacheManager - { - private readonly SemaphoreSlim connectionLock = new SemaphoreSlim(1, 1); - private readonly CapRedisOptions options; - private readonly ILogger logger; - private ConnectionMultiplexer redis; - private ISubscriber subscriber; - private bool disposed; - - public RedisCacheManager(IOptions options, ILogger logger) - { - this.options = options.Value; - this.logger = logger; - _ = ConnectAsync(); - } - - public async Task SubscribeAsync(string channelName, IEnumerable topics, Func callback) - { - var channel = await GetChannel(channelName); - channel.OnMessage(channelMessage => - { - var message = RedisMessage.Create(channelMessage.Message); - - if (topics.Any(c => c == message.GetName())) - { - callback?.Invoke(channelMessage.SubscriptionChannel, message); - } - }); - } - - public async Task PublishAsync(string channelName, RedisValue message) - { - await subscriber.PublishAsync(channelName, message); - } - public async Task UnsubscribeAsync() - { - await subscriber?.UnsubscribeAllAsync(); - } - - public void Dispose() - { - if (!disposed) - { - subscriber?.UnsubscribeAll(); - redis?.Close(); - disposed = true; - } - } - - - private async Task GetChannel(string channelName) - { - await ConnectAsync().ConfigureAwait(false); - return await subscriber.SubscribeAsync(channelName).ConfigureAwait(false); - } - - private async Task ConnectAsync() - { - if (disposed == true) - throw new ObjectDisposedException(nameof(IRedisCacheManager)); - - if (redis != null) - { - subscriber ??= redis.GetSubscriber(); - return; - } - else - { - try - { - await connectionLock.WaitAsync().ConfigureAwait(false); - - if (redis != null) - { - return; - } - - var redisLogger = new RedisCacheLogger(logger); - - redis = await ConnectionMultiplexer.ConnectAsync(options.Configuration, redisLogger) - .ConfigureAwait(false); - - redis.LogEvents(logger); - - subscriber = redis.GetSubscriber(); - } - finally - { - connectionLock.Release(); - } - } - } - - } -} diff --git a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.cs b/src/DotNetCore.CAP.Redis/IRedisCache.Manager.cs deleted file mode 100644 index dddca4c..0000000 --- a/src/DotNetCore.CAP.Redis/IRedisCache.Manager.cs +++ /dev/null @@ -1,20 +0,0 @@ -using DotNetCore.CAP.Transport; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using StackExchange.Redis; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace DotNetCore.CAP.Redis -{ - interface IRedisCacheManager : IDisposable - { - Task SubscribeAsync(string channelName, IEnumerable topics, Func callback); - Task PublishAsync(string channelName, RedisValue message); - Task UnsubscribeAsync(); - } -} diff --git a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Default.cs b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Default.cs new file mode 100644 index 0000000..03408d2 --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Default.cs @@ -0,0 +1,122 @@ +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + class RedisStreamManager : IRedisStreamManager + { + private readonly CapRedisOptions options; + private readonly IRedisConnectionPool connectionsPool; + private readonly ILogger logger; + private IConnectionMultiplexer redis; + + public RedisStreamManager(IRedisConnectionPool connectionsPool, IOptions options, ILogger logger) + { + this.options = options.Value; + this.connectionsPool = connectionsPool; + this.logger = logger; + } + + public async Task CreateStreamWithConsumerGroupAsync(string stream, string consumerGroup) + { + await ConnectAsync(); + + //The object returned from GetDatabase is a cheap pass - thru object, and does not need to be stored + var database = redis.GetDatabase(); + var streamExist = await database.KeyTypeAsync(stream); + if (streamExist == RedisType.None) + { + await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, StreamPosition.NewMessages, true); + } + else + { + var groupInfo = await database.StreamGroupInfoAsync(stream); + if (groupInfo.Any(g => g.Name == consumerGroup)) + return; + await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, StreamPosition.NewMessages); + } + } + + public async Task PublishAsync(string stream, NameValueEntry[] message) + { + await ConnectAsync(); + + //The object returned from GetDatabase is a cheap pass - thru object, and does not need to be stored + await redis.GetDatabase().StreamAddAsync(stream, message); + } + + public async IAsyncEnumerable PollStreamsLatestMessagesAsync(string[] streams, string consumerGroup, TimeSpan pollDelay, [EnumeratorCancellation] CancellationToken token) + { + var positions = streams.Select(stream => new StreamPosition(stream, StreamPosition.NewMessages)); + + while (true) + { + var result = await TryReadConsumerGroup(consumerGroup, positions.ToArray(), token).ConfigureAwait(false); + + yield return result.streams; + + token.WaitHandle.WaitOne(pollDelay); + } + } + + public async IAsyncEnumerable PollStreamsPendingMessagesAsync(string[] streams, string consumerGroup, TimeSpan pollDelay, [EnumeratorCancellation] CancellationToken token) + { + var positions = streams.Select(stream => new StreamPosition(stream, StreamPosition.Beginning)); + + while (true) + { + token.ThrowIfCancellationRequested(); + + var result = await TryReadConsumerGroup(consumerGroup, positions.ToArray(), token).ConfigureAwait(false); + + yield return result.streams; + + //Once we consumed our history of pending messages, we can break the loop. + if (result.canRead && result.streams.All(s => s.Entries.Length < options.StreamEntriesCount)) + break; + + token.WaitHandle.WaitOne(pollDelay); + } + } + + private async Task<(bool canRead, RedisStream[] streams)> TryReadConsumerGroup(string consumerGroup, StreamPosition[] positions, CancellationToken token) + { + try + { + await ConnectAsync(); + + var database = redis.GetDatabase(); + + var readSet = database.StreamReadGroupAsync(positions, consumerGroup, consumerGroup, (int)options.StreamEntriesCount); + + return (true, await readSet.ConfigureAwait(false)); + } + catch (Exception ex) + { + logger.LogError($"Redis error when trying read consumer group {consumerGroup}", ex); + return (false, Array.Empty()); + } + } + + public async Task Ack(string stream, string consumerGroup, string messageId) + { + await ConnectAsync(); + + await redis.GetDatabase().StreamAcknowledgeAsync(stream, consumerGroup, messageId).ConfigureAwait(false); + } + + private async Task ConnectAsync() + { + redis = await connectionsPool.ConnectAsync(); + } + } +} diff --git a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.cs b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.cs new file mode 100644 index 0000000..b742edf --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.cs @@ -0,0 +1,23 @@ +using DotNetCore.CAP.Transport; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace DotNetCore.CAP.Redis +{ + interface IRedisStreamManager + { + Task CreateStreamWithConsumerGroupAsync(string stream, string consumerGroup); + Task PublishAsync(string stream, NameValueEntry[] message); + IAsyncEnumerable PollStreamsLatestMessagesAsync(string[] streams, string consumerGroup, TimeSpan pollDelay, CancellationToken token); + IAsyncEnumerable PollStreamsPendingMessagesAsync(string[] streams, string consumerGroup, TimeSpan pollDelay, CancellationToken token); + Task Ack(string stream, string consumerGroup, string messageId); + } +} diff --git a/src/DotNetCore.CAP.Redis/ITransport.Redis.cs b/src/DotNetCore.CAP.Redis/ITransport.Redis.cs index defcdd3..cea32bf 100644 --- a/src/DotNetCore.CAP.Redis/ITransport.Redis.cs +++ b/src/DotNetCore.CAP.Redis/ITransport.Redis.cs @@ -15,12 +15,12 @@ namespace DotNetCore.CAP.Redis { class RedisTransport : ITransport { - private readonly IRedisCacheManager redis; + private readonly IRedisStreamManager redis; private readonly ILogger logger; private readonly MethodMatcherCache selector; private readonly CapRedisOptions options; - public RedisTransport(IRedisCacheManager redis, MethodMatcherCache selector, IOptions options, ILogger logger) + public RedisTransport(IRedisStreamManager redis, MethodMatcherCache selector, IOptions options, ILogger logger) { this.redis = redis; this.selector = selector; @@ -34,11 +34,7 @@ namespace DotNetCore.CAP.Redis { try { - var redisMessage = new RedisMessage(message.Headers, message.Body).AsRedisValue(); - - var channelName = selector.GetGroupByTopic(message.GetName()) ?? options.DefaultChannel; - - await redis.PublishAsync(channelName, redisMessage); + await redis.PublishAsync(message.GetName(), message.AsStreamEntries()); logger.LogDebug($"Redis message [{message.GetName()}] has been published."); diff --git a/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs b/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs index 44f3917..f12ff58 100644 --- a/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs +++ b/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs @@ -3,51 +3,50 @@ using StackExchange.Redis; using System; using System.Collections.Generic; using System.Linq; +using System.Runtime.CompilerServices; using System.Text; using System.Text.Json; using System.Threading.Tasks; -using Messages = DotNetCore.CAP.Messages; namespace DotNetCore.CAP.Redis { - class RedisMessage : TransportMessage + static class RedisMessage { - public RedisMessage(IDictionary headers, byte[] body) : base(headers, body) { } + const string HEADERS = "headers"; + const string BODY = "body"; - public string GroupId + public static NameValueEntry[] AsStreamEntries(this TransportMessage message) { - get => Headers.TryGetValue(Messages.Headers.Group, out var value) ? value : default; - set => Headers.TryAdd(Messages.Headers.Group, value); + return new[]{ + new NameValueEntry(HEADERS,ToJson(message.Headers)), + new NameValueEntry(BODY,ToJson(message.Body)) + }; } - internal RedisValue AsRedisValue() + public static TransportMessage Create(StreamEntry streamEntry, string groupId = null) { - return JsonSerializer.Serialize(new RedisMessageValue - { - Headers = Headers, - Body = Body - }, new JsonSerializerOptions(JsonSerializerDefaults.Web)); - } + if (streamEntry.IsNull) + return null; - public static RedisMessage Create(RedisValue redisValue) - { - if (redisValue.IsNullOrEmpty) - return Empty(); + var headersRaw = streamEntry[HEADERS]; + if (headersRaw.IsNullOrEmpty) + throw new ArgumentException($"Redis stream entry with id {streamEntry.Id} missing cap headers"); + + var headers = JsonSerializer.Deserialize>(headersRaw); + + var bodyRaw = streamEntry[BODY]; + + var body = !bodyRaw.IsNullOrEmpty ? JsonSerializer.Deserialize(bodyRaw) : null; - var value = JsonSerializer.Deserialize(redisValue, new JsonSerializerOptions(JsonSerializerDefaults.Web)); + headers.TryAdd(Headers.Group, groupId); - return new RedisMessage(value.Headers, value.Body); + return new TransportMessage(headers, body); } - public static RedisMessage Empty() + private static string ToJson(object obj) { - return new RedisMessage(new Dictionary(), Array.Empty()); + return JsonSerializer.Serialize(obj, new JsonSerializerOptions(JsonSerializerDefaults.Web)); } - } - class RedisMessageValue - { - public IDictionary Headers { get; set; } - public byte[] Body { get; set; } } } From 93ee3f28a042a221c58ea09b8e6350d19117522f Mon Sep 17 00:00:00 2001 From: "Mahmoud S. Zeid" Date: Tue, 27 Apr 2021 12:52:36 +0300 Subject: [PATCH 4/6] change project name from Redis to RedisStreams ensure consumer group creation while reading from stream --- CAP.sln | 4 +- .../Samples.Redis.SqlServer.csproj | 2 +- samples/Samples.Redis.SqlServer/Startup.cs | 2 +- .../CapOptions.Redis.Extensions.cs | 10 +-- .../CapOptions.Redis.PostConfigure.cs | 6 +- src/DotNetCore.CAP.Redis/CapOptions.Redis.cs | 13 +++- ...roj => DotNetCore.CAP.RedisStreams.csproj} | 10 +-- .../ICapOptionsExtension.Redis.cs | 6 +- .../IConnectionPool.Default.cs | 6 +- .../IConnectionPool.LazyConnection.cs | 6 +- src/DotNetCore.CAP.Redis/IConnectionPool.cs | 6 +- .../IConsumerClient.Redis.cs | 6 +- .../IConsumerClientFactory.Redis.cs | 6 +- .../IProcessingServer.PollPendingTopic.cs | 71 ------------------- .../IRedis.Events.Logger.cs | 6 +- src/DotNetCore.CAP.Redis/IRedis.Events.cs | 6 +- .../IRedisStream.Manager.Default.cs | 21 ++++-- .../IRedisStream.Manager.Extensions.cs | 54 ++++++++++++++ .../IRedisStream.Manager.cs | 6 +- src/DotNetCore.CAP.Redis/ITransport.Redis.cs | 2 +- .../MethodMatcherCache.Extensions.cs | 22 ------ .../TransportMessage.Redis.cs | 6 +- 22 files changed, 126 insertions(+), 151 deletions(-) rename src/DotNetCore.CAP.Redis/{DotNetCore.CAP.Redis.csproj => DotNetCore.CAP.RedisStreams.csproj} (67%) delete mode 100644 src/DotNetCore.CAP.Redis/IProcessingServer.PollPendingTopic.cs create mode 100644 src/DotNetCore.CAP.Redis/IRedisStream.Manager.Extensions.cs delete mode 100644 src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs diff --git a/CAP.sln b/CAP.sln index 12bcbda..b5d3f5c 100644 --- a/CAP.sln +++ b/CAP.sln @@ -71,9 +71,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.NATS", "src\ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.Postgres.DashboardAuth", "samples\Sample.RabbitMQ.Postgres.DashboardAuth\Sample.RabbitMQ.Postgres.DashboardAuth.csproj", "{54F6C206-2A23-4971-AE5A-FC47EB772452}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Redis", "src\DotNetCore.CAP.Redis\DotNetCore.CAP.Redis.csproj", "{462B6245-46F6-42BF-8CE6-588ACAA53190}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.RedisStreams", "src\DotNetCore.CAP.Redis\DotNetCore.CAP.RedisStreams.csproj", "{462B6245-46F6-42BF-8CE6-588ACAA53190}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Samples.Redis.SqlServer", "samples\Samples.Redis.SqlServer\Samples.Redis.SqlServer.csproj", "{375AF85D-8C81-47C6-BE5B-D0874D4971EA}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Samples.Redis.SqlServer", "samples\Samples.Redis.SqlServer\Samples.Redis.SqlServer.csproj", "{375AF85D-8C81-47C6-BE5B-D0874D4971EA}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj b/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj index 3bfb741..0b0b723 100644 --- a/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj +++ b/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj @@ -13,7 +13,7 @@ - + diff --git a/samples/Samples.Redis.SqlServer/Startup.cs b/samples/Samples.Redis.SqlServer/Startup.cs index cd445bc..c5ef3a1 100644 --- a/samples/Samples.Redis.SqlServer/Startup.cs +++ b/samples/Samples.Redis.SqlServer/Startup.cs @@ -1,4 +1,4 @@ -using DotNetCore.CAP.Redis; +using DotNetCore.CAP.RedisStreams; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.HttpsPolicy; diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs b/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs index 6593a76..c0fa094 100644 --- a/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs +++ b/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs @@ -1,14 +1,15 @@ -using DotNetCore.CAP; +using System; +using DotNetCore.CAP; using DotNetCore.CAP.Internal; using StackExchange.Redis; -using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Reflection; +using DotNetCore.CAP.RedisStreams; -namespace DotNetCore.CAP.Redis +namespace Microsoft.Extensions.DependencyInjection { public static class CapRedisOptionsExtensions { @@ -21,8 +22,7 @@ namespace DotNetCore.CAP.Redis public static CapOptions UseRedis(this CapOptions options, Action configure) { - if (configure is null) - throw new ArgumentNullException(nameof(configure)); + if (configure is null) throw new ArgumentNullException(nameof(configure)); options.RegisterExtension(new RedisOptionsExtension(configure)); diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs b/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs index 7ab86bd..5372416 100644 --- a/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs +++ b/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs @@ -1,15 +1,15 @@ -using DotNetCore.CAP; +using System; +using DotNetCore.CAP; using DotNetCore.CAP.Internal; using Microsoft.Extensions.Options; using StackExchange.Redis; -using System; using System.Collections.Generic; using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; -namespace DotNetCore.CAP.Redis +namespace DotNetCore.CAP.RedisStreams { class CapRedisOptionsPostConfigure : IPostConfigureOptions { diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs b/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs index bf03d49..82eec0d 100644 --- a/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs +++ b/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs @@ -1,12 +1,12 @@ -using StackExchange.Redis; -using System; +using System; +using StackExchange.Redis; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; -namespace DotNetCore.CAP.Redis +namespace DotNetCore.CAP { public class CapRedisOptions { @@ -17,7 +17,14 @@ namespace DotNetCore.CAP.Redis internal string Endpoint { get; set; } + /// + /// Gets or sets the count of entries consumed from stream + /// public uint StreamEntriesCount { get; set; } + + /// + /// Gets or sets the number of connections that can be used with redis server + /// public uint ConnectionPoolSize { get; set; } } } diff --git a/src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj b/src/DotNetCore.CAP.Redis/DotNetCore.CAP.RedisStreams.csproj similarity index 67% rename from src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj rename to src/DotNetCore.CAP.Redis/DotNetCore.CAP.RedisStreams.csproj index 51e56cf..c4c9032 100644 --- a/src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj +++ b/src/DotNetCore.CAP.Redis/DotNetCore.CAP.RedisStreams.csproj @@ -2,19 +2,15 @@ netstandard2.1 - DotNetCore.CAP.Redis - $(PackageTags);Redis + DotNetCore.CAP.RedisStreams + $(PackageTags);RedisStreams - bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.Redis.xml + bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.RedisStreams.xml 1701;1702;1705;CS1591 - - - - diff --git a/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs b/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs index 6cb5404..fd1b816 100644 --- a/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs +++ b/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs @@ -1,4 +1,5 @@ -using DotNetCore.CAP.Redis; +using System; +using DotNetCore.CAP.RedisStreams; using DotNetCore.CAP; using DotNetCore.CAP.Transport; using Microsoft.Extensions.Configuration; @@ -6,13 +7,12 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Options; using StackExchange.Redis; -using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; -namespace DotNetCore.CAP.Redis +namespace DotNetCore.CAP { class RedisOptionsExtension : ICapOptionsExtension { diff --git a/src/DotNetCore.CAP.Redis/IConnectionPool.Default.cs b/src/DotNetCore.CAP.Redis/IConnectionPool.Default.cs index 0bc96a8..21f7394 100644 --- a/src/DotNetCore.CAP.Redis/IConnectionPool.Default.cs +++ b/src/DotNetCore.CAP.Redis/IConnectionPool.Default.cs @@ -1,7 +1,7 @@ -using Microsoft.Extensions.Logging; +using System; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StackExchange.Redis; -using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -9,7 +9,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -namespace DotNetCore.CAP.Redis +namespace DotNetCore.CAP.RedisStreams { class RedisConnectionPool : IRedisConnectionPool,IDisposable { diff --git a/src/DotNetCore.CAP.Redis/IConnectionPool.LazyConnection.cs b/src/DotNetCore.CAP.Redis/IConnectionPool.LazyConnection.cs index 4ffb5db..149c498 100644 --- a/src/DotNetCore.CAP.Redis/IConnectionPool.LazyConnection.cs +++ b/src/DotNetCore.CAP.Redis/IConnectionPool.LazyConnection.cs @@ -1,7 +1,7 @@ -using Microsoft.Extensions.Logging; +using System; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StackExchange.Redis; -using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -10,7 +10,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -namespace DotNetCore.CAP.Redis +namespace DotNetCore.CAP.RedisStreams { public class AsyncLazyRedisConnection : Lazy> { diff --git a/src/DotNetCore.CAP.Redis/IConnectionPool.cs b/src/DotNetCore.CAP.Redis/IConnectionPool.cs index c619f0d..c1d644b 100644 --- a/src/DotNetCore.CAP.Redis/IConnectionPool.cs +++ b/src/DotNetCore.CAP.Redis/IConnectionPool.cs @@ -1,10 +1,10 @@ -using StackExchange.Redis; -using System; +using System; +using StackExchange.Redis; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; -namespace DotNetCore.CAP.Redis +namespace DotNetCore.CAP.RedisStreams { interface IRedisConnectionPool { diff --git a/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs b/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs index 6ccc728..71ecd43 100644 --- a/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs +++ b/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs @@ -1,9 +1,9 @@ -using DotNetCore.CAP.Redis; +using System; +using DotNetCore.CAP.RedisStreams; using DotNetCore.CAP.Messages; using DotNetCore.CAP.Transport; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; @@ -14,7 +14,7 @@ using System.Threading; using System.Threading.Tasks; using StackExchange.Redis; -namespace DotNetCore.CAP.Redis +namespace DotNetCore.CAP.RedisStreams { class RedisConsumerClient : IConsumerClient { diff --git a/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs b/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs index ee0580b..cb9b553 100644 --- a/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs +++ b/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs @@ -1,16 +1,16 @@ -using DotNetCore.CAP.Redis; +using System; +using DotNetCore.CAP.RedisStreams; using DotNetCore.CAP; using DotNetCore.CAP.Transport; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StackExchange.Redis; -using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; -namespace DotNetCore.CAP.Redis +namespace DotNetCore.CAP.RedisStreams { class RedisConsumerClientFactory : IConsumerClientFactory { diff --git a/src/DotNetCore.CAP.Redis/IProcessingServer.PollPendingTopic.cs b/src/DotNetCore.CAP.Redis/IProcessingServer.PollPendingTopic.cs deleted file mode 100644 index f33bd05..0000000 --- a/src/DotNetCore.CAP.Redis/IProcessingServer.PollPendingTopic.cs +++ /dev/null @@ -1,71 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Linq; -using DotNetCore.CAP.Internal; -using DotNetCore.CAP.Redis; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; - -namespace DotNetCore.CAP.Kafka -{ - class PollPendingTopic : IProcessingServer - { - private readonly IRedisStreamManager redis; - private readonly ILogger logger; - private readonly CapRedisOptions options; - private readonly MethodMatcherCache selector; - - public PollPendingTopic( - IRedisStreamManager redis, - ILogger logger, - IOptions options, - MethodMatcherCache selector) - { - this.redis = redis; - this.logger = logger; - this.options = options.Value; - this.selector = selector; - } - - public void Start() - { - try - { - var streams = selector.GetAllTopics(); - - foreach (var stream in streams) - { - var streamExist=redis. - } - - topics.Value.First().TopicName - using var adminClient = new AdminClientBuilder(config).Build(); - - adminClient.CreateTopicsAsync(topics.Select(x => new TopicSpecification - { - Name = x - })).GetAwaiter().GetResult(); - - logger.LogInformation("Topic is automatically created successfully!"); - } - catch (CreateTopicsException ex) when (ex.Message.Contains("already exists")) - { - } - catch (Exception ex) - { - logger.LogError(ex, "An error was encountered when automatically creating topic!"); - } - finally - { - KafkaConsumerClientFactory.WaitCreateTopic.Set(); - } - } - - public void Dispose() - { - - } - } -} diff --git a/src/DotNetCore.CAP.Redis/IRedis.Events.Logger.cs b/src/DotNetCore.CAP.Redis/IRedis.Events.Logger.cs index e5bbe53..6c9aa08 100644 --- a/src/DotNetCore.CAP.Redis/IRedis.Events.Logger.cs +++ b/src/DotNetCore.CAP.Redis/IRedis.Events.Logger.cs @@ -1,12 +1,12 @@ -using Microsoft.Extensions.Logging; -using System; +using System; +using Microsoft.Extensions.Logging; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; -namespace DotNetCore.CAP.Redis +namespace DotNetCore.CAP.RedisStreams { class RedisLogger : TextWriter { diff --git a/src/DotNetCore.CAP.Redis/IRedis.Events.cs b/src/DotNetCore.CAP.Redis/IRedis.Events.cs index 7a0abfe..6b5229e 100644 --- a/src/DotNetCore.CAP.Redis/IRedis.Events.cs +++ b/src/DotNetCore.CAP.Redis/IRedis.Events.cs @@ -1,13 +1,13 @@ -using Microsoft.Extensions.Logging; +using System; +using Microsoft.Extensions.Logging; using StackExchange.Redis; -using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading.Tasks; -namespace DotNetCore.CAP.Redis +namespace DotNetCore.CAP.RedisStreams { class RedisEvents { diff --git a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Default.cs b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Default.cs index 03408d2..dba44cd 100644 --- a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Default.cs +++ b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Default.cs @@ -1,8 +1,8 @@ -using DotNetCore.CAP.Transport; +using System; +using DotNetCore.CAP.Transport; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StackExchange.Redis; -using System; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; @@ -10,7 +10,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -namespace DotNetCore.CAP.Redis +namespace DotNetCore.CAP.RedisStreams { class RedisStreamManager : IRedisStreamManager { @@ -92,17 +92,28 @@ namespace DotNetCore.CAP.Redis { try { + token.ThrowIfCancellationRequested(); + + var createdPositions = new List(); + await ConnectAsync(); var database = redis.GetDatabase(); - var readSet = database.StreamReadGroupAsync(positions, consumerGroup, consumerGroup, (int)options.StreamEntriesCount); + await foreach (var position in database.TryCreateConsumerGroup(positions, consumerGroup, logger)) + { + createdPositions.Add(position); + } + + if (!createdPositions.Any()) return (false, Array.Empty()); + + var readSet = database.StreamReadGroupAsync(createdPositions.ToArray(), consumerGroup, consumerGroup, (int)options.StreamEntriesCount); return (true, await readSet.ConfigureAwait(false)); } catch (Exception ex) { - logger.LogError($"Redis error when trying read consumer group {consumerGroup}", ex); + logger.LogError(ex, $"Redis error when trying read consumer group {consumerGroup}"); return (false, Array.Empty()); } } diff --git a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Extensions.cs b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Extensions.cs new file mode 100644 index 0000000..2a54a5d --- /dev/null +++ b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Extensions.cs @@ -0,0 +1,54 @@ +using Microsoft.Extensions.Logging; +using StackExchange.Redis; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace StackExchange.Redis +{ + static class RedisStreamManagerExtensions + { + public static async IAsyncEnumerable TryCreateConsumerGroup(this IDatabase database, StreamPosition[] positions, string consumerGroup, ILogger logger = null) + { + foreach (var position in positions) + { + bool created = false; + try + { + var stream = position.Key; + var streamExist = await database.KeyTypeAsync(stream); + if (streamExist == RedisType.None) + { + if (await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, StreamPosition.NewMessages, true)) + { + logger.LogInformation($"Redis stream [{position.Key}] created with consumer group [{consumerGroup}]"); + created = true; + } + } + else + { + var groupInfo = await database.StreamGroupInfoAsync(stream); + + if (groupInfo.All(g => g.Name != consumerGroup)) + { + if (await database.StreamCreateConsumerGroupAsync(stream, consumerGroup, StreamPosition.NewMessages)) + { + logger.LogInformation($"Redis stream [{position.Key}] created with consumer group [{consumerGroup}]"); + created = true; + } + } + } + } + catch (Exception ex) + { + logger?.LogError(ex, $"Redis error while creating consumer group [{consumerGroup}] of stream [{position.Key}]"); + } + + if (created) + yield return position; + } + } + } +} diff --git a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.cs b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.cs index b742edf..03c2a31 100644 --- a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.cs +++ b/src/DotNetCore.CAP.Redis/IRedisStream.Manager.cs @@ -1,8 +1,8 @@ -using DotNetCore.CAP.Transport; +using System; +using DotNetCore.CAP.Transport; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StackExchange.Redis; -using System; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; @@ -10,7 +10,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -namespace DotNetCore.CAP.Redis +namespace DotNetCore.CAP.RedisStreams { interface IRedisStreamManager { diff --git a/src/DotNetCore.CAP.Redis/ITransport.Redis.cs b/src/DotNetCore.CAP.Redis/ITransport.Redis.cs index cea32bf..a894397 100644 --- a/src/DotNetCore.CAP.Redis/ITransport.Redis.cs +++ b/src/DotNetCore.CAP.Redis/ITransport.Redis.cs @@ -11,7 +11,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using StackExchange.Redis; -namespace DotNetCore.CAP.Redis +namespace DotNetCore.CAP.RedisStreams { class RedisTransport : ITransport { diff --git a/src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs b/src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs deleted file mode 100644 index 349abf7..0000000 --- a/src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs +++ /dev/null @@ -1,22 +0,0 @@ -using DotNetCore.CAP.Internal; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace DotNetCore.CAP.Redis -{ - static class MethodMatcherCacheExtensions - { - public static string GetGroupByTopic(this MethodMatcherCache source, string topicName) - { - var groupsMap = source.GetCandidatesMethodsOfGroupNameGrouped(); - - return (from groupMap in groupsMap - from topic in groupMap.Value - where topic.TopicName == topicName - select topic.Attribute.Group).FirstOrDefault(); - } - } -} diff --git a/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs b/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs index f12ff58..33a8b0c 100644 --- a/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs +++ b/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs @@ -1,6 +1,6 @@ -using DotNetCore.CAP.Messages; +using System; +using DotNetCore.CAP.Messages; using StackExchange.Redis; -using System; using System.Collections.Generic; using System.Linq; using System.Runtime.CompilerServices; @@ -8,7 +8,7 @@ using System.Text; using System.Text.Json; using System.Threading.Tasks; -namespace DotNetCore.CAP.Redis +namespace DotNetCore.CAP.RedisStreams { static class RedisMessage { From 671b77437a3b0a84d05df08f171a5829fdd15710 Mon Sep 17 00:00:00 2001 From: "Mahmoud S. Zeid" Date: Tue, 27 Apr 2021 12:56:09 +0300 Subject: [PATCH 5/6] rename project folder to RedisStreams --- CAP.sln | 14 +++++++------- .../Samples.Redis.SqlServer.csproj | 2 +- .../CapOptions.Redis.Extensions.cs | 0 .../CapOptions.Redis.PostConfigure.cs | 0 .../CapOptions.Redis.cs | 0 .../DotNetCore.CAP.RedisStreams.csproj | 0 .../ICapOptionsExtension.Redis.cs | 0 .../IConnectionPool.Default.cs | 0 .../IConnectionPool.LazyConnection.cs | 0 .../IConnectionPool.cs | 0 .../IConsumerClient.Redis.cs | 0 .../IConsumerClientFactory.Redis.cs | 0 .../IRedis.Events.Logger.cs | 0 .../IRedis.Events.cs | 0 .../IRedisStream.Manager.Default.cs | 0 .../IRedisStream.Manager.Extensions.cs | 0 .../IRedisStream.Manager.cs | 0 .../ITransport.Redis.cs | 0 .../TransportMessage.Redis.cs | 0 19 files changed, 8 insertions(+), 8 deletions(-) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/CapOptions.Redis.Extensions.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/CapOptions.Redis.PostConfigure.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/CapOptions.Redis.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/DotNetCore.CAP.RedisStreams.csproj (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/ICapOptionsExtension.Redis.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/IConnectionPool.Default.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/IConnectionPool.LazyConnection.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/IConnectionPool.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/IConsumerClient.Redis.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/IConsumerClientFactory.Redis.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/IRedis.Events.Logger.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/IRedis.Events.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/IRedisStream.Manager.Default.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/IRedisStream.Manager.Extensions.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/IRedisStream.Manager.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/ITransport.Redis.cs (100%) rename src/{DotNetCore.CAP.Redis => DotNetCore.CAP.RedisStreams}/TransportMessage.Redis.cs (100%) diff --git a/CAP.sln b/CAP.sln index b5d3f5c..573ed8d 100644 --- a/CAP.sln +++ b/CAP.sln @@ -71,10 +71,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.NATS", "src\ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.Postgres.DashboardAuth", "samples\Sample.RabbitMQ.Postgres.DashboardAuth\Sample.RabbitMQ.Postgres.DashboardAuth.csproj", "{54F6C206-2A23-4971-AE5A-FC47EB772452}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.RedisStreams", "src\DotNetCore.CAP.Redis\DotNetCore.CAP.RedisStreams.csproj", "{462B6245-46F6-42BF-8CE6-588ACAA53190}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Samples.Redis.SqlServer", "samples\Samples.Redis.SqlServer\Samples.Redis.SqlServer.csproj", "{375AF85D-8C81-47C6-BE5B-D0874D4971EA}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.RedisStreams", "src\DotNetCore.CAP.RedisStreams\DotNetCore.CAP.RedisStreams.csproj", "{54458B54-49CC-454C-82B2-4AED681D9D07}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -169,14 +169,14 @@ Global {54F6C206-2A23-4971-AE5A-FC47EB772452}.Debug|Any CPU.Build.0 = Debug|Any CPU {54F6C206-2A23-4971-AE5A-FC47EB772452}.Release|Any CPU.ActiveCfg = Release|Any CPU {54F6C206-2A23-4971-AE5A-FC47EB772452}.Release|Any CPU.Build.0 = Release|Any CPU - {462B6245-46F6-42BF-8CE6-588ACAA53190}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {462B6245-46F6-42BF-8CE6-588ACAA53190}.Debug|Any CPU.Build.0 = Debug|Any CPU - {462B6245-46F6-42BF-8CE6-588ACAA53190}.Release|Any CPU.ActiveCfg = Release|Any CPU - {462B6245-46F6-42BF-8CE6-588ACAA53190}.Release|Any CPU.Build.0 = Release|Any CPU {375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Debug|Any CPU.Build.0 = Debug|Any CPU {375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Release|Any CPU.ActiveCfg = Release|Any CPU {375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Release|Any CPU.Build.0 = Release|Any CPU + {54458B54-49CC-454C-82B2-4AED681D9D07}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {54458B54-49CC-454C-82B2-4AED681D9D07}.Debug|Any CPU.Build.0 = Debug|Any CPU + {54458B54-49CC-454C-82B2-4AED681D9D07}.Release|Any CPU.ActiveCfg = Release|Any CPU + {54458B54-49CC-454C-82B2-4AED681D9D07}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -204,8 +204,8 @@ Global {B187DD15-092D-4B72-9807-50856607D237} = {3A6B6931-A123-477A-9469-8B468B5385AF} {8B2FD3EA-E72B-4A82-B182-B87EC0C15D07} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {54F6C206-2A23-4971-AE5A-FC47EB772452} = {3A6B6931-A123-477A-9469-8B468B5385AF} - {462B6245-46F6-42BF-8CE6-588ACAA53190} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} {375AF85D-8C81-47C6-BE5B-D0874D4971EA} = {3A6B6931-A123-477A-9469-8B468B5385AF} + {54458B54-49CC-454C-82B2-4AED681D9D07} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB} diff --git a/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj b/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj index 0b0b723..e4f1307 100644 --- a/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj +++ b/samples/Samples.Redis.SqlServer/Samples.Redis.SqlServer.csproj @@ -13,7 +13,7 @@ - + diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs b/src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.Extensions.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs rename to src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.Extensions.cs diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs b/src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.PostConfigure.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs rename to src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.PostConfigure.cs diff --git a/src/DotNetCore.CAP.Redis/CapOptions.Redis.cs b/src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/CapOptions.Redis.cs rename to src/DotNetCore.CAP.RedisStreams/CapOptions.Redis.cs diff --git a/src/DotNetCore.CAP.Redis/DotNetCore.CAP.RedisStreams.csproj b/src/DotNetCore.CAP.RedisStreams/DotNetCore.CAP.RedisStreams.csproj similarity index 100% rename from src/DotNetCore.CAP.Redis/DotNetCore.CAP.RedisStreams.csproj rename to src/DotNetCore.CAP.RedisStreams/DotNetCore.CAP.RedisStreams.csproj diff --git a/src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs b/src/DotNetCore.CAP.RedisStreams/ICapOptionsExtension.Redis.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs rename to src/DotNetCore.CAP.RedisStreams/ICapOptionsExtension.Redis.cs diff --git a/src/DotNetCore.CAP.Redis/IConnectionPool.Default.cs b/src/DotNetCore.CAP.RedisStreams/IConnectionPool.Default.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/IConnectionPool.Default.cs rename to src/DotNetCore.CAP.RedisStreams/IConnectionPool.Default.cs diff --git a/src/DotNetCore.CAP.Redis/IConnectionPool.LazyConnection.cs b/src/DotNetCore.CAP.RedisStreams/IConnectionPool.LazyConnection.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/IConnectionPool.LazyConnection.cs rename to src/DotNetCore.CAP.RedisStreams/IConnectionPool.LazyConnection.cs diff --git a/src/DotNetCore.CAP.Redis/IConnectionPool.cs b/src/DotNetCore.CAP.RedisStreams/IConnectionPool.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/IConnectionPool.cs rename to src/DotNetCore.CAP.RedisStreams/IConnectionPool.cs diff --git a/src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs b/src/DotNetCore.CAP.RedisStreams/IConsumerClient.Redis.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs rename to src/DotNetCore.CAP.RedisStreams/IConsumerClient.Redis.cs diff --git a/src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs b/src/DotNetCore.CAP.RedisStreams/IConsumerClientFactory.Redis.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs rename to src/DotNetCore.CAP.RedisStreams/IConsumerClientFactory.Redis.cs diff --git a/src/DotNetCore.CAP.Redis/IRedis.Events.Logger.cs b/src/DotNetCore.CAP.RedisStreams/IRedis.Events.Logger.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/IRedis.Events.Logger.cs rename to src/DotNetCore.CAP.RedisStreams/IRedis.Events.Logger.cs diff --git a/src/DotNetCore.CAP.Redis/IRedis.Events.cs b/src/DotNetCore.CAP.RedisStreams/IRedis.Events.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/IRedis.Events.cs rename to src/DotNetCore.CAP.RedisStreams/IRedis.Events.cs diff --git a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Default.cs b/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Default.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/IRedisStream.Manager.Default.cs rename to src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Default.cs diff --git a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.Extensions.cs b/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Extensions.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/IRedisStream.Manager.Extensions.cs rename to src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Extensions.cs diff --git a/src/DotNetCore.CAP.Redis/IRedisStream.Manager.cs b/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/IRedisStream.Manager.cs rename to src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.cs diff --git a/src/DotNetCore.CAP.Redis/ITransport.Redis.cs b/src/DotNetCore.CAP.RedisStreams/ITransport.Redis.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/ITransport.Redis.cs rename to src/DotNetCore.CAP.RedisStreams/ITransport.Redis.cs diff --git a/src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs b/src/DotNetCore.CAP.RedisStreams/TransportMessage.Redis.cs similarity index 100% rename from src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs rename to src/DotNetCore.CAP.RedisStreams/TransportMessage.Redis.cs From 3cddb176ffc1136e784bfd91d9df75c81e07826a Mon Sep 17 00:00:00 2001 From: "Mahmoud S. Zeid" Date: Tue, 27 Apr 2021 13:50:41 +0300 Subject: [PATCH 6/6] return stream position for exsiting stream/group --- .../IRedisStream.Manager.Extensions.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Extensions.cs b/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Extensions.cs index 2a54a5d..ca5e312 100644 --- a/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Extensions.cs +++ b/src/DotNetCore.CAP.RedisStreams/IRedisStream.Manager.Extensions.cs @@ -39,6 +39,8 @@ namespace StackExchange.Redis created = true; } } + else + created = true; } } catch (Exception ex)