From 2825cceaa67f45ad8f529f2b72441b34cbe6b799 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Fri, 14 Jul 2017 00:26:22 +0800 Subject: [PATCH] rename namespace. --- .../IAdditionalProcessor.cs | 3 +- .../IJobProcessor.InfiniteRetry.cs | 9 +-- .../IJobProcessor.MessageJob.Default.cs | 2 +- .../IJobProcessor.PublishQueuer.cs} | 17 +++-- .../IJobProcessor.SubscribeQueuer.cs | 68 +++++++++++++++++++ .../{Job => Processor}/IJobProcessor.cs | 2 +- .../IMessageJobProcessor.cs | 2 +- .../IProcessingServer.Cap.cs} | 53 ++++++++------- .../{Job => Processor}/ProcessingContext.cs | 2 +- .../{Job => Processor}/RetryBehavior.cs | 2 +- .../States/IState.Enqueued.cs | 2 +- .../States/IState.Failed.cs | 2 +- .../States/IState.Processing.cs | 2 +- .../States/IState.Scheduled.cs | 2 +- .../States/IState.Succeeded.cs | 2 +- .../{Job => Processor}/States/IState.cs | 2 +- .../States/IStateChanger.Default.cs | 2 +- .../States/IStateChanger.Extensions.cs | 2 +- .../States/IStateChanger.cs | 2 +- 19 files changed, 123 insertions(+), 55 deletions(-) rename src/DotNetCore.CAP/{Job => Processor}/IAdditionalProcessor.cs (80%) rename src/DotNetCore.CAP/{Job => Processor}/IJobProcessor.InfiniteRetry.cs (73%) rename src/DotNetCore.CAP/{Job => Processor}/IJobProcessor.MessageJob.Default.cs (98%) rename src/DotNetCore.CAP/{Job/IJobProcessor.JobQueuer.cs => Processor/IJobProcessor.PublishQueuer.cs} (79%) create mode 100644 src/DotNetCore.CAP/Processor/IJobProcessor.SubscribeQueuer.cs rename src/DotNetCore.CAP/{Job => Processor}/IJobProcessor.cs (79%) rename src/DotNetCore.CAP/{Job => Processor}/IMessageJobProcessor.cs (83%) rename src/DotNetCore.CAP/{Job/IProcessingServer.Job.cs => Processor/IProcessingServer.Cap.cs} (78%) rename src/DotNetCore.CAP/{Job => Processor}/ProcessingContext.cs (97%) rename src/DotNetCore.CAP/{Job => Processor}/RetryBehavior.cs (98%) rename src/DotNetCore.CAP/{Job => Processor}/States/IState.Enqueued.cs (92%) rename src/DotNetCore.CAP/{Job => Processor}/States/IState.Failed.cs (91%) rename src/DotNetCore.CAP/{Job => Processor}/States/IState.Processing.cs (91%) rename src/DotNetCore.CAP/{Job => Processor}/States/IState.Scheduled.cs (91%) rename src/DotNetCore.CAP/{Job => Processor}/States/IState.Succeeded.cs (91%) rename src/DotNetCore.CAP/{Job => Processor}/States/IState.cs (88%) rename src/DotNetCore.CAP/{Job => Processor}/States/IStateChanger.Default.cs (96%) rename src/DotNetCore.CAP/{Job => Processor}/States/IStateChanger.Extensions.cs (95%) rename src/DotNetCore.CAP/{Job => Processor}/States/IStateChanger.cs (86%) diff --git a/src/DotNetCore.CAP/Job/IAdditionalProcessor.cs b/src/DotNetCore.CAP/Processor/IAdditionalProcessor.cs similarity index 80% rename from src/DotNetCore.CAP/Job/IAdditionalProcessor.cs rename to src/DotNetCore.CAP/Processor/IAdditionalProcessor.cs index c1e79f8..80073a5 100644 --- a/src/DotNetCore.CAP/Job/IAdditionalProcessor.cs +++ b/src/DotNetCore.CAP/Processor/IAdditionalProcessor.cs @@ -2,9 +2,10 @@ using System.Collections.Generic; using System.Text; -namespace DotNetCore.CAP.Job +namespace DotNetCore.CAP.Processor { public interface IAdditionalProcessor : IJobProcessor { + } } diff --git a/src/DotNetCore.CAP/Job/IJobProcessor.InfiniteRetry.cs b/src/DotNetCore.CAP/Processor/IJobProcessor.InfiniteRetry.cs similarity index 73% rename from src/DotNetCore.CAP/Job/IJobProcessor.InfiniteRetry.cs rename to src/DotNetCore.CAP/Processor/IJobProcessor.InfiniteRetry.cs index 619fa0a..8787771 100644 --- a/src/DotNetCore.CAP/Job/IJobProcessor.InfiniteRetry.cs +++ b/src/DotNetCore.CAP/Processor/IJobProcessor.InfiniteRetry.cs @@ -1,9 +1,8 @@ using System; -using System.Diagnostics; using System.Threading.Tasks; using Microsoft.Extensions.Logging; -namespace DotNetCore.CAP.Job +namespace DotNetCore.CAP.Processor { public class InfiniteRetryProcessor : IJobProcessor { @@ -24,7 +23,6 @@ namespace DotNetCore.CAP.Job { while (!context.IsStopping) { - Debug.WriteLine("InfiniteRetryProcessor在开线程:" + _inner.ToString() + " : " + DateTime.Now); try { await _inner.ProcessAsync(context); @@ -35,10 +33,7 @@ namespace DotNetCore.CAP.Job } catch (Exception ex) { - _logger.LogWarning( - 1, - ex, - "Prcessor '{ProcessorName}' failed. Retrying...", _inner.ToString()); + _logger.LogWarning(1, ex, "Prcessor '{ProcessorName}' failed. Retrying...", _inner.ToString()); } } } diff --git a/src/DotNetCore.CAP/Job/IJobProcessor.MessageJob.Default.cs b/src/DotNetCore.CAP/Processor/IJobProcessor.MessageJob.Default.cs similarity index 98% rename from src/DotNetCore.CAP/Job/IJobProcessor.MessageJob.Default.cs rename to src/DotNetCore.CAP/Processor/IJobProcessor.MessageJob.Default.cs index e880c36..a2b89ec 100644 --- a/src/DotNetCore.CAP/Job/IJobProcessor.MessageJob.Default.cs +++ b/src/DotNetCore.CAP/Processor/IJobProcessor.MessageJob.Default.cs @@ -6,7 +6,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -namespace DotNetCore.CAP.Job +namespace DotNetCore.CAP.Processor { public class DefaultMessageJobProcessor : IMessageJobProcessor { diff --git a/src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs b/src/DotNetCore.CAP/Processor/IJobProcessor.PublishQueuer.cs similarity index 79% rename from src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs rename to src/DotNetCore.CAP/Processor/IJobProcessor.PublishQueuer.cs index 8973c17..0425431 100644 --- a/src/DotNetCore.CAP/Job/IJobProcessor.JobQueuer.cs +++ b/src/DotNetCore.CAP/Processor/IJobProcessor.PublishQueuer.cs @@ -2,15 +2,15 @@ using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Infrastructure; -using DotNetCore.CAP.Job.States; +using DotNetCore.CAP.Processor.States; using DotNetCore.CAP.Models; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -namespace DotNetCore.CAP.Job +namespace DotNetCore.CAP.Processor { - public class JobQueuer : IJobProcessor + public class PublishQueuer : IJobProcessor { private ILogger _logger; private CapOptions _options; @@ -18,8 +18,10 @@ namespace DotNetCore.CAP.Job private IServiceProvider _provider; private TimeSpan _pollingDelay; - public JobQueuer( - ILogger logger, + internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); + + public PublishQueuer( + ILogger logger, IOptions options, IStateChanger stateChanger, IServiceProvider provider) @@ -57,8 +59,9 @@ namespace DotNetCore.CAP.Job context.ThrowIfStopping(); - WaitHandleEx.SentPulseEvent.Set(); - await WaitHandleEx.WaitAnyAsync(WaitHandleEx.QueuePulseEvent, + DefaultMessageJobProcessor.PulseEvent.Set(); + + await WaitHandleEx.WaitAnyAsync(PulseEvent, context.CancellationToken.WaitHandle, _pollingDelay); } } diff --git a/src/DotNetCore.CAP/Processor/IJobProcessor.SubscribeQueuer.cs b/src/DotNetCore.CAP/Processor/IJobProcessor.SubscribeQueuer.cs new file mode 100644 index 0000000..7e63d1a --- /dev/null +++ b/src/DotNetCore.CAP/Processor/IJobProcessor.SubscribeQueuer.cs @@ -0,0 +1,68 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Processor.States; +using DotNetCore.CAP.Models; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.Processor +{ + public class SubscribeQueuer : IJobProcessor + { + private ILogger _logger; + private CapOptions _options; + private IStateChanger _stateChanger; + private IServiceProvider _provider; + private TimeSpan _pollingDelay; + + internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); + + public SubscribeQueuer( + ILogger logger, + IOptions options, + IStateChanger stateChanger, + IServiceProvider provider) + { + _logger = logger; + _options = options.Value; + _stateChanger = stateChanger; + _provider = provider; + + _pollingDelay = TimeSpan.FromSeconds(_options.PollingDelay); + } + + public async Task ProcessAsync(ProcessingContext context) + { + using (var scope = _provider.CreateScope()) + { + CapReceivedMessage message; + var provider = scope.ServiceProvider; + var connection = provider.GetRequiredService(); + + while ( + !context.IsStopping && + (message = await connection.GetNextReceviedMessageToBeEnqueuedAsync()) != null) + + { + var state = new EnqueuedState(); + + using (var transaction = connection.CreateTransaction()) + { + _stateChanger.ChangeState(message, state, transaction); + await transaction.CommitAsync(); + } + } + } + + context.ThrowIfStopping(); + + DefaultMessageJobProcessor.PulseEvent.Set(); + + await WaitHandleEx.WaitAnyAsync(PulseEvent, + context.CancellationToken.WaitHandle, _pollingDelay); + } + } +} diff --git a/src/DotNetCore.CAP/Job/IJobProcessor.cs b/src/DotNetCore.CAP/Processor/IJobProcessor.cs similarity index 79% rename from src/DotNetCore.CAP/Job/IJobProcessor.cs rename to src/DotNetCore.CAP/Processor/IJobProcessor.cs index d7ffc76..4b9c15c 100644 --- a/src/DotNetCore.CAP/Job/IJobProcessor.cs +++ b/src/DotNetCore.CAP/Processor/IJobProcessor.cs @@ -1,6 +1,6 @@ using System.Threading.Tasks; -namespace DotNetCore.CAP.Job +namespace DotNetCore.CAP.Processor { public interface IJobProcessor { diff --git a/src/DotNetCore.CAP/Job/IMessageJobProcessor.cs b/src/DotNetCore.CAP/Processor/IMessageJobProcessor.cs similarity index 83% rename from src/DotNetCore.CAP/Job/IMessageJobProcessor.cs rename to src/DotNetCore.CAP/Processor/IMessageJobProcessor.cs index a280c0a..2f60494 100644 --- a/src/DotNetCore.CAP/Job/IMessageJobProcessor.cs +++ b/src/DotNetCore.CAP/Processor/IMessageJobProcessor.cs @@ -2,7 +2,7 @@ using System.Collections.Generic; using System.Text; -namespace DotNetCore.CAP.Job +namespace DotNetCore.CAP.Processor { public interface IMessageJobProcessor : IJobProcessor { diff --git a/src/DotNetCore.CAP/Job/IProcessingServer.Job.cs b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs similarity index 78% rename from src/DotNetCore.CAP/Job/IProcessingServer.Job.cs rename to src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs index 4724f53..87d12e7 100644 --- a/src/DotNetCore.CAP/Job/IProcessingServer.Job.cs +++ b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs @@ -3,14 +3,13 @@ using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; -using DotNetCore.CAP.Infrastructure; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -namespace DotNetCore.CAP.Job +namespace DotNetCore.CAP.Processor { - public class JobProcessingServer : IProcessingServer, IDisposable + public class CapProcessingServer : IProcessingServer, IDisposable { private readonly ILogger _logger; private readonly ILoggerFactory _loggerFactory; @@ -19,13 +18,13 @@ namespace DotNetCore.CAP.Job private readonly CapOptions _options; private IJobProcessor[] _processors; - private IMessageJobProcessor[] _messageProcessors; + private IList _messageProcessors; private ProcessingContext _context; private Task _compositeTask; private bool _disposed; - public JobProcessingServer( - ILogger logger, + public CapProcessingServer( + ILogger logger, ILoggerFactory loggerFactory, IServiceProvider provider, IOptions options) @@ -40,9 +39,8 @@ namespace DotNetCore.CAP.Job public void Start() { var processorCount = Environment.ProcessorCount; - processorCount = 1; _processors = GetProcessors(processorCount); - _logger.ServerStarting(processorCount, processorCount); + _logger.ServerStarting(processorCount, _processors.Length); _context = new ProcessingContext(_provider, _cts.Token); @@ -62,19 +60,8 @@ namespace DotNetCore.CAP.Job _logger.LogTrace("Pulsing the JobQueuer."); - WaitHandleEx.QueuePulseEvent.Set(); - } - - private bool AllProcessorsWaiting() - { - foreach (var processor in _messageProcessors) - { - if (!processor.Waiting) - { - return false; - } - } - return true; + PublishQueuer.PulseEvent.Set(); + SubscribeQueuer.PulseEvent.Set(); } public void Dispose() @@ -101,6 +88,18 @@ namespace DotNetCore.CAP.Job } } + private bool AllProcessorsWaiting() + { + foreach (var processor in _messageProcessors) + { + if (!processor.Waiting) + { + return false; + } + } + return true; + } + private IJobProcessor InfiniteRetry(IJobProcessor inner) { return new InfiniteRetryProcessor(inner, _loggerFactory); @@ -111,13 +110,15 @@ namespace DotNetCore.CAP.Job var returnedProcessors = new List(); for (int i = 0; i < processorCount; i++) { - var messageProcessors = _provider.GetServices(); - _messageProcessors = messageProcessors.ToArray(); - returnedProcessors.AddRange(messageProcessors); + var messageProcessors = _provider.GetService(); + _messageProcessors.Add(messageProcessors); } + returnedProcessors.AddRange(_messageProcessors); + + returnedProcessors.Add(_provider.GetService()); + returnedProcessors.Add(_provider.GetService()); - returnedProcessors.Add(_provider.GetService()); - //returnedProcessors.Add(_provider.GetService()); + returnedProcessors.Add(_provider.GetService()); return returnedProcessors.ToArray(); } diff --git a/src/DotNetCore.CAP/Job/ProcessingContext.cs b/src/DotNetCore.CAP/Processor/ProcessingContext.cs similarity index 97% rename from src/DotNetCore.CAP/Job/ProcessingContext.cs rename to src/DotNetCore.CAP/Processor/ProcessingContext.cs index 764ae80..f1fb203 100644 --- a/src/DotNetCore.CAP/Job/ProcessingContext.cs +++ b/src/DotNetCore.CAP/Processor/ProcessingContext.cs @@ -3,7 +3,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; -namespace DotNetCore.CAP.Job +namespace DotNetCore.CAP.Processor { public class ProcessingContext : IDisposable { diff --git a/src/DotNetCore.CAP/Job/RetryBehavior.cs b/src/DotNetCore.CAP/Processor/RetryBehavior.cs similarity index 98% rename from src/DotNetCore.CAP/Job/RetryBehavior.cs rename to src/DotNetCore.CAP/Processor/RetryBehavior.cs index 9d48d59..04cb667 100644 --- a/src/DotNetCore.CAP/Job/RetryBehavior.cs +++ b/src/DotNetCore.CAP/Processor/RetryBehavior.cs @@ -1,6 +1,6 @@ using System; -namespace DotNetCore.CAP.Job +namespace DotNetCore.CAP.Processor { public class RetryBehavior { diff --git a/src/DotNetCore.CAP/Job/States/IState.Enqueued.cs b/src/DotNetCore.CAP/Processor/States/IState.Enqueued.cs similarity index 92% rename from src/DotNetCore.CAP/Job/States/IState.Enqueued.cs rename to src/DotNetCore.CAP/Processor/States/IState.Enqueued.cs index 6588177..41d83ae 100644 --- a/src/DotNetCore.CAP/Job/States/IState.Enqueued.cs +++ b/src/DotNetCore.CAP/Processor/States/IState.Enqueued.cs @@ -1,7 +1,7 @@ using System; using DotNetCore.CAP.Models; -namespace DotNetCore.CAP.Job.States +namespace DotNetCore.CAP.Processor.States { public class EnqueuedState : IState { diff --git a/src/DotNetCore.CAP/Job/States/IState.Failed.cs b/src/DotNetCore.CAP/Processor/States/IState.Failed.cs similarity index 91% rename from src/DotNetCore.CAP/Job/States/IState.Failed.cs rename to src/DotNetCore.CAP/Processor/States/IState.Failed.cs index c779856..9bbfbd6 100644 --- a/src/DotNetCore.CAP/Job/States/IState.Failed.cs +++ b/src/DotNetCore.CAP/Processor/States/IState.Failed.cs @@ -1,7 +1,7 @@ using System; using DotNetCore.CAP.Models; -namespace DotNetCore.CAP.Job.States +namespace DotNetCore.CAP.Processor.States { public class FailedState : IState { diff --git a/src/DotNetCore.CAP/Job/States/IState.Processing.cs b/src/DotNetCore.CAP/Processor/States/IState.Processing.cs similarity index 91% rename from src/DotNetCore.CAP/Job/States/IState.Processing.cs rename to src/DotNetCore.CAP/Processor/States/IState.Processing.cs index a5564a8..db66b86 100644 --- a/src/DotNetCore.CAP/Job/States/IState.Processing.cs +++ b/src/DotNetCore.CAP/Processor/States/IState.Processing.cs @@ -1,7 +1,7 @@ using System; using DotNetCore.CAP.Models; -namespace DotNetCore.CAP.Job.States +namespace DotNetCore.CAP.Processor.States { public class ProcessingState : IState { diff --git a/src/DotNetCore.CAP/Job/States/IState.Scheduled.cs b/src/DotNetCore.CAP/Processor/States/IState.Scheduled.cs similarity index 91% rename from src/DotNetCore.CAP/Job/States/IState.Scheduled.cs rename to src/DotNetCore.CAP/Processor/States/IState.Scheduled.cs index 6a38697..6113f3b 100644 --- a/src/DotNetCore.CAP/Job/States/IState.Scheduled.cs +++ b/src/DotNetCore.CAP/Processor/States/IState.Scheduled.cs @@ -1,7 +1,7 @@ using System; using DotNetCore.CAP.Models; -namespace DotNetCore.CAP.Job.States +namespace DotNetCore.CAP.Processor.States { public class ScheduledState : IState { diff --git a/src/DotNetCore.CAP/Job/States/IState.Succeeded.cs b/src/DotNetCore.CAP/Processor/States/IState.Succeeded.cs similarity index 91% rename from src/DotNetCore.CAP/Job/States/IState.Succeeded.cs rename to src/DotNetCore.CAP/Processor/States/IState.Succeeded.cs index 384de56..b3b4f4d 100644 --- a/src/DotNetCore.CAP/Job/States/IState.Succeeded.cs +++ b/src/DotNetCore.CAP/Processor/States/IState.Succeeded.cs @@ -1,7 +1,7 @@ using System; using DotNetCore.CAP.Models; -namespace DotNetCore.CAP.Job.States +namespace DotNetCore.CAP.Processor.States { public class SucceededState : IState { diff --git a/src/DotNetCore.CAP/Job/States/IState.cs b/src/DotNetCore.CAP/Processor/States/IState.cs similarity index 88% rename from src/DotNetCore.CAP/Job/States/IState.cs rename to src/DotNetCore.CAP/Processor/States/IState.cs index 2219dcb..0416fe9 100644 --- a/src/DotNetCore.CAP/Job/States/IState.cs +++ b/src/DotNetCore.CAP/Processor/States/IState.cs @@ -1,7 +1,7 @@ using System; using DotNetCore.CAP.Models; -namespace DotNetCore.CAP.Job.States +namespace DotNetCore.CAP.Processor.States { public interface IState { diff --git a/src/DotNetCore.CAP/Job/States/IStateChanger.Default.cs b/src/DotNetCore.CAP/Processor/States/IStateChanger.Default.cs similarity index 96% rename from src/DotNetCore.CAP/Job/States/IStateChanger.Default.cs rename to src/DotNetCore.CAP/Processor/States/IStateChanger.Default.cs index b3f2d2a..1e4c99b 100644 --- a/src/DotNetCore.CAP/Job/States/IStateChanger.Default.cs +++ b/src/DotNetCore.CAP/Processor/States/IStateChanger.Default.cs @@ -1,6 +1,6 @@ using DotNetCore.CAP.Models; -namespace DotNetCore.CAP.Job.States +namespace DotNetCore.CAP.Processor.States { public class StateChanger : IStateChanger { diff --git a/src/DotNetCore.CAP/Job/States/IStateChanger.Extensions.cs b/src/DotNetCore.CAP/Processor/States/IStateChanger.Extensions.cs similarity index 95% rename from src/DotNetCore.CAP/Job/States/IStateChanger.Extensions.cs rename to src/DotNetCore.CAP/Processor/States/IStateChanger.Extensions.cs index 27ec4e9..5ccc018 100644 --- a/src/DotNetCore.CAP/Job/States/IStateChanger.Extensions.cs +++ b/src/DotNetCore.CAP/Processor/States/IStateChanger.Extensions.cs @@ -1,7 +1,7 @@ using System.Threading.Tasks; using DotNetCore.CAP.Models; -namespace DotNetCore.CAP.Job.States +namespace DotNetCore.CAP.Processor.States { public static class StateChangerExtensions { diff --git a/src/DotNetCore.CAP/Job/States/IStateChanger.cs b/src/DotNetCore.CAP/Processor/States/IStateChanger.cs similarity index 86% rename from src/DotNetCore.CAP/Job/States/IStateChanger.cs rename to src/DotNetCore.CAP/Processor/States/IStateChanger.cs index 662e4e1..1cba968 100644 --- a/src/DotNetCore.CAP/Job/States/IStateChanger.cs +++ b/src/DotNetCore.CAP/Processor/States/IStateChanger.cs @@ -1,6 +1,6 @@ using DotNetCore.CAP.Models; -namespace DotNetCore.CAP.Job.States +namespace DotNetCore.CAP.Processor.States { public interface IStateChanger {