Savorboard пре 7 година
родитељ
комит
8e107693da
1 измењених фајлова са 43 додато и 7 уклоњено
  1. +43
    -7
      src/DotNetCore.CAP/Processor/IProcessor.Failed.cs

src/DotNetCore.CAP/Processor/IProcessor.FailedJob.cs → src/DotNetCore.CAP/Processor/IProcessor.Failed.cs Прегледај датотеку

@@ -1,5 +1,8 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor.States;
using Microsoft.Extensions.DependencyInjection;
@@ -8,26 +11,32 @@ using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Processor
{
public class FailedJobProcessor : IProcessor
public class FailedProcessor : IProcessor
{
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1);
private readonly ILogger _logger;
private readonly CapOptions _options;
private readonly IServiceProvider _provider;
private readonly IStateChanger _stateChanger;
private readonly ISubscriberExecutor _subscriberExecutor;
private readonly IPublishExecutor _publishExecutor;
private readonly TimeSpan _waitingInterval;

public FailedJobProcessor(
public FailedProcessor(
IOptions<CapOptions> options,
ILogger<FailedJobProcessor> logger,
ILogger<FailedProcessor> logger,
IServiceProvider provider,
IStateChanger stateChanger)
IStateChanger stateChanger,
ISubscriberExecutor subscriberExecutor,
IPublishExecutor publishExecutor)
{
_options = options.Value;
_logger = logger;
_provider = provider;
_stateChanger = stateChanger;
_waitingInterval = TimeSpan.FromSeconds(_options.FailedMessageWaitingInterval);
_subscriberExecutor = subscriberExecutor;
_publishExecutor = publishExecutor;
_waitingInterval = TimeSpan.FromSeconds(_options.FailedRetryInterval);
}

public async Task ProcessAsync(ProcessingContext context)
@@ -57,6 +66,9 @@ namespace DotNetCore.CAP.Processor

foreach (var message in messages)
{
if (message.Retries > _options.FailedRetryCount)
continue;

if (!hasException)
try
{
@@ -70,7 +82,18 @@ namespace DotNetCore.CAP.Processor

using (var transaction = connection.CreateTransaction())
{
_stateChanger.ChangeState(message, new EnqueuedState(), transaction);
try
{
await _publishExecutor.PublishAsync(message.Name, message.Content);

_stateChanger.ChangeState(message, new SucceededState(), transaction);
}
catch (Exception e)
{
message.Content = Helper.AddExceptionProperty(message.Content, e);
message.Retries++;
transaction.UpdateMessage(message);
}
await transaction.CommitAsync();
}

@@ -87,6 +110,9 @@ namespace DotNetCore.CAP.Processor

foreach (var message in messages)
{
if (message.Retries > _options.FailedRetryCount)
continue;

if (!hasException)
try
{
@@ -100,7 +126,17 @@ namespace DotNetCore.CAP.Processor

using (var transaction = connection.CreateTransaction())
{
_stateChanger.ChangeState(message, new EnqueuedState(), transaction);
var ret = await _subscriberExecutor.ExecuteAsync(message);
if (ret.Succeeded)
{
_stateChanger.ChangeState(message, new SucceededState(), transaction);
}
else
{
message.Retries++;
message.Content = Helper.AddExceptionProperty(message.Content, ret.Exception);
transaction.UpdateMessage(message);
}
await transaction.CommitAsync();
}


Loading…
Откажи
Сачувај