Przeglądaj źródła

Change exhcange type with options.

master
yangxiaodong 7 lat temu
rodzic
commit
3d7eb4bcbf
2 zmienionych plików z 14 dodań i 22 usunięć
  1. +5
    -12
      src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs
  2. +9
    -10
      src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs

+ 5
- 12
src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs Wyświetl plik

@@ -94,16 +94,9 @@ namespace DotNetCore.CAP.RabbitMQ

sp.Stop();

if (!jobResult)
if (jobResult)
{
_logger.JobFailed(new Exception("topic send failed"));
}
else
{
//TODO : the state will be deleted when release.
message.StatusName = StatusName.Succeeded;
await messageStore.UpdateSentMessageAsync(message);

await messageStore.RemoveSentMessageAsync(message);
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
}
@@ -135,10 +128,10 @@ namespace DotNetCore.CAP.RabbitMQ
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "test",
type: "topic");
var body = Encoding.UTF8.GetBytes(content);
channel.BasicPublish(exchange: "test",

channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, _rabbitMQOptions.EXCHANGE_TYPE);
channel.BasicPublish(exchange: _rabbitMQOptions.TopicExchangeName,
routingKey: topic,
basicProperties: null,
body: body);


+ 9
- 10
src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs Wyświetl plik

@@ -9,10 +9,8 @@ namespace DotNetCore.CAP.RabbitMQ
{
public class RabbitMQConsumerClient : IConsumerClient
{
public const string TYPE = "topic";

private string _queueName;
private readonly string _exchange;
private readonly string _exchageName;
private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions;

private IConnectionFactory _connectionFactory;
@@ -21,10 +19,11 @@ namespace DotNetCore.CAP.RabbitMQ

public event EventHandler<MessageBase> MessageReceieved;

public RabbitMQConsumerClient(string exchange, RabbitMQOptions options)
public RabbitMQConsumerClient(string queueName, RabbitMQOptions options)
{
_exchange = exchange;
_queueName = queueName;
_rabbitMQOptions = options;
_exchageName = options.TopicExchangeName;

InitClient();
}
@@ -45,8 +44,8 @@ namespace DotNetCore.CAP.RabbitMQ

_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: _exchange, type: TYPE);
_queueName = _channel.QueueDeclare().QueueName;
_channel.ExchangeDeclare(exchange: _exchageName, type: _rabbitMQOptions.EXCHANGE_TYPE);
_channel.QueueDeclare(_queueName);
}

public void Listening(TimeSpan timeout)
@@ -62,12 +61,12 @@ namespace DotNetCore.CAP.RabbitMQ

public void Subscribe(string topic)
{
_channel.QueueBind(_queueName, _exchange, topic);
_channel.QueueBind(_queueName, _exchageName, topic);
}

public void Subscribe(string topic, int partition)
{
_channel.QueueBind(_queueName, _exchange, topic);
_channel.QueueBind(_queueName, _exchageName, topic);
}

public void Dispose()


Ładowanie…
Anuluj
Zapisz