浏览代码

Merge pull request #1009 from dotnetcore/supports/pulsar

Add transport support for pulsar
master
油条 3 年前
committed by GitHub
父节点
当前提交
c99a899a0a
找不到此签名对应的密钥 GPG 密钥 ID: 4AEE18F83AFDEB23
共有 16 个文件被更改,包括 554 次插入1 次删除
  1. +15
    -1
      CAP.sln
  2. +32
    -0
      samples/Sample.Pulsar.InMemory/Controllers/ValuesController.cs
  3. +20
    -0
      samples/Sample.Pulsar.InMemory/Program.cs
  4. +14
    -0
      samples/Sample.Pulsar.InMemory/Sample.Pulsar.InMemory.csproj
  5. +38
    -0
      samples/Sample.Pulsar.InMemory/Startup.cs
  6. +12
    -0
      samples/Sample.Pulsar.InMemory/appsettings.json
  7. +40
    -0
      src/DotNetCore.CAP.Pulsar/CAP.Options.Extensions.cs
  8. +32
    -0
      src/DotNetCore.CAP.Pulsar/CAP.PulsarCapOptionsExtension.cs
  9. +37
    -0
      src/DotNetCore.CAP.Pulsar/CAP.PulsarOptions.cs
  10. +23
    -0
      src/DotNetCore.CAP.Pulsar/DotNetCore.CAP.Pulsar.csproj
  11. +77
    -0
      src/DotNetCore.CAP.Pulsar/IConnectionFactory.Default.cs
  12. +17
    -0
      src/DotNetCore.CAP.Pulsar/IConnectionFactory.cs
  13. +55
    -0
      src/DotNetCore.CAP.Pulsar/ITransport.Pulsar.cs
  14. +98
    -0
      src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs
  15. +34
    -0
      src/DotNetCore.CAP.Pulsar/PulsarConsumerClientFactory.cs
  16. +10
    -0
      src/DotNetCore.CAP.Pulsar/PulsarHeaders.cs

+ 15
- 1
CAP.sln 查看文件

@@ -76,7 +76,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.RedisStreams
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Dashboard.Auth", "samples\Sample.Dashboard.Auth\Sample.Dashboard.Auth.csproj", "{6E059983-DE89-4D53-88F5-D9083BCE257F}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DotNetCore.CAP.MultiModuleSubscriberTests", "test\DotNetCore.CAP.MultiModuleSubscriberTests\DotNetCore.CAP.MultiModuleSubscriberTests.csproj", "{23684403-7DA8-489A-8A1E-8056D7683E18}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.MultiModuleSubscriberTests", "test\DotNetCore.CAP.MultiModuleSubscriberTests\DotNetCore.CAP.MultiModuleSubscriberTests.csproj", "{23684403-7DA8-489A-8A1E-8056D7683E18}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DotNetCore.CAP.Pulsar", "src\DotNetCore.CAP.Pulsar\DotNetCore.CAP.Pulsar.csproj", "{AB7A10CB-2C7E-49CE-AA21-893772FF6546}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Sample.Pulsar.InMemory", "samples\Sample.Pulsar.InMemory\Sample.Pulsar.InMemory.csproj", "{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -184,6 +188,14 @@ Global
{23684403-7DA8-489A-8A1E-8056D7683E18}.Debug|Any CPU.Build.0 = Debug|Any CPU
{23684403-7DA8-489A-8A1E-8056D7683E18}.Release|Any CPU.ActiveCfg = Release|Any CPU
{23684403-7DA8-489A-8A1E-8056D7683E18}.Release|Any CPU.Build.0 = Release|Any CPU
{AB7A10CB-2C7E-49CE-AA21-893772FF6546}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AB7A10CB-2C7E-49CE-AA21-893772FF6546}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AB7A10CB-2C7E-49CE-AA21-893772FF6546}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AB7A10CB-2C7E-49CE-AA21-893772FF6546}.Release|Any CPU.Build.0 = Release|Any CPU
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -214,6 +226,8 @@ Global
{54458B54-49CC-454C-82B2-4AED681D9D07} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{6E059983-DE89-4D53-88F5-D9083BCE257F} = {3A6B6931-A123-477A-9469-8B468B5385AF}
{23684403-7DA8-489A-8A1E-8056D7683E18} = {C09CDAB0-6DD4-46E9-B7F3-3EF2A4741EA0}
{AB7A10CB-2C7E-49CE-AA21-893772FF6546} = {9B2AE124-6636-4DE9-83A3-70360DABD0C4}
{B1D95CCD-0123-41D4-8CCB-9F834ED8D5C5} = {3A6B6931-A123-477A-9469-8B468B5385AF}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2E70565D-94CF-40B4-BFE1-AC18D5F736AB}


+ 32
- 0
samples/Sample.Pulsar.InMemory/Controllers/ValuesController.cs 查看文件

@@ -0,0 +1,32 @@
using System;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;

namespace Sample.Pulsar.InMemory.Controllers
{
[Route("api/[controller]")]
public class ValuesController : Controller, ICapSubscribe
{
private readonly ICapPublisher _capBus;

public ValuesController(ICapPublisher producer)
{
_capBus = producer;
}

[Route("~/without/transaction")]
public async Task<IActionResult> WithoutTransaction()
{
await _capBus.PublishAsync("persistent://public/default/captesttopic", DateTime.Now);

return Ok();
}

[CapSubscribe("persistent://public/default/captesttopic")]
public void Test2T2(string value)
{
Console.WriteLine("Subscriber output message: " + value);
}
}
}

+ 20
- 0
samples/Sample.Pulsar.InMemory/Program.cs 查看文件

@@ -0,0 +1,20 @@
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;

namespace Sample.Pulsar.InMemory
{
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}

public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
}
}

+ 14
- 0
samples/Sample.Pulsar.InMemory/Sample.Pulsar.InMemory.csproj 查看文件

@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk.Web">

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.Dashboard\DotNetCore.CAP.Dashboard.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.InMemoryStorage\DotNetCore.CAP.InMemoryStorage.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP.Pulsar\DotNetCore.CAP.Pulsar.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

</Project>

+ 38
- 0
samples/Sample.Pulsar.InMemory/Startup.cs 查看文件

@@ -0,0 +1,38 @@
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;

namespace Sample.Pulsar.InMemory
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
string pulsarUri = Configuration.GetValue("AppSettings:PulsarUri", "pulsar//localhost:6650");
services.AddCap(x =>
{
x.UseInMemoryStorage();
x.UsePulsar(pulsarUri);
x.UseDashboard();
});

services.AddControllers();
}

public void Configure(IApplicationBuilder app)
{
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
}
}

+ 12
- 0
samples/Sample.Pulsar.InMemory/appsettings.json 查看文件

@@ -0,0 +1,12 @@
{
"Logging": {
"IncludeScopes": false,
"LogLevel": {
"Default": "Debug"
}
},
"AppSettings": {
"PulsarUri": "pulsar://localhost:6650",
"PulsarTopic": "persistent://public/default/captesttopic"
}
}

+ 40
- 0
src/DotNetCore.CAP.Pulsar/CAP.Options.Extensions.cs 查看文件

@@ -0,0 +1,40 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions
{
/// <summary>
/// Configuration to use pulsar in CAP.
/// </summary>
/// <param name="options">CAP configuration options</param>
/// <param name="serverUrl">Pulsar bootstrap server urls.</param>
public static CapOptions UsePulsar(this CapOptions options, string serverUrl)
{
return options.UsePulsar(opt => { opt.ServiceUrl = serverUrl; });
}

/// <summary>
/// Configuration to use pulsar in CAP.
/// </summary>
/// <param name="options">CAP configuration options</param>
/// <param name="configure">Provides programmatic configuration for the pulsar .</param>
/// <returns></returns>
public static CapOptions UsePulsar(this CapOptions options, Action<PulsarOptions> configure)
{
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}

options.RegisterExtension(new PulsarCapOptionsExtension(configure));

return options;
}
}
}

+ 32
- 0
src/DotNetCore.CAP.Pulsar/CAP.PulsarCapOptionsExtension.cs 查看文件

@@ -0,0 +1,32 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using DotNetCore.CAP.Pulsar;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.DependencyInjection;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
internal sealed class PulsarCapOptionsExtension : ICapOptionsExtension
{
private readonly Action<PulsarOptions> _configure;

public PulsarCapOptionsExtension(Action<PulsarOptions> configure)
{
_configure = configure;
}

public void AddServices(IServiceCollection services)
{
services.AddSingleton<CapMessageQueueMakerService>();

services.Configure(_configure);

services.AddSingleton<ITransport, PulsarTransport>();
services.AddSingleton<IConsumerClientFactory, PulsarConsumerClientFactory>();
services.AddSingleton<IConnectionFactory, ConnectionFactory>();
}
}
}

+ 37
- 0
src/DotNetCore.CAP.Pulsar/CAP.PulsarOptions.cs 查看文件

@@ -0,0 +1,37 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP
{
using Pulsar;

/// <summary>
/// Provides programmatic configuration for the CAP pulsar project.
/// </summary>
public class PulsarOptions
{
public string ServiceUrl { get; set; }

public TlsOptions TlsOptions { get; set; }
}
}

namespace DotNetCore.CAP.Pulsar
{
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;

public class TlsOptions
{
private static readonly global::Pulsar.Client.Api.PulsarClientConfiguration Default =
global::Pulsar.Client.Api.PulsarClientConfiguration.Default;

public bool UseTls { get; set; } = Default.UseTls;
public bool TlsHostnameVerificationEnable { get; set; } = Default.TlsHostnameVerificationEnable;
public bool TlsAllowInsecureConnection { get; set; } = Default.TlsAllowInsecureConnection;
public X509Certificate2 TlsTrustCertificate { get; set; } = Default.TlsTrustCertificate;
public global::Pulsar.Client.Api.Authentication Authentication { get; set; } = Default.Authentication;
public SslProtocols TlsProtocols { get; set; } = Default.TlsProtocols;
}
}

+ 23
- 0
src/DotNetCore.CAP.Pulsar/DotNetCore.CAP.Pulsar.csproj 查看文件

@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<AssemblyName>DotNetCore.CAP.Pulsar</AssemblyName>
<PackageTags>$(PackageTags);Pulsar</PackageTags>
</PropertyGroup>

<PropertyGroup>
<WarningsAsErrors>NU1605;NU1701</WarningsAsErrors>
<NoWarn>NU1701;CS1591</NoWarn>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.Pulsar.xml</DocumentationFile>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Pulsar.Client" Version="2.8.1" />
</ItemGroup>

</Project>

+ 77
- 0
src/DotNetCore.CAP.Pulsar/IConnectionFactory.Default.cs 查看文件

@@ -0,0 +1,77 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Pulsar.Client.Api;

namespace DotNetCore.CAP.Pulsar
{
public class ConnectionFactory : IConnectionFactory, IAsyncDisposable
{
private PulsarClient _client;
private readonly PulsarOptions _options;
private readonly ConcurrentDictionary<string, Task<IProducer<byte[]>>> _topicProducers;

public ConnectionFactory(ILogger<ConnectionFactory> logger, IOptions<PulsarOptions> options)
{
_options = options.Value;
_topicProducers = new ConcurrentDictionary<string, Task<IProducer<byte[]>>>();

logger.LogDebug("CAP Pulsar configuration: {0}", JsonConvert.SerializeObject(_options, Formatting.Indented));
}

public string ServersAddress => _options.ServiceUrl;

public async Task<IProducer<byte[]>> CreateProducerAsync(string topic)
{
_client ??= RentClient();

async Task<IProducer<byte[]>> ValueFactory(string top)
{
return await _client.NewProducer()
.Topic(top)
.CreateAsync();
}

//connection may lost
return await _topicProducers.GetOrAdd(topic, ValueFactory);
}

public PulsarClient RentClient()
{
lock (this)
{
if (_client == null)
{
var builder = new PulsarClientBuilder().ServiceUrl(_options.ServiceUrl);
if (_options.TlsOptions != null)
{
builder.EnableTls(_options.TlsOptions.UseTls);
builder.EnableTlsHostnameVerification(_options.TlsOptions.TlsHostnameVerificationEnable);
builder.AllowTlsInsecureConnection(_options.TlsOptions.TlsAllowInsecureConnection);
builder.TlsTrustCertificate(_options.TlsOptions.TlsTrustCertificate);
builder.Authentication(_options.TlsOptions.Authentication);
builder.TlsProtocols(_options.TlsOptions.TlsProtocols);
}

_client = builder.BuildAsync().Result;
}

return _client;
}
}

public async ValueTask DisposeAsync()
{
foreach (var value in _topicProducers.Values)
{
_ = (await value).DisposeAsync();
}
}
}
}

+ 17
- 0
src/DotNetCore.CAP.Pulsar/IConnectionFactory.cs 查看文件

@@ -0,0 +1,17 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Threading.Tasks;
using Pulsar.Client.Api;

namespace DotNetCore.CAP.Pulsar
{
public interface IConnectionFactory
{
string ServersAddress { get; }

Task<IProducer<byte[]>> CreateProducerAsync(string topic);

PulsarClient RentClient();
}
}

+ 55
- 0
src/DotNetCore.CAP.Pulsar/ITransport.Pulsar.cs 查看文件

@@ -0,0 +1,55 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP.Pulsar
{
internal class PulsarTransport : ITransport
{
private readonly IConnectionFactory _connectionFactory;
private readonly ILogger _logger;

public PulsarTransport(ILogger<PulsarTransport> logger, IConnectionFactory connectionFactory)
{
_logger = logger;
_connectionFactory = connectionFactory;
}

public BrokerAddress BrokerAddress => new BrokerAddress("Pulsar", _connectionFactory.ServersAddress);

public async Task<OperateResult> SendAsync(TransportMessage message)
{
var producer = await _connectionFactory.CreateProducerAsync(message.GetName());

try
{
var headerDic = new Dictionary<string, string>(message.Headers);
headerDic.TryGetValue(PulsarHeaders.PulsarKey, out var key);
var pulsarMessage = producer.NewMessage(message.Body, key, headerDic);
var result = await producer.SendAsync(pulsarMessage);

if (result.Type != null)
{
_logger.LogDebug($"pulsar topic message [{message.GetName()}] has been published.");

return OperateResult.Success;
}

throw new PublisherSentFailedException("pulsar message persisted failed!");
}
catch (Exception ex)
{
var wrapperEx = new PublisherSentFailedException(ex.Message, ex);

return OperateResult.Failed(wrapperEx);
}
}
}
}

+ 98
- 0
src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs 查看文件

@@ -0,0 +1,98 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Reflection;
using System.Threading;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Options;
using Pulsar.Client.Api;
using Pulsar.Client.Common;

namespace DotNetCore.CAP.Pulsar
{
internal sealed class PulsarConsumerClient : IConsumerClient
{
private static PulsarClient _client;
private readonly string _groupId;
private readonly PulsarOptions _pulsarOptions;
private IConsumer<byte[]> _consumerClient;

public PulsarConsumerClient(PulsarClient client,string groupId, IOptions<PulsarOptions> options)
{
_client = client;
_groupId = groupId;
_pulsarOptions = options.Value;
}

public event EventHandler<TransportMessage> OnMessageReceived;

public event EventHandler<LogMessageEventArgs> OnLog;

public BrokerAddress BrokerAddress => new BrokerAddress("Pulsar", _pulsarOptions.ServiceUrl);

public void Subscribe(IEnumerable<string> topics)
{
if (topics == null)
{
throw new ArgumentNullException(nameof(topics));
}

var serviceName = Assembly.GetEntryAssembly()?.GetName().Name.ToLower();

_consumerClient = _client.NewConsumer()
.Topics(topics)
.SubscriptionName(_groupId)
.ConsumerName(serviceName)
.SubscriptionType(SubscriptionType.Shared)
.SubscribeAsync().Result;
}

public void Listening(TimeSpan timeout, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
var consumerResult = _consumerClient.ReceiveAsync().Result;

var headers = new Dictionary<string, string>(consumerResult.Properties.Count);
foreach (var header in consumerResult.Properties)
{
headers.Add(header.Key, header.Value);
}
headers.Add(Headers.Group, _groupId);

var message = new TransportMessage(headers, consumerResult.Data);

OnMessageReceived?.Invoke(consumerResult.MessageId, message);
}
// ReSharper disable once FunctionNeverReturns
}

public void Commit(object sender)
{
_consumerClient.AcknowledgeAsync((MessageId)sender);
}

public void Reject(object sender)
{
_consumerClient.NegativeAcknowledge((MessageId)sender);
}

public void Dispose()
{
_consumerClient?.DisposeAsync();
}

private void ConsumerClient_OnConsumeError(IConsumer<byte[]> consumer, Exception e)
{
var logArgs = new LogMessageEventArgs
{
LogType = MqLogType.ServerConnError,
Reason = $"An error occurred during connect pulsar --> {e.Message}"
};
OnLog?.Invoke(null, logArgs);
}
}
}

+ 34
- 0
src/DotNetCore.CAP.Pulsar/PulsarConsumerClientFactory.cs 查看文件

@@ -0,0 +1,34 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Pulsar
{
internal sealed class PulsarConsumerClientFactory : IConsumerClientFactory
{
private readonly IConnectionFactory _connection;
private readonly IOptions<PulsarOptions> _pulsarOptions;

public PulsarConsumerClientFactory(IConnectionFactory connection, IOptions<PulsarOptions> pulsarOptions)
{
_connection = connection;
_pulsarOptions = pulsarOptions;
}

public IConsumerClient Create(string groupId)
{
try
{
var client = _connection.RentClient();
var consumerClient = new PulsarConsumerClient(client,groupId, _pulsarOptions);
return consumerClient;
}
catch (System.Exception e)
{
throw new BrokerConnectionException(e);
}
}
}
}

+ 10
- 0
src/DotNetCore.CAP.Pulsar/PulsarHeaders.cs 查看文件

@@ -0,0 +1,10 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

namespace DotNetCore.CAP.Pulsar
{
public static class PulsarHeaders
{
public const string PulsarKey = "cap-pulsar-key";
}
}

正在加载...
取消
保存