Bladeren bron

WIP: Add NATS support. #595

master
Savorboard 4 jaren geleden
bovenliggende
commit
d838f97cb2
13 gewijzigde bestanden met toevoegingen van 479 en 1 verwijderingen
  1. +7
    -0
      CAP.sln
  2. +2
    -0
      CAP.sln.DotSettings
  3. +32
    -0
      src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs
  4. +30
    -0
      src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs
  5. +40
    -0
      src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs
  6. +23
    -0
      src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj
  7. +83
    -0
      src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs
  8. +16
    -0
      src/DotNetCore.CAP.NATS/IConnectionPool.cs
  9. +60
    -0
      src/DotNetCore.CAP.NATS/ITransport.NATS.cs
  10. +147
    -0
      src/DotNetCore.CAP.NATS/NATSConsumerClient.cs
  11. +32
    -0
      src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs
  12. +3
    -0
      src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
  13. +4
    -1
      src/DotNetCore.CAP/Transport/MqLogType.cs

+ 7
- 0
CAP.sln Bestand weergeven

@@ -65,6 +65,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Test", "test
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.ConsoleApp", "samples\Sample.ConsoleApp\Sample.ConsoleApp.csproj", "{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.NATS", "src\DotNetCore.CAP.NATS\DotNetCore.CAP.NATS.csproj", "{25A1B3A1-DD74-436C-9956-17E04FE7643D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -147,6 +149,10 @@ Global
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266}.Release|Any CPU.Build.0 = Release|Any CPU
{25A1B3A1-DD74-436C-9956-17E04FE7643D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{25A1B3A1-DD74-436C-9956-17E04FE7643D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{25A1B3A1-DD74-436C-9956-17E04FE7643D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{25A1B3A1-DD74-436C-9956-17E04FE7643D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -171,6 +177,7 @@ Global
{93176BAE-914B-4BED-9DE3-01FFB4F27FC5} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{75CC45E6-BF06-40F4-977D-10DCC05B2EFA} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{2B0F467E-ABBD-4A51-BF38-D4F609DB6266} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{25A1B3A1-DD74-436C-9956-17E04FE7643D} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}


+ 2
- 0
CAP.sln.DotSettings Bestand weergeven

@@ -1,4 +1,6 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=DB/@EntryIndexedValue">DB</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=NATS/@EntryIndexedValue">NATS</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Mongo/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=NATS/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Postgre/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>

+ 32
- 0
src/DotNetCore.CAP.NATS/CAP.NATSCapOptionsExtension.cs Bestand weergeven

@@ -0,0 +1,32 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.NATS;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.DependencyInjection;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
internal sealed class NATSCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<NATSOptions> _configure;

public NATSCapOptionsExtension(Action<NATSOptions> configure)
{
_configure = configure;
}

public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapMessageQueueMakerService>();

services.Configure(_configure);

services.AddSingleton<ITransport, NATSTransport>();
services.AddSingleton<IConsumerClientFactory, NATSConsumerClientFactory>();
services.AddSingleton<IConnectionPool, ConnectionPool>();
}
}
}

+ 30
- 0
src/DotNetCore.CAP.NATS/CAP.NATSOptions.cs Bestand weergeven

@@ -0,0 +1,30 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using NATS.Client;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
/// <summary>
/// Provides programmatic configuration for the CAP NATS project.
/// </summary>
public class NATSOptions
{
/// <summary>
/// Gets or sets the server url/urls used to connect to the NATs server.
/// </summary>
/// <remarks>This may contain username/password information.</remarks>
public string Servers { get; set; }

/// <summary>
/// connection pool size, default is 10
/// </summary>
public int ConnectionPoolSize { get; set; } = 10;

/// <summary>
/// Used to setup all NATs client options
/// </summary>
public Options Options { get; set; }
}
}

+ 40
- 0
src/DotNetCore.CAP.NATS/CAP.Options.Extensions.cs Bestand weergeven

@@ -0,0 +1,40 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
/// <summary>
/// Configuration to use NATS in CAP.
/// </summary>
/// <param name="options">CAP configuration options</param>
/// <param name="bootstrapServers">NATS bootstrap server urls.</param>
public static CapOptions UseNATS(this CapOptions options, string bootstrapServers)
{
return options.UseNATS(opt => { opt.Servers = bootstrapServers; });
}

/// <summary>
/// Configuration to use NATS in CAP.
/// </summary>
/// <param name="options">CAP configuration options</param>
/// <param name="configure">Provides programmatic configuration for the NATS.</param>
/// <returns></returns>
public static CapOptions UseNATS(this CapOptions options, Action<NATSOptions> configure)
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}

options.RegisterExtension(new NATSCapOptionsExtension(configure));

return options;
}
}
}

+ 23
- 0
src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj Bestand weergeven

@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.NATS</AssemblyName>
<PackageTags>$(PackageTags);NATS</PackageTags>
</PropertyGroup>

<PropertyGroup>
<WarningsAsErrors>NU1605;NU1701</WarningsAsErrors>
<NoWarn>NU1701;CS1591</NoWarn>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.NATS.xml</DocumentationFile>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NATS.Client" Version="0.10.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

</Project>

+ 83
- 0
src/DotNetCore.CAP.NATS/IConnectionPool.Default.cs Bestand weergeven

@@ -0,0 +1,83 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Threading;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NATS.Client;

namespace DotNetCore.CAP.NATS
{
public class ConnectionPool : IConnectionPool, IDisposable
{
private readonly NATSOptions _options;
private readonly ConcurrentQueue<IConnection> _connectionPool;
private readonly ConnectionFactory _connectionFactory;
private int _pCount;
private int _maxSize;
public ConnectionPool(ILogger<ConnectionPool> logger, IOptions<NATSOptions> options)
{
_options = options.Value;
_connectionPool = new ConcurrentQueue<IConnection>();
_connectionFactory = new ConnectionFactory();
_maxSize = _options.ConnectionPoolSize;

logger.LogDebug("NATS configuration: {0}", options.Value.Options);
}

public string ServersAddress => _options.Servers;

public IConnection RentConnection()
{
if (_connectionPool.TryDequeue(out var connection))
{
Interlocked.Decrement(ref _pCount);

return connection;
}

if (_options.Options != null)
{
if (_options.Servers != null)
{
_options.Options.Url = _options.Servers;
}
connection = _connectionFactory.CreateConnection(_options.Options);
}
else
{
connection = _connectionFactory.CreateConnection(_options.Servers);
}
return connection;
}

public bool Return(IConnection connection)
{
if (Interlocked.Increment(ref _pCount) <= _maxSize)
{
_connectionPool.Enqueue(connection);

return true;
}

Interlocked.Decrement(ref _pCount);

return false;
}

public void Dispose()
{
_maxSize = 0;

while (_connectionPool.TryDequeue(out var context))
{
context.Dispose();

}
}
}
}

+ 16
- 0
src/DotNetCore.CAP.NATS/IConnectionPool.cs Bestand weergeven

@@ -0,0 +1,16 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using NATS.Client;

namespace DotNetCore.CAP.NATS
{
public interface IConnectionPool
{
string ServersAddress { get; }

IConnection RentConnection();

bool Return(IConnection connection);
}
}

+ 60
- 0
src/DotNetCore.CAP.NATS/ITransport.NATS.cs Bestand weergeven

@@ -0,0 +1,60 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP.NATS
{
internal class NATSTransport : ITransport
{
private readonly IConnectionPool _connectionPool;
private readonly ILogger _logger;

public NATSTransport(ILogger<NATSTransport> logger, IConnectionPool connectionPool)
{
_logger = logger;
_connectionPool = connectionPool;
}

public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _connectionPool.ServersAddress);

public Task<OperateResult> SendAsync(TransportMessage message)
{
var connection = _connectionPool.RentConnection();

try
{
var binFormatter = new BinaryFormatter();
using var mStream = new MemoryStream();
binFormatter.Serialize(mStream, message);

connection.Publish(message.GetName(), mStream.ToArray());

_logger.LogDebug($"kafka topic message [{message.GetName()}] has been published.");

return Task.FromResult(OperateResult.Success);
}
catch (Exception ex)
{
var warpEx = new PublisherSentFailedException(ex.Message, ex);

return Task.FromResult(OperateResult.Failed(warpEx));
}
finally
{
var returned = _connectionPool.Return(connection);
if (!returned)
{
connection.Dispose();
}
}
}
}
}

+ 147
- 0
src/DotNetCore.CAP.NATS/NATSConsumerClient.cs Bestand weergeven

@@ -0,0 +1,147 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Options;
using NATS.Client;

namespace DotNetCore.CAP.NATS
{
internal sealed class NATSConsumerClient : IConsumerClient
{
private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);

private readonly string _groupId;
private readonly NATSOptions _natsOptions;
private readonly IList<IAsyncSubscription> _asyncSubscriptions;

private IConnection _consumerClient;

public NATSConsumerClient(string groupId, IOptions<NATSOptions> options)
{
_groupId = groupId;
_asyncSubscriptions = new List<IAsyncSubscription>();
_natsOptions = options.Value ?? throw new ArgumentNullException(nameof(options));
}

public event EventHandler<TransportMessage> OnMessageReceived;

public event EventHandler<LogMessageEventArgs> OnLog;

public BrokerAddress BrokerAddress => new BrokerAddress("NATS", _natsOptions.Servers);

public void Subscribe(IEnumerable<string> topics)
{
if (topics == null)
{
throw new ArgumentNullException(nameof(topics));
}

Connect();

foreach (var topic in topics)
{
_asyncSubscriptions.Add(_consumerClient.SubscribeAsync(topic, _groupId));
}
}

public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
Connect();

foreach (var subscription in _asyncSubscriptions)
{
subscription.MessageHandler += Subscription_MessageHandler;
subscription.Start();
}

while (true)
{
cancellationToken.ThrowIfCancellationRequested();
cancellationToken.WaitHandle.WaitOne(timeout);
}
// ReSharper disable once FunctionNeverReturns
}

private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e)
{
using var mStream = new MemoryStream();
var binFormatter = new BinaryFormatter();

mStream.Write(e.Message.Data, 0, e.Message.Data.Length);
mStream.Position = 0;

var message = binFormatter.Deserialize(mStream) as TransportMessage;

OnMessageReceived?.Invoke(sender, message);
}

public void Commit(object sender)
{
//TODO : Only NATS Streaming Server Support
}

public void Reject(object sender)
{
//TODO : Only NATS Streaming Server Support
}

public void Dispose()
{
_consumerClient?.Dispose();
}

public void Connect()
{
if (_consumerClient != null)
{
return;
}

ConnectionLock.Wait();

try
{
if (_consumerClient == null)
{
var opts = _natsOptions.Options ?? ConnectionFactory.GetDefaultOptions();
opts.Url = _natsOptions.Servers ?? opts.Url;
opts.ClosedEventHandler = ConnectedEventHandler;
opts.DisconnectedEventHandler = ConnectedEventHandler;
opts.AsyncErrorEventHandler = AsyncErrorEventHandler;
_consumerClient = new ConnectionFactory().CreateConnection(opts);
}
}
finally
{
ConnectionLock.Release();
}
}

private void ConnectedEventHandler(object sender, ConnEventArgs e)
{
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.ServerConnError,
Reason = $"An error occurred during connect NATS --> {e.Error}"
};
OnLog?.Invoke(null, logArgs);
}

private void AsyncErrorEventHandler(object sender, ErrEventArgs e)
{
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.AsyncErrorEvent,
Reason = $"An error occurred out of band --> {e.Error}"
};
OnLog?.Invoke(null, logArgs);
}
}
}

+ 32
- 0
src/DotNetCore.CAP.NATS/NATSConsumerClientFactory.cs Bestand weergeven

@@ -0,0 +1,32 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.NATS
{
internal sealed class NATSConsumerClientFactory : IConsumerClientFactory
{
private readonly IOptions<NATSOptions> _natsOptions;

public NATSConsumerClientFactory(IOptions<NATSOptions> natsOptions)
{
_natsOptions = natsOptions;
}

public IConsumerClient Create(string groupId)
{
try
{
var client = new NATSConsumerClient(groupId, _natsOptions);
client.Connect();
return client;
}
catch (System.Exception e)
{
throw new BrokerConnectionException(e);
}
}
}
}

+ 3
- 0
src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs Bestand weergeven

@@ -271,6 +271,9 @@ namespace DotNetCore.CAP.Internal
case MqLogType.ExceptionReceived:
_logger.LogError("AzureServiceBus subscriber received an error. --> " + logmsg.Reason);
break;
case MqLogType.AsyncErrorEvent:
_logger.LogError("NATS subscriber received an error. --> " + logmsg.Reason);
break;
default:
throw new ArgumentOutOfRangeException();
}


+ 4
- 1
src/DotNetCore.CAP/Transport/MqLogType.cs Bestand weergeven

@@ -18,7 +18,10 @@ namespace DotNetCore.CAP.Transport
ServerConnError,

//AzureServiceBus
ExceptionReceived
ExceptionReceived,

//NATS
AsyncErrorEvent
}

public class LogMessageEventArgs : EventArgs


Laden…
Annuleren
Opslaan