@@ -19,14 +19,14 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// EF DbContext | |||
/// </summary> | |||
internal Type DbContextType { get; set; } | |||
internal Type? DbContextType { get; set; } | |||
internal bool IsSqlServer2008 { get; set; } | |||
/// <summary> | |||
/// Data version | |||
/// </summary> | |||
internal string Version { get; set; } | |||
internal string Version { get; set; } = default!; | |||
public EFOptions UseSqlServer2008() | |||
{ | |||
@@ -13,7 +13,7 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// Gets or sets the database's connection string that will be used to store database entities. | |||
/// </summary> | |||
public string ConnectionString { get; set; } | |||
public string ConnectionString { get; set; } = default!; | |||
} | |||
@@ -4,6 +4,7 @@ | |||
using System; | |||
using System.Collections.Concurrent; | |||
using System.Collections.Generic; | |||
using System.Diagnostics.CodeAnalysis; | |||
using System.Reflection; | |||
using DotNetCore.CAP.Persistence; | |||
using DotNetCore.CAP.Transport; | |||
@@ -11,7 +12,7 @@ using Microsoft.Data.SqlClient; | |||
namespace DotNetCore.CAP.SqlServer.Diagnostics | |||
{ | |||
internal class DiagnosticObserver : IObserver<KeyValuePair<string, object>> | |||
internal class DiagnosticObserver : IObserver<KeyValuePair<string, object?>> | |||
{ | |||
public const string SqlAfterCommitTransactionMicrosoft = "Microsoft.Data.SqlClient.WriteTransactionCommitAfter"; | |||
public const string SqlErrorCommitTransactionMicrosoft = "Microsoft.Data.SqlClient.WriteTransactionCommitError"; | |||
@@ -36,41 +37,48 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics | |||
{ | |||
} | |||
public void OnNext(KeyValuePair<string, object> evt) | |||
public void OnNext(KeyValuePair<string, object?> evt) | |||
{ | |||
if (evt.Key == SqlAfterCommitTransactionMicrosoft) | |||
switch (evt.Key) | |||
{ | |||
if (!TryGetSqlConnection(evt, out SqlConnection sqlConnection)) return; | |||
var transactionKey = sqlConnection.ClientConnectionId; | |||
if (_bufferList.TryRemove(transactionKey, out var msgList)) | |||
case SqlAfterCommitTransactionMicrosoft: | |||
{ | |||
foreach (var message in msgList) | |||
if (!TryGetSqlConnection(evt, out SqlConnection? sqlConnection)) return; | |||
var transactionKey = sqlConnection.ClientConnectionId; | |||
if (_bufferList.TryRemove(transactionKey, out var msgList)) | |||
{ | |||
_dispatcher.EnqueueToPublish(message); | |||
foreach (var message in msgList) | |||
{ | |||
_dispatcher.EnqueueToPublish(message); | |||
} | |||
} | |||
break; | |||
} | |||
} | |||
else if (evt.Key == SqlErrorCommitTransactionMicrosoft || evt.Key == SqlAfterRollbackTransactionMicrosoft || evt.Key == SqlBeforeCloseConnectionMicrosoft) | |||
{ | |||
if (!_bufferList.IsEmpty) | |||
case SqlErrorCommitTransactionMicrosoft or SqlAfterRollbackTransactionMicrosoft or SqlBeforeCloseConnectionMicrosoft: | |||
{ | |||
if (!TryGetSqlConnection(evt, out SqlConnection sqlConnection)) return; | |||
var transactionKey = sqlConnection.ClientConnectionId; | |||
if (!_bufferList.IsEmpty) | |||
{ | |||
if (!TryGetSqlConnection(evt, out SqlConnection? sqlConnection)) return; | |||
var transactionKey = sqlConnection.ClientConnectionId; | |||
_bufferList.TryRemove(transactionKey, out _); | |||
} | |||
_bufferList.TryRemove(transactionKey, out _); | |||
break; | |||
} | |||
} | |||
} | |||
private static bool TryGetSqlConnection(KeyValuePair<string, object> evt, out SqlConnection sqlConnection) | |||
private static bool TryGetSqlConnection(KeyValuePair<string, object?> evt, [NotNullWhen(true)] out SqlConnection? sqlConnection) | |||
{ | |||
sqlConnection = GetProperty(evt.Value, "Connection") as SqlConnection; | |||
return sqlConnection != null; | |||
} | |||
private static object GetProperty(object _this, string propertyName) | |||
private static object? GetProperty(object? @this, string propertyName) | |||
{ | |||
return _this.GetType().GetTypeInfo().GetDeclaredProperty(propertyName)?.GetValue(_this); | |||
return @this?.GetType().GetTypeInfo().GetDeclaredProperty(propertyName)?.GetValue(@this); | |||
} | |||
} | |||
} |
@@ -34,7 +34,9 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics | |||
public void OnNext(DiagnosticListener listener) | |||
{ | |||
if (listener.Name == DiagnosticListenerName) | |||
{ | |||
listener.Subscribe(new DiagnosticObserver(_dispatcher, BufferList)); | |||
} | |||
} | |||
} | |||
} |
@@ -2,14 +2,8 @@ | |||
<PropertyGroup> | |||
<TargetFrameworks>net6.0;netstandard2.1</TargetFrameworks> | |||
<AssemblyName>DotNetCore.CAP.SqlServer</AssemblyName> | |||
<Nullable>enable</Nullable> | |||
<PackageTags>$(PackageTags);SQL Server</PackageTags> | |||
</PropertyGroup> | |||
<PropertyGroup> | |||
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.SqlServer.xml</DocumentationFile> | |||
<NoWarn>1701;1702;1705;CS1591</NoWarn> | |||
</PropertyGroup> | |||
<ItemGroup Condition=" '$(TargetFramework)' == 'net6.0' "> | |||
@@ -46,7 +46,7 @@ namespace DotNetCore.CAP | |||
if (dbTransaction == null) throw new ArgumentNullException(nameof(DbTransaction)); | |||
} | |||
var transactionKey = ((SqlConnection)dbTransaction.Connection).ClientConnectionId; | |||
var transactionKey = ((SqlConnection)dbTransaction.Connection!).ClientConnectionId; | |||
if (_diagnosticProcessor.BufferList.TryGetValue(transactionKey, out var list)) | |||
{ | |||
list.Add(msg); | |||
@@ -166,7 +166,7 @@ namespace DotNetCore.CAP | |||
var dbTransaction = dbConnection.BeginTransaction(); | |||
publisher.Transaction.Value = ActivatorUtilities.CreateInstance<SqlServerCapTransaction>(publisher.ServiceProvider); | |||
var capTransaction = publisher.Transaction.Value.Begin(dbTransaction, autoCommit); | |||
return (IDbTransaction)capTransaction.DbTransaction; | |||
return (IDbTransaction)capTransaction.DbTransaction!; | |||
} | |||
/// <summary> | |||
@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.SqlServer | |||
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) => | |||
await ChangeMessageStateAsync(_recName, message, state); | |||
public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null) | |||
public MediumMessage StoreMessage(string name, Message content, object? dbTransaction = null) | |||
{ | |||
var sql = $"INSERT INTO {_pubName} ([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" + | |||
$"VALUES(@Id,'{_options.Value.Version}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; | |||
@@ -84,7 +84,7 @@ namespace DotNetCore.CAP.SqlServer | |||
dbTrans = dbContextTrans.GetDbTransaction(); | |||
var conn = dbTrans?.Connection; | |||
conn.ExecuteNonQuery(sql, dbTrans, sqlParams); | |||
conn!.ExecuteNonQuery(sql, dbTrans, sqlParams); | |||
} | |||
return message; | |||
@@ -126,7 +126,7 @@ namespace DotNetCore.CAP.SqlServer | |||
new SqlParameter("@Content", _serializer.Serialize(mdMessage.Origin)), | |||
new SqlParameter("@Retries", mdMessage.Retries), | |||
new SqlParameter("@Added", mdMessage.Added), | |||
new SqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? (object) mdMessage.ExpiresAt.Value : DBNull.Value), | |||
new SqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? mdMessage.ExpiresAt.Value : DBNull.Value), | |||
new SqlParameter("@StatusName", nameof(StatusName.Scheduled)) | |||
}; | |||
@@ -200,7 +200,7 @@ namespace DotNetCore.CAP.SqlServer | |||
messages.Add(new MediumMessage | |||
{ | |||
DbId = reader.GetInt64(0).ToString(), | |||
Origin = _serializer.Deserialize(reader.GetString(1)), | |||
Origin = _serializer.Deserialize(reader.GetString(1))!, | |||
Retries = reader.GetInt32(2), | |||
Added = reader.GetDateTime(3) | |||
}); | |||
@@ -9,7 +9,7 @@ namespace DotNetCore.CAP.SqlServer | |||
{ | |||
internal static class DbConnectionExtensions | |||
{ | |||
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction transaction = null, | |||
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction? transaction = null, | |||
params object[] sqlParams) | |||
{ | |||
if (connection.State == ConnectionState.Closed) | |||
@@ -33,7 +33,7 @@ namespace DotNetCore.CAP.SqlServer | |||
return command.ExecuteNonQuery(); | |||
} | |||
public static T ExecuteReader<T>(this IDbConnection connection, string sql, Func<IDataReader, T> readerFunc, | |||
public static T ExecuteReader<T>(this IDbConnection connection, string sql, Func<IDataReader, T>? readerFunc, | |||
params object[] sqlParams) | |||
{ | |||
if (connection.State == ConnectionState.Closed) | |||
@@ -51,7 +51,7 @@ namespace DotNetCore.CAP.SqlServer | |||
var reader = command.ExecuteReader(); | |||
T result = default; | |||
T result = default!; | |||
if (readerFunc != null) | |||
{ | |||
result = readerFunc(reader); | |||
@@ -77,14 +77,14 @@ namespace DotNetCore.CAP.SqlServer | |||
var objValue = command.ExecuteScalar(); | |||
T result = default; | |||
T result = default!; | |||
if (objValue != null) | |||
{ | |||
var returnType = typeof(T); | |||
var converter = TypeDescriptor.GetConverter(returnType); | |||
if (converter.CanConvertFrom(objValue.GetType())) | |||
{ | |||
result = (T)converter.ConvertFrom(objValue); | |||
result = (T)converter.ConvertFrom(objValue)!; | |||
} | |||
else | |||
{ | |||
@@ -18,7 +18,7 @@ namespace Microsoft.EntityFrameworkCore.Storage | |||
public CapEFDbTransaction(ICapTransaction transaction) | |||
{ | |||
_transaction = transaction; | |||
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction; | |||
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction!; | |||
TransactionId = dbContextTransaction.TransactionId; | |||
} | |||
@@ -58,7 +58,7 @@ namespace Microsoft.EntityFrameworkCore.Storage | |||
{ | |||
get | |||
{ | |||
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction; | |||
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction!; | |||
return dbContextTransaction.GetDbTransaction(); | |||
} | |||
} | |||
@@ -131,7 +131,7 @@ SELECT | |||
Content = reader.GetString(index++), | |||
Retries = reader.GetInt32(index++), | |||
Added = reader.GetDateTime(index++), | |||
ExpiresAt = reader.IsDBNull(index++) ? (DateTime?)null : reader.GetDateTime(index - 1), | |||
ExpiresAt = reader.IsDBNull(index++) ? null : reader.GetDateTime(index - 1), | |||
StatusName = reader.GetString(index) | |||
}); | |||
} | |||
@@ -162,9 +162,9 @@ SELECT | |||
return GetNumberOfMessage(_recName, nameof(StatusName.Succeeded)); | |||
} | |||
public async Task<MediumMessage> GetPublishedMessageAsync(long id) => await GetMessageAsync(_pubName, id); | |||
public async Task<MediumMessage?> GetPublishedMessageAsync(long id) => await GetMessageAsync(_pubName, id); | |||
public async Task<MediumMessage> GetReceivedMessageAsync(long id) => await GetMessageAsync(_recName, id); | |||
public async Task<MediumMessage?> GetReceivedMessageAsync(long id) => await GetMessageAsync(_recName, id); | |||
private int GetNumberOfMessage(string tableName, string statusName) | |||
{ | |||
@@ -254,14 +254,14 @@ select [Key], [Count] from aggr with (nolock) where [Key] >= @minKey and [Key] < | |||
return result; | |||
} | |||
private async Task<MediumMessage> GetMessageAsync(string tableName, long id) | |||
private async Task<MediumMessage?> GetMessageAsync(string tableName, long id) | |||
{ | |||
var sql = $@"SELECT TOP 1 Id AS DbId, Content, Added, ExpiresAt, Retries FROM {tableName} WITH (readpast) WHERE Id={id}"; | |||
await using var connection = new SqlConnection(_options.ConnectionString); | |||
var mediumMessage = connection.ExecuteReader(sql, reader => | |||
{ | |||
MediumMessage message = null; | |||
MediumMessage? message = null; | |||
while (reader.Read()) | |||
{ | |||