@@ -34,14 +34,6 @@ namespace BPA.Component.ApolloClientTester | |||||
/// </summary> | /// </summary> | ||||
private static void TestUseRedisSingleton() | private static void TestUseRedisSingleton() | ||||
{ | { | ||||
// var cities = new List<City> | |||||
// { | |||||
// new City {Name = "成都"}, | |||||
// new City {Name = "上海"}, | |||||
// new City {Name = "北京"} | |||||
// }; | |||||
// var json = cities.ToJson(); | |||||
var builder = new ConfigurationBuilder(); | var builder = new ConfigurationBuilder(); | ||||
builder.AddEnvironmentVariables(); | builder.AddEnvironmentVariables(); | ||||
IConfiguration configuration = builder.Build(); | IConfiguration configuration = builder.Build(); | ||||
@@ -4,7 +4,6 @@ | |||||
"BPA.Component.ApolloClientWebTester": { | "BPA.Component.ApolloClientWebTester": { | ||||
"commandName": "Project", | "commandName": "Project", | ||||
"launchBrowser": true, | "launchBrowser": true, | ||||
"launchUrl": "testconfig", | |||||
"applicationUrl": "http://localhost:5022", | "applicationUrl": "http://localhost:5022", | ||||
"environmentVariables": { | "environmentVariables": { | ||||
"ASPNETCORE_ENVIRONMENT": "Development", | "ASPNETCORE_ENVIRONMENT": "Development", | ||||
@@ -6,151 +6,150 @@ using Microsoft.Extensions.Logging; | |||||
using RabbitMQ.Client; | using RabbitMQ.Client; | ||||
using RabbitMQ.Client.Events; | using RabbitMQ.Client.Events; | ||||
namespace BPA.Component.RabbitMQClient.Base | |||||
namespace BPA.Component.RabbitMQClient.Base; | |||||
/// <summary> | |||||
/// BaseConsumer,必须重写父类的Dequeue,以便接受消息 | |||||
/// </summary> | |||||
/// <typeparam name="TMsgBody"></typeparam> | |||||
/// <typeparam name="TConsumer"></typeparam> | |||||
public abstract class BaseConsumer<TMsgBody, TConsumer> : IDisposable | |||||
{ | { | ||||
/// <summary> | /// <summary> | ||||
/// BaseConsumer ,必须重写父类的 Dequeue,以便接受消息 | |||||
/// logger | |||||
/// </summary> | /// </summary> | ||||
/// <typeparam name="TMsgBody"></typeparam> | |||||
/// <typeparam name="TConsumer"></typeparam> | |||||
public abstract class BaseConsumer<TMsgBody, TConsumer> : IDisposable | |||||
{ | |||||
/// <summary> | |||||
/// logger | |||||
/// </summary> | |||||
public readonly ILogger<TConsumer> Logger; | |||||
public readonly ILogger<TConsumer> Logger; | |||||
/// <summary> | |||||
/// Config | |||||
/// </summary> | |||||
internal RabbitMqQueueConfig Config; | |||||
/// <summary> | |||||
/// Config | |||||
/// </summary> | |||||
internal RabbitMqQueueConfig Config; | |||||
/// <summary> | |||||
/// 消费者ID | |||||
/// </summary> | |||||
public string ConsumeId { private set; get; } | |||||
/// <summary> | |||||
/// 消费者ID | |||||
/// </summary> | |||||
public string ConsumeId { private set; get; } | |||||
/// <summary> | |||||
/// BaseConsumer | |||||
/// </summary> | |||||
/// <param name="logger"></param> | |||||
public BaseConsumer(ILogger<TConsumer> logger) | |||||
{ | |||||
this.Logger = logger; | |||||
} | |||||
/// <summary> | |||||
/// BaseConsumer | |||||
/// </summary> | |||||
/// <param name="logger"></param> | |||||
public BaseConsumer(ILogger<TConsumer> logger) | |||||
{ | |||||
this.Logger = logger; | |||||
} | |||||
/// <summary> | |||||
/// 初始化 | |||||
/// </summary> | |||||
/// <param name="config"></param> | |||||
internal void Init(RabbitMqQueueConfig config) | |||||
{ | |||||
this.Config = config; | |||||
InitConsumer(); | |||||
config.InitConsumer = InitConsumer; | |||||
} | |||||
/// <summary> | |||||
/// 初始化 | |||||
/// </summary> | |||||
/// <param name="config"></param> | |||||
internal void Init(RabbitMqQueueConfig config) | |||||
{ | |||||
this.Config = config; | |||||
InitConsumer(); | |||||
config.InitConsumer = InitConsumer; | |||||
} | |||||
/// <summary> | |||||
/// 重新连接 | |||||
/// </summary> | |||||
internal void ReStart() => Config.ReStart(); | |||||
/// <summary> | |||||
/// 重新连接 | |||||
/// </summary> | |||||
internal void ReStart() => Config.ReStart(); | |||||
/// <summary> | |||||
/// 定义消费者 | |||||
/// </summary> | |||||
private void InitConsumer() | |||||
/// <summary> | |||||
/// 定义消费者 | |||||
/// </summary> | |||||
private void InitConsumer() | |||||
{ | |||||
if (Config.Channel == null) | |||||
throw new Exception("请先 UseQueue"); | |||||
//初始化消费者 | |||||
var consumer = new EventingBasicConsumer(Config.Channel); | |||||
consumer.Received += (model, ea) => | |||||
{ | { | ||||
if (Config.Channel == null) | |||||
throw new Exception("请先 UseQueue"); | |||||
//初始化消费者 | |||||
var consumer = new EventingBasicConsumer(Config.Channel); | |||||
consumer.Received += (model, ea) => | |||||
TMsgBody msgBody = default; | |||||
var json = Encoding.UTF8.GetString(ea.Body.ToArray()); | |||||
try | |||||
{ | |||||
msgBody = Newtonsoft.Json.JsonConvert.DeserializeObject<TMsgBody>(json, RabbitMQProvider.JsonSerializerSettings); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
Logger.LogError(ex, "消息队列{queueName}消费消息失败:{message}", Config.Option.QueueName, json); | |||||
if (Config.Option.IsAutoAck) | |||||
Config.Channel.BasicAck(ea.DeliveryTag, false); | |||||
return; | |||||
} | |||||
if (Config.Option.IsAutoAck) | |||||
{ | { | ||||
TMsgBody msgBody = default; | |||||
var json = Encoding.UTF8.GetString(ea.Body.ToArray()); | |||||
try | try | ||||
{ | { | ||||
msgBody = Newtonsoft.Json.JsonConvert.DeserializeObject<TMsgBody>(json, RabbitMQProvider.JsonSerializerSettings); | |||||
Dequeue(msgBody, ea); | |||||
} | } | ||||
catch (Exception ex) | catch (Exception ex) | ||||
{ | { | ||||
Logger.LogError(ex, "消息队列{queueName}消费消息失败:{message}", Config.Option.QueueName, json); | Logger.LogError(ex, "消息队列{queueName}消费消息失败:{message}", Config.Option.QueueName, json); | ||||
if (Config.Option.IsAutoAck) | |||||
Config.Channel.BasicAck(ea.DeliveryTag, false); | |||||
return; | |||||
Config.Channel.BasicAck(ea.DeliveryTag, false); | |||||
} | } | ||||
if (Config.Option.IsAutoAck) | |||||
} | |||||
else | |||||
{ | |||||
try | |||||
{ | { | ||||
try | |||||
{ | |||||
Dequeue(msgBody, ea); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
Logger.LogError(ex, "消息队列{queueName}消费消息失败:{message}", Config.Option.QueueName, json); | |||||
var result = Dequeue(msgBody, ea); | |||||
if (Config.Channel.IsClosed) return; | |||||
if (result) //消息处理成功,ack提交 | |||||
Config.Channel.BasicAck(ea.DeliveryTag, false); | Config.Channel.BasicAck(ea.DeliveryTag, false); | ||||
} | |||||
else //消息处理失败,把消息重新放入队列 | |||||
Config.Channel.BasicNack(ea.DeliveryTag, false, true); | |||||
} | } | ||||
else | |||||
catch (Exception ex) | |||||
{ | { | ||||
try | |||||
{ | |||||
var result = Dequeue(msgBody, ea); | |||||
if (Config.Channel.IsClosed) return; | |||||
if (result)//消息处理成功,ack提交 | |||||
Config.Channel.BasicAck(ea.DeliveryTag, false); | |||||
else //消息处理失败,把消息重新放入队列 | |||||
Config.Channel.BasicNack(ea.DeliveryTag, false, true); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
Logger.LogError(ex, "消息队列{queueName}消费消息失败:{message}", Config.Option.QueueName, json); | |||||
if (Config.Channel.IsClosed) return; | |||||
//消息处理失败,把消息重新放入队列 | |||||
Config.Channel.BasicNack(ea.DeliveryTag, false, true); | |||||
} | |||||
Logger.LogError(ex, "消息队列{queueName}消费消息失败:{message}", Config.Option.QueueName, json); | |||||
if (Config.Channel.IsClosed) return; | |||||
//消息处理失败,把消息重新放入队列 | |||||
Config.Channel.BasicNack(ea.DeliveryTag, false, true); | |||||
} | } | ||||
}; | |||||
Console.WriteLine(); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* 初始化消费者 : start ---------------------------"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* [{nameof(Config.Option.QueueName)}]={ Config.Option.QueueName}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* [FullName]={ this.GetType().FullName}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* [HashCode]={ this.GetHashCode()}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* [ConsumerID]={ ConsumeId}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* [Exchange]={Config.Option.ExchangeName}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* 初始化消费者 : end ---------------------------"); | |||||
ConsumeId = Config.Channel.BasicConsume(queue: Config.Option.QueueName, autoAck: Config.Option.IsAutoAck, consumer: consumer); | |||||
} | |||||
/// <summary> | |||||
/// 消息出列,消息接受者处理成功或者失败后返回true | false, | |||||
/// 如果消息AutoAck=false(<see cref="QueueOption.IsAutoAck"/>) ,那么当返回true是,提交ack,返回false是消息重新入列 | |||||
/// </summary> | |||||
/// <param name="msgBody">消息内容</param> | |||||
/// <param name=" eventArgs">原始事件信息</param> | |||||
public abstract bool Dequeue(TMsgBody msgBody, BasicDeliverEventArgs eventArgs); | |||||
/// <summary> | |||||
/// 释放资源 | |||||
/// </summary> | |||||
public void Dispose() | |||||
{ | |||||
Config.Channel?.Close(); | |||||
Config.Channel?.Dispose(); | |||||
} | |||||
} | |||||
}; | |||||
Console.WriteLine(); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* 初始化消费者 : start ---------------------------"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* [{nameof(Config.Option.QueueName)}]={Config.Option.QueueName}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* [FullName]={this.GetType().FullName}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* [HashCode]={this.GetHashCode()}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* [ConsumerID]={ConsumeId}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* [Exchange]={Config.Option.ExchangeName}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* 初始化消费者 : end ---------------------------"); | |||||
ConsumeId = Config.Channel.BasicConsume(queue: Config.Option.QueueName, autoAck: Config.Option.IsAutoAck, consumer: consumer); | |||||
} | |||||
/// <summary> | |||||
/// 关闭通道 | |||||
/// </summary> | |||||
internal void ColseChannel() | |||||
{ | |||||
Config.Channel?.Close(); | |||||
} | |||||
/// <summary> | |||||
/// 消息出列,消息接受者处理成功或者失败后返回true | false, | |||||
/// 如果消息AutoAck=false(<see cref="QueueOption.IsAutoAck"/>) ,那么当返回true是,提交ack,返回false是消息重新入列 | |||||
/// </summary> | |||||
/// <param name="msgBody">消息内容</param> | |||||
/// <param name=" eventArgs">原始事件信息</param> | |||||
public abstract bool Dequeue(TMsgBody msgBody, BasicDeliverEventArgs eventArgs); | |||||
/// <summary> | |||||
/// 判断通道是否关闭 | |||||
/// </summary> | |||||
internal bool ChannelIsColsed => Config.Channel == null || Config.Channel.IsClosed; | |||||
/// <summary> | |||||
/// 释放资源 | |||||
/// </summary> | |||||
public void Dispose() | |||||
{ | |||||
Config.Channel?.Close(); | |||||
Config.Channel?.Dispose(); | |||||
} | |||||
/// <summary> | |||||
/// 关闭通道 | |||||
/// </summary> | |||||
internal void ColseChannel() | |||||
{ | |||||
Config.Channel?.Close(); | |||||
} | } | ||||
} | |||||
/// <summary> | |||||
/// 判断通道是否关闭 | |||||
/// </summary> | |||||
internal bool ChannelIsColsed => Config.Channel == null || Config.Channel.IsClosed; | |||||
} |
@@ -44,16 +44,17 @@ namespace BPA.Component.RabbitMQClient.Base | |||||
routingKey ??= string.Empty; | routingKey ??= string.Empty; | ||||
var properties = config.Channel.CreateBasicProperties(); | var properties = config.Channel.CreateBasicProperties(); | ||||
//需要注意的是BasicProperties.deliveryMode,Non-persistent (1) or persistent (2) 这里指的是消息的持久化, | //需要注意的是BasicProperties.deliveryMode,Non-persistent (1) or persistent (2) 这里指的是消息的持久化, | ||||
properties.DeliveryMode = (byte)config.Option.DeliveryMode; | |||||
properties.DeliveryMode = (byte) config.Option.DeliveryMode; | |||||
properties.Persistent = config.Option.DeliveryMode == DeliveryModeType.Persistent; | properties.Persistent = config.Option.DeliveryMode == DeliveryModeType.Persistent; | ||||
if (config.Option.Type == ExchangeType.X_Delayed_Message && expiration.HasValue) | if (config.Option.Type == ExchangeType.X_Delayed_Message && expiration.HasValue) | ||||
{ | { | ||||
(properties.Headers ??= new Dictionary<string, object>()).Add("x-delay", expiration.Value.ToString()); | (properties.Headers ??= new Dictionary<string, object>()).Add("x-delay", expiration.Value.ToString()); | ||||
} | } | ||||
else if (expiration.HasValue) //毫秒 | |||||
else if (expiration.HasValue) //毫秒 | |||||
{ | { | ||||
properties.Expiration = expiration.Value.ToString(); | properties.Expiration = expiration.Value.ToString(); | ||||
} | } | ||||
var json = JsonConvert.SerializeObject(msgBody, RabbitMQProvider.JsonSerializerSettings); | var json = JsonConvert.SerializeObject(msgBody, RabbitMQProvider.JsonSerializerSettings); | ||||
var bodyBytes = Encoding.UTF8.GetBytes(json); | var bodyBytes = Encoding.UTF8.GetBytes(json); | ||||
config.Channel.BasicPublish(config.Option.Name, routingKey, properties, bodyBytes); | config.Channel.BasicPublish(config.Option.Name, routingKey, properties, bodyBytes); | ||||
@@ -71,9 +72,9 @@ namespace BPA.Component.RabbitMQClient.Base | |||||
routingKey ??= string.Empty; | routingKey ??= string.Empty; | ||||
var properties = config.Channel.CreateBasicProperties(); | var properties = config.Channel.CreateBasicProperties(); | ||||
//需要注意的是BasicProperties.deliveryMode,Non-persistent (1) or persistent (2) 这里指的是消息的持久化, | //需要注意的是BasicProperties.deliveryMode,Non-persistent (1) or persistent (2) 这里指的是消息的持久化, | ||||
properties.DeliveryMode = (byte)config.Option.DeliveryMode; | |||||
properties.DeliveryMode = (byte) config.Option.DeliveryMode; | |||||
properties.Persistent = config.Option.DeliveryMode == DeliveryModeType.Persistent; | properties.Persistent = config.Option.DeliveryMode == DeliveryModeType.Persistent; | ||||
if (expiration.HasValue)//毫秒 | |||||
if (expiration.HasValue) //毫秒 | |||||
properties.Expiration = expiration.Value.ToString(); | properties.Expiration = expiration.Value.ToString(); | ||||
var json = JsonConvert.SerializeObject(msgBody, RabbitMQProvider.JsonSerializerSettings); | var json = JsonConvert.SerializeObject(msgBody, RabbitMQProvider.JsonSerializerSettings); | ||||
var bodyBytes = Encoding.UTF8.GetBytes(json); | var bodyBytes = Encoding.UTF8.GetBytes(json); | ||||
@@ -98,4 +99,4 @@ namespace BPA.Component.RabbitMQClient.Base | |||||
Sender.Invoke(msgBody, routingKey, expiration); | Sender.Invoke(msgBody, routingKey, expiration); | ||||
} | } | ||||
} | } | ||||
} | |||||
} |
@@ -23,7 +23,7 @@ namespace BPA.Component.RabbitMQClient.Extensions | |||||
/// <param name="service"></param> | /// <param name="service"></param> | ||||
/// <param name="configFunc"></param> | /// <param name="configFunc"></param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IServiceCollection UseRabbitMQProvider(this IServiceCollection service, Func<IServiceProvider, RabbitMqConnectionConfig> configFunc) | |||||
public static IServiceCollection AddRabbitMQProvider(this IServiceCollection service, Func<IServiceProvider, RabbitMqConnectionConfig> configFunc) | |||||
{ | { | ||||
if (configFunc == null) | if (configFunc == null) | ||||
throw new ArgumentNullException(nameof(configFunc)); | throw new ArgumentNullException(nameof(configFunc)); | ||||
@@ -38,7 +38,7 @@ namespace BPA.Component.RabbitMQClient.Extensions | |||||
/// <param name="service"></param> | /// <param name="service"></param> | ||||
/// <param name="buildRabbitMQProvider"></param> | /// <param name="buildRabbitMQProvider"></param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IServiceCollection UseRabbitMQProvider<TRabbitMQProvider>(this IServiceCollection service, Func<IServiceProvider, TRabbitMQProvider> buildRabbitMQProvider) where TRabbitMQProvider : RabbitMQProvider, new() | |||||
public static IServiceCollection AddRabbitMQProvider<TRabbitMQProvider>(this IServiceCollection service, Func<IServiceProvider, TRabbitMQProvider> buildRabbitMQProvider) where TRabbitMQProvider : RabbitMQProvider, new() | |||||
{ | { | ||||
service.AddSingleton(sp => buildRabbitMQProvider(sp)); | service.AddSingleton(sp => buildRabbitMQProvider(sp)); | ||||
return service; | return service; | ||||
@@ -51,7 +51,7 @@ namespace BPA.Component.RabbitMQClient.Extensions | |||||
/// <typeparam name="TMsgBody"></typeparam> | /// <typeparam name="TMsgBody"></typeparam> | ||||
/// <param name="services"></param> | /// <param name="services"></param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IServiceCollection UseRabbitMQProducer<TProducer, TMsgBody>(this IServiceCollection services) where TProducer : BaseProducer<TMsgBody, TProducer> | |||||
public static IServiceCollection AddRabbitMQProducer<TProducer, TMsgBody>(this IServiceCollection services) where TProducer : BaseProducer<TMsgBody, TProducer> | |||||
{ | { | ||||
services.AddSingleton<TProducer>(); | services.AddSingleton<TProducer>(); | ||||
return services; | return services; | ||||
@@ -64,7 +64,7 @@ namespace BPA.Component.RabbitMQClient.Extensions | |||||
/// <typeparam name="TMsgBody"></typeparam> | /// <typeparam name="TMsgBody"></typeparam> | ||||
/// <param name="services"></param> | /// <param name="services"></param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IServiceCollection UseRabbitMQConsumer<TConsumer, TMsgBody>(this IServiceCollection services) where TConsumer : BaseConsumer<TMsgBody, TConsumer> | |||||
public static IServiceCollection AddRabbitMQConsumer<TConsumer, TMsgBody>(this IServiceCollection services) where TConsumer : BaseConsumer<TMsgBody, TConsumer> | |||||
{ | { | ||||
services.AddTransient<TConsumer>(); | services.AddTransient<TConsumer>(); | ||||
return services; | return services; | ||||
@@ -1,4 +1,5 @@ | |||||
using BPA.Component.RabbitMQClient.Base; | |||||
using System; | |||||
using BPA.Component.RabbitMQClient.Base; | |||||
using BPA.Component.RabbitMQClient.Config; | using BPA.Component.RabbitMQClient.Config; | ||||
using BPA.Component.RabbitMQClient.Options; | using BPA.Component.RabbitMQClient.Options; | ||||
using BPA.Component.RabbitMQClient.Result; | using BPA.Component.RabbitMQClient.Result; | ||||
@@ -39,7 +40,8 @@ namespace BPA.Component.RabbitMQClient.Extensions | |||||
/// </summary> | /// </summary> | ||||
/// <typeparam name="TProducer"></typeparam> | /// <typeparam name="TProducer"></typeparam> | ||||
/// <typeparam name="TMsgBody"></typeparam> | /// <typeparam name="TMsgBody"></typeparam> | ||||
/// <param name="config"></param> | |||||
/// <param name="config"></param> | |||||
/// <exception cref="ArgumentNullException"></exception> | |||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IRegisterProducerResult UseProducer<TMsgBody, TProducer>(this RabbitMqQueueConfig config) | public static IRegisterProducerResult UseProducer<TMsgBody, TProducer>(this RabbitMqQueueConfig config) | ||||
where TProducer : BaseProducer<TMsgBody, TProducer> | where TProducer : BaseProducer<TMsgBody, TProducer> | ||||
@@ -75,11 +75,11 @@ namespace BPA.Component.RabbitMQClient.Provider | |||||
private void BuildConnectionFactory() | private void BuildConnectionFactory() | ||||
{ | { | ||||
Console.WriteLine($"*bpa.rabbitmqclient* build connetion info: start ---------------------------"); | Console.WriteLine($"*bpa.rabbitmqclient* build connetion info: start ---------------------------"); | ||||
Console.WriteLine($"*bpa.rabbitmqclient* build connetion info: [{MQConfig.HostName }]={ MQConfig.HostName }"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* build connetion info: [{MQConfig.Port}]={ MQConfig.Port}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* build connetion info: [{MQConfig.Password}]={ MQConfig.Password}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* build connetion info: [{MQConfig.UserName}]={ MQConfig.UserName}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* build connetion info: [{MQConfig.VirtualHost}]={ MQConfig.VirtualHost}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* build connetion info: [HostName]={ MQConfig.HostName }"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* build connetion info: [Port]={ MQConfig.Port}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* build connetion info: [Password]={ MQConfig.Password}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* build connetion info: [UserName]={ MQConfig.UserName}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* build connetion info: [VirtualHost]={ MQConfig.VirtualHost}"); | |||||
Console.WriteLine($"*bpa.rabbitmqclient* build connetion info: end ---------------------------"); | Console.WriteLine($"*bpa.rabbitmqclient* build connetion info: end ---------------------------"); | ||||
var factory = new ConnectionFactory() | var factory = new ConnectionFactory() | ||||
{ | { | ||||
@@ -10,11 +10,14 @@ | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<ProjectReference Include="..\BPA.Component.RabbitMQClient\BPA.Component.RabbitMQClient.csproj" /> | |||||
<ProjectReference Include="..\..\BPA.Component.ApolloClient\BPA.Component.ApolloClient\BPA.Component.ApolloClient.csproj" /> | |||||
<ProjectReference Include="..\..\BPA.Component.LogClient\BPA.Component.LogClient\BPA.Component.LogClient.csproj" /> | |||||
<ProjectReference Include="..\BPA.Component.RabbitMQClient\BPA.Component.RabbitMQClient.csproj" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="BPA.Component.RabbitMQClient" Version="1.0.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration" Version="6.0.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="6.0.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" /> | <PackageReference Include="Microsoft.Extensions.Logging" Version="6.0.0" /> | ||||
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" /> | <PackageReference Include="Microsoft.Extensions.Logging.Console" Version="6.0.0" /> | ||||
</ItemGroup> | </ItemGroup> | ||||
@@ -22,7 +22,6 @@ namespace BPA.Component.RabbitMQClientTester.Default | |||||
/// 消息出列 | /// 消息出列 | ||||
/// </summary> | /// </summary> | ||||
/// <param name="msgBody"></param> | /// <param name="msgBody"></param> | ||||
/// <param name="queueConfig"></param> | |||||
/// <param name="eventArgs"></param> | /// <param name="eventArgs"></param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public override bool Dequeue(TestMsgBody msgBody, BasicDeliverEventArgs eventArgs) | public override bool Dequeue(TestMsgBody msgBody, BasicDeliverEventArgs eventArgs) | ||||
@@ -30,6 +29,7 @@ namespace BPA.Component.RabbitMQClientTester.Default | |||||
Console.WriteLine($"\nTestDefaultQueueConsumer_1_消息出列:" + | Console.WriteLine($"\nTestDefaultQueueConsumer_1_消息出列:" + | ||||
$"\n\t\t\t[{GetHashCode()}]-[{ConsumeId}]" + | $"\n\t\t\t[{GetHashCode()}]-[{ConsumeId}]" + | ||||
$"\n\t\t\t[{nameof(msgBody.Id)}]={msgBody.Id},{nameof(msgBody.Date)}]={msgBody.Date}"); | $"\n\t\t\t[{nameof(msgBody.Id)}]={msgBody.Id},{nameof(msgBody.Date)}]={msgBody.Date}"); | ||||
//返回false 代表消费失败,消息会返回队列,返回true代表消费成功 | //返回false 代表消费失败,消息会返回队列,返回true代表消费成功 | ||||
return true; | return true; | ||||
} | } | ||||
@@ -17,8 +17,9 @@ namespace BPA.Component.RabbitMQClientTester.Default | |||||
/// 消息入列,可以不重写,如果重写必须调用父类的 Enqueue | /// 消息入列,可以不重写,如果重写必须调用父类的 Enqueue | ||||
/// </summary> | /// </summary> | ||||
/// <param name="msgBody"></param> | /// <param name="msgBody"></param> | ||||
/// <param name="routingKey"></param> | |||||
/// <param name="expiration"></param> | /// <param name="expiration"></param> | ||||
public override void Enqueue(TestMsgBody msgBody, string routingKey = null, long? expiration = null) | |||||
public override void Enqueue(TestMsgBody msgBody, string? routingKey = null, long? expiration = null) | |||||
{ | { | ||||
Console.WriteLine($"TestDefaultQueueProducer.消息入列:[{nameof(msgBody.Id)}]={msgBody.Id},{nameof(msgBody.Date)}]={msgBody.Date}"); | Console.WriteLine($"TestDefaultQueueProducer.消息入列:[{nameof(msgBody.Id)}]={msgBody.Id},{nameof(msgBody.Date)}]={msgBody.Date}"); | ||||
base.Enqueue(msgBody, routingKey, expiration); | base.Enqueue(msgBody, routingKey, expiration); | ||||
@@ -22,14 +22,13 @@ namespace BPA.Component.RabbitMQClientTester.Delays | |||||
/// 消息出列 | /// 消息出列 | ||||
/// </summary> | /// </summary> | ||||
/// <param name="msgBody"></param> | /// <param name="msgBody"></param> | ||||
/// <param name="queueConfig"></param> | |||||
/// <param name="eventArgs"></param> | /// <param name="eventArgs"></param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public override bool Dequeue(TestMsgBody msgBody, BasicDeliverEventArgs eventArgs) | public override bool Dequeue(TestMsgBody msgBody, BasicDeliverEventArgs eventArgs) | ||||
{ | { | ||||
Console.WriteLine($"\n\t\t Delay:{((int)(DateTime.Now - msgBody.Date).TotalSeconds) * 1000} , {Newtonsoft.Json.JsonConvert.SerializeObject(msgBody)}"); | |||||
Console.WriteLine($"TestDelayQueueConsumer.消息出列: Delay:{(int) (DateTime.Now - msgBody.Date).TotalSeconds * 1000},{Newtonsoft.Json.JsonConvert.SerializeObject(msgBody)}"); | |||||
//返回false 代表消费失败,消息会返回队列,返回true代表消费成功 | //返回false 代表消费失败,消息会返回队列,返回true代表消费成功 | ||||
return true; | return true; | ||||
} | } | ||||
} | } | ||||
} | |||||
} |
@@ -1,37 +0,0 @@ | |||||
using BPA.Component.RabbitMQClient.Base; | |||||
using BPA.Component.RabbitMQClientTester.Modesl; | |||||
using Microsoft.Extensions.Logging; | |||||
using RabbitMQ.Client.Events; | |||||
namespace BPA.Component.RabbitMQClientTester.Dynamic | |||||
{ | |||||
/// <summary> | |||||
/// 测试自定义交换+动态机队列消费者1 | |||||
/// </summary> | |||||
public class TestCustomDynamicQueueConsumer : BaseConsumer<TestMsgBody, TestCustomDynamicQueueConsumer> | |||||
{ | |||||
private readonly IServiceProvider serviceProvider; | |||||
public TestCustomDynamicQueueConsumer(IServiceProvider serviceProvider, ILogger<TestCustomDynamicQueueConsumer> logger) : base(logger) | |||||
{ | |||||
this.serviceProvider = serviceProvider; | |||||
} | |||||
/// <summary> | |||||
/// 消息出列 | |||||
/// </summary> | |||||
/// <param name="msgBody"></param> | |||||
/// <param name="queueConfig"></param> | |||||
/// <param name="eventArgs"></param> | |||||
/// <returns></returns> | |||||
public override bool Dequeue(TestMsgBody msgBody, BasicDeliverEventArgs eventArgs) | |||||
{ | |||||
Console.WriteLine($"\nTestDynamicQueueConsumer_1_消息出列:" + | |||||
$"\n\t\t\t[{GetHashCode()}]-[{ConsumeId}]" + | |||||
$"\n\t\t\t[{nameof(msgBody.Id)}]={msgBody.Id},{nameof(msgBody.Date)}]={msgBody.Date}"); | |||||
//返回false 代表消费失败,消息会返回队列,返回true代表消费成功 | |||||
return true; | |||||
} | |||||
} | |||||
} |
@@ -1,27 +0,0 @@ | |||||
using BPA.Component.RabbitMQClient.Base; | |||||
using BPA.Component.RabbitMQClientTester.Modesl; | |||||
using Microsoft.Extensions.Logging; | |||||
namespace BPA.Component.RabbitMQClientTester.Dynamic | |||||
{ | |||||
/// <summary> | |||||
/// 测试自定义交换机+动态队列生产者 | |||||
/// </summary> | |||||
public class TestCustomDynamicQueueProducer : BaseProducer<TestMsgBody, TestCustomDynamicQueueProducer> | |||||
{ | |||||
public TestCustomDynamicQueueProducer(ILogger<TestCustomDynamicQueueProducer> logger) : base(logger) | |||||
{ | |||||
} | |||||
/// <summary> | |||||
/// 消息入列,可以不重写,如果重写必须调用父类的 Enqueue | |||||
/// </summary> | |||||
/// <param name="msgBody"></param> | |||||
/// <param name="expiration"></param> | |||||
public override void Enqueue(TestMsgBody msgBody, string routingKey = null, long? expiration = null) | |||||
{ | |||||
Console.WriteLine($"TestDynamicQueueProducer.消息入列:[{nameof(msgBody.Id)}]={msgBody.Id},{nameof(msgBody.Date)}]={msgBody.Date}"); | |||||
base.Enqueue(msgBody, routingKey, expiration); | |||||
} | |||||
} | |||||
} |
@@ -14,11 +14,6 @@ namespace BPA.Component.RabbitMQClientTester.Modesl | |||||
/// </summary> | /// </summary> | ||||
public readonly static QueueName TestDefaultQueue1 = QueueName.Build(nameof(TestDefaultQueue1)); | public readonly static QueueName TestDefaultQueue1 = QueueName.Build(nameof(TestDefaultQueue1)); | ||||
/// <summary> | |||||
/// 测试队列2,消息体 <see cref="TestMsgBody"/> | |||||
/// </summary> | |||||
public readonly static QueueName TestDefaultQueue2 = QueueName.Build(nameof(TestDefaultQueue2)); | |||||
#endregion | #endregion | ||||
#region 自定义交换机 | #region 自定义交换机 | ||||
@@ -40,24 +35,6 @@ namespace BPA.Component.RabbitMQClientTester.Modesl | |||||
#endregion | #endregion | ||||
#region 自定义交换机+动态队列 | |||||
/// <summary> | |||||
/// 测试自定义+动态队列交换机名称 | |||||
/// </summary> | |||||
public static readonly ExchangeName TestCustomDynamicQueueExchange = ExchangeName.Build(nameof(TestCustomDynamicQueueExchange)); | |||||
/// <summary> | |||||
/// 测试队列+动态队列,消息体 <see cref="TestMsgBody"/> | |||||
/// 一定要BuildDynamic ,属性必须用get 方法 | |||||
/// </summary> | |||||
public static QueueName TestCustomDynamicQueue | |||||
{ | |||||
get => QueueName.BuildDynamic(nameof(TestCustomDynamicQueue)); | |||||
} | |||||
#endregion | |||||
#region | #region | ||||
/// <summary> | /// <summary> | ||||
@@ -75,11 +52,6 @@ namespace BPA.Component.RabbitMQClientTester.Modesl | |||||
/// </summary> | /// </summary> | ||||
public static readonly QueueName TestDelayQueue1 = QueueName.Build(nameof(TestDelayQueue1)); | public static readonly QueueName TestDelayQueue1 = QueueName.Build(nameof(TestDelayQueue1)); | ||||
/// <summary> | |||||
/// 延时队列,消息体 <see cref="TestMsgBody"/> | |||||
/// </summary> | |||||
public static readonly QueueName TestDelayQueue2 = QueueName.Build(nameof(TestDelayQueue2)); | |||||
#endregion | #endregion | ||||
} | } | ||||
} | } |
@@ -1,26 +1,27 @@ | |||||
using BPA.Component.RabbitMQClient.Connection; | |||||
using BPA.Component.ApolloClient; | |||||
using BPA.Component.RabbitMQClient.Connection; | |||||
using Com.Ctrip.Framework.Apollo; | |||||
using Microsoft.Extensions.Configuration; | |||||
namespace BPA.Component.RabbitMQClientTester.Modesl | namespace BPA.Component.RabbitMQClientTester.Modesl | ||||
{ | { | ||||
/// <summary> | /// <summary> | ||||
/// test项目配置 | /// test项目配置 | ||||
/// </summary> | /// </summary> | ||||
public static class TestConfig | |||||
public class TestConfig : ApolloBPAConfig<TestConfig> | |||||
{ | { | ||||
#region 当前应用配置 | #region 当前应用配置 | ||||
/// <summary> | /// <summary> | ||||
/// RabbitMQ配置 | /// RabbitMQ配置 | ||||
/// </summary> | |||||
public static RabbitMqConnectionConfig RabbitMQConfig = new RabbitMqConnectionConfig | |||||
{ | |||||
HostName = "192.168.0.149", | |||||
Password = "admin", | |||||
Port = 5672, | |||||
UserName = "admin", | |||||
VirtualHost = "admin_vhost" | |||||
}; | |||||
/// </summary> | |||||
[AutoWrite] | |||||
public RabbitMqConnectionConfig RabbitMqConfig { get; set; } | |||||
#endregion | #endregion | ||||
public TestConfig(ApolloConfigurationManager apolloConfigurationManager, IConfiguration configuration) : base(apolloConfigurationManager, configuration) | |||||
{ | |||||
} | |||||
} | } | ||||
} | } |
@@ -15,6 +15,9 @@ | |||||
/// </summary> | /// </summary> | ||||
public DateTime Date{set;get;} | public DateTime Date{set;get;} | ||||
/// <summary> | |||||
/// Message | |||||
/// </summary> | |||||
public string Message { set; get; } | public string Message { set; get; } | ||||
} | } | ||||
} | } |
@@ -1,11 +1,13 @@ | |||||
using BPA.Component.RabbitMQClient.Enums; | |||||
using BPA.Component.ApolloClient; | |||||
using BPA.Component.LogClient.Extensions; | |||||
using BPA.Component.RabbitMQClient.Enums; | |||||
using BPA.Component.RabbitMQClient.Extensions; | using BPA.Component.RabbitMQClient.Extensions; | ||||
using BPA.Component.RabbitMQClient.Provider; | using BPA.Component.RabbitMQClient.Provider; | ||||
using BPA.Component.RabbitMQClientTester.Custom; | using BPA.Component.RabbitMQClientTester.Custom; | ||||
using BPA.Component.RabbitMQClientTester.Default; | using BPA.Component.RabbitMQClientTester.Default; | ||||
using BPA.Component.RabbitMQClientTester.Delays; | using BPA.Component.RabbitMQClientTester.Delays; | ||||
using BPA.Component.RabbitMQClientTester.Dynamic; | |||||
using BPA.Component.RabbitMQClientTester.Modesl; | using BPA.Component.RabbitMQClientTester.Modesl; | ||||
using Microsoft.Extensions.Configuration; | |||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
using Microsoft.Extensions.Logging; | using Microsoft.Extensions.Logging; | ||||
@@ -15,78 +17,45 @@ namespace BPA.Component.RabbitMQClientTester | |||||
{ | { | ||||
private static void Main(string[] args) | private static void Main(string[] args) | ||||
{ | { | ||||
Console.WriteLine("1=测试默认交换机, 2=测试自定义交换机, 3=测试自定义交换机 + 动态队列实现真正广播,4=延时"); | |||||
var cmd = Console.ReadLine(); | |||||
switch (cmd) | |||||
{ | |||||
case "1": | |||||
TestDefaultExchange(); | |||||
break; | |||||
case "2": | |||||
TestCustomExchange(); | |||||
break; | |||||
case "3": | |||||
TestCustomExchange_Dynamic_Queue(); | |||||
break; | |||||
case "4": | |||||
TestDelay(); | |||||
break; | |||||
default: | |||||
Console.WriteLine("命令错误"); | |||||
break; | |||||
} | |||||
DelayExchange(); | |||||
Console.ReadLine(); | Console.ReadLine(); | ||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// 测试默认交换机 | |||||
/// 默认交换机 | |||||
/// </summary> | /// </summary> | ||||
public static void TestDefaultExchange() | |||||
private static void DefaultExchange() | |||||
{ | { | ||||
var services = new ServiceCollection(); | |||||
services.AddLogging(configure => { configure.AddConsole(); }); | |||||
//注入RabbitMQProvider | |||||
services.UseRabbitMQProvider(sp => TestConfig.RabbitMQConfig); | |||||
//注入生产者,单例 | |||||
services.UseRabbitMQProducer<TestDefaultQueueProducer, TestMsgBody>(); | |||||
//注入消费者,多例 | |||||
services.UseRabbitMQConsumer<TestDefaultQueueConsumer1, TestMsgBody>(); | |||||
var builder = new ConfigurationBuilder(); | |||||
builder.AddEnvironmentVariables(); | |||||
//创建容器提供者 | |||||
var sp = services.BuildServiceProvider(); | |||||
var configuration = builder.Build(); | |||||
configuration.AddApolloConfiguration(); | |||||
//获取rabbitMqProvider | |||||
var rabbitMqProvider = sp.UseRabbitMQProvider<RabbitMQProvider>(); | |||||
//使用默认交换机的话,需要用队列配置对象 启用消息生产者 | |||||
var services = new ServiceCollection(); | |||||
services.AddBPALog(); | |||||
services.AddApollo<TestConfig>(); | |||||
services.AddRabbitMQProvider(provider => provider.GetService<TestConfig>()?.RabbitMqConfig); | |||||
services.AddRabbitMQProducer<TestDefaultQueueProducer, TestMsgBody>(); | |||||
services.AddRabbitMQConsumer<TestDefaultQueueConsumer1, TestMsgBody>(); | |||||
#region 启用消费者1 | |||||
var serviceProvider = services.BuildServiceProvider(); | |||||
var rabbitMqProvider = serviceProvider.UseRabbitMQProvider<RabbitMQProvider>(); | |||||
//启用消费者1, 3个消费者实例 | |||||
var queue1 = rabbitMqProvider.BuildQueueConfig(MqNameConfig.TestDefaultQueue1); | var queue1 = rabbitMqProvider.BuildQueueConfig(MqNameConfig.TestDefaultQueue1); | ||||
//额外配置 | |||||
queue1.ConfigOption(option => { }); | |||||
//启用队列 | |||||
queue1.ConfigOption(_ => { }); | |||||
queue1.UseQueue(); | queue1.UseQueue(); | ||||
//使用默认交换机的话,需要用队列配置对象 启用消息生产者 | |||||
queue1.UseProducer<TestMsgBody, TestDefaultQueueProducer>(); | queue1.UseProducer<TestMsgBody, TestDefaultQueueProducer>(); | ||||
var producer = sp.GetService<TestDefaultQueueProducer>(); | |||||
var producer = serviceProvider.GetService<TestDefaultQueueProducer>(); | |||||
if (producer == null) throw new ArgumentNullException(); | |||||
for (var i = 0; i <= 20; i++) | for (var i = 0; i <= 20; i++) | ||||
{ | { | ||||
producer.Enqueue(new TestMsgBody {Date = DateTime.Now, Id = i}); | producer.Enqueue(new TestMsgBody {Date = DateTime.Now, Id = i}); | ||||
} | } | ||||
//启用一个消费者 | |||||
queue1.UseConsumer<TestMsgBody, TestDefaultQueueConsumer1>(); | |||||
//启用多个消费者 | |||||
queue1.UseMultipleConsumer<TestMsgBody, TestDefaultQueueConsumer1>(3); | |||||
#endregion | |||||
queue1.UseConsumer<TestMsgBody, TestDefaultQueueConsumer1>(); //启用一个消费者 | |||||
queue1.UseMultipleConsumer<TestMsgBody, TestDefaultQueueConsumer1>(3); //启用多个消费者 | |||||
var id = 0; | var id = 0; | ||||
while (true) | while (true) | ||||
@@ -94,228 +63,95 @@ namespace BPA.Component.RabbitMQClientTester | |||||
//生产消息 | //生产消息 | ||||
producer.Enqueue(new TestMsgBody {Date = DateTime.Now, Id = id++}); | producer.Enqueue(new TestMsgBody {Date = DateTime.Now, Id = id++}); | ||||
Console.ReadLine(); | Console.ReadLine(); | ||||
Thread.Sleep(1000); | |||||
} | } | ||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// 测试自定义交换机 | /// 测试自定义交换机 | ||||
/// </summary> | /// </summary> | ||||
public static void TestCustomExchange() | |||||
private static void CustomExchange() | |||||
{ | { | ||||
var services = new ServiceCollection(); | |||||
services.AddLogging(configure => { configure.AddConsole(); }); | |||||
//注入RabbitMQProvider | |||||
services.UseRabbitMQProvider(sp => TestConfig.RabbitMQConfig); | |||||
//注入生产者,单例 | |||||
services.UseRabbitMQProducer<TestCustomQueueProducer, TestMsgBody>(); | |||||
//注入消费者,多例 | |||||
services.UseRabbitMQConsumer<TestCustomQueueConsumer1, TestMsgBody>(); | |||||
//注入消费者2,多例 | |||||
services.UseRabbitMQConsumer<TestCustomQueueConsumer2, TestMsgBody>(); | |||||
var builder = new ConfigurationBuilder(); | |||||
builder.AddEnvironmentVariables(); | |||||
//创建容器提供者 | |||||
var sp = services.BuildServiceProvider(); | |||||
var configuration = builder.Build(); | |||||
configuration.AddApolloConfiguration(); | |||||
//获取rabbitMqProvider | |||||
var rabbitMqProvider = sp.UseRabbitMQProvider<RabbitMQProvider>(); | |||||
var services = new ServiceCollection(); | |||||
services.AddBPALog(); | |||||
services.AddApollo<TestConfig>(); | |||||
services.AddRabbitMQProvider(provider => provider.GetService<TestConfig>()?.RabbitMqConfig); | |||||
services.AddRabbitMQProducer<TestCustomQueueProducer, TestMsgBody>(); | |||||
services.AddRabbitMQConsumer<TestCustomQueueConsumer1, TestMsgBody>(); | |||||
services.AddRabbitMQConsumer<TestCustomQueueConsumer2, TestMsgBody>(); | |||||
#region 初始化生产者 | |||||
var serviceProvider = services.BuildServiceProvider(); | |||||
var rabbitMqProvider = serviceProvider.UseRabbitMQProvider<RabbitMQProvider>(); | |||||
//构建 TestQueue 的队列配置信息,启用生产者,启用消费者 | |||||
var exchangeConfig = rabbitMqProvider.BuildExchangeConfig(MqNameConfig.TestCustomExchange, ExchangeType.Fanout); | |||||
//exchangeConfig.ConfigOption(option => option.Type = ExchangeType.Fanout);//扇形模式 | |||||
var exchangeConfig = rabbitMqProvider.BuildExchangeConfig(MqNameConfig.TestCustomExchange); | |||||
exchangeConfig.UseExchange(); | exchangeConfig.UseExchange(); | ||||
//启用生成者,单例 | |||||
exchangeConfig.UseProducer<TestMsgBody, TestCustomQueueProducer>(); | exchangeConfig.UseProducer<TestMsgBody, TestCustomQueueProducer>(); | ||||
#endregion | |||||
#region 启用消费者1 | |||||
//启用消费者1 ,3个消费者实例 | |||||
var queue1 = rabbitMqProvider.BuildQueueConfig(MqNameConfig.TestCustomQueue1, MqNameConfig.TestCustomExchange); | var queue1 = rabbitMqProvider.BuildQueueConfig(MqNameConfig.TestCustomQueue1, MqNameConfig.TestCustomExchange); | ||||
//额外配置 | |||||
queue1.ConfigOption(option => { }); | |||||
//启用队列 | |||||
queue1.ConfigOption(_ => { }); | |||||
queue1.UseQueue(); | queue1.UseQueue(); | ||||
//启用一个消费者 | |||||
queue1.UseConsumer<TestMsgBody, TestCustomQueueConsumer1>(); | queue1.UseConsumer<TestMsgBody, TestCustomQueueConsumer1>(); | ||||
//启用多个消费者 | |||||
queue1.UseConsumer<TestMsgBody, TestCustomQueueConsumer1>(); | |||||
queue1.UseMultipleConsumer<TestMsgBody, TestCustomQueueConsumer1>(3); | queue1.UseMultipleConsumer<TestMsgBody, TestCustomQueueConsumer1>(3); | ||||
#endregion | |||||
#region 启用消费者2 | |||||
//启用消费者1 ,3个消费者实例 | |||||
var queue2 = rabbitMqProvider.BuildQueueConfig(MqNameConfig.TestCustomQueue2, MqNameConfig.TestCustomExchange); | var queue2 = rabbitMqProvider.BuildQueueConfig(MqNameConfig.TestCustomQueue2, MqNameConfig.TestCustomExchange); | ||||
//额外配置 | |||||
queue2.ConfigOption(option => { }); | |||||
//启用队列 | |||||
queue2.ConfigOption(_ => { }); | |||||
queue2.UseQueue(); | queue2.UseQueue(); | ||||
//启用一个消费者 | |||||
queue2.UseConsumer<TestMsgBody, TestCustomQueueConsumer2>(); | queue2.UseConsumer<TestMsgBody, TestCustomQueueConsumer2>(); | ||||
//启用多个消费者 | |||||
queue2.UseMultipleConsumer<TestMsgBody, TestCustomQueueConsumer2>(3); | queue2.UseMultipleConsumer<TestMsgBody, TestCustomQueueConsumer2>(3); | ||||
#endregion | |||||
var id = 0; | var id = 0; | ||||
while (true) | while (true) | ||||
{ | { | ||||
var producer = sp.GetService<TestCustomQueueProducer>(); | |||||
//生产消息 | |||||
var producer = serviceProvider.GetService<TestCustomQueueProducer>(); | |||||
producer.Enqueue(new TestMsgBody {Date = DateTime.Now, Id = id++}); | producer.Enqueue(new TestMsgBody {Date = DateTime.Now, Id = id++}); | ||||
Console.ReadLine(); | |||||
Thread.Sleep(1000); | |||||
} | } | ||||
} | } | ||||
/// <summary> | |||||
/// 测试自定义交换机+动态队列实现 真正广播 这种广播模式 队列不会持久化 所有消息有丢失的风险 | |||||
/// </summary> | |||||
public static void TestCustomExchange_Dynamic_Queue() | |||||
{ | |||||
var services = new ServiceCollection(); | |||||
services.AddLogging(configure => { configure.AddConsole(); }); | |||||
//注入RabbitMQProvider | |||||
services.UseRabbitMQProvider(sp => TestConfig.RabbitMQConfig); | |||||
//注入生产者,单例 | |||||
services.UseRabbitMQProducer<TestCustomDynamicQueueProducer, TestMsgBody>(); | |||||
//注入消费者,多例 | |||||
services.UseRabbitMQConsumer<TestCustomDynamicQueueConsumer, TestMsgBody>(); | |||||
//注入消费者2,多例 | |||||
services.UseRabbitMQConsumer<TestCustomDynamicQueueConsumer, TestMsgBody>(); | |||||
//创建容器提供者 | |||||
var sp = services.BuildServiceProvider(); | |||||
//获取rabbitMqProvider | |||||
var rabbitMqProvider = sp.UseRabbitMQProvider<RabbitMQProvider>(); | |||||
#region 初始化生产者 | |||||
//构建 TestQueue 的队列配置信息,启用生产者,启用消费者 | |||||
var exchangeConfig = rabbitMqProvider.BuildExchangeConfig(MqNameConfig.TestCustomDynamicQueueExchange, ExchangeType.Fanout); | |||||
//exchangeConfig.ConfigOption(option => option.Type = ExchangeType.Fanout);//扇形模式 | |||||
exchangeConfig.UseExchange(); | |||||
//启用生成者,单例 | |||||
exchangeConfig.UseProducer<TestMsgBody, TestCustomDynamicQueueProducer>(); | |||||
#endregion | |||||
//看着MqNameConfig.TestCustomDynamicQueue是一个名字,实际上是或生成多个 | |||||
#region 启用消费者1 | |||||
//启用消费者1 ,3个消费者实例 | |||||
var queue1 = rabbitMqProvider.BuildQueueConfig(MqNameConfig.TestCustomDynamicQueue, | |||||
MqNameConfig.TestCustomDynamicQueueExchange); | |||||
//额外配置 | |||||
queue1.ConfigOption(option => { }); | |||||
//启用队列 | |||||
queue1.UseQueue(); | |||||
//启用一个消费者 | |||||
queue1.UseConsumer<TestMsgBody, TestCustomDynamicQueueConsumer>(); | |||||
//启用多个消费者 | |||||
#endregion | |||||
#region 启用消费者2 | |||||
//启用消费者1 ,3个消费者实例 | |||||
var queue2 = rabbitMqProvider.BuildQueueConfig(MqNameConfig.TestCustomDynamicQueue, | |||||
MqNameConfig.TestCustomDynamicQueueExchange); | |||||
//额外配置 | |||||
queue2.ConfigOption(option => { }); | |||||
//启用队列 | |||||
queue2.UseQueue(); | |||||
//启用一个消费者 | |||||
queue2.UseConsumer<TestMsgBody, TestCustomDynamicQueueConsumer>(); | |||||
#endregion | |||||
var id = 0; | |||||
while (true) | |||||
{ | |||||
var producer = sp.GetService<TestCustomDynamicQueueProducer>(); | |||||
//生产消息 | |||||
producer.Enqueue(new TestMsgBody {Date = DateTime.Now, Id = id++}); | |||||
Console.ReadLine(); | |||||
} | |||||
} | |||||
/// <summary> | /// <summary> | ||||
/// 测试延时 | /// 测试延时 | ||||
/// </summary> | /// </summary> | ||||
public static void TestDelay() | |||||
private static void DelayExchange() | |||||
{ | { | ||||
var services = new ServiceCollection(); | |||||
services.AddLogging(configure => { configure.AddConsole(); }); | |||||
var builder = new ConfigurationBuilder(); | |||||
builder.AddEnvironmentVariables(); | |||||
//注入RabbitMQProvider | |||||
services.UseRabbitMQProvider(sp => TestConfig.RabbitMQConfig); | |||||
//注入生产者,单例 | |||||
services.UseRabbitMQProducer<TestDelayQueueProducer, TestMsgBody>(); | |||||
//注入消费者,多例 | |||||
services.UseRabbitMQConsumer<TestDelayQueueConsumer, TestMsgBody>(); | |||||
var configuration = builder.Build(); | |||||
configuration.AddApolloConfiguration(); | |||||
//创建容器提供者 | |||||
var sp = services.BuildServiceProvider(); | |||||
//获取rabbitMqProvider | |||||
var rabbitMqProvider = sp.UseRabbitMQProvider<RabbitMQProvider>(); | |||||
var services = new ServiceCollection(); | |||||
services.AddBPALog(); | |||||
services.AddApollo<TestConfig>(); | |||||
services.AddRabbitMQProvider(provider => provider.GetService<TestConfig>()?.RabbitMqConfig); | |||||
services.AddRabbitMQProducer<TestDelayQueueProducer, TestMsgBody>(); | |||||
services.AddRabbitMQConsumer<TestDelayQueueConsumer, TestMsgBody>(); | |||||
#region 初始化生产者 和交换机1 | |||||
var serviceProvider = services.BuildServiceProvider(); | |||||
var rabbitMqProvider = serviceProvider.UseRabbitMQProvider<RabbitMQProvider>(); | |||||
//构建 TestQueue 的队列配置信息,启用生产者,启用消费者 | |||||
var exchangeConfig1 = rabbitMqProvider.BuildExchangeConfig(MqNameConfig.TestDelayExchange1, ExchangeType.X_Delayed_Message); | var exchangeConfig1 = rabbitMqProvider.BuildExchangeConfig(MqNameConfig.TestDelayExchange1, ExchangeType.X_Delayed_Message); | ||||
exchangeConfig1.UseExchange(); | exchangeConfig1.UseExchange(); | ||||
//启用生成者,单例 | |||||
exchangeConfig1.UseProducer<TestMsgBody, TestDelayQueueProducer>(); | exchangeConfig1.UseProducer<TestMsgBody, TestDelayQueueProducer>(); | ||||
//场景1:消息--发送到(延时消息)--交换机1---队列1---消费者 | |||||
//场景2:消息--发送到(延时消息)--交换机1---队列1---过期(延时)---交换机2---队列2---消费者 | |||||
rabbitMqProvider.BuildExchangeConfig(MqNameConfig.TestDelayExchange2).UseExchange(); | |||||
#endregion | |||||
//交换机2 | |||||
rabbitMqProvider.BuildExchangeConfig(MqNameConfig.TestDelayExchange2, ExchangeType.Direct).UseExchange(); | |||||
// 启用队列1 ,队列1中消息过期后会移交到 交换机2 | |||||
var queue1 = rabbitMqProvider.BuildQueueConfig(MqNameConfig.TestDelayQueue1, MqNameConfig.TestDelayExchange1); | var queue1 = rabbitMqProvider.BuildQueueConfig(MqNameConfig.TestDelayQueue1, MqNameConfig.TestDelayExchange1); | ||||
////额外配置 | |||||
//queue1.ConfigOption(option => | |||||
//{ | |||||
// //消息过期后 将消息移动到 交换机2 | |||||
// option.SetDeadLetterExchangeName(MqNameConfig.TestDelayExchange2); | |||||
// //路由 用 队列2的名字 | |||||
// option.SetDeadLetterRoutingKey(MqNameConfig.TestDelayQueue2); | |||||
//}); | |||||
//启用队列 | |||||
queue1.UseQueue(); | queue1.UseQueue(); | ||||
queue1.UseConsumer<TestMsgBody, TestDelayQueueConsumer>(); | queue1.UseConsumer<TestMsgBody, TestDelayQueueConsumer>(); | ||||
////队列2从交换机2中拿到数据 | |||||
//var queue2 = rabbitMqProvider.BuildQueueConfig(MqNameConfig.TestDelayQueue2, MqNameConfig.TestDelayExchange2); | |||||
//queue2.ConfigOption(option => | |||||
//{ | |||||
// option.RoutingKey = MqNameConfig.TestDelayQueue2; | |||||
//}); | |||||
////启用队列 | |||||
//queue2.UseQueue(); | |||||
//queue2.UseConsumer<TestMsgBody, TestDelayQueueConsumer>(); | |||||
var id = 0; | var id = 0; | ||||
while (true) | while (true) | ||||
{ | { | ||||
//消息--发送到(延时消息)--交换机1---队列1----过期(延时)--交换机2--队列2--消费者 | |||||
var producer = sp.GetService<TestDelayQueueProducer>(); | |||||
//生产消息 | |||||
var producer = serviceProvider.GetService<TestDelayQueueProducer>(); | |||||
var delay = new Random().Next(1, 15) * 1000L; | var delay = new Random().Next(1, 15) * 1000L; | ||||
producer.Enqueue(new TestMsgBody {Date = DateTime.Now, Id = id++, Message = $"延时{delay}"}, null, delay); | producer.Enqueue(new TestMsgBody {Date = DateTime.Now, Id = id++, Message = $"延时{delay}"}, null, delay); | ||||
Console.ReadLine(); | Console.ReadLine(); | ||||
@@ -1,12 +1,14 @@ | |||||
{ | { | ||||
"$schema": "http://json.schemastore.org/launchsettings.json", | |||||
"profiles": { | "profiles": { | ||||
"BPA.Component.LogClientTester": { | |||||
"BPA.Component.RabbitMQClientTester": { | |||||
"commandName": "Project", | "commandName": "Project", | ||||
"environmentVariables": { | "environmentVariables": { | ||||
"APOLLO_META_SERVER_URL": "http://192.168.0.147:7010", | |||||
"APOLLO_COMMON_NAMESPACE": "BPA.test.common", | |||||
"APP_NAME": "BPA.test" | |||||
"ASPNETCORE_ENVIRONMENT": "Development", | |||||
"APOLLO_META_SERVER_URL": "http://10.2.1.21:28080", | |||||
"APOLLO_COMMON_NAMESPACE": "Dev.Common", | |||||
"APP_NAME": "BPA-DEV" | |||||
} | } | ||||
} | } | ||||
} | } | ||||
} | |||||
} |
@@ -7,7 +7,11 @@ | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.2.3"/> | |||||
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.2.3" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<Folder Include="Controllers" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
</Project> | </Project> |
@@ -1,32 +0,0 @@ | |||||
using Microsoft.AspNetCore.Mvc; | |||||
namespace BPA.Component.RabbitMQClientWebTester.Controllers; | |||||
[ApiController] | |||||
[Route("[controller]")] | |||||
public class WeatherForecastController : ControllerBase | |||||
{ | |||||
private static readonly string[] Summaries = new[] | |||||
{ | |||||
"Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching" | |||||
}; | |||||
private readonly ILogger<WeatherForecastController> _logger; | |||||
public WeatherForecastController(ILogger<WeatherForecastController> logger) | |||||
{ | |||||
_logger = logger; | |||||
} | |||||
[HttpGet(Name = "GetWeatherForecast")] | |||||
public IEnumerable<WeatherForecast> Get() | |||||
{ | |||||
return Enumerable.Range(1, 5).Select(index => new WeatherForecast | |||||
{ | |||||
Date = DateTime.Now.AddDays(index), | |||||
TemperatureC = Random.Shared.Next(-20, 55), | |||||
Summary = Summaries[Random.Shared.Next(Summaries.Length)] | |||||
}) | |||||
.ToArray(); | |||||
} | |||||
} |
@@ -1,30 +1,15 @@ | |||||
{ | { | ||||
"$schema": "https://json.schemastore.org/launchsettings.json", | |||||
"iisSettings": { | |||||
"windowsAuthentication": false, | |||||
"anonymousAuthentication": true, | |||||
"iisExpress": { | |||||
"applicationUrl": "http://localhost:57481", | |||||
"sslPort": 44386 | |||||
} | |||||
}, | |||||
"$schema": "http://json.schemastore.org/launchsettings.json", | |||||
"profiles": { | "profiles": { | ||||
"BPA.Component.RabbitMQClientWebTester": { | "BPA.Component.RabbitMQClientWebTester": { | ||||
"commandName": "Project", | "commandName": "Project", | ||||
"dotnetRunMessages": true, | |||||
"launchBrowser": true, | |||||
"launchUrl": "swagger", | |||||
"applicationUrl": "https://localhost:7262;http://localhost:5262", | |||||
"environmentVariables": { | |||||
"ASPNETCORE_ENVIRONMENT": "Development" | |||||
} | |||||
}, | |||||
"IIS Express": { | |||||
"commandName": "IISExpress", | |||||
"launchBrowser": true, | "launchBrowser": true, | ||||
"launchUrl": "swagger", | |||||
"applicationUrl": "http://localhost:5023", | |||||
"environmentVariables": { | "environmentVariables": { | ||||
"ASPNETCORE_ENVIRONMENT": "Development" | |||||
"ASPNETCORE_ENVIRONMENT": "Development", | |||||
"APOLLO_META_SERVER_URL": "http://10.2.1.21:28080", | |||||
"APOLLO_COMMON_NAMESPACE": "Dev.Common", | |||||
"APP_NAME": "BPA-DEV" | |||||
} | } | ||||
} | } | ||||
} | } | ||||
@@ -1,12 +0,0 @@ | |||||
namespace BPA.Component.RabbitMQClientWebTester; | |||||
public class WeatherForecast | |||||
{ | |||||
public DateTime Date { get; set; } | |||||
public int TemperatureC { get; set; } | |||||
public int TemperatureF => 32 + (int) (TemperatureC / 0.5556); | |||||
public string? Summary { get; set; } | |||||
} |
@@ -1,8 +0,0 @@ | |||||
{ | |||||
"Logging": { | |||||
"LogLevel": { | |||||
"Default": "Information", | |||||
"Microsoft.AspNetCore": "Warning" | |||||
} | |||||
} | |||||
} |
@@ -1,9 +0,0 @@ | |||||
{ | |||||
"Logging": { | |||||
"LogLevel": { | |||||
"Default": "Information", | |||||
"Microsoft.AspNetCore": "Warning" | |||||
} | |||||
}, | |||||
"AllowedHosts": "*" | |||||
} |
@@ -60,8 +60,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BPA.Component.ApolloClientW | |||||
EndProject | EndProject | ||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BPA.Component.ClickHouseClientTester", "BPA.Component.ClickHouseClient\BPA.Component.ClickHouseClientTester\BPA.Component.ClickHouseClientTester.csproj", "{F81FDB53-2CCD-47EF-8766-7CBEF3EDEB8D}" | Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BPA.Component.ClickHouseClientTester", "BPA.Component.ClickHouseClient\BPA.Component.ClickHouseClientTester\BPA.Component.ClickHouseClientTester.csproj", "{F81FDB53-2CCD-47EF-8766-7CBEF3EDEB8D}" | ||||
EndProject | EndProject | ||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BPA.Component.RabbitMQClientWebTester", "BPA.Component.RabbitMQClient\BPA.Component.RabbitMQClientWebTester\BPA.Component.RabbitMQClientWebTester.csproj", "{0545F9FD-8302-440D-AF94-AD671F130479}" | |||||
EndProject | |||||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tester", "Tester", "{C09304A6-255C-4693-B704-8C1FBD57CF85}" | Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tester", "Tester", "{C09304A6-255C-4693-B704-8C1FBD57CF85}" | ||||
EndProject | EndProject | ||||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Template", "Template", "{114E6F3D-9A66-47DF-AB8E-1BE6EC85213E}" | Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Template", "Template", "{114E6F3D-9A66-47DF-AB8E-1BE6EC85213E}" | ||||
@@ -180,10 +178,6 @@ Global | |||||
{F81FDB53-2CCD-47EF-8766-7CBEF3EDEB8D}.Debug|Any CPU.Build.0 = Debug|Any CPU | {F81FDB53-2CCD-47EF-8766-7CBEF3EDEB8D}.Debug|Any CPU.Build.0 = Debug|Any CPU | ||||
{F81FDB53-2CCD-47EF-8766-7CBEF3EDEB8D}.Release|Any CPU.ActiveCfg = Release|Any CPU | {F81FDB53-2CCD-47EF-8766-7CBEF3EDEB8D}.Release|Any CPU.ActiveCfg = Release|Any CPU | ||||
{F81FDB53-2CCD-47EF-8766-7CBEF3EDEB8D}.Release|Any CPU.Build.0 = Release|Any CPU | {F81FDB53-2CCD-47EF-8766-7CBEF3EDEB8D}.Release|Any CPU.Build.0 = Release|Any CPU | ||||
{0545F9FD-8302-440D-AF94-AD671F130479}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||||
{0545F9FD-8302-440D-AF94-AD671F130479}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||||
{0545F9FD-8302-440D-AF94-AD671F130479}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||||
{0545F9FD-8302-440D-AF94-AD671F130479}.Release|Any CPU.Build.0 = Release|Any CPU | |||||
{F8581079-3BE5-47BB-93CA-AC20753157B6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | {F8581079-3BE5-47BB-93CA-AC20753157B6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | ||||
{F8581079-3BE5-47BB-93CA-AC20753157B6}.Debug|Any CPU.Build.0 = Debug|Any CPU | {F8581079-3BE5-47BB-93CA-AC20753157B6}.Debug|Any CPU.Build.0 = Debug|Any CPU | ||||
{F8581079-3BE5-47BB-93CA-AC20753157B6}.Release|Any CPU.ActiveCfg = Release|Any CPU | {F8581079-3BE5-47BB-93CA-AC20753157B6}.Release|Any CPU.ActiveCfg = Release|Any CPU | ||||
@@ -215,7 +209,6 @@ Global | |||||
{9BB79240-0400-4843-BFD3-7C8EE316C0A4} = {C09304A6-255C-4693-B704-8C1FBD57CF85} | {9BB79240-0400-4843-BFD3-7C8EE316C0A4} = {C09304A6-255C-4693-B704-8C1FBD57CF85} | ||||
{38640EA1-3DBE-492A-96B3-AA3AC5EBC16B} = {C09304A6-255C-4693-B704-8C1FBD57CF85} | {38640EA1-3DBE-492A-96B3-AA3AC5EBC16B} = {C09304A6-255C-4693-B704-8C1FBD57CF85} | ||||
{BCCC8684-C155-4FC2-A648-1DD5E550B568} = {C09304A6-255C-4693-B704-8C1FBD57CF85} | {BCCC8684-C155-4FC2-A648-1DD5E550B568} = {C09304A6-255C-4693-B704-8C1FBD57CF85} | ||||
{0545F9FD-8302-440D-AF94-AD671F130479} = {C09304A6-255C-4693-B704-8C1FBD57CF85} | |||||
{78463667-1A86-48ED-B459-0AE2C1CBC668} = {C09304A6-255C-4693-B704-8C1FBD57CF85} | {78463667-1A86-48ED-B459-0AE2C1CBC668} = {C09304A6-255C-4693-B704-8C1FBD57CF85} | ||||
{F8581079-3BE5-47BB-93CA-AC20753157B6} = {114E6F3D-9A66-47DF-AB8E-1BE6EC85213E} | {F8581079-3BE5-47BB-93CA-AC20753157B6} = {114E6F3D-9A66-47DF-AB8E-1BE6EC85213E} | ||||
{2B2DAA5B-0BD9-4DAD-A53E-1C52B7F14E0D} = {114E6F3D-9A66-47DF-AB8E-1BE6EC85213E} | {2B2DAA5B-0BD9-4DAD-A53E-1C52B7F14E0D} = {114E6F3D-9A66-47DF-AB8E-1BE6EC85213E} | ||||