Browse Source

Support custom multiple producer threads for sending。 #731

master
Savorboard 3 years ago
parent
commit
3528432277
2 changed files with 6 additions and 2 deletions
  1. +3
    -0
      samples/Sample.RabbitMQ.SqlServer/Controllers/ValuesController.cs
  2. +3
    -2
      src/DotNetCore.CAP/Processor/IDispatcher.Default.cs

+ 3
- 0
samples/Sample.RabbitMQ.SqlServer/Controllers/ValuesController.cs View File

@@ -2,6 +2,7 @@
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP;
using DotNetCore.CAP.Messages;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Data.SqlClient;

@@ -77,6 +78,8 @@ namespace Sample.RabbitMQ.SqlServer.Controllers
[CapSubscribe("sample.rabbitmq.sqlserver", Group = "group.test2")]
public void Subscriber2(Person p, [FromCap]CapHeader header)
{
var id = header[Headers.MessageId];

Console.WriteLine($@"{DateTime.Now} Subscriber invoked, Info: {p}");
}
}


+ 3
- 2
src/DotNetCore.CAP/Processor/IDispatcher.Default.cs View File

@@ -34,10 +34,11 @@ namespace DotNetCore.CAP.Processor
_sender = sender;
_executor = executor;

_publishedChannel = Channel.CreateUnbounded<MediumMessage>(new UnboundedChannelOptions() { SingleReader = true, SingleWriter = true });
_publishedChannel = Channel.CreateUnbounded<MediumMessage>(new UnboundedChannelOptions() { SingleReader = false, SingleWriter = true });
_receivedChannel = Channel.CreateUnbounded<(MediumMessage, ConsumerExecutorDescriptor)>();

Task.Factory.StartNew(Sending, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
Task.WhenAll(Enumerable.Range(0, options.Value.ProducerThreadCount)
.Select(_ => Task.Factory.StartNew(Sending, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());

Task.WhenAll(Enumerable.Range(0, options.Value.ConsumerThreadCount)
.Select(_ => Task.Factory.StartNew(Processing, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default)).ToArray());


Loading…
Cancel
Save