Browse Source

Code refactoring

master
杨晓东 4 years ago
parent
commit
79e34ca2c2
11 changed files with 149 additions and 279 deletions
  1. +2
    -2
      CAP.sln
  2. +0
    -0
      samples/Sample.Pulsar.InMemory/Sample.Pulsar.InMemory.csproj
  3. +3
    -3
      src/DotNetCore.CAP.Pulsar/CAP.Options.Extensions.cs
  4. +1
    -1
      src/DotNetCore.CAP.Pulsar/CAP.PulsarCapOptionsExtension.cs
  5. +22
    -59
      src/DotNetCore.CAP.Pulsar/CAP.PulsarOptions.cs
  6. +77
    -0
      src/DotNetCore.CAP.Pulsar/IConnectionFactory.Default.cs
  7. +4
    -3
      src/DotNetCore.CAP.Pulsar/IConnectionFactory.cs
  8. +0
    -76
      src/DotNetCore.CAP.Pulsar/IConnectionPool.Default.cs
  9. +12
    -31
      src/DotNetCore.CAP.Pulsar/ITransport.Pulsar.cs
  10. +22
    -100
      src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs
  11. +6
    -4
      src/DotNetCore.CAP.Pulsar/PulsarConsumerClientFactory.cs

+ 2
- 2
CAP.sln View File

@@ -67,9 +67,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.ConsoleApp", "sample
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.AmazonSQS", "src\DotNetCore.CAP.AmazonSQS\DotNetCore.CAP.AmazonSQS.csproj", "{43475E00-51B7-443D-BC2D-FC21F9D8A0B4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.Pulsar", "src\DotNetCore.CAP.Pulsar\DotNetCore.CAP.Pulsar.csproj", "{73408EA6-1025-463C-88BC-A20769E44BC4}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Pulsar", "src\DotNetCore.CAP.Pulsar\DotNetCore.CAP.Pulsar.csproj", "{73408EA6-1025-463C-88BC-A20769E44BC4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample.PulsarInMemory", "samples\Sample.Pulsar.InMemory\Sample.PulsarInMemory.csproj", "{AFF0A34A-F938-4F75-9A96-9FC3DC0BF6DF}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Pulsar.InMemory", "samples\Sample.Pulsar.InMemory\Sample.Pulsar.InMemory.csproj", "{AFF0A34A-F938-4F75-9A96-9FC3DC0BF6DF}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution


samples/Sample.Pulsar.InMemory/Sample.PulsarInMemory.csproj → samples/Sample.Pulsar.InMemory/Sample.Pulsar.InMemory.csproj View File


+ 3
- 3
src/DotNetCore.CAP.Pulsar/CAP.Options.Extensions.cs View File

@@ -13,10 +13,10 @@ namespace Microsoft.Extensions.DependencyInjection
/// Configuration to use pulsar in CAP.
/// </summary>
/// <param name="options">CAP configuration options</param>
/// <param name="bootstrapServers">Pulsar bootstrap server urls.</param>
public static CapOptions UsePulsar(this CapOptions options, string bootstrapServers)
/// <param name="serverUrl">Pulsar bootstrap server urls.</param>
public static CapOptions UsePulsar(this CapOptions options, string serverUrl)
{
return options.UsePulsar(opt => { opt.Servers = bootstrapServers; });
return options.UsePulsar(opt => { opt.ServiceUrl = serverUrl; });
}

/// <summary>


+ 1
- 1
src/DotNetCore.CAP.Pulsar/CAP.PulsarCapOptionsExtension.cs View File

@@ -26,7 +26,7 @@ namespace DotNetCore.CAP

services.AddSingleton<ITransport, PulsarTransport>();
services.AddSingleton<IConsumerClientFactory, PulsarConsumerClientFactory>();
services.AddSingleton<IConnectionPool, ConnectionPool>();
services.AddSingleton<IConnectionFactory, ConnectionFactory>();
}
}
}

+ 22
- 59
src/DotNetCore.CAP.Pulsar/CAP.PulsarOptions.cs View File

@@ -1,74 +1,37 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using Pulsar.Client.Api;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
using Pulsar;

/// <summary>
/// Provides programmatic configuration for the CAP kafka project.
/// Provides programmatic configuration for the CAP pulsar project.
/// </summary>
public class PulsarOptions
{
/// <summary>
/// librdkafka configuration parameters (refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
/// <para>
/// Topic configuration parameters are specified via the "default.topic.config" sub-dictionary config parameter.
/// </para>
/// </summary>
public readonly ConcurrentDictionary<string, string> MainConfig;

private IEnumerable<KeyValuePair<string, string>> _pulsarConfig;


public PulsarOptions()
{
MainConfig = new ConcurrentDictionary<string, string>();
}

/// <summary>
/// Producer connection pool size, default is 10
/// </summary>
public int ConnectionPoolSize { get; set; } = 10;
public string ServiceUrl { get; set; }

/// <summary>
/// The `bootstrap.servers` item config of <see cref="MainConfig" />.
/// <para>
/// Initial list of brokers as a CSV list of broker host or host:port.
/// </para>
/// </summary>
public string Servers { get; set; }

/// <summary>
/// If you need to get offset and partition and so on.., you can use this function to write additional header into <see cref="CapHeader"/>
/// </summary>
//public Func<ConsumeResult<byte[]>, List<KeyValuePair<string, string>>> CustomHeaders { get; set; }

internal IEnumerable<KeyValuePair<string, string>> AsPulsarConfig()
{
if (_pulsarConfig == null)
{
if (string.IsNullOrWhiteSpace(Servers))
{
throw new ArgumentNullException(nameof(Servers));
}

MainConfig["bootstrap.servers"] = Servers;
MainConfig["queue.buffering.max.ms"] = "10";
MainConfig["enable.auto.commit"] = "false";
MainConfig["log.connection.close"] = "false";
MainConfig["request.timeout.ms"] = "3000";
MainConfig["message.timeout.ms"] = "5000";
public TlsOptions TlsOptions { get; set; }
}
}

_pulsarConfig = MainConfig.AsEnumerable();
}
namespace DotNetCore.CAP.Pulsar
{
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;

return _pulsarConfig;
}
public class TlsOptions
{
private static readonly global::Pulsar.Client.Api.PulsarClientConfiguration Default =
global::Pulsar.Client.Api.PulsarClientConfiguration.Default;

public bool UseTls { get; set; } = Default.UseTls;
public bool TlsHostnameVerificationEnable { get; set; } = Default.TlsHostnameVerificationEnable;
public bool TlsAllowInsecureConnection { get; set; } = Default.TlsAllowInsecureConnection;
public X509Certificate2 TlsTrustCertificate { get; set; } = Default.TlsTrustCertificate;
public global::Pulsar.Client.Api.Authentication Authentication { get; set; } = Default.Authentication;
public SslProtocols TlsProtocols { get; set; } = Default.TlsProtocols;
}
}

+ 77
- 0
src/DotNetCore.CAP.Pulsar/IConnectionFactory.Default.cs View File

@@ -0,0 +1,77 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Pulsar.Client.Api;

namespace DotNetCore.CAP.Pulsar
{
public class ConnectionFactory : IConnectionFactory, IAsyncDisposable
{
private PulsarClient _client;
private readonly PulsarOptions _options;
private readonly ConcurrentDictionary<string, Task<IProducer<byte[]>>> _topicProducers;

public ConnectionFactory(ILogger<ConnectionFactory> logger, IOptions<PulsarOptions> options)
{
_options = options.Value;
_topicProducers = new ConcurrentDictionary<string, Task<IProducer<byte[]>>>();

logger.LogDebug("CAP Pulsar configuration: {0}", JsonConvert.SerializeObject(_options, Formatting.Indented));
}

public string ServersAddress => _options.ServiceUrl;

public async Task<IProducer<byte[]>> CreateProducerAsync(string topic)
{
_client ??= RentClient();

async Task<IProducer<byte[]>> ValueFactory(string top)
{
return await _client.NewProducer()
.Topic(top)
.CreateAsync();
}

//connection may lost
return await _topicProducers.GetOrAdd(topic, ValueFactory);
}

public PulsarClient RentClient()
{
lock (this)
{
if (_client == null)
{
var builder = new PulsarClientBuilder().ServiceUrl(_options.ServiceUrl);
if (_options.TlsOptions != null)
{
builder.EnableTls(_options.TlsOptions.UseTls);
builder.EnableTlsHostnameVerification(_options.TlsOptions.TlsHostnameVerificationEnable);
builder.AllowTlsInsecureConnection(_options.TlsOptions.TlsAllowInsecureConnection);
builder.TlsTrustCertificate(_options.TlsOptions.TlsTrustCertificate);
builder.Authentication(_options.TlsOptions.Authentication);
builder.TlsProtocols(_options.TlsOptions.TlsProtocols);
}

_client = builder.Build();
}

return _client;
}
}

public async ValueTask DisposeAsync()
{
foreach (var value in _topicProducers.Values)
{
_ = (await value).DisposeAsync();
}
}
}
}

src/DotNetCore.CAP.Pulsar/IConnectionPool.cs → src/DotNetCore.CAP.Pulsar/IConnectionFactory.cs View File

@@ -1,16 +1,17 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading.Tasks;
using Pulsar.Client.Api;

namespace DotNetCore.CAP.Pulsar
{
public interface IConnectionPool
public interface IConnectionFactory
{
string ServersAddress { get; }

IProducer<byte[]> RentProducer();
Task<IProducer<byte[]>> CreateProducerAsync(string topic);

bool Return(IProducer<byte[]> producer);
PulsarClient RentClient();
}
}

+ 0
- 76
src/DotNetCore.CAP.Pulsar/IConnectionPool.Default.cs View File

@@ -1,76 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Threading;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Pulsar.Client.Api;

namespace DotNetCore.CAP.Pulsar
{
public class ConnectionPool : IConnectionPool, IDisposable
{
private readonly PulsarClient _client;
private readonly PulsarOptions _options;
private readonly ConcurrentQueue<IProducer<byte[]>> _producerPool;
private int _pCount;
private int _maxSize;

public ConnectionPool(ILogger<ConnectionPool> logger, IOptions<PulsarOptions> options)
{
_options = options.Value;
_client = new PulsarClientBuilder().ServiceUrl(_options.Servers).Build();
_producerPool = new ConcurrentQueue<IProducer<byte[]>>();
_maxSize = _options.ConnectionPoolSize;

logger.LogDebug("CAP Pulsar configuration: {0}", JsonConvert.SerializeObject(_options.AsPulsarConfig(), Formatting.Indented));
}

public string ServersAddress => _options.Servers;

public IProducer<byte[]> RentProducer()
{
// TODO: need topic to rent producer
if (_producerPool.TryDequeue(out var producer))
{
Interlocked.Decrement(ref _pCount);

return producer;
}

var options = _options;
producer = _client.NewProducer().Topic($"persistent://public/default/captesttopic").CreateAsync().Result;

return producer;
}

public bool Return(IProducer<byte[]> producer)
{
if (Interlocked.Increment(ref _pCount) <= _maxSize)
{
_producerPool.Enqueue(producer);

return true;
}

Interlocked.Decrement(ref _pCount);

return false;
}

public void Dispose()
{
_maxSize = 0;

while (_producerPool.TryDequeue(out var context))
{
context.DisposeAsync();

}
}
}
}

+ 12
- 31
src/DotNetCore.CAP.Pulsar/ITransport.Pulsar.cs View File

@@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
@@ -12,39 +13,27 @@ namespace DotNetCore.CAP.Pulsar
{
internal class PulsarTransport : ITransport
{
private readonly IConnectionPool _connectionPool;
private readonly IConnectionFactory _connectionFactory;
private readonly ILogger _logger;

public PulsarTransport(ILogger<PulsarTransport> logger, IConnectionPool connectionPool)
public PulsarTransport(ILogger<PulsarTransport> logger, IConnectionFactory connectionFactory)
{
_logger = logger;
_connectionPool = connectionPool;
_connectionFactory = connectionFactory;
}

public BrokerAddress BrokerAddress => new BrokerAddress("Pulsar", _connectionPool.ServersAddress);
public BrokerAddress BrokerAddress => new BrokerAddress("Pulsar", _connectionFactory.ServersAddress);

public async Task<OperateResult> SendAsync(TransportMessage message)
{
var producer = _connectionPool.RentProducer();
var producer = await _connectionFactory.CreateProducerAsync(message.GetName());

try
{
// var headers = new H. Headers();

/*foreach (var header in message.Headers)
{
headers.Add(header.Value != null
? new Header(header.Key, Encoding.UTF8.GetBytes(header.Value))
: new Header(header.Key, null));
}*/

var result = await producer.SendAsync(message.Body);
/*var result = await producer.SendAsync(message.GetName(), new Message
{
Headers = headers,
Key = message.Headers.TryGetValue(KafkaHeaders.KafkaKey, out string kafkaMessageKey) && !string.IsNullOrEmpty(kafkaMessageKey) ? kafkaMessageKey : message.GetId(),
Value = message.Body
});*/
var headerDic = new Dictionary<string, string>(message.Headers);
headerDic.TryGetValue(PulsarHeaders.PulsarKey, out var key);
var pulsarMessage = producer.NewMessage(message.Body, key, headerDic);
var result = await producer.SendAsync(pulsarMessage);

if (result.Type != null)
{
@@ -57,17 +46,9 @@ namespace DotNetCore.CAP.Pulsar
}
catch (Exception ex)
{
var wapperEx = new PublisherSentFailedException(ex.Message, ex);
var wrapperEx = new PublisherSentFailedException(ex.Message, ex);

return OperateResult.Failed(wapperEx);
}
finally
{
var returned = _connectionPool.Return(producer);
if (!returned)
{
await producer.DisposeAsync();
}
return OperateResult.Failed(wrapperEx);
}
}
}

+ 22
- 100
src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs View File

@@ -3,9 +3,8 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Options;
@@ -16,24 +15,23 @@ namespace DotNetCore.CAP.Pulsar
{
internal sealed class PulsarConsumerClient : IConsumerClient
{
private static readonly SemaphoreSlim ConnectionLock = new SemaphoreSlim(initialCount: 1, maxCount: 1);
private static PulsarClient _client;
private readonly string _groupId;
private readonly PulsarOptions _pulsarOptions;
private IConsumer<byte[]> _consumerClient;

public PulsarConsumerClient(string groupId, IOptions<PulsarOptions> options)
public PulsarConsumerClient(PulsarClient client,string groupId, IOptions<PulsarOptions> options)
{
_client = client;
_groupId = groupId;
_pulsarOptions = options.Value ?? throw new ArgumentNullException(nameof(options));
_pulsarOptions = options.Value;
}

public event EventHandler<TransportMessage> OnMessageReceived;

public event EventHandler<LogMessageEventArgs> OnLog;

public BrokerAddress BrokerAddress => new BrokerAddress("Pulsar", _pulsarOptions.Servers);
public BrokerAddress BrokerAddress => new BrokerAddress("Pulsar", _pulsarOptions.ServiceUrl);

public void Subscribe(IEnumerable<string> topics)
{
@@ -42,95 +40,44 @@ namespace DotNetCore.CAP.Pulsar
throw new ArgumentNullException(nameof(topics));
}

Connect();
var serviceName = Assembly.GetEntryAssembly()?.GetName().Name.ToLower();

_consumerClient = _client.NewConsumer()
.Topic(topics.First())
.SubscriptionName("test")
.ConsumerName("testconsumer")
.Topics(topics)
.SubscriptionName(_groupId)
.ConsumerName(serviceName)
.SubscriptionType(SubscriptionType.Shared)
.SubscribeAsync().Result;
}

public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
Connect();

var task = ProcessMessage(_consumerClient, (message) =>
while (!cancellationToken.IsCancellationRequested)
{
//var messageText = Encoding.UTF8.GetString(message.Data);
return Task.CompletedTask;
}, cancellationToken);
task.RunSynchronously();
// ReSharper disable once FunctionNeverReturns
}
var consumerResult = _consumerClient.ReceiveAsync().Result;

internal async Task ProcessMessage(IConsumer<byte[]> consumer,
Func<Message<byte[]>, Task> f, CancellationToken ct)
{
try
{
while (!ct.IsCancellationRequested)
var headers = new Dictionary<string, string>(consumerResult.Properties.Count);
foreach (var header in consumerResult.Properties)
{
bool success = false;
Message<byte[]> consumerResult = consumer.ReceiveAsync().Result;
try
{
await f(consumerResult);
success = true;
Dictionary<string, string> headers = new Dictionary<string, string>();
headers.Add(Messages.Headers.MessageId, consumerResult.MessageId.EntryId.ToString());
headers.Add(Messages.Headers.MessageName, consumer.Topic);
headers.Add(Messages.Headers.Group, _groupId);
TransportMessage result = new TransportMessage(headers, consumerResult.Data);
OnMessageReceived?.Invoke(consumerResult, result);
}
catch (Exception e)
{
Reject(consumerResult);
//logger.LogError(e, "Can't process message {0}, MessageId={1}", consumer.Topic, message.MessageId);
}

if (success)
{
Commit(consumerResult);
}
headers.Add(header.Key, header.Value);
}
headers.Add(Headers.Group, _groupId);

var message = new TransportMessage(headers, consumerResult.Data);

OnMessageReceived?.Invoke(consumerResult.MessageId, message);
}
catch (Exception ex)
{
//logger.LogError(ex, "ProcessMessages failed for {0}", consumer.Topic);
}
// ReSharper disable once FunctionNeverReturns
// ReSharper disable once FunctionNeverReturns
}

public void Commit(object sender)
{
try
{
Message<byte[]> mSender = (Message<byte[]>) sender;
_consumerClient.AcknowledgeAsync(mSender.MessageId);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
_consumerClient.AcknowledgeAsync((MessageId)sender);
}

public void Reject(object sender)
{
try
{
Message<byte[]> mSender = (Message<byte[]>) sender;
_consumerClient.NegativeAcknowledge(mSender.MessageId);
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
//_consumerClient.NegativeAcknowledge(sender);//Assign(_consumerClient.Assignment);
_consumerClient.NegativeAcknowledge((MessageId)sender);
}

public void Dispose()
@@ -138,31 +85,6 @@ namespace DotNetCore.CAP.Pulsar
_consumerClient?.DisposeAsync();
}

public void Connect()
{
if (_client != null)
{
return;
}

ConnectionLock.Wait();

try
{
if (_client == null)
{
_pulsarOptions.MainConfig["group.id"] = _groupId;
_pulsarOptions.MainConfig["auto.offset.reset"] = "earliest";
var config = _pulsarOptions.AsPulsarConfig();
_client = new PulsarClientBuilder().ServiceUrl(_pulsarOptions.MainConfig["bootstrap.servers"]).Build();
}
}
finally
{
ConnectionLock.Release();
}
}

private void ConsumerClient_OnConsumeError(IConsumer<byte[]> consumer, Exception e)
{
var logArgs = new LogMessageEventArgs


+ 6
- 4
src/DotNetCore.CAP.Pulsar/PulsarConsumerClientFactory.cs View File

@@ -8,10 +8,12 @@ namespace DotNetCore.CAP.Pulsar
{
internal sealed class PulsarConsumerClientFactory : IConsumerClientFactory
{
private readonly IConnectionFactory _connection;
private readonly IOptions<PulsarOptions> _pulsarOptions;

public PulsarConsumerClientFactory(IOptions<PulsarOptions> pulsarOptions)
public PulsarConsumerClientFactory(IConnectionFactory connection, IOptions<PulsarOptions> pulsarOptions)
{
_connection = connection;
_pulsarOptions = pulsarOptions;
}

@@ -19,9 +21,9 @@ namespace DotNetCore.CAP.Pulsar
{
try
{
var client = new PulsarConsumerClient(groupId, _pulsarOptions);
client.Connect();
return client;
var client = _connection.RentClient();
var consumerClient = new PulsarConsumerClient(client,groupId, _pulsarOptions);
return consumerClient;
}
catch (System.Exception e)
{


Loading…
Cancel
Save