diff --git a/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs b/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs index f4eb5b3..4db2b94 100644 --- a/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs +++ b/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs @@ -191,26 +191,23 @@ namespace DotNetCore.CAP.MySql private async Task> GetMessagesOfNeedRetryAsync(string sql) { - List result; - await using (var connection = new MySqlConnection(_options.Value.ConnectionString)) + await using var connection = new MySqlConnection(_options.Value.ConnectionString); + var result = connection.ExecuteReader(sql, reader => { - result = connection.ExecuteReader(sql, reader => + var messages = new List(); + while (reader.Read()) { - var messages = new List(); - while (reader.Read()) + messages.Add(new MediumMessage { - messages.Add(new MediumMessage - { - DbId = reader.GetInt64(0).ToString(), - Origin = StringSerializer.DeSerialize(reader.GetString(1)), - Retries = reader.GetInt32(2), - Added = reader.GetDateTime(3) - }); - } - - return messages; - }); - } + DbId = reader.GetInt64(0).ToString(), + Origin = StringSerializer.DeSerialize(reader.GetString(1)), + Retries = reader.GetInt32(2), + Added = reader.GetDateTime(3) + }); + } + + return messages; + }); return result; } diff --git a/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs b/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs index bd50445..31d6189 100644 --- a/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs +++ b/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs @@ -30,39 +30,35 @@ namespace DotNetCore.CAP.MySql public StatisticsDto GetStatistics() { var sql = $@" - set transaction isolation level read committed; - SELECT - ( - SELECT COUNT(Id) FROM `{_pubName}` WHERE StatusName = N'Succeeded' - ) AS PublishedSucceeded, - ( - SELECT COUNT(Id) FROM `{_recName}` WHERE StatusName = N'Succeeded' - ) AS ReceivedSucceeded, - ( - SELECT COUNT(Id) FROM `{_pubName}` WHERE StatusName = N'Failed' - ) AS PublishedFailed, - ( - SELECT COUNT(Id) FROM `{_recName}` WHERE StatusName = N'Failed' - ) AS ReceivedFailed;"; - - StatisticsDto statistics; - using (var connection = new MySqlConnection(_options.ConnectionString)) +SELECT +( + SELECT COUNT(Id) FROM `{_pubName}` WHERE StatusName = N'Succeeded' +) AS PublishedSucceeded, +( + SELECT COUNT(Id) FROM `{_recName}` WHERE StatusName = N'Succeeded' +) AS ReceivedSucceeded, +( + SELECT COUNT(Id) FROM `{_pubName}` WHERE StatusName = N'Failed' +) AS PublishedFailed, +( + SELECT COUNT(Id) FROM `{_recName}` WHERE StatusName = N'Failed' +) AS ReceivedFailed;"; + + using var connection = new MySqlConnection(_options.ConnectionString); + var statistics = connection.ExecuteReader(sql, reader => { - statistics = connection.ExecuteReader(sql, reader => - { - var statisticsDto = new StatisticsDto(); + var statisticsDto = new StatisticsDto(); - while (reader.Read()) - { - statisticsDto.PublishedSucceeded = reader.GetInt32(0); - statisticsDto.ReceivedSucceeded = reader.GetInt32(1); - statisticsDto.PublishedFailed = reader.GetInt32(2); - statisticsDto.ReceivedFailed = reader.GetInt32(3); - } + while (reader.Read()) + { + statisticsDto.PublishedSucceeded = reader.GetInt32(0); + statisticsDto.ReceivedSucceeded = reader.GetInt32(1); + statisticsDto.PublishedFailed = reader.GetInt32(2); + statisticsDto.ReceivedFailed = reader.GetInt32(3); + } - return statisticsDto; - }); - } + return statisticsDto; + }); return statistics; } @@ -189,15 +185,17 @@ namespace DotNetCore.CAP.MySql string statusName, IDictionary keyMaps) { - var sqlQuery = - $@" -select aggr.* from ( - select date_format(`Added`,'%Y-%m-%d-%H') as `Key`, - count(id) `Count` - from `{tableName}` - where StatusName = @statusName - group by date_format(`Added`,'%Y-%m-%d-%H') -) aggr where `Key` >= @minKey and `Key` <= @maxKey;"; + var sqlQuery = $@" +SELECT aggr.* +FROM ( + SELECT date_format(`Added`, '%Y-%m-%d-%H') AS `Key`, + count(id) `Count` + FROM `{tableName}` + WHERE StatusName = @statusName + GROUP BY date_format(`Added`, '%Y-%m-%d-%H') + ) aggr +WHERE `Key` >= @minKey + AND `Key` <= @maxKey;"; object[] sqlParams = { @@ -249,7 +247,7 @@ select aggr.* from ( var sql = $@"SELECT `Id` as DbId, `Content`,`Added`,`ExpiresAt`,`Retries` FROM `{tableName}` WHERE Id={id};"; await using var connection = new MySqlConnection(_options.ConnectionString); - var mediumMessae = connection.ExecuteReader(sql, reader => + var mediumMessage = connection.ExecuteReader(sql, reader => { MediumMessage message = null; @@ -268,13 +266,7 @@ select aggr.* from ( return message; }); - return mediumMessae; + return mediumMessage; } } - - class TimelineCounter - { - public string Key { get; set; } - public int Count { get; set; } - } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs index 3c06288..1cf0ce7 100644 --- a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs @@ -135,7 +135,7 @@ namespace DotNetCore.CAP.PostgreSql public async Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) { - using var connection = new NpgsqlConnection(_options.Value.ConnectionString); + await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); var count = connection.ExecuteNonQuery( $"DELETE FROM {table} WHERE \"ExpiresAt\" < @timeout AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @batchCount);", null, new NpgsqlParameter("@timeout", timeout), new NpgsqlParameter("@batchCount", batchCount)); @@ -179,7 +179,7 @@ namespace DotNetCore.CAP.PostgreSql new NpgsqlParameter("@StatusName", state.ToString("G")) }; - using var connection = new NpgsqlConnection(_options.Value.ConnectionString); + await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); connection.ExecuteNonQuery(sql, sqlParams: sqlParams); await Task.CompletedTask; @@ -197,28 +197,25 @@ namespace DotNetCore.CAP.PostgreSql private async Task> GetMessagesOfNeedRetryAsync(string sql) { - List result; - using (var connection = new NpgsqlConnection(_options.Value.ConnectionString)) + await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); + var result = connection.ExecuteReader(sql, reader => { - result = connection.ExecuteReader(sql, reader => + var messages = new List(); + while (reader.Read()) { - var messages = new List(); - while (reader.Read()) + messages.Add(new MediumMessage { - messages.Add(new MediumMessage - { - DbId = reader.GetInt64(0).ToString(), - Origin = StringSerializer.DeSerialize(reader.GetString(1)), - Retries = reader.GetInt32(2), - Added = reader.GetDateTime(3) - }); - } - - return messages; - }); - } + DbId = reader.GetInt64(0).ToString(), + Origin = StringSerializer.DeSerialize(reader.GetString(1)), + Retries = reader.GetInt32(2), + Added = reader.GetDateTime(3) + }); + } + + return messages; + }); - return await Task.FromResult(result); + return result; } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs index 8f6722f..9cae360 100644 --- a/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs @@ -238,8 +238,8 @@ select ""Key"",""Count"" from aggr where ""Key"" >= @minKey and ""Key"" <= @maxK { var sql = $@"SELECT ""Id"" AS ""DbId"", ""Content"", ""Added"", ""ExpiresAt"", ""Retries"" FROM {tableName} WHERE ""Id""={id} FOR UPDATE SKIP LOCKED"; - using var connection = new NpgsqlConnection(_options.ConnectionString); - var mediumMessae = connection.ExecuteReader(sql, reader => + await using var connection = new NpgsqlConnection(_options.ConnectionString); + var mediumMessage = connection.ExecuteReader(sql, reader => { MediumMessage message = null; @@ -258,13 +258,7 @@ select ""Key"",""Count"" from aggr where ""Key"" >= @minKey and ""Key"" <= @maxK return message; }); - return await Task.FromResult(mediumMessae); + return await Task.FromResult(mediumMessage); } - } - - internal class TimelineCounter - { - public string Key { get; set; } - public int Count { get; set; } - } + } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs index ddf6ef2..3e908d5 100644 --- a/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IMonitoringApi.SqlServer.cs @@ -30,39 +30,35 @@ namespace DotNetCore.CAP.SqlServer public StatisticsDto GetStatistics() { var sql = $@" - SET TRANSACTION ISOLATION LEVEL READ COMMITTED; - SELECT - ( - SELECT COUNT(Id) FROM {_pubName} WHERE StatusName = N'Succeeded' - ) AS PublishedSucceeded, - ( - SELECT COUNT(Id) FROM {_recName} WHERE StatusName = N'Succeeded' - ) AS ReceivedSucceeded, - ( - SELECT COUNT(Id) FROM {_pubName} WHERE StatusName = N'Failed' - ) AS PublishedFailed, - ( - SELECT COUNT(Id) FROM {_recName} WHERE StatusName = N'Failed' - ) AS ReceivedFailed;"; - - StatisticsDto statistics; - using (var connection = new SqlConnection(_options.ConnectionString)) +SELECT +( + SELECT COUNT(Id) FROM {_pubName} WHERE StatusName = N'Succeeded' +) AS PublishedSucceeded, +( + SELECT COUNT(Id) FROM {_recName} WHERE StatusName = N'Succeeded' +) AS ReceivedSucceeded, +( + SELECT COUNT(Id) FROM {_pubName} WHERE StatusName = N'Failed' +) AS PublishedFailed, +( + SELECT COUNT(Id) FROM {_recName} WHERE StatusName = N'Failed' +) AS ReceivedFailed;"; + + using var connection = new SqlConnection(_options.ConnectionString); + var statistics = connection.ExecuteReader(sql, reader => { - statistics = connection.ExecuteReader(sql, reader => - { - var statisticsDto = new StatisticsDto(); + var statisticsDto = new StatisticsDto(); - while (reader.Read()) - { - statisticsDto.PublishedSucceeded = reader.GetInt32(0); - statisticsDto.ReceivedSucceeded = reader.GetInt32(1); - statisticsDto.PublishedFailed = reader.GetInt32(2); - statisticsDto.ReceivedFailed = reader.GetInt32(3); - } + while (reader.Read()) + { + statisticsDto.PublishedSucceeded = reader.GetInt32(0); + statisticsDto.ReceivedSucceeded = reader.GetInt32(1); + statisticsDto.PublishedFailed = reader.GetInt32(2); + statisticsDto.ReceivedFailed = reader.GetInt32(3); + } - return statisticsDto; - }); - } + return statisticsDto; + }); return statistics; } @@ -253,7 +249,7 @@ select [Key], [Count] from aggr with (nolock) where [Key] >= @minKey and [Key] < var sql = $@"SELECT TOP 1 Id AS DbId, Content, Added, ExpiresAt, Retries FROM {tableName} WITH (readpast) WHERE Id={id}"; using var connection = new SqlConnection(_options.ConnectionString); - var mediumMessae = connection.ExecuteReader(sql, reader => + var mediumMessage = connection.ExecuteReader(sql, reader => { MediumMessage message = null; @@ -272,13 +268,7 @@ select [Key], [Count] from aggr with (nolock) where [Key] >= @minKey and [Key] < return message; }); - return await Task.FromResult(mediumMessae); + return await Task.FromResult(mediumMessage); } - } - - internal class TimelineCounter - { - public string Key { get; set; } - public int Count { get; set; } - } + } } \ No newline at end of file