diff --git a/src/DotNetCore.CAP.SqlServer/CAP.Options.Extensions.cs b/src/DotNetCore.CAP.SqlServer/CAP.Options.Extensions.cs index 75af7aa..f7763d1 100644 --- a/src/DotNetCore.CAP.SqlServer/CAP.Options.Extensions.cs +++ b/src/DotNetCore.CAP.SqlServer/CAP.Options.Extensions.cs @@ -17,10 +17,7 @@ namespace Microsoft.Extensions.DependencyInjection public static CapOptions UseSqlServer(this CapOptions options, Action configure) { - if (configure == null) - { - throw new ArgumentNullException(nameof(configure)); - } + if (configure == null) throw new ArgumentNullException(nameof(configure)); configure += x => x.Version = options.Version; @@ -38,10 +35,7 @@ namespace Microsoft.Extensions.DependencyInjection public static CapOptions UseEntityFramework(this CapOptions options, Action configure) where TContext : DbContext { - if (configure == null) - { - throw new ArgumentNullException(nameof(configure)); - } + if (configure == null) throw new ArgumentNullException(nameof(configure)); options.RegisterExtension(new SqlServerCapOptionsExtension(x => { diff --git a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs index 3fa9f9b..ff25fa6 100644 --- a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs @@ -2,7 +2,7 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using DotNetCore.CAP.Processor; +using DotNetCore.CAP.Persistence; using DotNetCore.CAP.SqlServer; using DotNetCore.CAP.SqlServer.Diagnostics; using Microsoft.Extensions.DependencyInjection; @@ -25,12 +25,8 @@ namespace DotNetCore.CAP services.AddSingleton(); services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(x => (SqlServerPublisher)x.GetService()); - services.AddSingleton(); - + services.AddSingleton(); + services.AddSingleton(); services.AddTransient(); services.Configure(_configure); diff --git a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerOptions.cs b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerOptions.cs index 730e823..513e82e 100644 --- a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerOptions.cs +++ b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerOptions.cs @@ -28,17 +28,12 @@ namespace DotNetCore.CAP public void Configure(SqlServerOptions options) { - if (options.DbContextType != null) - { - using (var scope = _serviceScopeFactory.CreateScope()) - { - var provider = scope.ServiceProvider; - using (var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType)) - { - options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; - } - } - } + if (options.DbContextType == null) return; + + using var scope = _serviceScopeFactory.CreateScope(); + var provider = scope.ServiceProvider; + using var dbContext = (DbContext)provider.GetRequiredService(options.DbContextType); + options.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticObserver.cs b/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticObserver.cs index 46d1b55..432185a 100644 --- a/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticObserver.cs +++ b/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticObserver.cs @@ -4,9 +4,9 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; -using System.Data.SqlClient; using System.Reflection; -using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Persistence; +using Microsoft.Data.SqlClient; namespace DotNetCore.CAP.SqlServer.Diagnostics { @@ -16,11 +16,11 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics public const string SqlAfterCommitTransaction = SqlClientPrefix + "WriteTransactionCommitAfter"; public const string SqlErrorCommitTransaction = SqlClientPrefix + "WriteTransactionCommitError"; - private readonly ConcurrentDictionary> _bufferList; + private readonly ConcurrentDictionary> _bufferList; private readonly IDispatcher _dispatcher; public DiagnosticObserver(IDispatcher dispatcher, - ConcurrentDictionary> bufferList) + ConcurrentDictionary> bufferList) { _dispatcher = dispatcher; _bufferList = bufferList; @@ -41,12 +41,10 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics var sqlConnection = (SqlConnection) GetProperty(evt.Value, "Connection"); var transactionKey = sqlConnection.ClientConnectionId; if (_bufferList.TryRemove(transactionKey, out var msgList)) - { foreach (var message in msgList) { _dispatcher.EnqueueToPublish(message); } - } } else if (evt.Key == SqlErrorCommitTransaction) { diff --git a/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticProcessorObserver.cs b/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticProcessorObserver.cs index 2e952a2..53b22cc 100644 --- a/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticProcessorObserver.cs +++ b/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticProcessorObserver.cs @@ -5,7 +5,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; -using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Persistence; namespace DotNetCore.CAP.SqlServer.Diagnostics { @@ -17,10 +17,10 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics public DiagnosticProcessorObserver(IDispatcher dispatcher) { _dispatcher = dispatcher; - BufferList = new ConcurrentDictionary>(); + BufferList = new ConcurrentDictionary>(); } - public ConcurrentDictionary> BufferList { get; } + public ConcurrentDictionary> BufferList { get; } public void OnCompleted() { @@ -33,9 +33,7 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics public void OnNext(DiagnosticListener listener) { if (listener.Name == DiagnosticListenerName) - { listener.Subscribe(new DiagnosticObserver(_dispatcher, BufferList)); - } } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj b/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj index 3fe74f7..25257d7 100644 --- a/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj +++ b/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj @@ -1,26 +1,27 @@  - netstandard2.0 + netstandard2.1 DotNetCore.CAP.SqlServer $(PackageTags);SQL Server - + - + bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.SqlServer.xml 1701;1702;1705;CS1591 + 8 - + - - - - + + + + - + \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/ICapPublisher.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/ICapPublisher.SqlServer.cs deleted file mode 100644 index 59461eb..0000000 --- a/src/DotNetCore.CAP.SqlServer/ICapPublisher.SqlServer.cs +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Data; -using System.Data.SqlClient; -using System.Threading; -using System.Threading.Tasks; -using Dapper; -using DotNetCore.CAP.Abstractions; -using DotNetCore.CAP.Messages; -using Microsoft.EntityFrameworkCore.Storage; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; - -namespace DotNetCore.CAP.SqlServer -{ - public class SqlServerPublisher : CapPublisherBase, ICallbackPublisher - { - private readonly SqlServerOptions _options; - - public SqlServerPublisher(IServiceProvider provider) : base(provider) - { - _options = ServiceProvider.GetService>().Value; - } - - public async Task PublishCallbackAsync(CapPublishedMessage message) - { - await PublishAsyncInternal(message); - } - - protected override async Task ExecuteAsync(CapPublishedMessage message, ICapTransaction transaction = null, - CancellationToken cancel = default(CancellationToken)) - { - if (transaction == null) - { - using (var connection = new SqlConnection(_options.ConnectionString)) - { - await connection.ExecuteAsync(PrepareSql(), message); - return; - } - } - - var dbTrans = transaction.DbTransaction as IDbTransaction; - if (dbTrans == null && transaction.DbTransaction is IDbContextTransaction dbContextTrans) - { - dbTrans = dbContextTrans.GetDbTransaction(); - } - - var conn = dbTrans?.Connection; - await conn.ExecuteAsync(PrepareSql(), message, dbTrans); - } - - #region private methods - - private string PrepareSql() - { - return - $"INSERT INTO {_options.Schema}.[Published] ([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])VALUES(@Id,'{_options.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; - } - - #endregion private methods - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs index 006676b..817c983 100644 --- a/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs @@ -4,10 +4,12 @@ using System; using System.Collections.Generic; using System.Data; -using System.Data.SqlClient; +using System.Threading; +using System.Threading.Tasks; using DotNetCore.CAP.Internal; -using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Persistence; using DotNetCore.CAP.SqlServer.Diagnostics; +using Microsoft.Data.SqlClient; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Storage; @@ -28,14 +30,12 @@ namespace DotNetCore.CAP { var sqlServerOptions = serviceProvider.GetService>().Value; if (sqlServerOptions.DbContextType != null) - { _dbContext = serviceProvider.GetService(sqlServerOptions.DbContextType) as DbContext; - } _diagnosticProcessor = serviceProvider.GetRequiredService(); } - protected override void AddToSent(CapPublishedMessage msg) + protected override void AddToSent(MediumMessage msg) { if (DbTransaction is NoopTransaction) { @@ -47,24 +47,19 @@ namespace DotNetCore.CAP if (dbTransaction == null) { if (DbTransaction is IDbContextTransaction dbContextTransaction) - { dbTransaction = dbContextTransaction.GetDbTransaction(); - } - if (dbTransaction == null) - { - throw new ArgumentNullException(nameof(DbTransaction)); - } + if (dbTransaction == null) throw new ArgumentNullException(nameof(DbTransaction)); } - var transactionKey = ((SqlConnection)dbTransaction.Connection).ClientConnectionId; + var transactionKey = ((SqlConnection) dbTransaction.Connection).ClientConnectionId; if (_diagnosticProcessor.BufferList.TryGetValue(transactionKey, out var list)) { list.Add(msg); } else { - var msgList = new List(1) { msg }; + var msgList = new List(1) {msg}; _diagnosticProcessor.BufferList.TryAdd(transactionKey, msgList); } } @@ -86,6 +81,23 @@ namespace DotNetCore.CAP } } + public override async Task CommitAsync(CancellationToken cancellationToken = default) + { + switch (DbTransaction) + { + case NoopTransaction _: + Flush(); + break; + case IDbTransaction dbTransaction: + dbTransaction.Commit(); + break; + case IDbContextTransaction dbContextTransaction: + await _dbContext.SaveChangesAsync(cancellationToken); + await dbContextTransaction.CommitAsync(cancellationToken); + break; + } + } + public override void Rollback() { switch (DbTransaction) @@ -99,6 +111,19 @@ namespace DotNetCore.CAP } } + public override async Task RollbackAsync(CancellationToken cancellationToken = default) + { + switch (DbTransaction) + { + case IDbTransaction dbTransaction: + dbTransaction.Rollback(); + break; + case IDbContextTransaction dbContextTransaction: + await dbContextTransaction.RollbackAsync(cancellationToken); + break; + } + } + public override void Dispose() { switch (DbTransaction) @@ -110,6 +135,7 @@ namespace DotNetCore.CAP dbContextTransaction.Dispose(); break; } + DbTransaction = null; } } @@ -144,15 +170,12 @@ namespace DotNetCore.CAP public static IDbTransaction BeginTransaction(this IDbConnection dbConnection, ICapPublisher publisher, bool autoCommit = false) { - if (dbConnection.State == ConnectionState.Closed) - { - dbConnection.Open(); - } + if (dbConnection.State == ConnectionState.Closed) dbConnection.Open(); var dbTransaction = dbConnection.BeginTransaction(); publisher.Transaction.Value = publisher.ServiceProvider.GetService(); var capTransaction = publisher.Transaction.Value.Begin(dbTransaction, autoCommit); - return (IDbTransaction)capTransaction.DbTransaction; + return (IDbTransaction) capTransaction.DbTransaction; } /// diff --git a/src/DotNetCore.CAP.SqlServer/ICollectProcessor.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/ICollectProcessor.SqlServer.cs deleted file mode 100644 index ba147d4..0000000 --- a/src/DotNetCore.CAP.SqlServer/ICollectProcessor.SqlServer.cs +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Data.SqlClient; -using System.Threading.Tasks; -using Dapper; -using DotNetCore.CAP.Processor; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; - -namespace DotNetCore.CAP.SqlServer -{ - public class SqlServerCollectProcessor : ICollectProcessor - { - private const int MaxBatch = 1000; - - private static readonly string[] Tables = - { - "Published", "Received" - }; - - private readonly ILogger _logger; - private readonly SqlServerOptions _options; - private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); - private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); - - public SqlServerCollectProcessor( - ILogger logger, - IOptions sqlServerOptions) - { - _logger = logger; - _options = sqlServerOptions.Value; - } - - public async Task ProcessAsync(ProcessingContext context) - { - foreach (var table in Tables) - { - _logger.LogDebug($"Collecting expired data from table [{_options.Schema}].[{table}]."); - - int removedCount; - do - { - using (var connection = new SqlConnection(_options.ConnectionString)) - { - removedCount = await connection.ExecuteAsync($@" -DELETE TOP (@count) -FROM [{_options.Schema}].[{table}] WITH (readpast) -WHERE ExpiresAt < @now;", new {now = DateTime.Now, count = MaxBatch}); - } - - if (removedCount != 0) - { - await context.WaitAsync(_delay); - context.ThrowIfStopping(); - } - } while (removedCount != 0); - } - - await context.WaitAsync(_waitingInterval); - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs new file mode 100644 index 0000000..a665447 --- /dev/null +++ b/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs @@ -0,0 +1,219 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Data; +using System.Threading; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Persistence; +using DotNetCore.CAP.Serialization; +using Microsoft.Data.SqlClient; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.Extensions.Options; + +namespace DotNetCore.CAP.SqlServer +{ + public class SqlServerDataStorage : IDataStorage + { + private readonly IOptions _capOptions; + private readonly IOptions _options; + + public SqlServerDataStorage( + IOptions capOptions, + IOptions options) + { + _options = options; + _capOptions = capOptions; + } + + public async Task ChangePublishStateAsync(MediumMessage message, StatusName state) + { + var sql = + $"UPDATE [{_options.Value.Schema}].[Published] SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id"; + await using var connection = new SqlConnection(_options.Value.ConnectionString); + await connection.ExecuteAsync(sql, new + { + Id = message.DbId, + message.Retries, + message.ExpiresAt, + StatusName = state.ToString("G") + }); + } + + public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) + { + var sql = + $"UPDATE [{_options.Value.Schema}].[Received] SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id"; + await using var connection = new SqlConnection(_options.Value.ConnectionString); + await connection.ExecuteAsync(sql, new + { + Id = message.DbId, + message.Retries, + message.ExpiresAt, + StatusName = state.ToString("G") + }); + } + + public async Task StoreMessageAsync(string name, Message content, object dbTransaction = null, + CancellationToken cancellationToken = default) + { + var sql = $"INSERT INTO {_options.Value.Schema}.[Published] ([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" + + $"VALUES(@Id,'{_options.Value}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + + var message = new MediumMessage + { + DbId = content.GetId(), + Origin = content, + Content = StringSerializer.Serialize(content), + Added = DateTime.Now, + ExpiresAt = null, + Retries = 0 + }; + + var po = new + { + Id = message.DbId, + Name = name, + message.Content, + message.Retries, + message.Added, + message.ExpiresAt, + StatusName = StatusName.Scheduled + }; + + if (dbTransaction == null) + { + await using var connection = new SqlConnection(_options.Value.ConnectionString); + await connection.ExecuteAsync(sql, po); + } + else + { + var dbTrans = dbTransaction as IDbTransaction; + if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans) + dbTrans = dbContextTrans.GetDbTransaction(); + + var conn = dbTrans?.Connection; + await conn.ExecuteAsync(sql, po, dbTrans); + } + + return message; + } + + public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content) + { + var sql = + $"INSERT INTO [{_options.Value.Schema}].[Received]([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" + + $"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + + await using var connection = new SqlConnection(_options.Value.ConnectionString); + await connection.ExecuteAsync(sql, new + { + Id = SnowflakeId.Default().NextId().ToString(), + Group = group, + Name = name, + Content = content, + Retries = _capOptions.Value.FailedRetryCount, + Added = DateTime.Now, + ExpiresAt = DateTime.Now.AddDays(15), + StatusName = nameof(StatusName.Failed) + }); + } + + public async Task StoreReceivedMessageAsync(string name, string group, Message message) + { + var sql = + $"INSERT INTO [{_options.Value.Schema}].[Received]([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" + + $"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + + var mdMessage = new MediumMessage + { + DbId = SnowflakeId.Default().NextId().ToString(), + Origin = message, + Added = DateTime.Now, + ExpiresAt = null, + Retries = 0 + }; + var content = StringSerializer.Serialize(mdMessage.Origin); + await using var connection = new SqlConnection(_options.Value.ConnectionString); + await connection.ExecuteAsync(sql, new + { + Id = mdMessage.DbId, + Group = group, + Name = name, + Content = content, + mdMessage.Retries, + mdMessage.Added, + mdMessage.ExpiresAt, + StatusName = nameof(StatusName.Scheduled) + }); + return mdMessage; + } + + public async Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, + CancellationToken token = default) + { + await using var connection = new SqlConnection(_options.Value.ConnectionString); + return await connection.ExecuteAsync( + $"DELETE TOP (@batchCount) FROM [{_options.Value.Schema}].[{table}] WITH (readpast) WHERE ExpiresAt < @timeout;", + new {timeout, batchCount}); + } + + public async Task> GetPublishedMessagesOfNeedRetry() + { + var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); + var sql = $"SELECT TOP (200) * FROM [{_options.Value.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " + + $"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; + + var result = new List(); + await using var connection = new SqlConnection(_options.Value.ConnectionString); + var reader = await connection.ExecuteReaderAsync(sql); + while (reader.Read()) + { + result.Add(new MediumMessage + { + DbId = reader.GetInt64(0).ToString(), + Origin = StringSerializer.DeSerialize(reader.GetString(3)), + Retries = reader.GetInt32(4), + Added = reader.GetDateTime(5) + }); + } + + return result; + } + + public async Task> GetReceivedMessagesOfNeedRetry() + { + var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); + var sql = + $"SELECT TOP (200) * FROM [{_options.Value.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " + + $"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; + + var result = new List(); + + await using var connection = new SqlConnection(_options.Value.ConnectionString); + var reader = await connection.ExecuteReaderAsync(sql); + while (reader.Read()) + { + result.Add(new MediumMessage + { + DbId = reader.GetInt64(0).ToString(), + Origin = StringSerializer.DeSerialize(reader.GetString(3)), + Retries = reader.GetInt32(4), + Added = reader.GetDateTime(5) + }); + } + + return result; + } + + public IMonitoringApi GetMonitoringApi() + { + return new SqlServerMonitoringApi(_options); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/IDbContextTransaction.CAP.cs b/src/DotNetCore.CAP.SqlServer/IDbContextTransaction.CAP.cs index 980dc57..e476bb0 100644 --- a/src/DotNetCore.CAP.SqlServer/IDbContextTransaction.CAP.cs +++ b/src/DotNetCore.CAP.SqlServer/IDbContextTransaction.CAP.cs @@ -2,6 +2,8 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; +using System.Threading; +using System.Threading.Tasks; using DotNetCore.CAP; // ReSharper disable once CheckNamespace @@ -18,6 +20,8 @@ namespace Microsoft.EntityFrameworkCore.Storage TransactionId = dbContextTransaction.TransactionId; } + public Guid TransactionId { get; } + public void Dispose() { _transaction.Dispose(); @@ -33,6 +37,19 @@ namespace Microsoft.EntityFrameworkCore.Storage _transaction.Rollback(); } - public Guid TransactionId { get; } + public async Task CommitAsync(CancellationToken cancellationToken = default) + { + await _transaction.CommitAsync(cancellationToken); + } + + public async Task RollbackAsync(CancellationToken cancellationToken = default) + { + await _transaction.RollbackAsync(cancellationToken); + } + + public ValueTask DisposeAsync() + { + return new ValueTask(Task.Run(() => _transaction.Dispose())); + } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs index a353c51..6172944 100644 --- a/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs @@ -5,11 +5,13 @@ using System; using System.Collections.Generic; using System.Data; using System.Linq; +using System.Threading.Tasks; using Dapper; -using DotNetCore.CAP.Dashboard; -using DotNetCore.CAP.Dashboard.Monitoring; -using DotNetCore.CAP.Infrastructure; +using DotNetCore.CAP.Internal; using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Persistence; +using Microsoft.Data.SqlClient; using Microsoft.Extensions.Options; namespace DotNetCore.CAP.SqlServer @@ -17,12 +19,10 @@ namespace DotNetCore.CAP.SqlServer internal class SqlServerMonitoringApi : IMonitoringApi { private readonly SqlServerOptions _options; - private readonly SqlServerStorage _storage; - public SqlServerMonitoringApi(IStorage storage, IOptions options) + public SqlServerMonitoringApi(IOptions options) { _options = options.Value ?? throw new ArgumentNullException(nameof(options)); - _storage = storage as SqlServerStorage ?? throw new ArgumentNullException(nameof(storage)); } public StatisticsDto GetStatistics() @@ -56,39 +56,27 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; { var tableName = type == MessageType.Publish ? "Published" : "Received"; return UseConnection(connection => - GetHourlyTimelineStats(connection, tableName, StatusName.Failed)); + GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Failed))); } public IDictionary HourlySucceededJobs(MessageType type) { var tableName = type == MessageType.Publish ? "Published" : "Received"; return UseConnection(connection => - GetHourlyTimelineStats(connection, tableName, StatusName.Succeeded)); + GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Succeeded))); } public IList Messages(MessageQueryDto queryDto) { var tableName = queryDto.MessageType == MessageType.Publish ? "Published" : "Received"; var where = string.Empty; - if (!string.IsNullOrEmpty(queryDto.StatusName)) - { - where += " and statusname=@StatusName"; - } + if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " and statusname=@StatusName"; - if (!string.IsNullOrEmpty(queryDto.Name)) - { - where += " and name=@Name"; - } + if (!string.IsNullOrEmpty(queryDto.Name)) where += " and name=@Name"; - if (!string.IsNullOrEmpty(queryDto.Group)) - { - where += " and [group]=@Group"; - } + if (!string.IsNullOrEmpty(queryDto.Group)) where += " and [group]=@Group"; - if (!string.IsNullOrEmpty(queryDto.Content)) - { - where += " and content like '%@Content%'"; - } + if (!string.IsNullOrEmpty(queryDto.Content)) where += " and content like '%@Content%'"; var sqlQuery2008 = $@"select * from @@ -113,22 +101,36 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; public int PublishedFailedCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Failed)); + return UseConnection(conn => GetNumberOfMessage(conn, "Published", nameof(StatusName.Failed))); } public int PublishedSucceededCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Succeeded)); + return UseConnection(conn => GetNumberOfMessage(conn, "Published", nameof(StatusName.Succeeded))); } public int ReceivedFailedCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Failed)); + return UseConnection(conn => GetNumberOfMessage(conn, "Received", nameof(StatusName.Failed))); } public int ReceivedSucceededCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Succeeded)); + return UseConnection(conn => GetNumberOfMessage(conn, "Received", nameof(StatusName.Succeeded))); + } + + public async Task GetPublishedMessageAsync(long id) + { + var sql = $@"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE Id={id}"; + await using var connection = new SqlConnection(_options.ConnectionString); + return await connection.QueryFirstOrDefaultAsync(sql); + } + + public async Task GetReceivedMessageAsync(long id) + { + var sql = $@"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE Id={id}"; + await using var connection = new SqlConnection(_options.ConnectionString); + return await connection.QueryFirstOrDefaultAsync(sql); } private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName) @@ -142,7 +144,7 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; private T UseConnection(Func action) { - return _storage.UseConnection(action); + return action(new SqlConnection(_options.ConnectionString)); } private Dictionary GetHourlyTimelineStats(IDbConnection connection, string tableName, @@ -195,10 +197,7 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;"; foreach (var key in keyMaps.Keys) { - if (!valuesMap.ContainsKey(key)) - { - valuesMap.Add(key, 0); - } + if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0); } var result = new Dictionary(); @@ -211,4 +210,11 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;"; return result; } } + + + internal class TimelineCounter + { + public string Key { get; set; } + public int Count { get; set; } + } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/IStorageConnection.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IStorageConnection.SqlServer.cs deleted file mode 100644 index 3ec78ed..0000000 --- a/src/DotNetCore.CAP.SqlServer/IStorageConnection.SqlServer.cs +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Collections.Generic; -using System.Data.SqlClient; -using System.Threading.Tasks; -using Dapper; -using DotNetCore.CAP.Infrastructure; -using DotNetCore.CAP.Messages; -using Microsoft.Extensions.Options; - -namespace DotNetCore.CAP.SqlServer -{ - public class SqlServerStorageConnection : IStorageConnection - { - private readonly CapOptions _capOptions; - - public SqlServerStorageConnection( - IOptions options, - IOptions capOptions) - { - _capOptions = capOptions.Value; - Options = options.Value; - } - - public SqlServerOptions Options { get; } - - public IStorageTransaction CreateTransaction() - { - return new SqlServerStorageTransaction(this); - } - - public async Task GetPublishedMessageAsync(long id) - { - var sql = $@"SELECT * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Id={id}"; - - using (var connection = new SqlConnection(Options.ConnectionString)) - { - return await connection.QueryFirstOrDefaultAsync(sql); - } - } - - public async Task> GetPublishedMessagesOfNeedRetry() - { - var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); - var sql = - $"SELECT TOP (200) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Version='{_capOptions.Version}' AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; - - using (var connection = new SqlConnection(Options.ConnectionString)) - { - return await connection.QueryAsync(sql); - } - } - - public void StoreReceivedMessage(CapReceivedMessage message) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var sql = $@" -INSERT INTO [{Options.Schema}].[Received]([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) -VALUES(@Id,'{_capOptions.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; - - using (var connection = new SqlConnection(Options.ConnectionString)) - { - connection.Execute(sql, message); - } - } - - public async Task GetReceivedMessageAsync(long id) - { - var sql = $@"SELECT * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Id={id}"; - using (var connection = new SqlConnection(Options.ConnectionString)) - { - return await connection.QueryFirstOrDefaultAsync(sql); - } - } - - public async Task> GetReceivedMessagesOfNeedRetry() - { - var fourMinsAgo = DateTime.Now.AddMinutes(-4).ToString("O"); - var sql = - $"SELECT TOP (200) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.FailedRetryCount} AND Version='{_capOptions.Version}' AND Added<'{fourMinsAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; - using (var connection = new SqlConnection(Options.ConnectionString)) - { - return await connection.QueryAsync(sql); - } - } - - public bool ChangePublishedState(long messageId, string state) - { - var sql = - $"UPDATE [{Options.Schema}].[Published] SET Retries=Retries+1,ExpiresAt=NULL,StatusName = '{state}' WHERE Id={messageId}"; - - using (var connection = new SqlConnection(Options.ConnectionString)) - { - return connection.Execute(sql) > 0; - } - } - - public bool ChangeReceivedState(long messageId, string state) - { - var sql = - $"UPDATE [{Options.Schema}].[Received] SET Retries=Retries+1,ExpiresAt=NULL,StatusName = '{state}' WHERE Id={messageId}"; - - using (var connection = new SqlConnection(Options.ConnectionString)) - { - return connection.Execute(sql) > 0; - } - } - } -} \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IStorageInitializer.SqlServer.cs similarity index 60% rename from src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs rename to src/DotNetCore.CAP.SqlServer/IStorageInitializer.SqlServer.cs index f2eec37..e1c83e1 100644 --- a/src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IStorageInitializer.SqlServer.cs @@ -1,59 +1,49 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. -using System; -using System.Data; -using System.Data.SqlClient; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Dapper; -using DotNetCore.CAP.Dashboard; +using DotNetCore.CAP.Persistence; using DotNetCore.CAP.SqlServer.Diagnostics; +using Microsoft.Data.SqlClient; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; namespace DotNetCore.CAP.SqlServer { - public class SqlServerStorage : IStorage + public class SqlServerStorageInitializer : IStorageInitializer { + private readonly DiagnosticProcessorObserver _diagnosticProcessorObserver; private readonly ILogger _logger; - private readonly IOptions _capOptions; private readonly IOptions _options; - private readonly IDbConnection _existingConnection = null; - private readonly DiagnosticProcessorObserver _diagnosticProcessorObserver; - public SqlServerStorage( - ILogger logger, - IOptions capOptions, + public SqlServerStorageInitializer( + ILogger logger, IOptions options, DiagnosticProcessorObserver diagnosticProcessorObserver) { _options = options; _diagnosticProcessorObserver = diagnosticProcessorObserver; _logger = logger; - _capOptions = capOptions; } - public IStorageConnection GetConnection() + public string GetPublishedTableName() { - return new SqlServerStorageConnection(_options, _capOptions); + return $"[{_options.Value.Schema}].[Published]"; } - public IMonitoringApi GetMonitoringApi() + public string GetReceivedTableName() { - return new SqlServerMonitoringApi(this, _options); + return $"[{_options.Value.Schema}].[Received]"; } - public async Task InitializeAsync(CancellationToken cancellationToken = default(CancellationToken)) + public async Task InitializeAsync(CancellationToken cancellationToken) { - if (cancellationToken.IsCancellationRequested) - { - return; - } + if (cancellationToken.IsCancellationRequested) return; var sql = CreateDbTablesScript(_options.Value.Schema); - using (var connection = new SqlConnection(_options.Value.ConnectionString)) { await connection.ExecuteAsync(sql); @@ -64,6 +54,7 @@ namespace DotNetCore.CAP.SqlServer DiagnosticListener.AllListeners.Subscribe(_diagnosticProcessorObserver); } + protected virtual string CreateDbTablesScript(string schema) { var batchSql = @@ -111,45 +102,5 @@ CREATE TABLE [{schema}].[Published]( END;"; return batchSql; } - - internal T UseConnection(Func func) - { - IDbConnection connection = null; - - try - { - connection = CreateAndOpenConnection(); - return func(connection); - } - finally - { - ReleaseConnection(connection); - } - } - - internal IDbConnection CreateAndOpenConnection() - { - var connection = _existingConnection ?? new SqlConnection(_options.Value.ConnectionString); - - if (connection.State == ConnectionState.Closed) - { - connection.Open(); - } - - return connection; - } - - internal bool IsExistingConnection(IDbConnection connection) - { - return connection != null && ReferenceEquals(connection, _existingConnection); - } - - internal void ReleaseConnection(IDbConnection connection) - { - if (connection != null && !IsExistingConnection(connection)) - { - connection.Dispose(); - } - } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/IStorageTransaction.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IStorageTransaction.SqlServer.cs deleted file mode 100644 index 33fcce7..0000000 --- a/src/DotNetCore.CAP.SqlServer/IStorageTransaction.SqlServer.cs +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright (c) .NET Core Community. All rights reserved. -// Licensed under the MIT License. See License.txt in the project root for license information. - -using System; -using System.Data; -using System.Data.SqlClient; -using System.Threading.Tasks; -using Dapper; -using DotNetCore.CAP.Messages; - -namespace DotNetCore.CAP.SqlServer -{ - public class SqlServerStorageTransaction : IStorageTransaction - { - private readonly IDbConnection _dbConnection; - private readonly string _schema; - - public SqlServerStorageTransaction(SqlServerStorageConnection connection) - { - var options = connection.Options; - _schema = options.Schema; - - _dbConnection = new SqlConnection(options.ConnectionString); - _dbConnection.Open(); - } - - public void UpdateMessage(CapPublishedMessage message) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var sql = - $"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; - _dbConnection.Execute(sql, message); - } - - public void UpdateMessage(CapReceivedMessage message) - { - if (message == null) - { - throw new ArgumentNullException(nameof(message)); - } - - var sql = - $"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; - _dbConnection.Execute(sql, message); - } - - public Task CommitAsync() - { - return Task.CompletedTask; - } - - public void Dispose() - { - _dbConnection.Dispose(); - } - } -} \ No newline at end of file