Savorboard 7 роки тому
джерело
коміт
2825cceaa6
19 змінених файлів з 123 додано та 55 видалено
  1. +2
    -1
      src/DotNetCore.CAP/Processor/IAdditionalProcessor.cs
  2. +2
    -7
      src/DotNetCore.CAP/Processor/IJobProcessor.InfiniteRetry.cs
  3. +1
    -1
      src/DotNetCore.CAP/Processor/IJobProcessor.MessageJob.Default.cs
  4. +10
    -7
      src/DotNetCore.CAP/Processor/IJobProcessor.PublishQueuer.cs
  5. +68
    -0
      src/DotNetCore.CAP/Processor/IJobProcessor.SubscribeQueuer.cs
  6. +1
    -1
      src/DotNetCore.CAP/Processor/IJobProcessor.cs
  7. +1
    -1
      src/DotNetCore.CAP/Processor/IMessageJobProcessor.cs
  8. +27
    -26
      src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs
  9. +1
    -1
      src/DotNetCore.CAP/Processor/ProcessingContext.cs
  10. +1
    -1
      src/DotNetCore.CAP/Processor/RetryBehavior.cs
  11. +1
    -1
      src/DotNetCore.CAP/Processor/States/IState.Enqueued.cs
  12. +1
    -1
      src/DotNetCore.CAP/Processor/States/IState.Failed.cs
  13. +1
    -1
      src/DotNetCore.CAP/Processor/States/IState.Processing.cs
  14. +1
    -1
      src/DotNetCore.CAP/Processor/States/IState.Scheduled.cs
  15. +1
    -1
      src/DotNetCore.CAP/Processor/States/IState.Succeeded.cs
  16. +1
    -1
      src/DotNetCore.CAP/Processor/States/IState.cs
  17. +1
    -1
      src/DotNetCore.CAP/Processor/States/IStateChanger.Default.cs
  18. +1
    -1
      src/DotNetCore.CAP/Processor/States/IStateChanger.Extensions.cs
  19. +1
    -1
      src/DotNetCore.CAP/Processor/States/IStateChanger.cs

src/DotNetCore.CAP/Job/IAdditionalProcessor.cs → src/DotNetCore.CAP/Processor/IAdditionalProcessor.cs Переглянути файл

@@ -2,9 +2,10 @@
using System.Collections.Generic;
using System.Text;

namespace DotNetCore.CAP.Job
namespace DotNetCore.CAP.Processor
{
public interface IAdditionalProcessor : IJobProcessor
{

}
}

src/DotNetCore.CAP/Job/IJobProcessor.InfiniteRetry.cs → src/DotNetCore.CAP/Processor/IJobProcessor.InfiniteRetry.cs Переглянути файл

@@ -1,9 +1,8 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP.Job
namespace DotNetCore.CAP.Processor
{
public class InfiniteRetryProcessor : IJobProcessor
{
@@ -24,7 +23,6 @@ namespace DotNetCore.CAP.Job
{
while (!context.IsStopping)
{
Debug.WriteLine("InfiniteRetryProcessor在开线程:" + _inner.ToString() + " : " + DateTime.Now);
try
{
await _inner.ProcessAsync(context);
@@ -35,10 +33,7 @@ namespace DotNetCore.CAP.Job
}
catch (Exception ex)
{
_logger.LogWarning(
1,
ex,
"Prcessor '{ProcessorName}' failed. Retrying...", _inner.ToString());
_logger.LogWarning(1, ex, "Prcessor '{ProcessorName}' failed. Retrying...", _inner.ToString());
}
}
}

src/DotNetCore.CAP/Job/IJobProcessor.MessageJob.Default.cs → src/DotNetCore.CAP/Processor/IJobProcessor.MessageJob.Default.cs Переглянути файл

@@ -6,7 +6,7 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Job
namespace DotNetCore.CAP.Processor
{
public class DefaultMessageJobProcessor : IMessageJobProcessor
{

src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs → src/DotNetCore.CAP/Processor/IJobProcessor.PublishQueuer.cs Переглянути файл

@@ -2,15 +2,15 @@
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Job.States;
using DotNetCore.CAP.Processor.States;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Job
namespace DotNetCore.CAP.Processor
{
public class JobQueuer : IJobProcessor
public class PublishQueuer : IJobProcessor
{
private ILogger _logger;
private CapOptions _options;
@@ -18,8 +18,10 @@ namespace DotNetCore.CAP.Job
private IServiceProvider _provider;
private TimeSpan _pollingDelay;

public JobQueuer(
ILogger<JobQueuer> logger,
internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);

public PublishQueuer(
ILogger<PublishQueuer> logger,
IOptions<CapOptions> options,
IStateChanger stateChanger,
IServiceProvider provider)
@@ -57,8 +59,9 @@ namespace DotNetCore.CAP.Job

context.ThrowIfStopping();
WaitHandleEx.SentPulseEvent.Set();
await WaitHandleEx.WaitAnyAsync(WaitHandleEx.QueuePulseEvent,
DefaultMessageJobProcessor.PulseEvent.Set();

await WaitHandleEx.WaitAnyAsync(PulseEvent,
context.CancellationToken.WaitHandle, _pollingDelay);
}
}

+ 68
- 0
src/DotNetCore.CAP/Processor/IJobProcessor.SubscribeQueuer.cs Переглянути файл

@@ -0,0 +1,68 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Processor.States;
using DotNetCore.CAP.Models;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Processor
{
public class SubscribeQueuer : IJobProcessor
{
private ILogger _logger;
private CapOptions _options;
private IStateChanger _stateChanger;
private IServiceProvider _provider;
private TimeSpan _pollingDelay;

internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true);

public SubscribeQueuer(
ILogger<SubscribeQueuer> logger,
IOptions<CapOptions> options,
IStateChanger stateChanger,
IServiceProvider provider)
{
_logger = logger;
_options = options.Value;
_stateChanger = stateChanger;
_provider = provider;

_pollingDelay = TimeSpan.FromSeconds(_options.PollingDelay);
}

public async Task ProcessAsync(ProcessingContext context)
{
using (var scope = _provider.CreateScope())
{
CapReceivedMessage message;
var provider = scope.ServiceProvider;
var connection = provider.GetRequiredService<IStorageConnection>();

while (
!context.IsStopping &&
(message = await connection.GetNextReceviedMessageToBeEnqueuedAsync()) != null)

{
var state = new EnqueuedState();

using (var transaction = connection.CreateTransaction())
{
_stateChanger.ChangeState(message, state, transaction);
await transaction.CommitAsync();
}
}
}

context.ThrowIfStopping();

DefaultMessageJobProcessor.PulseEvent.Set();

await WaitHandleEx.WaitAnyAsync(PulseEvent,
context.CancellationToken.WaitHandle, _pollingDelay);
}
}
}

src/DotNetCore.CAP/Job/IJobProcessor.cs → src/DotNetCore.CAP/Processor/IJobProcessor.cs Переглянути файл

@@ -1,6 +1,6 @@
using System.Threading.Tasks;

namespace DotNetCore.CAP.Job
namespace DotNetCore.CAP.Processor
{
public interface IJobProcessor
{

src/DotNetCore.CAP/Job/IMessageJobProcessor.cs → src/DotNetCore.CAP/Processor/IMessageJobProcessor.cs Переглянути файл

@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Text;

namespace DotNetCore.CAP.Job
namespace DotNetCore.CAP.Processor
{
public interface IMessageJobProcessor : IJobProcessor
{

src/DotNetCore.CAP/Job/IProcessingServer.Job.cs → src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs Переглянути файл

@@ -3,14 +3,13 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DotNetCore.CAP.Infrastructure;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace DotNetCore.CAP.Job
namespace DotNetCore.CAP.Processor
{
public class JobProcessingServer : IProcessingServer, IDisposable
public class CapProcessingServer : IProcessingServer, IDisposable
{
private readonly ILogger _logger;
private readonly ILoggerFactory _loggerFactory;
@@ -19,13 +18,13 @@ namespace DotNetCore.CAP.Job
private readonly CapOptions _options;

private IJobProcessor[] _processors;
private IMessageJobProcessor[] _messageProcessors;
private IList<IMessageJobProcessor> _messageProcessors;
private ProcessingContext _context;
private Task _compositeTask;
private bool _disposed;

public JobProcessingServer(
ILogger<JobProcessingServer> logger,
public CapProcessingServer(
ILogger<CapProcessingServer> logger,
ILoggerFactory loggerFactory,
IServiceProvider provider,
IOptions<CapOptions> options)
@@ -40,9 +39,8 @@ namespace DotNetCore.CAP.Job
public void Start()
{
var processorCount = Environment.ProcessorCount;
processorCount = 1;
_processors = GetProcessors(processorCount);
_logger.ServerStarting(processorCount, processorCount);
_logger.ServerStarting(processorCount, _processors.Length);

_context = new ProcessingContext(_provider, _cts.Token);

@@ -62,19 +60,8 @@ namespace DotNetCore.CAP.Job

_logger.LogTrace("Pulsing the JobQueuer.");

WaitHandleEx.QueuePulseEvent.Set();
}

private bool AllProcessorsWaiting()
{
foreach (var processor in _messageProcessors)
{
if (!processor.Waiting)
{
return false;
}
}
return true;
PublishQueuer.PulseEvent.Set();
SubscribeQueuer.PulseEvent.Set();
}

public void Dispose()
@@ -101,6 +88,18 @@ namespace DotNetCore.CAP.Job
}
}

private bool AllProcessorsWaiting()
{
foreach (var processor in _messageProcessors)
{
if (!processor.Waiting)
{
return false;
}
}
return true;
}

private IJobProcessor InfiniteRetry(IJobProcessor inner)
{
return new InfiniteRetryProcessor(inner, _loggerFactory);
@@ -111,13 +110,15 @@ namespace DotNetCore.CAP.Job
var returnedProcessors = new List<IJobProcessor>();
for (int i = 0; i < processorCount; i++)
{
var messageProcessors = _provider.GetServices<IMessageJobProcessor>();
_messageProcessors = messageProcessors.ToArray();
returnedProcessors.AddRange(messageProcessors);
var messageProcessors = _provider.GetService<IMessageJobProcessor>();
_messageProcessors.Add(messageProcessors);
}
returnedProcessors.AddRange(_messageProcessors);

returnedProcessors.Add(_provider.GetService<PublishQueuer>());
returnedProcessors.Add(_provider.GetService<SubscribeQueuer>());

returnedProcessors.Add(_provider.GetService<JobQueuer>());
//returnedProcessors.Add(_provider.GetService<IAdditionalProcessor>());
returnedProcessors.Add(_provider.GetService<IAdditionalProcessor>());

return returnedProcessors.ToArray();
}

src/DotNetCore.CAP/Job/ProcessingContext.cs → src/DotNetCore.CAP/Processor/ProcessingContext.cs Переглянути файл

@@ -3,7 +3,7 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;

namespace DotNetCore.CAP.Job
namespace DotNetCore.CAP.Processor
{
public class ProcessingContext : IDisposable
{

src/DotNetCore.CAP/Job/RetryBehavior.cs → src/DotNetCore.CAP/Processor/RetryBehavior.cs Переглянути файл

@@ -1,6 +1,6 @@
using System;

namespace DotNetCore.CAP.Job
namespace DotNetCore.CAP.Processor
{
public class RetryBehavior
{

src/DotNetCore.CAP/Job/States/IState.Enqueued.cs → src/DotNetCore.CAP/Processor/States/IState.Enqueued.cs Переглянути файл

@@ -1,7 +1,7 @@
using System;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.Job.States
namespace DotNetCore.CAP.Processor.States
{
public class EnqueuedState : IState
{

src/DotNetCore.CAP/Job/States/IState.Failed.cs → src/DotNetCore.CAP/Processor/States/IState.Failed.cs Переглянути файл

@@ -1,7 +1,7 @@
using System;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.Job.States
namespace DotNetCore.CAP.Processor.States
{
public class FailedState : IState
{

src/DotNetCore.CAP/Job/States/IState.Processing.cs → src/DotNetCore.CAP/Processor/States/IState.Processing.cs Переглянути файл

@@ -1,7 +1,7 @@
using System;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.Job.States
namespace DotNetCore.CAP.Processor.States
{
public class ProcessingState : IState
{

src/DotNetCore.CAP/Job/States/IState.Scheduled.cs → src/DotNetCore.CAP/Processor/States/IState.Scheduled.cs Переглянути файл

@@ -1,7 +1,7 @@
using System;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.Job.States
namespace DotNetCore.CAP.Processor.States
{
public class ScheduledState : IState
{

src/DotNetCore.CAP/Job/States/IState.Succeeded.cs → src/DotNetCore.CAP/Processor/States/IState.Succeeded.cs Переглянути файл

@@ -1,7 +1,7 @@
using System;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.Job.States
namespace DotNetCore.CAP.Processor.States
{
public class SucceededState : IState
{

src/DotNetCore.CAP/Job/States/IState.cs → src/DotNetCore.CAP/Processor/States/IState.cs Переглянути файл

@@ -1,7 +1,7 @@
using System;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.Job.States
namespace DotNetCore.CAP.Processor.States
{
public interface IState
{

src/DotNetCore.CAP/Job/States/IStateChanger.Default.cs → src/DotNetCore.CAP/Processor/States/IStateChanger.Default.cs Переглянути файл

@@ -1,6 +1,6 @@
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.Job.States
namespace DotNetCore.CAP.Processor.States
{
public class StateChanger : IStateChanger
{

src/DotNetCore.CAP/Job/States/IStateChanger.Extensions.cs → src/DotNetCore.CAP/Processor/States/IStateChanger.Extensions.cs Переглянути файл

@@ -1,7 +1,7 @@
using System.Threading.Tasks;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.Job.States
namespace DotNetCore.CAP.Processor.States
{
public static class StateChangerExtensions
{

src/DotNetCore.CAP/Job/States/IStateChanger.cs → src/DotNetCore.CAP/Processor/States/IStateChanger.cs Переглянути файл

@@ -1,6 +1,6 @@
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.Job.States
namespace DotNetCore.CAP.Processor.States
{
public interface IStateChanger
{

Завантаження…
Відмінити
Зберегти