diff --git a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs index db0d73f..dec70ec 100644 --- a/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs +++ b/src/DotNetCore.CAP.SqlServer/CAP.SqlServerCapOptionsExtension.cs @@ -4,6 +4,7 @@ using System; using DotNetCore.CAP.Processor; using DotNetCore.CAP.SqlServer; +using DotNetCore.CAP.SqlServer.Diagnostics; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; @@ -22,6 +23,7 @@ namespace DotNetCore.CAP public void AddServices(IServiceCollection services) { services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); diff --git a/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticObserver.cs b/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticObserver.cs new file mode 100644 index 0000000..72ac344 --- /dev/null +++ b/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticObserver.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Data.SqlClient; +using System.Reflection; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.SqlServer.Diagnostics +{ + internal class DiagnosticObserver : IObserver> + { + private readonly IDispatcher _dispatcher; + private readonly ConcurrentDictionary> _bufferList; + + public DiagnosticObserver(IDispatcher dispatcher, + ConcurrentDictionary> bufferList) + { + _dispatcher = dispatcher; + _bufferList = bufferList; + } + + private const string SqlClientPrefix = "System.Data.SqlClient."; + + public const string SqlAfterCommitTransaction = SqlClientPrefix + "WriteTransactionCommitAfter"; + public const string SqlErrorCommitTransaction = SqlClientPrefix + "WriteTransactionCommitError"; + + public void OnCompleted() + { + + } + + public void OnError(Exception error) + { + + } + + public void OnNext(KeyValuePair evt) + { + if (evt.Key == SqlAfterCommitTransaction) + { + var sqlConnection = (SqlConnection) GetProperty(evt.Value, "Connection"); + var transactionKey = sqlConnection.ClientConnectionId; + if (_bufferList.TryRemove(transactionKey, out var msgList)) + { + foreach (var message in msgList) + { + _dispatcher.EnqueueToPublish(message); + } + } + } + else if (evt.Key == SqlErrorCommitTransaction) + { + var sqlConnection = (SqlConnection) GetProperty(evt.Value, "Connection"); + var transactionKey = sqlConnection.ClientConnectionId; + + _bufferList.TryRemove(transactionKey, out _); + } + } + + static object GetProperty(object _this, string propertyName) + { + return _this.GetType().GetTypeInfo().GetDeclaredProperty(propertyName)?.GetValue(_this); + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticProcessorObserver.cs b/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticProcessorObserver.cs new file mode 100644 index 0000000..d7e5102 --- /dev/null +++ b/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticProcessorObserver.cs @@ -0,0 +1,38 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using DotNetCore.CAP.Models; + +namespace DotNetCore.CAP.SqlServer.Diagnostics +{ + public class DiagnosticProcessorObserver : IObserver + { + private readonly IDispatcher _dispatcher; + public const string DiagnosticListenerName = "SqlClientDiagnosticListener"; + + public ConcurrentDictionary> BufferList { get; } + + public DiagnosticProcessorObserver(IDispatcher dispatcher) + { + _dispatcher = dispatcher; + BufferList = new ConcurrentDictionary>(); + } + + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + } + + public void OnNext(DiagnosticListener listener) + { + if (listener.Name == DiagnosticListenerName) + { + listener.Subscribe(new DiagnosticObserver(_dispatcher, BufferList)); + } + } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs index a8e051e..19d1eec 100644 --- a/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/ICapTransaction.SqlServer.cs @@ -1,8 +1,12 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System; +using System.Collections.Generic; using System.Data; -using System.Diagnostics; +using System.Data.SqlClient; +using DotNetCore.CAP.Models; +using DotNetCore.CAP.SqlServer.Diagnostics; using Microsoft.EntityFrameworkCore.Infrastructure; using Microsoft.EntityFrameworkCore.Storage; @@ -11,50 +15,68 @@ namespace DotNetCore.CAP { public class SqlServerCapTransaction : CapTransactionBase { - public SqlServerCapTransaction(IDispatcher dispatcher) : base(dispatcher) + private readonly DiagnosticProcessorObserver _diagnosticProcessor; + + public SqlServerCapTransaction(IDispatcher dispatcher, + DiagnosticProcessorObserver diagnosticProcessor) : base(dispatcher) { + _diagnosticProcessor = diagnosticProcessor; } - public override void Commit() + protected override void AddToSent(CapPublishedMessage msg) { - Debug.Assert(DbTransaction != null); - - switch (DbTransaction) + var transactionKey = ((SqlConnection)((IDbTransaction)DbTransaction).Connection).ClientConnectionId; + if (_diagnosticProcessor.BufferList.TryGetValue(transactionKey, out var list)) { - case IDbTransaction dbTransaction: - dbTransaction.Commit(); - break; - case IDbContextTransaction dbContextTransaction: - dbContextTransaction.Commit(); - break; + list.Add(msg); } + else + { + var msgList = new List(1) { msg }; + _diagnosticProcessor.BufferList.TryAdd(transactionKey, msgList); + } + } - Flush(); + public override void Commit() + { + throw new NotImplementedException(); } public override void Rollback() { - Debug.Assert(DbTransaction != null); - - switch (DbTransaction) - { - case IDbTransaction dbTransaction: - dbTransaction.Rollback(); - break; - case IDbContextTransaction dbContextTransaction: - dbContextTransaction.Rollback(); - break; - } + throw new NotImplementedException(); } public override void Dispose() { - (DbTransaction as IDbTransaction)?.Dispose(); + } } public static class CapTransactionExtensions { + public static ICapTransaction Begin(this ICapTransaction transaction, + IDbTransaction dbTransaction, bool autoCommit = false) + { + transaction.DbTransaction = dbTransaction; + transaction.AutoCommit = autoCommit; + + return transaction; + } + + public static IDbTransaction BeginTransaction(this IDbConnection dbConnection, + ICapPublisher publisher, bool autoCommit = false) + { + if (dbConnection.State == ConnectionState.Closed) + { + dbConnection.Open(); + } + + var dbTransaction = dbConnection.BeginTransaction(); + var capTransaction = publisher.Transaction.Begin(dbTransaction, autoCommit); + return (IDbTransaction)capTransaction.DbTransaction; + } + public static ICapTransaction Begin(this ICapTransaction transaction, IDbContextTransaction dbTransaction, bool autoCommit = false) { @@ -64,7 +86,7 @@ namespace DotNetCore.CAP return transaction; } - public static IDbContextTransaction BeginAndJoinToTransaction(this DatabaseFacade database, + public static IDbContextTransaction BeginTransaction(this DatabaseFacade database, ICapPublisher publisher, bool autoCommit = false) { var trans = database.BeginTransaction(); diff --git a/src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs index 5b6dbd5..f6995c3 100644 --- a/src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IStorage.SqlServer.cs @@ -4,10 +4,12 @@ using System; using System.Data; using System.Data.SqlClient; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Dapper; using DotNetCore.CAP.Dashboard; +using DotNetCore.CAP.SqlServer.Diagnostics; using Microsoft.Extensions.Logging; namespace DotNetCore.CAP.SqlServer @@ -18,12 +20,15 @@ namespace DotNetCore.CAP.SqlServer private readonly IDbConnection _existingConnection = null; private readonly ILogger _logger; private readonly SqlServerOptions _options; + private readonly DiagnosticProcessorObserver _diagnosticProcessorObserver; public SqlServerStorage(ILogger logger, CapOptions capOptions, - SqlServerOptions options) + SqlServerOptions options, + DiagnosticProcessorObserver diagnosticProcessorObserver) { _options = options; + _diagnosticProcessorObserver = diagnosticProcessorObserver; _logger = logger; _capOptions = capOptions; } @@ -53,6 +58,8 @@ namespace DotNetCore.CAP.SqlServer } _logger.LogDebug("Ensuring all create database tables script are applied."); + + DiagnosticListener.AllListeners.Subscribe(_diagnosticProcessorObserver); } protected virtual string CreateDbTablesScript(string schema) diff --git a/src/DotNetCore.CAP.SqlServer/IStorageTransaction.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IStorageTransaction.SqlServer.cs index 687639d..96cc849 100644 --- a/src/DotNetCore.CAP.SqlServer/IStorageTransaction.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IStorageTransaction.SqlServer.cs @@ -14,7 +14,6 @@ namespace DotNetCore.CAP.SqlServer { private readonly IDbConnection _dbConnection; - private readonly IDbTransaction _dbTransaction; private readonly string _schema; public SqlServerStorageTransaction(SqlServerStorageConnection connection) @@ -24,7 +23,6 @@ namespace DotNetCore.CAP.SqlServer _dbConnection = new SqlConnection(options.ConnectionString); _dbConnection.Open(); - _dbTransaction = _dbConnection.BeginTransaction(IsolationLevel.ReadCommitted); } public void UpdateMessage(CapPublishedMessage message) @@ -36,7 +34,7 @@ namespace DotNetCore.CAP.SqlServer var sql = $"UPDATE [{_schema}].[Published] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; - _dbConnection.Execute(sql, message, _dbTransaction); + _dbConnection.Execute(sql, message); } public void UpdateMessage(CapReceivedMessage message) @@ -48,18 +46,16 @@ namespace DotNetCore.CAP.SqlServer var sql = $"UPDATE [{_schema}].[Received] SET [Retries] = @Retries,[Content] = @Content,[ExpiresAt] = @ExpiresAt,[StatusName]=@StatusName WHERE Id=@Id;"; - _dbConnection.Execute(sql, message, _dbTransaction); + _dbConnection.Execute(sql, message); } public Task CommitAsync() { - _dbTransaction.Commit(); return Task.CompletedTask; } public void Dispose() { - _dbTransaction.Dispose(); _dbConnection.Dispose(); } } diff --git a/test/DotNetCore.CAP.SqlServer.Test/DatabaseTestHost.cs b/test/DotNetCore.CAP.SqlServer.Test/DatabaseTestHost.cs index 9c40b85..218fedb 100644 --- a/test/DotNetCore.CAP.SqlServer.Test/DatabaseTestHost.cs +++ b/test/DotNetCore.CAP.SqlServer.Test/DatabaseTestHost.cs @@ -2,16 +2,18 @@ using System; using System.Data; using System.Data.SqlClient; using Dapper; +using DotNetCore.CAP.SqlServer.Diagnostics; using Microsoft.Extensions.Logging; using Moq; namespace DotNetCore.CAP.SqlServer.Test { - public abstract class DatabaseTestHost:IDisposable + public abstract class DatabaseTestHost : IDisposable { protected ILogger Logger; protected CapOptions CapOptions; protected SqlServerOptions SqlSeverOptions; + protected DiagnosticProcessorObserver DiagnosticProcessorObserver; public bool SqlObjectInstalled; @@ -23,6 +25,8 @@ namespace DotNetCore.CAP.SqlServer.Test .SetupProperty(x => x.ConnectionString, ConnectionUtil.GetConnectionString()) .Object; + DiagnosticProcessorObserver = new Mock().Object; + InitializeDatabase(); } @@ -42,7 +46,7 @@ IF NOT EXISTS (SELECT * FROM sysdatabases WHERE name = N'{databaseName}') CREATE DATABASE [{databaseName}];"); } - new SqlServerStorage(Logger, CapOptions, SqlSeverOptions).InitializeAsync().GetAwaiter().GetResult(); + new SqlServerStorage(Logger, CapOptions, SqlSeverOptions, DiagnosticProcessorObserver).InitializeAsync().GetAwaiter().GetResult(); SqlObjectInstalled = true; }