diff --git a/src/DotNetCore.CAP.MySql/MySqlDataStorage.cs b/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs similarity index 56% rename from src/DotNetCore.CAP.MySql/MySqlDataStorage.cs rename to src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs index e3b119d..d4ba6da 100644 --- a/src/DotNetCore.CAP.MySql/MySqlDataStorage.cs +++ b/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs @@ -1,4 +1,7 @@ -using System; +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; using System.Collections.Generic; using System.Data; using System.Threading; @@ -28,34 +31,32 @@ namespace DotNetCore.CAP.MySql public async Task ChangePublishStateAsync(MediumMessage message, StatusName state) { - using (var connection = new MySqlConnection(_options.Value.ConnectionString)) - { - var sql = $"UPDATE `{_options.Value.TableNamePrefix}.published` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;"; + await using var connection = new MySqlConnection(_options.Value.ConnectionString); - await connection.ExecuteAsync(sql, new - { - Id = message.DbId, - message.Retries, - message.ExpiresAt, - StatusName = state.ToString("G") - }); - } + var sql = $"UPDATE `{_options.Value.TableNamePrefix}.published` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;"; + + await connection.ExecuteAsync(sql, new + { + Id = message.DbId, + message.Retries, + message.ExpiresAt, + StatusName = state.ToString("G") + }); } public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) { - using (var connection = new MySqlConnection(_options.Value.ConnectionString)) - { - var sql = $"UPDATE `{_options.Value.TableNamePrefix}.received` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;"; + await using var connection = new MySqlConnection(_options.Value.ConnectionString); - await connection.ExecuteAsync(sql, new - { - Id = message.DbId, - message.Retries, - message.ExpiresAt, - StatusName = state.ToString("G") - }); - } + var sql = $"UPDATE `{_options.Value.TableNamePrefix}.received` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;"; + + await connection.ExecuteAsync(sql, new + { + Id = message.DbId, + message.Retries, + message.ExpiresAt, + StatusName = state.ToString("G") + }); } public async Task StoreMessageAsync(string name, Message content, object dbTransaction = null, CancellationToken cancellationToken = default) @@ -85,10 +86,8 @@ namespace DotNetCore.CAP.MySql if (dbTransaction == null) { - using (var connection = new MySqlConnection(_options.Value.ConnectionString)) - { - await connection.ExecuteAsync(sql, po); - } + await using var connection = new MySqlConnection(_options.Value.ConnectionString); + await connection.ExecuteAsync(sql, po); } else { @@ -109,23 +108,21 @@ namespace DotNetCore.CAP.MySql { var sql = $@"INSERT INTO `{_options.Value.TableNamePrefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; - using (var connection = new MySqlConnection(_options.Value.ConnectionString)) + await using var connection = new MySqlConnection(_options.Value.ConnectionString); + await connection.ExecuteAsync(sql, new { - await connection.ExecuteAsync(sql, new - { - Id = SnowflakeId.Default().NextId().ToString(), - Group = group, - Name = name, - Content = content, - Retries = _capOptions.Value.FailedRetryCount, - Added = DateTime.Now, - ExpiresAt = DateTime.Now.AddDays(15), - StatusName = nameof(StatusName.Failed) - }); - } + Id = SnowflakeId.Default().NextId().ToString(), + Group = @group, + Name = name, + Content = content, + Retries = _capOptions.Value.FailedRetryCount, + Added = DateTime.Now, + ExpiresAt = DateTime.Now.AddDays(15), + StatusName = nameof(StatusName.Failed) + }); } - public Task StoreReceivedMessageAsync(string name, string group, Message message) + public async Task StoreReceivedMessageAsync(string name, string group, Message message) { var sql = $@"INSERT INTO `{_options.Value.TableNamePrefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; @@ -138,33 +135,28 @@ namespace DotNetCore.CAP.MySql Retries = 0 }; var content = StringSerializer.Serialize(mdMessage.Origin); - using (var connection = new MySqlConnection(_options.Value.ConnectionString)) + await using var connection = new MySqlConnection(_options.Value.ConnectionString); + await connection.ExecuteAsync(sql, new { - - connection.Execute(sql, new - { - Id = mdMessage.DbId, - Group = group, - Name = name, - Content = content, - mdMessage.Retries, - mdMessage.Added, - mdMessage.ExpiresAt, - StatusName = nameof(StatusName.Scheduled) - }); - } - - return Task.FromResult(mdMessage); + Id = mdMessage.DbId, + Group = @group, + Name = name, + Content = content, + mdMessage.Retries, + mdMessage.Added, + mdMessage.ExpiresAt, + StatusName = nameof(StatusName.Scheduled) + }); + return mdMessage; } public async Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) { - using (var connection = new MySqlConnection(_options.Value.ConnectionString)) - { - return await connection.ExecuteAsync( - $@"DELETE FROM `{table}` WHERE ExpiresAt < @timeout limit @batchCount;", - new { timeout, batchCount }); - } + await using var connection = new MySqlConnection(_options.Value.ConnectionString); + + return await connection.ExecuteAsync( + $@"DELETE FROM `{table}` WHERE ExpiresAt < @timeout limit @batchCount;", + new { timeout, batchCount }); } public async Task> GetPublishedMessagesOfNeedRetry() @@ -173,19 +165,17 @@ namespace DotNetCore.CAP.MySql var sql = $"SELECT * FROM `{_options.Value.TableNamePrefix}.published` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; var result = new List(); - using (var connection = new MySqlConnection(_options.Value.ConnectionString)) + await using var connection = new MySqlConnection(_options.Value.ConnectionString); + var reader = await connection.ExecuteReaderAsync(sql); + while (reader.Read()) { - var reader = await connection.ExecuteReaderAsync(sql); - while (reader.Read()) + result.Add(new MediumMessage { - result.Add(new MediumMessage - { - DbId = reader.GetInt64(0).ToString(), - Origin = StringSerializer.DeSerialize(reader.GetString(3)), - Retries = reader.GetInt32(4), - Added = reader.GetDateTime(5) - }); - } + DbId = reader.GetInt64(0).ToString(), + Origin = StringSerializer.DeSerialize(reader.GetString(3)), + Retries = reader.GetInt32(4), + Added = reader.GetDateTime(5) + }); } return result; } @@ -197,20 +187,20 @@ namespace DotNetCore.CAP.MySql $"SELECT * FROM `{_options.Value.TableNamePrefix}.received` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; var result = new List(); - using (var connection = new MySqlConnection(_options.Value.ConnectionString)) + + await using var connection = new MySqlConnection(_options.Value.ConnectionString); + var reader = await connection.ExecuteReaderAsync(sql); + while (reader.Read()) { - var reader = await connection.ExecuteReaderAsync(sql); - while (reader.Read()) + result.Add(new MediumMessage { - result.Add(new MediumMessage - { - DbId = reader.GetInt64(0).ToString(), - Origin = StringSerializer.DeSerialize(reader.GetString(3)), - Retries = reader.GetInt32(4), - Added = reader.GetDateTime(5) - }); - } + DbId = reader.GetInt64(0).ToString(), + Origin = StringSerializer.DeSerialize(reader.GetString(3)), + Retries = reader.GetInt32(4), + Added = reader.GetDateTime(5) + }); } + return result; } diff --git a/src/DotNetCore.CAP.MySql/IStorage.MySql.cs b/src/DotNetCore.CAP.MySql/IStorageInitializer.MySql.cs similarity index 92% rename from src/DotNetCore.CAP.MySql/IStorage.MySql.cs rename to src/DotNetCore.CAP.MySql/IStorageInitializer.MySql.cs index be84cc9..e5b4d72 100644 --- a/src/DotNetCore.CAP.MySql/IStorage.MySql.cs +++ b/src/DotNetCore.CAP.MySql/IStorageInitializer.MySql.cs @@ -1,6 +1,7 @@ // Copyright (c) .NET Core Community. All rights reserved. // Licensed under the MIT License. See License.txt in the project root for license information. +using System; using System.Threading; using System.Threading.Tasks; using Dapper; @@ -18,8 +19,7 @@ namespace DotNetCore.CAP.MySql public MySqlStorageInitializer( ILogger logger, - IOptions options, - IOptions capOptions) + IOptions options) { _options = options; _logger = logger; @@ -43,7 +43,7 @@ namespace DotNetCore.CAP.MySql } var sql = CreateDbTablesScript(_options.Value.TableNamePrefix); - using (var connection = new MySqlConnection(_options.Value.ConnectionString)) + await using (var connection = new MySqlConnection(_options.Value.ConnectionString)) { await connection.ExecuteAsync(sql); }