From 20b90339d77b13ec6a26e9c5c9b5f2822f93df93 Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Tue, 11 Jul 2017 15:40:01 +0800 Subject: [PATCH] Refactor message storage. --- .../CAP.BuilderExtensions.cs | 5 +- .../CapDbContext.cs | 25 ++- .../CapMessageStore.cs | 1 + .../EFFetchedMessage.cs | 70 ++++++++ .../EFStorageConnection.cs | 170 +++++------------- .../EFStorageTransaction.cs | 68 ++++--- .../HelperExtensions.cs | 30 ++++ src/DotNetCore.CAP/ICapMessageStore.cs | 1 + src/DotNetCore.CAP/ICapPublisher.Default.cs | 1 + .../IConsumerHandler.Default.cs | 1 + src/DotNetCore.CAP/IFetchedMessage.cs | 2 +- src/DotNetCore.CAP/Models/CapQueue.cs | 14 ++ .../Models/CapReceivedMessage.cs | 3 +- src/DotNetCore.CAP/Models/CapSentMessage.cs | 1 + .../EFMessageStoreTest.cs | 1 + test/DotNetCore.CAP.Test/CAP.BuilderTest.cs | 2 +- test/DotNetCore.CAP.Test/NoopMessageStore.cs | 1 + test/Shared/MessageManagerTestBase.cs | 1 + 18 files changed, 230 insertions(+), 167 deletions(-) create mode 100644 src/DotNetCore.CAP.EntityFrameworkCore/EFFetchedMessage.cs create mode 100644 src/DotNetCore.CAP.EntityFrameworkCore/HelperExtensions.cs create mode 100644 src/DotNetCore.CAP/Models/CapQueue.cs diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs index 5cd1553..d7bc4d3 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CAP.BuilderExtensions.cs @@ -21,7 +21,7 @@ namespace Microsoft.Extensions.DependencyInjection builder.Services.AddScoped>(); builder.Services.AddScoped(); - builder.Services.AddScoped>(); + builder.Services.AddScoped(); return builder; } @@ -30,10 +30,11 @@ namespace Microsoft.Extensions.DependencyInjection public static CapBuilder AddEntityFrameworkStores(this CapBuilder builder, Action options) where TContext : DbContext { + builder.Services.AddScoped>(); builder.Services.AddScoped(); - builder.Services.AddScoped>(); + builder.Services.AddScoped(); builder.Services.Configure(options); return builder; diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CapDbContext.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CapDbContext.cs index 829a2fb..b255f1d 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CapDbContext.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CapDbContext.cs @@ -1,4 +1,6 @@ -using DotNetCore.CAP.Infrastructure; +using System.Data.Common; +using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; using Microsoft.EntityFrameworkCore; namespace DotNetCore.CAP.EntityFrameworkCore @@ -8,6 +10,8 @@ namespace DotNetCore.CAP.EntityFrameworkCore /// public class CapDbContext : DbContext { + private readonly EFOptions _efOptions; + /// /// Initializes a new instance of the . /// @@ -17,18 +21,26 @@ namespace DotNetCore.CAP.EntityFrameworkCore /// Initializes a new instance of the . /// /// The options to be used by a . - public CapDbContext(DbContextOptions options) : base(options) { } + public CapDbContext(DbContextOptions options, EFOptions efOptions) + : base(options) { + _efOptions = efOptions; + } /// /// Gets or sets the of Messages. /// public DbSet CapSentMessages { get; set; } + + public DbSet CapQueue { get; set; } + /// /// Gets or sets the of Messages. /// public DbSet CapReceivedMessages { get; set; } + public DbConnection GetDbConnection() => Database.GetDbConnection(); + /// /// Configures the schema for the identity framework. /// @@ -37,15 +49,20 @@ namespace DotNetCore.CAP.EntityFrameworkCore /// protected override void OnModelCreating(ModelBuilder modelBuilder) { + modelBuilder.HasDefaultSchema(_efOptions.Schema); + modelBuilder.Entity(b => { b.HasKey(m => m.Id); - b.Property(p => p.StatusName).HasMaxLength(50); + b.HasIndex(x => x.StatusName); + b.Property(p => p.StatusName).IsRequired().HasMaxLength(50); }); modelBuilder.Entity(b => { - b.Property(p => p.StatusName).HasMaxLength(50); + b.HasKey(m => m.Id); + b.HasIndex(x => x.StatusName); + b.Property(p => p.StatusName).IsRequired().HasMaxLength(50); }); } } diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/CapMessageStore.cs b/src/DotNetCore.CAP.EntityFrameworkCore/CapMessageStore.cs index bc3fedf..24bf796 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/CapMessageStore.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/CapMessageStore.cs @@ -1,6 +1,7 @@ using System; using System.Threading.Tasks; using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; using Microsoft.EntityFrameworkCore; namespace DotNetCore.CAP.EntityFrameworkCore diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/EFFetchedMessage.cs b/src/DotNetCore.CAP.EntityFrameworkCore/EFFetchedMessage.cs new file mode 100644 index 0000000..171b8dd --- /dev/null +++ b/src/DotNetCore.CAP.EntityFrameworkCore/EFFetchedMessage.cs @@ -0,0 +1,70 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Text; +using System.Threading; +using Microsoft.EntityFrameworkCore.Storage; + +namespace DotNetCore.CAP.EntityFrameworkCore +{ + public class EFFetchedMessage : IFetchedMessage + { + private readonly IDbConnection _connection; + private readonly IDbContextTransaction _transaction; + private readonly Timer _timer; + private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1); + private readonly object _lockObject = new object(); + + public EFFetchedMessage(string messageId, + IDbConnection connection, + IDbContextTransaction transaction) + { + MessageId = messageId; + _connection = connection; + _transaction = transaction; + _timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval); + } + + public string MessageId { get; } + + public void RemoveFromQueue() + { + lock (_lockObject) + { + _transaction.Commit(); + } + } + + public void Requeue() + { + lock (_lockObject) + { + _transaction.Rollback(); + } + } + + public void Dispose() + { + lock (_lockObject) + { + _timer?.Dispose(); + _transaction.Dispose(); + _connection.Dispose(); + } + } + + private void ExecuteKeepAliveQuery(object obj) + { + lock (_lockObject) + { + try + { + _connection?.Execute("SELECT 1", _transaction.GetDbTransaction()); + } + catch + { + } + } + } + } +} diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs index 257f682..3026424 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageConnection.cs @@ -1,20 +1,15 @@ using System; using System.Data; -using System.Data.SqlClient; using System.Linq; using System.Threading.Tasks; -using Dapper; +using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Models; using Microsoft.EntityFrameworkCore; -using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.Options; -using MR.AspNetCore.Jobs.Models; -using MR.AspNetCore.Jobs.Server; -using MR.AspNetCore.Jobs.Server.States; namespace DotNetCore.CAP.EntityFrameworkCore { - public class EFStorageConnection : IStorageConnection where TContext : DbContext + public class EFStorageConnection : IStorageConnection { private readonly CapDbContext _context; private readonly EFOptions _options; @@ -31,67 +26,11 @@ namespace DotNetCore.CAP.EntityFrameworkCore public EFOptions Options => _options; - - - public Task StoreCronJobAsync(CronJob job) - { - if (job == null) throw new ArgumentNullException(nameof(job)); - - _context.Add(job); - return _context.SaveChangesAsync(); - } - - public Task AttachCronJobAsync(CronJob job) - { - if (job == null) throw new ArgumentNullException(nameof(job)); - - _context.Attach(job); - return Task.FromResult(true); - } - - public Task UpdateCronJobAsync(CronJob job) - { - if (job == null) throw new ArgumentNullException(nameof(job)); - - return _context.SaveChangesAsync(); - } - - public Task GetCronJobsAsync() - { - return _context.CronJobs.ToArrayAsync(); - } - - public async Task RemoveCronJobAsync(string name) - { - var cronJob = await _context.CronJobs.FirstOrDefaultAsync(j => j.Name == name); - if (cronJob != null) - { - _context.Remove(cronJob); - await _context.SaveChangesAsync(); - } - } - public IStorageTransaction CreateTransaction() { return new EFStorageTransaction(this); } - public void Dispose() - { - } - - private DateTime? NormalizeDateTime(DateTime? dateTime) - { - if (!dateTime.HasValue) return dateTime; - if (dateTime == DateTime.MinValue) - { - return new DateTime(1754, 1, 1, 0, 0, 0, DateTimeKind.Utc); - } - return dateTime; - } - - - public Task StoreSentMessageAsync(CapSentMessage message) { if (message == null) throw new ArgumentNullException(nameof(message)); @@ -107,96 +46,83 @@ namespace DotNetCore.CAP.EntityFrameworkCore return _context.CapSentMessages.FirstOrDefaultAsync(x => x.Id == id); } - public Task FetchNextJobAsync() + public async Task FetchNextSentMessageAsync() { - + // var sql = $@" + //DELETE TOP (1) + //FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast, updlock, rowlock) + //OUTPUT DELETED.Id"; + + var queueFirst = await _context.CapQueue.FirstOrDefaultAsync(); + if (queueFirst == null) + return null; + + _context.CapQueue.Remove(queueFirst); + + var connection = _context.Database.GetDbConnection(); + var transaction = _context.Database.CurrentTransaction; + transaction = transaction ?? await _context.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); + return new EFFetchedMessage(queueFirst.MessageId, connection, transaction); } - public async Task GetNextJobToBeEnqueuedAsync() + public Task GetNextSentMessageToBeEnqueuedAsync() { - var sql = $@" -SELECT TOP (1) * -FROM [{_options.Schema}].[{nameof(JobsDbContext.Jobs)}] WITH (readpast) -WHERE (Due IS NULL OR Due < GETUTCDATE()) AND StateName = '{ScheduledState.StateName}'"; + // var sql = $@" + //SELECT TOP (1) * + //FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast) + //WHERE (Due IS NULL OR Due < GETUTCDATE()) AND StateName = '{StatusName.Enqueued}'"; - var connection = _context.GetDbConnection(); + // var connection = _context.GetDbConnection(); - var job = (await connection.QueryAsync(sql)).FirstOrDefault(); + // var message = _context.CapSentMessages.FromSql(sql).FirstOrDefaultAsync(); - if (job != null) + var message = _context.CapSentMessages.Where(x => x.StatusName == StatusName.Enqueued).FirstOrDefaultAsync(); + + if (message != null) { - _context.Attach(job); + _context.Attach(message); } - return job; + return message; } - public Task FetchNextSentMessageAsync() + public Task StoreReceivedMessageAsync(CapReceivedMessage message) { - var sql = $@" -DELETE TOP (1) -FROM [{_options.Schema}].[{nameof(CapDbContext.CapSentMessages)}] WITH (readpast, updlock, rowlock) -OUTPUT DELETED.Id"; + if (message == null) throw new ArgumentNullException(nameof(message)); - //return FetchNextDelayedMessageCoreAsync(sql); - throw new NotImplementedException(); - } + message.LastRun = NormalizeDateTime(message.LastRun); - //private async Task FetchNextDelayedMessageCoreAsync(string sql, object args = null) - //{ - // FetchedMessage fetchedJob = null; - // var connection = _context.Database.GetDbConnection(); - // var transaction = _context.Database.CurrentTransaction; - // transaction = transaction ?? await _context.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); - - // try - // { - // fetchedJob = - // (await _context...QueryAsync(sql, args, transaction.GetDbTransaction())) - // .FirstOrDefault(); - // } - // catch (SqlException) - // { - // transaction.Dispose(); - // throw; - // } - - // if (fetchedJob == null) - // { - // transaction.Rollback(); - // transaction.Dispose(); - // return null; - // } - - // return new SqlServerFetchedJob( - // fetchedJob.JobId, - // connection, - // transaction); - //} + _context.Add(message); + return _context.SaveChangesAsync(); + } - public Task GetNextSentMessageToBeEnqueuedAsync() + public Task GetReceivedMessageAsync(string id) { - throw new NotImplementedException(); + return _context.CapReceivedMessages.FirstOrDefaultAsync(x => x.Id == id); } - public Task StoreReceivedMessageAsync(CapReceivedMessage message) + public Task FetchNextReceivedMessageAsync() { throw new NotImplementedException(); } - public Task GetReceivedMessageAsync(string id) + public Task GetNextReceviedMessageToBeEnqueuedAsync() { throw new NotImplementedException(); } - public Task FetchNextReceivedMessageAsync() + private DateTime? NormalizeDateTime(DateTime? dateTime) { - throw new NotImplementedException(); + if (!dateTime.HasValue) return dateTime; + if (dateTime == DateTime.MinValue) + { + return new DateTime(1754, 1, 1, 0, 0, 0, DateTimeKind.Utc); + } + return dateTime; } - public Task GetNextReceviedMessageToBeEnqueuedAsync() + public void Dispose() { - throw new NotImplementedException(); } } } diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageTransaction.cs b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageTransaction.cs index d511d6a..47cb24c 100644 --- a/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageTransaction.cs +++ b/src/DotNetCore.CAP.EntityFrameworkCore/EFStorageTransaction.cs @@ -4,64 +4,60 @@ using DotNetCore.CAP.Models; namespace DotNetCore.CAP.EntityFrameworkCore { - public class EFStorageTransaction : IStorageTransaction, IDisposable - { - private EFStorageConnection _connection; + public class EFStorageTransaction + : IStorageTransaction, IDisposable + { + private EFStorageConnection _connection; - public EFStorageTransaction(EFStorageConnection connection) - { - _connection = connection; - } - - public void UpdateJob(Job job) - { - if (job == null) throw new ArgumentNullException(nameof(job)); - - // NOOP. EF will detect changes. - } - - public void EnqueueJob(Job job) - { - - } - - public Task CommitAsync() - { - return _connection.Context.SaveChangesAsync(); - } - - public void Dispose() - { - } + public EFStorageTransaction(EFStorageConnection connection) + { + _connection = connection; + } public void UpdateMessage(CapSentMessage message) { - throw new NotImplementedException(); + if (message == null) throw new ArgumentNullException(nameof(message)); + + // NOOP. EF will detect changes. } public void UpdateMessage(CapReceivedMessage message) { - throw new NotImplementedException(); + if (message == null) throw new ArgumentNullException(nameof(message)); + + // NOOP. EF will detect changes. } public void EnqueueMessage(CapSentMessage message) { - if (job == null) throw new ArgumentNullException(nameof(job)); + if (message == null) throw new ArgumentNullException(nameof(message)); - _connection.Context.Add(new JobQueue + _connection.Context.Add(new CapQueue { - JobId = job.Id + MessageId = message.Id, + Type = 0 }); } public void EnqueueMessage(CapReceivedMessage message) { - if (job == null) throw new ArgumentNullException(nameof(job)); + if (message == null) throw new ArgumentNullException(nameof(message)); - _connection.Context.Add(new JobQueue + _connection.Context.Add(new CapQueue { - JobId = job.Id + MessageId = message.Id, + Type = 1 }); } + + + public Task CommitAsync() + { + return _connection.Context.SaveChangesAsync(); + } + + public void Dispose() + { + } } } diff --git a/src/DotNetCore.CAP.EntityFrameworkCore/HelperExtensions.cs b/src/DotNetCore.CAP.EntityFrameworkCore/HelperExtensions.cs new file mode 100644 index 0000000..02342d6 --- /dev/null +++ b/src/DotNetCore.CAP.EntityFrameworkCore/HelperExtensions.cs @@ -0,0 +1,30 @@ +using System; +using System.Collections.Generic; +using System.Data; + +namespace DotNetCore.CAP.EntityFrameworkCore +{ + static class HelperExtensions + { + public static void Execute(this IDbConnection connection, string sql, IDbTransaction transcation = null) + { + try + { + connection.Open(); + using (var command = connection.CreateCommand()) + { + command.CommandText = "SELELCT 1"; + if (transcation != null) + command.Transaction = transcation; + command.ExecuteNonQuery(); + } + } + finally + { + connection.Close(); + } + } + + + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/ICapMessageStore.cs b/src/DotNetCore.CAP/ICapMessageStore.cs index 4440a6a..958a749 100644 --- a/src/DotNetCore.CAP/ICapMessageStore.cs +++ b/src/DotNetCore.CAP/ICapMessageStore.cs @@ -1,5 +1,6 @@ using System.Threading.Tasks; using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; namespace DotNetCore.CAP { diff --git a/src/DotNetCore.CAP/ICapPublisher.Default.cs b/src/DotNetCore.CAP/ICapPublisher.Default.cs index 454e6e5..334ce87 100644 --- a/src/DotNetCore.CAP/ICapPublisher.Default.cs +++ b/src/DotNetCore.CAP/ICapPublisher.Default.cs @@ -1,6 +1,7 @@ using System; using System.Threading.Tasks; using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; using Microsoft.Extensions.Logging; namespace DotNetCore.CAP diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs index aebf720..cf2936a 100644 --- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs +++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs @@ -5,6 +5,7 @@ using System.Threading.Tasks; using DotNetCore.CAP.Abstractions; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Models; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; diff --git a/src/DotNetCore.CAP/IFetchedMessage.cs b/src/DotNetCore.CAP/IFetchedMessage.cs index b1c8b76..2dc2f68 100644 --- a/src/DotNetCore.CAP/IFetchedMessage.cs +++ b/src/DotNetCore.CAP/IFetchedMessage.cs @@ -4,7 +4,7 @@ namespace DotNetCore.CAP { public interface IFetchedMessage : IDisposable { - int MessageId { get; } + string MessageId { get; } void RemoveFromQueue(); diff --git a/src/DotNetCore.CAP/Models/CapQueue.cs b/src/DotNetCore.CAP/Models/CapQueue.cs new file mode 100644 index 0000000..bb64de1 --- /dev/null +++ b/src/DotNetCore.CAP/Models/CapQueue.cs @@ -0,0 +1,14 @@ +namespace DotNetCore.CAP.Models +{ + public class CapQueue + { + public int Id { get; set; } + + public string MessageId { get; set; } + + /// + /// 0 is CapSentMessage, 1 is CapReceviedMessage + /// + public int Type { get; set; } + } +} diff --git a/src/DotNetCore.CAP/Models/CapReceivedMessage.cs b/src/DotNetCore.CAP/Models/CapReceivedMessage.cs index 09878a7..ce1df51 100644 --- a/src/DotNetCore.CAP/Models/CapReceivedMessage.cs +++ b/src/DotNetCore.CAP/Models/CapReceivedMessage.cs @@ -1,4 +1,5 @@ using System; +using DotNetCore.CAP.Infrastructure; namespace DotNetCore.CAP.Models { @@ -33,7 +34,7 @@ namespace DotNetCore.CAP.Models public DateTime Added { get; set; } - public DateTime LastRun { get; set; } + public DateTime? LastRun { get; set; } public int Retries { get; set; } diff --git a/src/DotNetCore.CAP/Models/CapSentMessage.cs b/src/DotNetCore.CAP/Models/CapSentMessage.cs index 5d088ae..f615fa7 100644 --- a/src/DotNetCore.CAP/Models/CapSentMessage.cs +++ b/src/DotNetCore.CAP/Models/CapSentMessage.cs @@ -1,4 +1,5 @@ using System; +using DotNetCore.CAP.Infrastructure; namespace DotNetCore.CAP.Models { diff --git a/test/DotNetCore.CAP.EntityFrameworkCore.Test/EFMessageStoreTest.cs b/test/DotNetCore.CAP.EntityFrameworkCore.Test/EFMessageStoreTest.cs index c138173..2897482 100644 --- a/test/DotNetCore.CAP.EntityFrameworkCore.Test/EFMessageStoreTest.cs +++ b/test/DotNetCore.CAP.EntityFrameworkCore.Test/EFMessageStoreTest.cs @@ -1,6 +1,7 @@ using System; using System.Linq; using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; using Microsoft.Extensions.DependencyInjection; using Xunit; diff --git a/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs b/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs index d526c75..e4810fd 100644 --- a/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs +++ b/test/DotNetCore.CAP.Test/CAP.BuilderTest.cs @@ -1,8 +1,8 @@ using System; using System.Threading; using System.Threading.Tasks; -using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Job; +using DotNetCore.CAP.Models; using Microsoft.Extensions.DependencyInjection; using Xunit; diff --git a/test/DotNetCore.CAP.Test/NoopMessageStore.cs b/test/DotNetCore.CAP.Test/NoopMessageStore.cs index 7100a21..dfa7be4 100644 --- a/test/DotNetCore.CAP.Test/NoopMessageStore.cs +++ b/test/DotNetCore.CAP.Test/NoopMessageStore.cs @@ -2,6 +2,7 @@ using System.Threading; using System.Threading.Tasks; using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; namespace DotNetCore.CAP.Test { diff --git a/test/Shared/MessageManagerTestBase.cs b/test/Shared/MessageManagerTestBase.cs index 0f981e2..f375bf2 100644 --- a/test/Shared/MessageManagerTestBase.cs +++ b/test/Shared/MessageManagerTestBase.cs @@ -1,6 +1,7 @@ using System; using System.Threading.Tasks; using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Models; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging;