From 768a62c3a489a65988083ccac77cb02827ccf6d5 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Sat, 15 Jul 2017 01:40:32 +0800 Subject: [PATCH] refactor --- samples/Sample.Kafka/AppDbContext.cs | 3 +- .../Controllers/ValuesController.cs | 13 +++-- samples/Sample.Kafka/Sample.Kafka.csproj | 1 - samples/Sample.Kafka/Startup.cs | 9 +-- .../CAP.BuilderExtensions.cs | 4 +- .../CAP.EFOptions.cs | 2 +- .../CapDbContext.cs | 5 +- .../CapPublisher.cs | 55 +++++++++++++++++++ .../EFStorageConnection.cs | 2 +- .../CAP.ServiceCollectionExtensions.cs | 4 +- src/DotNetCore.CAP/DotNetCore.CAP.csproj | 2 +- ...sage.Default.cs => IDispatcher.Default.cs} | 6 +- .../{IMessageProcessor.cs => IDispatcher.cs} | 2 +- .../Processor/IProcessingServer.Cap.cs | 11 ++-- .../Processor/IProcessor.PublishQueuer.cs | 2 +- .../Processor/IProcessor.SubscribeQueuer.cs | 2 +- 16 files changed, 93 insertions(+), 30 deletions(-) create mode 100644 src/DotNetCore.CAP.EntityFrameworkCore/CapPublisher.cs rename src/DotNetCore.CAP/Processor/{IProcessor.Message.Default.cs => IDispatcher.Default.cs} (94%) rename src/DotNetCore.CAP/Processor/{IMessageProcessor.cs => IDispatcher.cs} (74%) diff --git a/samples/Sample.Kafka/AppDbContext.cs b/samples/Sample.Kafka/AppDbContext.cs index 8140218..fe320e1 100644 --- a/samples/Sample.Kafka/AppDbContext.cs +++ b/samples/Sample.Kafka/AppDbContext.cs @@ -13,7 +13,8 @@ namespace Sample.Kafka { protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { - optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True"); + //optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True"); + optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"); } } } diff --git a/samples/Sample.Kafka/Controllers/ValuesController.cs b/samples/Sample.Kafka/Controllers/ValuesController.cs index ff42daf..6ed8a5c 100644 --- a/samples/Sample.Kafka/Controllers/ValuesController.cs +++ b/samples/Sample.Kafka/Controllers/ValuesController.cs @@ -1,9 +1,10 @@ using System; using System.Threading.Tasks; using DotNetCore.CAP; -using DotNetCore.CAP.Kafka; +using DotNetCore.CAP.RabbitMQ; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Mvc; +using Dapper; namespace Sample.Kafka.Controllers { @@ -11,10 +12,12 @@ namespace Sample.Kafka.Controllers public class ValuesController : Controller, ICapSubscribe { private readonly ICapPublisher _producer; + private readonly AppDbContext _dbContext ; - public ValuesController(ICapPublisher producer) + public ValuesController(ICapPublisher producer, AppDbContext dbContext) { _producer = producer; + _dbContext = dbContext; } [Route("/")] @@ -33,11 +36,11 @@ namespace Sample.Kafka.Controllers } [Route("~/send")] - public async Task SendTopic([FromServices] AppDbContext dbContext) + public async Task SendTopic() { - using (var trans = dbContext.Database.BeginTransaction()) + using (var trans = _dbContext.Database.BeginTransaction()) { - await _producer.PublishAsync("zzwl.topic.finace.callBack", new Person { Name = "Test", Age = 11 }); + await _producer.PublishAsync("zzwl.topic.finace.callBack",""); trans.Commit(); } diff --git a/samples/Sample.Kafka/Sample.Kafka.csproj b/samples/Sample.Kafka/Sample.Kafka.csproj index 506780b..3bb9d98 100644 --- a/samples/Sample.Kafka/Sample.Kafka.csproj +++ b/samples/Sample.Kafka/Sample.Kafka.csproj @@ -25,7 +25,6 @@ - diff --git a/samples/Sample.Kafka/Startup.cs b/samples/Sample.Kafka/Startup.cs index 8de4c57..715b5c5 100644 --- a/samples/Sample.Kafka/Startup.cs +++ b/samples/Sample.Kafka/Startup.cs @@ -28,13 +28,14 @@ namespace Sample.Kafka services.AddCap() .AddEntityFrameworkStores(x=> { - x.ConnectionString = "Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True"; + //x.ConnectionString = "Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True"; + x.ConnectionString = "Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"; }) .AddRabbitMQ(x => { - x.HostName = "192.168.2.206"; - x.UserName = "admin"; - x.Password = "123123"; + x.HostName = "localhost"; + // x.UserName = "admin"; + // x.Password = "123123"; }); //.AddKafka(x => x.Servers = ""); diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs index c12ae54..fc6d53f 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs @@ -23,8 +23,10 @@ namespace Microsoft.Extensions.DependencyInjection builder.Services.AddSingleton(); builder.Services.AddScoped(); - builder.Services.AddTransient(); + builder.Services.AddScoped(); + builder.Services.AddTransient(); + builder.Services.Configure(actionOptions); var sqlServerOptions = new SqlServerOptions(); diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.EFOptions.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.EFOptions.cs index df2240b..396d62e 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.EFOptions.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.EFOptions.cs @@ -12,7 +12,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore public EFOptions() { - ConnectionString = "Server=DESKTOP-M9R8T31;Initial Catalog=WebApp1;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"; + ConnectionString = "Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"; } /// diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CapDbContext.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CapDbContext.cs index 882e742..0f06315 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CapDbContext.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CapDbContext.cs @@ -47,7 +47,7 @@ namespace DotNetCore.CAP.EntityFrameworkCore /// protected override void OnModelCreating(ModelBuilder modelBuilder) { - //_sqlServerOptions = new SqlServerOptions(); + _sqlServerOptions = new SqlServerOptions(); modelBuilder.HasDefaultSchema(_sqlServerOptions.Schema); modelBuilder.Entity(b => @@ -67,7 +67,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { - // optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True"); + // optionsBuilder.UseSqlServer("Server=192.168.2.206;Initial Catalog=Test;User Id=cmswuliu;Password=h7xY81agBn*Veiu3;MultipleActiveResultSets=True"); + optionsBuilder.UseSqlServer("Server=DESKTOP-M9R8T31;Initial Catalog=Test;User Id=sa;Password=P@ssw0rd;MultipleActiveResultSets=True"); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisher.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisher.cs new file mode 100644 index 0000000..8aadb9d --- /dev/null +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CapPublisher.cs @@ -0,0 +1,55 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Text; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; +using DotNetCore.CAP.Processor; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.Extensions.DependencyInjection; + +namespace DotNetCore.CAP.EntityFrameworkCore +{ + public class CapPublisher : ICapPublisher + { + private readonly SqlServerOptions _options; + private readonly IServiceProvider _provider; + private readonly DbContext _dbContext; + + public CapPublisher(SqlServerOptions options, IServiceProvider provider) + { + _options = options; + _provider = provider; + _dbContext = (DbContext)_provider.GetService(_options.DbContextType); + } + + public async Task PublishAsync(string topic, string content) + { + var connection = _dbContext.Database.GetDbConnection(); + var transaction = _dbContext.Database.CurrentTransaction; + transaction = transaction ?? await _dbContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); + var dbTransaction = transaction.GetDbTransaction(); + + var message = new CapSentMessage + { + KeyName = topic, + Content = content, + StatusName = StatusName.Scheduled + }; + + var sql = "INSERT INTO [cap].[CapSentMessages] ([Id],[Added],[Content],[KeyName],[ExpiresAt],[Retries],[StatusName])VALUES(@Id,@Added,@Content,@KeyName,@ExpiresAt,@Retries,@StatusName)"; + await connection.ExecuteAsync(sql, message, transaction: dbTransaction); + + PublishQueuer.PulseEvent.Set(); + } + + public Task PublishAsync(string topic, T contentObj) + { + throw new NotImplementedException(); + } + } +} diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs index b8e278e..8b38284 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs @@ -89,7 +89,7 @@ WHERE StatusName = '{StatusName.Scheduled}'"; var sql = $@" SELECT TOP (1) * FROM [{_options.Schema}].[{nameof(CapDbContext.CapReceivedMessages)}] WITH (readpast) -WHERE StateName = '{StatusName.Enqueued}'"; +WHERE StatusName = '{StatusName.Enqueued}'"; var connection = _context.GetDbConnection(); var message = (await connection.QueryAsync(sql)).FirstOrDefault(); diff --git a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs index e38bd2b..dd993ea 100644 --- a/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs +++ b/src/DotNetCore.CAP/CAP.ServiceCollectionExtensions.cs @@ -55,14 +55,14 @@ namespace Microsoft.Extensions.DependencyInjection //Processors services.AddTransient(); services.AddTransient(); - services.AddTransient(); + services.AddTransient(); //Executors services.AddSingleton(); services.AddSingleton(); - services.TryAddScoped(); + // services.TryAddScoped(); return new CapBuilder(services); } diff --git a/src/DotNetCore.CAP/DotNetCore.CAP.csproj b/src/DotNetCore.CAP/DotNetCore.CAP.csproj index a59f6cd..5d73615 100644 --- a/src/DotNetCore.CAP/DotNetCore.CAP.csproj +++ b/src/DotNetCore.CAP/DotNetCore.CAP.csproj @@ -15,7 +15,7 @@ - + diff --git a/src/DotNetCore.CAP/Processor/IProcessor.Message.Default.cs b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs similarity index 94% rename from src/DotNetCore.CAP/Processor/IProcessor.Message.Default.cs rename to src/DotNetCore.CAP/Processor/IDispatcher.Default.cs index a9f6542..be0aa4f 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.Message.Default.cs +++ b/src/DotNetCore.CAP/Processor/IDispatcher.Default.cs @@ -8,7 +8,7 @@ using Microsoft.Extensions.Options; namespace DotNetCore.CAP.Processor { - public class DefaultMessageProcessor : IMessageProcessor + public class DefaultDispatcher : IDispatcher { private readonly IQueueExecutorFactory _queueExecutorFactory; private readonly IServiceProvider _provider; @@ -19,11 +19,11 @@ namespace DotNetCore.CAP.Processor internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); - public DefaultMessageProcessor( + public DefaultDispatcher( IServiceProvider provider, IQueueExecutorFactory queueExecutorFactory, IOptions capOptions, - ILogger logger) + ILogger logger) { _logger = logger; _queueExecutorFactory = queueExecutorFactory; diff --git a/src/DotNetCore.CAP/Processor/IMessageProcessor.cs b/src/DotNetCore.CAP/Processor/IDispatcher.cs similarity index 74% rename from src/DotNetCore.CAP/Processor/IMessageProcessor.cs rename to src/DotNetCore.CAP/Processor/IDispatcher.cs index e26b6fa..f358ac6 100644 --- a/src/DotNetCore.CAP/Processor/IMessageProcessor.cs +++ b/src/DotNetCore.CAP/Processor/IDispatcher.cs @@ -4,7 +4,7 @@ using System.Text; namespace DotNetCore.CAP.Processor { - public interface IMessageProcessor : IProcessor + public interface IDispatcher : IProcessor { bool Waiting { get; } } diff --git a/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs index d5bc699..8aae553 100644 --- a/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs +++ b/src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs @@ -18,7 +18,7 @@ namespace DotNetCore.CAP.Processor private readonly CapOptions _options; private IProcessor[] _processors; - private IList _messageProcessors; + private IList _messageDispatchers; private ProcessingContext _context; private Task _compositeTask; private bool _disposed; @@ -34,6 +34,7 @@ namespace DotNetCore.CAP.Processor _provider = provider; _options = options.Value; _cts = new CancellationTokenSource(); + _messageDispatchers = new List(); } public void Start() @@ -90,7 +91,7 @@ namespace DotNetCore.CAP.Processor private bool AllProcessorsWaiting() { - foreach (var processor in _messageProcessors) + foreach (var processor in _messageDispatchers) { if (!processor.Waiting) { @@ -110,10 +111,10 @@ namespace DotNetCore.CAP.Processor var returnedProcessors = new List(); for (int i = 0; i < processorCount; i++) { - var messageProcessors = _provider.GetService(); - _messageProcessors.Add(messageProcessors); + var messageProcessors = _provider.GetService(); + _messageDispatchers.Add(messageProcessors); } - returnedProcessors.AddRange(_messageProcessors); + returnedProcessors.AddRange(_messageDispatchers); returnedProcessors.Add(_provider.GetService()); returnedProcessors.Add(_provider.GetService()); diff --git a/src/DotNetCore.CAP/Processor/IProcessor.PublishQueuer.cs b/src/DotNetCore.CAP/Processor/IProcessor.PublishQueuer.cs index cc9f644..9de598e 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.PublishQueuer.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.PublishQueuer.cs @@ -59,7 +59,7 @@ namespace DotNetCore.CAP.Processor context.ThrowIfStopping(); - DefaultMessageProcessor.PulseEvent.Set(); + DefaultDispatcher.PulseEvent.Set(); await WaitHandleEx.WaitAnyAsync(PulseEvent, context.CancellationToken.WaitHandle, _pollingDelay); diff --git a/src/DotNetCore.CAP/Processor/IProcessor.SubscribeQueuer.cs b/src/DotNetCore.CAP/Processor/IProcessor.SubscribeQueuer.cs index 7d1b7c2..17c2425 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.SubscribeQueuer.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.SubscribeQueuer.cs @@ -59,7 +59,7 @@ namespace DotNetCore.CAP.Processor context.ThrowIfStopping(); - DefaultMessageProcessor.PulseEvent.Set(); + DefaultDispatcher.PulseEvent.Set(); await WaitHandleEx.WaitAnyAsync(PulseEvent, context.CancellationToken.WaitHandle, _pollingDelay);