Przeglądaj źródła

modify job processor executor step.

master
yangxiaodong 7 lat temu
rodzic
commit
83064c25b6
1 zmienionych plików z 29 dodań i 39 usunięć
  1. +29
    -39
      src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs

+ 29
- 39
src/DotNetCore.CAP.RabbitMQ/IProcessor.RabbitJobProcessor.cs Wyświetl plik

@@ -81,68 +81,58 @@ namespace DotNetCore.CAP.RabbitMQ
{
var provider = scopedContext.Provider;
var messageStore = provider.GetRequiredService<ICapMessageStore>();
var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync();
try
{
var message = await messageStore.GetNextSentMessageToBeEnqueuedAsync();
if (message != null)
{
var sp = Stopwatch.StartNew();
message.StatusName = StatusName.Processing;
await messageStore.UpdateSentMessageAsync(message);

var jobResult = ExecuteJob(message.KeyName, message.Content);
ExecuteJob(message.KeyName, message.Content);

sp.Stop();

if (jobResult)
{
await messageStore.RemoveSentMessageAsync(message);
_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
message.StatusName = StatusName.Succeeded;
await messageStore.UpdateSentMessageAsync(message);

_logger.JobExecuted(sp.Elapsed.TotalSeconds);
}
}
catch (Exception)
catch (Exception ex)
{
_logger.ExceptionOccuredWhileExecutingJob(message.KeyName, ex);
return false;
}
}
return true;
}

private bool ExecuteJob(string topic, string content)
private void ExecuteJob(string topic, string content)
{
try
var factory = new ConnectionFactory()
{
var factory = new ConnectionFactory()
{
HostName = _rabbitMQOptions.HostName,
UserName = _rabbitMQOptions.UserName,
Port = _rabbitMQOptions.Port,
Password = _rabbitMQOptions.Password,
VirtualHost = _rabbitMQOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMQOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMQOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMQOptions.SocketWriteTimeout
};

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
var body = Encoding.UTF8.GetBytes(content);

channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, _rabbitMQOptions.EXCHANGE_TYPE);
channel.BasicPublish(exchange: _rabbitMQOptions.TopicExchangeName,
routingKey: topic,
basicProperties: null,
body: body);

return true;
}
}
catch (Exception ex)
HostName = _rabbitMQOptions.HostName,
UserName = _rabbitMQOptions.UserName,
Port = _rabbitMQOptions.Port,
Password = _rabbitMQOptions.Password,
VirtualHost = _rabbitMQOptions.VirtualHost,
RequestedConnectionTimeout = _rabbitMQOptions.RequestedConnectionTimeout,
SocketReadTimeout = _rabbitMQOptions.SocketReadTimeout,
SocketWriteTimeout = _rabbitMQOptions.SocketWriteTimeout
};

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
_logger.ExceptionOccuredWhileExecutingJob(topic, ex);
return false;
var body = Encoding.UTF8.GetBytes(content);

channel.ExchangeDeclare(_rabbitMQOptions.TopicExchangeName, _rabbitMQOptions.EXCHANGE_TYPE);
channel.BasicPublish(exchange: _rabbitMQOptions.TopicExchangeName,
routingKey: topic,
basicProperties: null,
body: body);
}
}
}

Ładowanie…
Anuluj
Zapisz