Browse Source

Add EnableClientLog option for Apache Pulsar transport.

master
Savorboard 3 years ago
parent
commit
9fc0176f54
5 changed files with 20 additions and 4 deletions
  1. +2
    -0
      src/DotNetCore.CAP.Pulsar/CAP.PulsarOptions.cs
  2. +1
    -0
      src/DotNetCore.CAP.Pulsar/DotNetCore.CAP.Pulsar.csproj
  3. +2
    -0
      src/DotNetCore.CAP.Pulsar/IConnectionFactory.Default.cs
  4. +6
    -2
      src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs
  5. +9
    -2
      src/DotNetCore.CAP.Pulsar/PulsarConsumerClientFactory.cs

+ 2
- 0
src/DotNetCore.CAP.Pulsar/CAP.PulsarOptions.cs View File

@@ -13,6 +13,8 @@ namespace DotNetCore.CAP
{ {
public string ServiceUrl { get; set; } = default!; public string ServiceUrl { get; set; } = default!;


public bool EnableClientLog { get; set; } = false;

public TlsOptions? TlsOptions { get; set; } public TlsOptions? TlsOptions { get; set; }
} }
} }


+ 1
- 0
src/DotNetCore.CAP.Pulsar/DotNetCore.CAP.Pulsar.csproj View File

@@ -4,6 +4,7 @@
<TargetFramework>netstandard2.1</TargetFramework> <TargetFramework>netstandard2.1</TargetFramework>
<Nullable>enable</Nullable> <Nullable>enable</Nullable>
<PackageTags>$(PackageTags);Pulsar</PackageTags> <PackageTags>$(PackageTags);Pulsar</PackageTags>
<NoWarn>CS0067</NoWarn>
</PropertyGroup> </PropertyGroup>


<ItemGroup> <ItemGroup>


+ 2
- 0
src/DotNetCore.CAP.Pulsar/IConnectionFactory.Default.cs View File

@@ -13,12 +13,14 @@ namespace DotNetCore.CAP.Pulsar
{ {
public class ConnectionFactory : IConnectionFactory, IAsyncDisposable public class ConnectionFactory : IConnectionFactory, IAsyncDisposable
{ {
private readonly ILogger<ConnectionFactory> _logger;
private PulsarClient? _client; private PulsarClient? _client;
private readonly PulsarOptions _options; private readonly PulsarOptions _options;
private readonly ConcurrentDictionary<string, Task<IProducer<byte[]>>> _topicProducers; private readonly ConcurrentDictionary<string, Task<IProducer<byte[]>>> _topicProducers;


public ConnectionFactory(ILogger<ConnectionFactory> logger, IOptions<PulsarOptions> options) public ConnectionFactory(ILogger<ConnectionFactory> logger, IOptions<PulsarOptions> options)
{ {
_logger = logger;
_options = options.Value; _options = options.Value;
_topicProducers = new ConcurrentDictionary<string, Task<IProducer<byte[]>>>(); _topicProducers = new ConcurrentDictionary<string, Task<IProducer<byte[]>>>();




+ 6
- 2
src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs View File

@@ -7,6 +7,8 @@ using System.Reflection;
using System.Threading; using System.Threading;
using DotNetCore.CAP.Messages; using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport; using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Pulsar.Client.Api; using Pulsar.Client.Api;
using Pulsar.Client.Common; using Pulsar.Client.Common;
@@ -41,7 +43,7 @@ namespace DotNetCore.CAP.Pulsar
} }


var serviceName = Assembly.GetEntryAssembly()?.GetName().Name.ToLower(); var serviceName = Assembly.GetEntryAssembly()?.GetName().Name.ToLower();
_consumerClient = _client.NewConsumer() _consumerClient = _client.NewConsumer()
.Topics(topics) .Topics(topics)
.SubscriptionName(_groupId) .SubscriptionName(_groupId)
@@ -87,5 +89,7 @@ namespace DotNetCore.CAP.Pulsar
{ {
_consumerClient?.DisposeAsync(); _consumerClient?.DisposeAsync();
} }
}

}
} }

+ 9
- 2
src/DotNetCore.CAP.Pulsar/PulsarConsumerClientFactory.cs View File

@@ -2,7 +2,9 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.


using DotNetCore.CAP.Transport; using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Pulsar.Client.Api;


namespace DotNetCore.CAP.Pulsar namespace DotNetCore.CAP.Pulsar
{ {
@@ -11,10 +13,15 @@ namespace DotNetCore.CAP.Pulsar
private readonly IConnectionFactory _connection; private readonly IConnectionFactory _connection;
private readonly IOptions<PulsarOptions> _pulsarOptions; private readonly IOptions<PulsarOptions> _pulsarOptions;


public PulsarConsumerClientFactory(IConnectionFactory connection, IOptions<PulsarOptions> pulsarOptions)
public PulsarConsumerClientFactory(IConnectionFactory connection, ILoggerFactory loggerFactory, IOptions<PulsarOptions> pulsarOptions)
{ {
_connection = connection; _connection = connection;
_pulsarOptions = pulsarOptions; _pulsarOptions = pulsarOptions;

if (_pulsarOptions.Value.EnableClientLog)
{
PulsarClient.Logger = loggerFactory.CreateLogger<PulsarClient>();
}
} }


public IConsumerClient Create(string groupId) public IConsumerClient Create(string groupId)
@@ -22,7 +29,7 @@ namespace DotNetCore.CAP.Pulsar
try try
{ {
var client = _connection.RentClient(); var client = _connection.RentClient();
var consumerClient = new PulsarConsumerClient(client,groupId, _pulsarOptions);
var consumerClient = new PulsarConsumerClient(client, groupId, _pulsarOptions);
return consumerClient; return consumerClient;
} }
catch (System.Exception e) catch (System.Exception e)


Loading…
Cancel
Save