|
|
@@ -13,9 +13,7 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics |
|
|
|
{ |
|
|
|
internal class DiagnosticObserver : IObserver<KeyValuePair<string, object>> |
|
|
|
{ |
|
|
|
public const string SqlAfterCommitTransaction = "System.Data.SqlClient.WriteTransactionCommitAfter"; |
|
|
|
public const string SqlAfterCommitTransactionMicrosoft = "Microsoft.Data.SqlClient.WriteTransactionCommitAfter"; |
|
|
|
public const string SqlErrorCommitTransaction = "System.Data.SqlClient.WriteTransactionCommitError"; |
|
|
|
public const string SqlErrorCommitTransactionMicrosoft = "Microsoft.Data.SqlClient.WriteTransactionCommitError"; |
|
|
|
|
|
|
|
private readonly ConcurrentDictionary<Guid, List<MediumMessage>> _bufferList; |
|
|
@@ -38,9 +36,9 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics |
|
|
|
|
|
|
|
public void OnNext(KeyValuePair<string, object> evt) |
|
|
|
{ |
|
|
|
if (evt.Key == SqlAfterCommitTransaction || evt.Key == SqlAfterCommitTransactionMicrosoft) |
|
|
|
if (evt.Key == SqlAfterCommitTransactionMicrosoft) |
|
|
|
{ |
|
|
|
var sqlConnection = (SqlConnection)GetProperty(evt.Value, "Connection"); |
|
|
|
if (!TryGetSqlConnection(evt, out SqlConnection sqlConnection)) return; |
|
|
|
var transactionKey = sqlConnection.ClientConnectionId; |
|
|
|
if (_bufferList.TryRemove(transactionKey, out var msgList)) |
|
|
|
foreach (var message in msgList) |
|
|
@@ -48,15 +46,21 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics |
|
|
|
_dispatcher.EnqueueToPublish(message); |
|
|
|
} |
|
|
|
} |
|
|
|
else if (evt.Key == SqlErrorCommitTransaction || evt.Key == SqlErrorCommitTransactionMicrosoft) |
|
|
|
else if (evt.Key == SqlErrorCommitTransactionMicrosoft) |
|
|
|
{ |
|
|
|
var sqlConnection = (SqlConnection)GetProperty(evt.Value, "Connection"); |
|
|
|
if (!TryGetSqlConnection(evt, out SqlConnection sqlConnection)) return; |
|
|
|
var transactionKey = sqlConnection.ClientConnectionId; |
|
|
|
|
|
|
|
_bufferList.TryRemove(transactionKey, out _); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private static bool TryGetSqlConnection(KeyValuePair<string, object> evt, out SqlConnection sqlConnection) |
|
|
|
{ |
|
|
|
sqlConnection = GetProperty(evt.Value, "Connection") as SqlConnection; |
|
|
|
return sqlConnection != null; |
|
|
|
} |
|
|
|
|
|
|
|
private static object GetProperty(object _this, string propertyName) |
|
|
|
{ |
|
|
|
return _this.GetType().GetTypeInfo().GetDeclaredProperty(propertyName)?.GetValue(_this); |
|
|
|