From 71df4dfdd9d95c8419f7af55601fe9ed352d7caf Mon Sep 17 00:00:00 2001 From: Savorboard Date: Thu, 21 Nov 2019 15:14:04 +0800 Subject: [PATCH] Introduced rewrite table name option --- .../Diagnostics/DiagnosticObserver.cs | 15 ++--- .../IDataStorage.SqlServer.cs | 29 ++++++---- .../IMonitoringApi.SqlServer.cs | 56 ++++++++++--------- .../IStorageInitializer.SqlServer.cs | 8 +-- 4 files changed, 59 insertions(+), 49 deletions(-) diff --git a/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticObserver.cs b/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticObserver.cs index 1f43bd7..e13f533 100644 --- a/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticObserver.cs +++ b/src/DotNetCore.CAP.SqlServer/Diagnostics/DiagnosticObserver.cs @@ -13,10 +13,11 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics { internal class DiagnosticObserver : IObserver> { - private const string SqlClientPrefix = "System.Data.SqlClient."; + public const string SqlAfterCommitTransaction = "System.Data.SqlClient.WriteTransactionCommitAfter"; + public const string SqlAfterCommitTransactionMicrosoft = "Microsoft.Data.SqlClient.WriteTransactionCommitAfter"; + public const string SqlErrorCommitTransaction = "System.Data.SqlClient.WriteTransactionCommitError"; + public const string SqlErrorCommitTransactionMicrosoft = "Microsoft.Data.SqlClient.WriteTransactionCommitError"; - public const string SqlAfterCommitTransaction = SqlClientPrefix + "WriteTransactionCommitAfter"; - public const string SqlErrorCommitTransaction = SqlClientPrefix + "WriteTransactionCommitError"; private readonly ConcurrentDictionary> _bufferList; private readonly IDispatcher _dispatcher; @@ -37,9 +38,9 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics public void OnNext(KeyValuePair evt) { - if (evt.Key == SqlAfterCommitTransaction) + if (evt.Key == SqlAfterCommitTransaction || evt.Key == SqlAfterCommitTransactionMicrosoft) { - var sqlConnection = (SqlConnection) GetProperty(evt.Value, "Connection"); + var sqlConnection = (SqlConnection)GetProperty(evt.Value, "Connection"); var transactionKey = sqlConnection.ClientConnectionId; if (_bufferList.TryRemove(transactionKey, out var msgList)) foreach (var message in msgList) @@ -47,9 +48,9 @@ namespace DotNetCore.CAP.SqlServer.Diagnostics _dispatcher.EnqueueToPublish(message); } } - else if (evt.Key == SqlErrorCommitTransaction) + else if (evt.Key == SqlErrorCommitTransaction || evt.Key == SqlErrorCommitTransactionMicrosoft) { - var sqlConnection = (SqlConnection) GetProperty(evt.Value, "Connection"); + var sqlConnection = (SqlConnection)GetProperty(evt.Value, "Connection"); var transactionKey = sqlConnection.ClientConnectionId; _bufferList.TryRemove(transactionKey, out _); diff --git a/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs index cdc9406..cd4c29f 100644 --- a/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs @@ -22,19 +22,26 @@ namespace DotNetCore.CAP.SqlServer { private readonly IOptions _capOptions; private readonly IOptions _options; + private readonly IStorageInitializer _initializer; + private readonly string _pubName; + private readonly string _recName; public SqlServerDataStorage( IOptions capOptions, - IOptions options) + IOptions options, + IStorageInitializer initializer) { _options = options; + _initializer = initializer; _capOptions = capOptions; + _pubName = initializer.GetPublishedTableName(); + _recName = initializer.GetReceivedTableName(); } public async Task ChangePublishStateAsync(MediumMessage message, StatusName state) { var sql = - $"UPDATE [{_options.Value.Schema}].[Published] SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id"; + $"UPDATE {_pubName} SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id"; await using var connection = new SqlConnection(_options.Value.ConnectionString); await connection.ExecuteAsync(sql, new { @@ -48,7 +55,7 @@ namespace DotNetCore.CAP.SqlServer public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) { var sql = - $"UPDATE [{_options.Value.Schema}].[Received] SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id"; + $"UPDATE {_recName} SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id"; await using var connection = new SqlConnection(_options.Value.ConnectionString); await connection.ExecuteAsync(sql, new { @@ -62,7 +69,7 @@ namespace DotNetCore.CAP.SqlServer public async Task StoreMessageAsync(string name, Message content, object dbTransaction = null, CancellationToken cancellationToken = default) { - var sql = $"INSERT INTO {_options.Value.Schema}.[Published] ([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" + + var sql = $"INSERT INTO {_pubName} ([Id],[Version],[Name],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" + $"VALUES(@Id,'{_options.Value}',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var message = new MediumMessage @@ -107,7 +114,7 @@ namespace DotNetCore.CAP.SqlServer public async Task StoreReceivedExceptionMessageAsync(string name, string group, string content) { var sql = - $"INSERT INTO [{_options.Value.Schema}].[Received]([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" + + $"INSERT INTO {_recName}([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" + $"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; await using var connection = new SqlConnection(_options.Value.ConnectionString); @@ -127,7 +134,7 @@ namespace DotNetCore.CAP.SqlServer public async Task StoreReceivedMessageAsync(string name, string group, Message message) { var sql = - $"INSERT INTO [{_options.Value.Schema}].[Received]([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" + + $"INSERT INTO {_recName}([Id],[Version],[Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])" + $"VALUES(@Id,'{_capOptions.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var mdMessage = new MediumMessage @@ -159,14 +166,14 @@ namespace DotNetCore.CAP.SqlServer { await using var connection = new SqlConnection(_options.Value.ConnectionString); return await connection.ExecuteAsync( - $"DELETE TOP (@batchCount) FROM [{_options.Value.Schema}].[{table}] WITH (readpast) WHERE ExpiresAt < @timeout;", - new {timeout, batchCount}); + $"DELETE TOP (@batchCount) FROM {table} WITH (readpast) WHERE ExpiresAt < @timeout;", + new { timeout, batchCount }); } public async Task> GetPublishedMessagesOfNeedRetry() { var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); - var sql = $"SELECT TOP (200) * FROM [{_options.Value.Schema}].[Published] WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " + + var sql = $"SELECT TOP (200) * FROM {_pubName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " + $"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; var result = new List(); @@ -190,7 +197,7 @@ namespace DotNetCore.CAP.SqlServer { var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); var sql = - $"SELECT TOP (200) * FROM [{_options.Value.Schema}].[Received] WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " + + $"SELECT TOP (200) * FROM {_recName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " + $"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; var result = new List(); @@ -213,7 +220,7 @@ namespace DotNetCore.CAP.SqlServer public IMonitoringApi GetMonitoringApi() { - return new SqlServerMonitoringApi(_options); + return new SqlServerMonitoringApi(_options, _initializer); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs index 6172944..9daf00d 100644 --- a/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs @@ -19,21 +19,24 @@ namespace DotNetCore.CAP.SqlServer internal class SqlServerMonitoringApi : IMonitoringApi { private readonly SqlServerOptions _options; + private readonly string _pubName; + private readonly string _recName; - public SqlServerMonitoringApi(IOptions options) + public SqlServerMonitoringApi(IOptions options, IStorageInitializer initializer) { _options = options.Value ?? throw new ArgumentNullException(nameof(options)); + _pubName = initializer.GetPublishedTableName(); + _recName = initializer.GetReceivedTableName(); } public StatisticsDto GetStatistics() { - var sql = string.Format(@" + var sql = $@" set transaction isolation level read committed; -select count(Id) from [{0}].Published with (nolock) where StatusName = N'Succeeded'; -select count(Id) from [{0}].Received with (nolock) where StatusName = N'Succeeded'; -select count(Id) from [{0}].Published with (nolock) where StatusName = N'Failed'; -select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed';", - _options.Schema); +select count(Id) from {_pubName} with (nolock) where StatusName = N'Succeeded'; +select count(Id) from {_recName} with (nolock) where StatusName = N'Succeeded'; +select count(Id) from {_pubName} with (nolock) where StatusName = N'Failed'; +select count(Id) from {_recName} with (nolock) where StatusName = N'Failed';"; var statistics = UseConnection(connection => { @@ -54,21 +57,21 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; public IDictionary HourlyFailedJobs(MessageType type) { - var tableName = type == MessageType.Publish ? "Published" : "Received"; + var tableName = type == MessageType.Publish ? _pubName : _recName; return UseConnection(connection => GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Failed))); } public IDictionary HourlySucceededJobs(MessageType type) { - var tableName = type == MessageType.Publish ? "Published" : "Received"; + var tableName = type == MessageType.Publish ? _pubName : _recName; return UseConnection(connection => GetHourlyTimelineStats(connection, tableName, nameof(StatusName.Succeeded))); } public IList Messages(MessageQueryDto queryDto) { - var tableName = queryDto.MessageType == MessageType.Publish ? "Published" : "Received"; + var tableName = queryDto.MessageType == MessageType.Publish ? _pubName : _recName; var where = string.Empty; if (!string.IsNullOrEmpty(queryDto.StatusName)) where += " and statusname=@StatusName"; @@ -80,13 +83,13 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; var sqlQuery2008 = $@"select * from - (SELECT t.*, ROW_NUMBER() OVER(order by t.Added desc) AS rownumber - from [{_options.Schema}].{tableName} as t + (SELECT t.*, ROW_NUMBER() OVER(order by t.Added desc) AS row_number + from {tableName} as t where 1=1 {where}) as tbl - where tbl.rownumber between @offset and @offset + @limit"; + where tbl.row_number between @offset and @offset + @limit"; var sqlQuery = - $"select * from [{_options.Schema}].{tableName} where 1=1 {where} order by Added desc offset @Offset rows fetch next @Limit rows only"; + $"select * from {tableName} where 1=1 {where} order by Added desc offset @Offset rows fetch next @Limit rows only"; return UseConnection(conn => conn.Query(_options.IsSqlServer2008 ? sqlQuery2008 : sqlQuery, new { @@ -101,34 +104,34 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; public int PublishedFailedCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "Published", nameof(StatusName.Failed))); + return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Failed))); } public int PublishedSucceededCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "Published", nameof(StatusName.Succeeded))); + return UseConnection(conn => GetNumberOfMessage(conn, _pubName, nameof(StatusName.Succeeded))); } public int ReceivedFailedCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "Received", nameof(StatusName.Failed))); + return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Failed))); } public int ReceivedSucceededCount() { - return UseConnection(conn => GetNumberOfMessage(conn, "Received", nameof(StatusName.Succeeded))); + return UseConnection(conn => GetNumberOfMessage(conn, _recName, nameof(StatusName.Succeeded))); } public async Task GetPublishedMessageAsync(long id) { - var sql = $@"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE Id={id}"; + var sql = $@"SELECT * FROM {_pubName} WITH (readpast) WHERE Id={id}"; await using var connection = new SqlConnection(_options.ConnectionString); return await connection.QueryFirstOrDefaultAsync(sql); } public async Task GetReceivedMessageAsync(long id) { - var sql = $@"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE Id={id}"; + var sql = $@"SELECT * FROM {_recName} WITH (readpast) WHERE Id={id}"; await using var connection = new SqlConnection(_options.ConnectionString); return await connection.QueryFirstOrDefaultAsync(sql); } @@ -136,9 +139,9 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName) { var sqlQuery = - $"select count(Id) from [{_options.Schema}].{tableName} with (nolock) where StatusName = @state"; + $"select count(Id) from {tableName} with (nolock) where StatusName = @state"; - var count = connection.ExecuteScalar(sqlQuery, new {state = statusName}); + var count = connection.ExecuteScalar(sqlQuery, new { state = statusName }); return count; } @@ -173,7 +176,7 @@ select count(Id) from [{0}].Received with (nolock) where StatusName = N'Failed'; with aggr as ( select replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added)) as [Key], count(id) [Count] - from [{_options.Schema}].{tableName} + from {tableName} where StatusName = @statusName group by replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added)) ) @@ -184,15 +187,14 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;"; with aggr as ( select FORMAT(Added,'yyyy-MM-dd-HH') as [Key], count(id) [Count] - from [{_options.Schema}].{tableName} + from {tableName} where StatusName = @statusName group by FORMAT(Added,'yyyy-MM-dd-HH') ) select [Key], [Count] from aggr with (nolock) where [Key] in @keys;"; - var valuesMap = connection.Query( - _options.IsSqlServer2008 ? sqlQuery2008 : sqlQuery, - new {keys = keyMaps.Keys, statusName}) + var valuesMap = connection + .Query(_options.IsSqlServer2008 ? sqlQuery2008 : sqlQuery, new { keys = keyMaps.Keys, statusName }) .ToDictionary(x => x.Key, x => x.Count); foreach (var key in keyMaps.Keys) diff --git a/src/DotNetCore.CAP.SqlServer/IStorageInitializer.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IStorageInitializer.SqlServer.cs index 66a3bb3..9d4c911 100644 --- a/src/DotNetCore.CAP.SqlServer/IStorageInitializer.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IStorageInitializer.SqlServer.cs @@ -29,12 +29,12 @@ namespace DotNetCore.CAP.SqlServer _logger = logger; } - public string GetPublishedTableName() + public virtual string GetPublishedTableName() { return $"[{_options.Value.Schema}].[Published]"; } - public string GetReceivedTableName() + public virtual string GetReceivedTableName() { return $"[{_options.Value.Schema}].[Received]"; } @@ -64,7 +64,7 @@ BEGIN EXEC('CREATE SCHEMA [{schema}]') END; -IF OBJECT_ID(N'[{schema}].[Received]',N'U') IS NULL +IF OBJECT_ID(N'{GetReceivedTableName()}',N'U') IS NULL BEGIN CREATE TABLE [{schema}].[Received]( [Id] [bigint] NOT NULL, @@ -83,7 +83,7 @@ CREATE TABLE [{schema}].[Received]( ) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY] END; -IF OBJECT_ID(N'[{schema}].[Published]',N'U') IS NULL +IF OBJECT_ID(N'{GetPublishedTableName()}',N'U') IS NULL BEGIN CREATE TABLE [{schema}].[Published]( [Id] [bigint] NOT NULL,