diff --git a/samples/Sample.RabbitMQ.SqlServer/Controllers/ValuesController.cs b/samples/Sample.RabbitMQ.SqlServer/Controllers/ValuesController.cs index c7e32dc..bb75269 100644 --- a/samples/Sample.RabbitMQ.SqlServer/Controllers/ValuesController.cs +++ b/samples/Sample.RabbitMQ.SqlServer/Controllers/ValuesController.cs @@ -2,6 +2,7 @@ using System.Threading.Tasks; using Dapper; using DotNetCore.CAP; +using DotNetCore.CAP.Messages; using Microsoft.AspNetCore.Mvc; using Microsoft.Data.SqlClient; @@ -77,6 +78,8 @@ namespace Sample.RabbitMQ.SqlServer.Controllers [CapSubscribe("sample.rabbitmq.sqlserver", Group = "group.test2")] public void Subscriber2(Person p, [FromCap]CapHeader header) { + var id = header[Headers.MessageId]; + Console.WriteLine($@"{DateTime.Now} Subscriber invoked, Info: {p}"); } } diff --git a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs index 2139ee1..d2f21cc 100644 --- a/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs +++ b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs @@ -34,10 +34,11 @@ namespace DotNetCore.CAP.Processor _sender = sender; _executor = executor; - _publishedChannel = Channel.CreateUnbounded(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = true }); + _publishedChannel = Channel.CreateUnbounded(new UnboundedChannelOptions() { SingleReader = false, SingleWriter = true }); _receivedChannel = Channel.CreateUnbounded<(MediumMessage, ConsumerExecutorDescriptor)>(); - Task.Factory.StartNew(Sending, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); + Task.WhenAll(Enumerable.Range(0, options.Value.ProducerThreadCount) + .Select(_ => Task.Factory.StartNew(Sending, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray()); Task.WhenAll(Enumerable.Range(0, options.Value.ConsumerThreadCount) .Select(_ => Task.Factory.StartNew(Processing, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());