Browse Source

add redis

master
mahmoud.samir 3 years ago
parent
commit
05e0f53bb3
15 changed files with 632 additions and 1 deletions
  1. +15
    -1
      CAP.sln
  2. +32
    -0
      src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs
  3. +38
    -0
      src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs
  4. +22
    -0
      src/DotNetCore.CAP.Redis/CapOptions.Redis.cs
  5. +22
    -0
      src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj
  6. +41
    -0
      src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs
  7. +87
    -0
      src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs
  8. +33
    -0
      src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs
  9. +108
    -0
      src/DotNetCore.CAP.Redis/IRedisCache.Manager.Default.cs
  10. +57
    -0
      src/DotNetCore.CAP.Redis/IRedisCache.Manager.Events.cs
  11. +27
    -0
      src/DotNetCore.CAP.Redis/IRedisCache.Manager.Logger.cs
  12. +20
    -0
      src/DotNetCore.CAP.Redis/IRedisCache.Manager.cs
  13. +55
    -0
      src/DotNetCore.CAP.Redis/ITransport.Redis.cs
  14. +22
    -0
      src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs
  15. +53
    -0
      src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs

+ 15
- 1
CAP.sln View File

@@ -69,7 +69,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.AmazonSQS.InMemory",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.NATS", "src\DotNetCore.CAP.NATS\DotNetCore.CAP.NATS.csproj", "{8B2FD3EA-E72B-4A82-B182-B87EC0C15D07}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.RabbitMQ.Postgres.DashboardAuth", "samples\Sample.RabbitMQ.Postgres.DashboardAuth\Sample.RabbitMQ.Postgres.DashboardAuth.csproj", "{54F6C206-2A23-4971-AE5A-FC47EB772452}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.RabbitMQ.Postgres.DashboardAuth", "samples\Sample.RabbitMQ.Postgres.DashboardAuth\Sample.RabbitMQ.Postgres.DashboardAuth.csproj", "{54F6C206-2A23-4971-AE5A-FC47EB772452}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Redis", "src\DotNetCore.CAP.Redis\DotNetCore.CAP.Redis.csproj", "{462B6245-46F6-42BF-8CE6-588ACAA53190}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Samples.Redis.SqlServer", "samples\Samples.Redis.SqlServer\Samples.Redis.SqlServer.csproj", "{375AF85D-8C81-47C6-BE5B-D0874D4971EA}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -165,6 +169,14 @@ Global
{54F6C206-2A23-4971-AE5A-FC47EB772452}.Debug|Any CPU.Build.0 = Debug|Any CPU
{54F6C206-2A23-4971-AE5A-FC47EB772452}.Release|Any CPU.ActiveCfg = Release|Any CPU
{54F6C206-2A23-4971-AE5A-FC47EB772452}.Release|Any CPU.Build.0 = Release|Any CPU
{462B6245-46F6-42BF-8CE6-588ACAA53190}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{462B6245-46F6-42BF-8CE6-588ACAA53190}.Debug|Any CPU.Build.0 = Debug|Any CPU
{462B6245-46F6-42BF-8CE6-588ACAA53190}.Release|Any CPU.ActiveCfg = Release|Any CPU
{462B6245-46F6-42BF-8CE6-588ACAA53190}.Release|Any CPU.Build.0 = Release|Any CPU
{375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{375AF85D-8C81-47C6-BE5B-D0874D4971EA}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -192,6 +204,8 @@ Global
{B187DD15-092D-4B72-9807-50856607D237} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{8B2FD3EA-E72B-4A82-B182-B87EC0C15D07} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{54F6C206-2A23-4971-AE5A-FC47EB772452} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{462B6245-46F6-42BF-8CE6-588ACAA53190} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{375AF85D-8C81-47C6-BE5B-D0874D4971EA} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}


+ 32
- 0
src/DotNetCore.CAP.Redis/CapOptions.Redis.Extensions.cs View File

@@ -0,0 +1,32 @@
using DotNetCore.CAP;
using DotNetCore.CAP.Internal;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Reflection;

namespace DotNetCore.CAP.Redis
{
public static class CapRedisOptionsExtensions
{
public static CapOptions UseRedis(this CapOptions options) =>
options.UseRedis(_ => { });

public static CapOptions UseRedis(this CapOptions options, string connection) =>
options.UseRedis(opt => opt.Configuration = ConfigurationOptions.Parse(connection));


public static CapOptions UseRedis(this CapOptions options, Action<CapRedisOptions> configure)
{
if (configure is null)
throw new ArgumentNullException(nameof(configure));

options.RegisterExtension(new RedisOptionsExtension(configure));

return options;
}
}
}

+ 38
- 0
src/DotNetCore.CAP.Redis/CapOptions.Redis.PostConfigure.cs View File

@@ -0,0 +1,38 @@
using DotNetCore.CAP;
using DotNetCore.CAP.Internal;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Redis
{
class CapRedisOptionsPostConfigure : IPostConfigureOptions<CapRedisOptions>
{
private readonly CapOptions capOptions;

public CapRedisOptionsPostConfigure(IOptions<CapOptions> options)
{
capOptions = options.Value;
}

public void PostConfigure(string name, CapRedisOptions options)
{
var groupPrefix = string.IsNullOrWhiteSpace(capOptions.GroupNamePrefix) ? string.Empty : $"{capOptions.GroupNamePrefix}.";
options.DefaultChannel = $"{groupPrefix}{capOptions.DefaultGroupName}";

options.Configuration ??= new ConfigurationOptions();

if (!options.Configuration.EndPoints.Any())
{
options.Configuration.EndPoints.Add(IPAddress.Loopback, 0);
options.Configuration.SetDefaultPorts();
}
}
}
}

+ 22
- 0
src/DotNetCore.CAP.Redis/CapOptions.Redis.cs View File

@@ -0,0 +1,22 @@
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Redis
{
public class CapRedisOptions
{
/// <summary>
/// Gets or sets the options of redis connections
/// </summary>
public ConfigurationOptions Configuration { get; set; }

internal string Endpoint { get; set; }

internal string DefaultChannel { get; set; }
}
}

+ 22
- 0
src/DotNetCore.CAP.Redis/DotNetCore.CAP.Redis.csproj View File

@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<AssemblyName>DotNetCore.CAP.Redis</AssemblyName>
<PackageTags>$(PackageTags);Redis</PackageTags>
</PropertyGroup>

<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.Redis.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="StackExchange.Redis" Version="2.2.4" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

</Project>

+ 41
- 0
src/DotNetCore.CAP.Redis/ICapOptionsExtension.Redis.cs View File

@@ -0,0 +1,41 @@
using DotNetCore.CAP.Redis;
using DotNetCore.CAP;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Redis
{
class RedisOptionsExtension : ICapOptionsExtension
{
private readonly Action<CapRedisOptions> configure;
public RedisOptionsExtension(Action<CapRedisOptions> configure)
{
if (configure is null)
{
throw new ArgumentNullException(nameof(configure));
}

this.configure = configure;
}

public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapMessageQueueMakerService>();
services.AddSingleton<IRedisCacheManager, RedisCacheManager>();
services.AddSingleton<IConsumerClientFactory, RedisConsumerClientFactory>();
services.AddSingleton<ITransport, RedisTransport>();

services.TryAddEnumerable(ServiceDescriptor.Singleton<IPostConfigureOptions<CapRedisOptions>, CapRedisOptionsPostConfigure>());
services.AddOptions<CapRedisOptions>().Configure(configure);
}
}
}

+ 87
- 0
src/DotNetCore.CAP.Redis/IConsumerClient.Redis.cs View File

@@ -0,0 +1,87 @@
using DotNetCore.CAP.Redis;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Redis
{
class RedisConsumerClient : IConsumerClient
{
private readonly ILogger<RedisConsumerClient> logger;
private readonly IRedisCacheManager redis;
private readonly CapRedisOptions options;
private readonly string groupId;

public RedisConsumerClient(
string groubId,
IRedisCacheManager redis,
CapRedisOptions options,
ILogger<RedisConsumerClient> logger
)
{
this.groupId = groubId;
this.redis = redis;
this.options = options;
this.logger = logger;
}

public event EventHandler<TransportMessage> OnMessageReceived;

public event EventHandler<LogMessageEventArgs> OnLog;

public BrokerAddress BrokerAddress => new BrokerAddress("redis", options.Endpoint);

public void Subscribe(IEnumerable<string> topics)
{
if (topics == null) throw new ArgumentNullException(nameof(topics));

redis.SubscribeAsync(groupId, topics, (channel, message) =>
{
logger.LogDebug($"Redis message with name {message.GetName()} subscribed.");

message.GroupId = groupId;

OnMessageReceived?.Invoke(channel, message);

return Task.CompletedTask;
});
}

public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
cancellationToken.Register(async () => await redis.UnsubscribeAsync());

while (true)
{
cancellationToken.ThrowIfCancellationRequested();
cancellationToken.WaitHandle.WaitOne(timeout);
}
}

public void Commit(object sender)
{
// ignore
}

public void Reject(object sender)
{
// ignore
}

public void Dispose()
{
//ignore
}

}
}

+ 33
- 0
src/DotNetCore.CAP.Redis/IConsumerClientFactory.Redis.cs View File

@@ -0,0 +1,33 @@
using DotNetCore.CAP.Redis;
using DotNetCore.CAP;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Redis
{
class RedisConsumerClientFactory : IConsumerClientFactory
{
private readonly CapRedisOptions redisOptions;
private readonly IRedisCacheManager redis;
private readonly ILogger<RedisConsumerClient> logger;

public RedisConsumerClientFactory(IOptions<CapRedisOptions> redisOptions, IRedisCacheManager redis, ILogger<RedisConsumerClient> logger)
{
this.redisOptions = redisOptions.Value;
this.redis = redis;
this.logger = logger;
}

public IConsumerClient Create(string groupId)
{
return new RedisConsumerClient(groupId, redis, redisOptions, logger);
}
}
}

+ 108
- 0
src/DotNetCore.CAP.Redis/IRedisCache.Manager.Default.cs View File

@@ -0,0 +1,108 @@
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Redis
{
class RedisCacheManager : IRedisCacheManager
{
private readonly SemaphoreSlim connectionLock = new SemaphoreSlim(1, 1);
private readonly CapRedisOptions options;
private readonly ILogger<RedisCacheManager> logger;
private ConnectionMultiplexer redis;
private ISubscriber subscriber;
private bool disposed;

public RedisCacheManager(IOptions<CapRedisOptions> options, ILogger<RedisCacheManager> logger)
{
this.options = options.Value;
this.logger = logger;
_ = ConnectAsync();
}

public async Task SubscribeAsync(string channelName, IEnumerable<string> topics, Func<RedisChannel, RedisMessage, Task> callback)
{
var channel = await GetChannel(channelName);
channel.OnMessage(channelMessage =>
{
var message = RedisMessage.Create(channelMessage.Message);

if (topics.Any(c => c == message.GetName()))
{
callback?.Invoke(channelMessage.SubscriptionChannel, message);
}
});
}

public async Task PublishAsync(string channelName, RedisValue message)
{
await subscriber.PublishAsync(channelName, message);
}
public async Task UnsubscribeAsync()
{
await subscriber?.UnsubscribeAllAsync();
}

public void Dispose()
{
if (!disposed)
{
subscriber?.UnsubscribeAll();
redis?.Close();
disposed = true;
}
}


private async Task<ChannelMessageQueue> GetChannel(string channelName)
{
await ConnectAsync().ConfigureAwait(false);
return await subscriber.SubscribeAsync(channelName).ConfigureAwait(false);
}

private async Task ConnectAsync()
{
if (disposed == true)
throw new ObjectDisposedException(nameof(IRedisCacheManager));

if (redis != null)
{
subscriber ??= redis.GetSubscriber();
return;
}
else
{
try
{
await connectionLock.WaitAsync().ConfigureAwait(false);

if (redis != null)
{
return;
}

var redisLogger = new RedisCacheLogger(logger);

redis = await ConnectionMultiplexer.ConnectAsync(options.Configuration, redisLogger)
.ConfigureAwait(false);

redis.LogEvents(logger);

subscriber = redis.GetSubscriber();
}
finally
{
connectionLock.Release();
}
}
}

}
}

+ 57
- 0
src/DotNetCore.CAP.Redis/IRedisCache.Manager.Events.cs View File

@@ -0,0 +1,57 @@
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Redis
{
static class RedisConnectionExtensions
{
public static void LogEvents(this IConnectionMultiplexer connection,ILogger logger)
{
if (connection is null)
{
throw new ArgumentNullException(nameof(connection));
}

if (logger is null)
{
throw new ArgumentNullException(nameof(logger));
}

_ = new RedisCacheManagerEvents(connection, logger);
}
}

class RedisCacheManagerEvents
{
private readonly ILogger logger;

public RedisCacheManagerEvents(IConnectionMultiplexer connection, ILogger logger)
{
this.logger = logger;
connection.ErrorMessage += Connection_ErrorMessage;
connection.ConnectionRestored += Connection_ConnectionRestored;
connection.ConnectionFailed += Connection_ConnectionFailed;
}

private void Connection_ConnectionFailed(object sender, ConnectionFailedEventArgs e)
{
logger.LogError(e.Exception, $"Connection failed!, {e.Exception.Message}, for endpoint:{e.EndPoint}, failure type:{e.FailureType}, connection type:{e.ConnectionType}");
}

private void Connection_ConnectionRestored(object sender, ConnectionFailedEventArgs e)
{
logger.LogWarning(e.Exception, $"Connection restored back!, {e.Exception.Message}, for endpoint:{e.EndPoint}, failure type:{e.FailureType}, connection type:{e.ConnectionType}");
}

private void Connection_ErrorMessage(object sender, RedisErrorEventArgs e)
{
logger.LogError($"Server replied with error, {e.Message}, for endpoint:{e.EndPoint}");
}
}
}

+ 27
- 0
src/DotNetCore.CAP.Redis/IRedisCache.Manager.Logger.cs View File

@@ -0,0 +1,27 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Redis
{
class RedisCacheLogger : TextWriter
{
private readonly ILogger logger;

public RedisCacheLogger(ILogger logger)
{
this.logger = logger;
}

public override void WriteLine(string value)
{
logger.LogInformation(value);
}
public override Encoding Encoding => Encoding.UTF8;
}
}

+ 20
- 0
src/DotNetCore.CAP.Redis/IRedisCache.Manager.cs View File

@@ -0,0 +1,20 @@
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Redis
{
interface IRedisCacheManager : IDisposable
{
Task SubscribeAsync(string channelName, IEnumerable<string> topics, Func<RedisChannel,RedisMessage, Task> callback);
Task PublishAsync(string channelName, RedisValue message);
Task UnsubscribeAsync();
}
}

+ 55
- 0
src/DotNetCore.CAP.Redis/ITransport.Redis.cs View File

@@ -0,0 +1,55 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using DotNetCore.CAP;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using StackExchange.Redis;

namespace DotNetCore.CAP.Redis
{
class RedisTransport : ITransport
{
private readonly IRedisCacheManager redis;
private readonly ILogger<RedisTransport> logger;
private readonly MethodMatcherCache selector;
private readonly CapRedisOptions options;

public RedisTransport(IRedisCacheManager redis, MethodMatcherCache selector, IOptions<CapRedisOptions> options, ILogger<RedisTransport> logger)
{
this.redis = redis;
this.selector = selector;
this.options = options.Value;
this.logger = logger;
}

public BrokerAddress BrokerAddress => new BrokerAddress("redis", options.Endpoint);

public async Task<OperateResult> SendAsync(TransportMessage message)
{
try
{
var redisMessage = new RedisMessage(message.Headers, message.Body).AsRedisValue();

var channelName = selector.GetGroupByTopic(message.GetName()) ?? options.DefaultChannel;

await redis.PublishAsync(channelName, redisMessage);

logger.LogDebug($"Redis message [{message.GetName()}] has been published.");

return OperateResult.Success;
}
catch (Exception ex)
{
var wrapperEx = new PublisherSentFailedException(ex.Message, ex);

return OperateResult.Failed(wrapperEx);
}
}
}
}

+ 22
- 0
src/DotNetCore.CAP.Redis/MethodMatcherCache.Extensions.cs View File

@@ -0,0 +1,22 @@
using DotNetCore.CAP.Internal;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Redis
{
static class MethodMatcherCacheExtensions
{
public static string GetGroupByTopic(this MethodMatcherCache source, string topicName)
{
var groupsMap = source.GetCandidatesMethodsOfGroupNameGrouped();

return (from groupMap in groupsMap
from topic in groupMap.Value
where topic.TopicName == topicName
select topic.Attribute.Group).FirstOrDefault();
}
}
}

+ 53
- 0
src/DotNetCore.CAP.Redis/TransportMessage.Redis.cs View File

@@ -0,0 +1,53 @@
using DotNetCore.CAP.Messages;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Messages = DotNetCore.CAP.Messages;

namespace DotNetCore.CAP.Redis
{
class RedisMessage : TransportMessage
{
public RedisMessage(IDictionary<string, string> headers, byte[] body) : base(headers, body) { }

public string GroupId
{
get => Headers.TryGetValue(Messages.Headers.Group, out var value) ? value : default;
set => Headers.TryAdd(Messages.Headers.Group, value);
}

internal RedisValue AsRedisValue()
{
return JsonSerializer.Serialize(new RedisMessageValue
{
Headers = Headers,
Body = Body
}, new JsonSerializerOptions(JsonSerializerDefaults.Web));
}

public static RedisMessage Create(RedisValue redisValue)
{
if (redisValue.IsNullOrEmpty)
return Empty();

var value = JsonSerializer.Deserialize<RedisMessageValue>(redisValue, new JsonSerializerOptions(JsonSerializerDefaults.Web));

return new RedisMessage(value.Headers, value.Body);
}

public static RedisMessage Empty()
{
return new RedisMessage(new Dictionary<string, string>(), Array.Empty<byte>());
}
}

class RedisMessageValue
{
public IDictionary<string, string> Headers { get; set; }
public byte[] Body { get; set; }
}
}

Loading…
Cancel
Save