Bladeren bron

Rename file

master
Savorboard 5 jaren geleden
bovenliggende
commit
c15ae3172d
2 gewijzigde bestanden met toevoegingen van 78 en 88 verwijderingen
  1. +75
    -85
      src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs
  2. +3
    -3
      src/DotNetCore.CAP.MySql/IStorageInitializer.MySql.cs

src/DotNetCore.CAP.MySql/MySqlDataStorage.cs → src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs Bestand weergeven

@@ -1,4 +1,7 @@
using System;
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Data;
using System.Threading;
@@ -28,34 +31,32 @@ namespace DotNetCore.CAP.MySql

public async Task ChangePublishStateAsync(MediumMessage message, StatusName state)
{
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
var sql = $"UPDATE `{_options.Value.TableNamePrefix}.published` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
await using var connection = new MySqlConnection(_options.Value.ConnectionString);

await connection.ExecuteAsync(sql, new
{
Id = message.DbId,
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}
var sql = $"UPDATE `{_options.Value.TableNamePrefix}.published` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";

await connection.ExecuteAsync(sql, new
{
Id = message.DbId,
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}

public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state)
{
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
var sql = $"UPDATE `{_options.Value.TableNamePrefix}.received` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";
await using var connection = new MySqlConnection(_options.Value.ConnectionString);

await connection.ExecuteAsync(sql, new
{
Id = message.DbId,
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}
var sql = $"UPDATE `{_options.Value.TableNamePrefix}.received` SET `Retries` = @Retries,`ExpiresAt` = @ExpiresAt,`StatusName`=@StatusName WHERE `Id`=@Id;";

await connection.ExecuteAsync(sql, new
{
Id = message.DbId,
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}

public async Task<MediumMessage> StoreMessageAsync(string name, Message content, object dbTransaction = null, CancellationToken cancellationToken = default)
@@ -85,10 +86,8 @@ namespace DotNetCore.CAP.MySql

if (dbTransaction == null)
{
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
await connection.ExecuteAsync(sql, po);
}
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, po);
}
else
{
@@ -109,23 +108,21 @@ namespace DotNetCore.CAP.MySql
{
var sql = $@"INSERT INTO `{_options.Value.TableNamePrefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

using (var connection = new MySqlConnection(_options.Value.ConnectionString))
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{
await connection.ExecuteAsync(sql, new
{
Id = SnowflakeId.Default().NextId().ToString(),
Group = group,
Name = name,
Content = content,
Retries = _capOptions.Value.FailedRetryCount,
Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15),
StatusName = nameof(StatusName.Failed)
});
}
Id = SnowflakeId.Default().NextId().ToString(),
Group = @group,
Name = name,
Content = content,
Retries = _capOptions.Value.FailedRetryCount,
Added = DateTime.Now,
ExpiresAt = DateTime.Now.AddDays(15),
StatusName = nameof(StatusName.Failed)
});
}

public Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message message)
public async Task<MediumMessage> StoreReceivedMessageAsync(string name, string group, Message message)
{
var sql = $@"INSERT INTO `{_options.Value.TableNamePrefix}.received`(`Id`,`Version`,`Name`,`Group`,`Content`,`Retries`,`Added`,`ExpiresAt`,`StatusName`) VALUES(@Id,'{_options.Value.Version}',@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

@@ -138,33 +135,28 @@ namespace DotNetCore.CAP.MySql
Retries = 0
};
var content = StringSerializer.Serialize(mdMessage.Origin);
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
await connection.ExecuteAsync(sql, new
{

connection.Execute(sql, new
{
Id = mdMessage.DbId,
Group = group,
Name = name,
Content = content,
mdMessage.Retries,
mdMessage.Added,
mdMessage.ExpiresAt,
StatusName = nameof(StatusName.Scheduled)
});
}

return Task.FromResult(mdMessage);
Id = mdMessage.DbId,
Group = @group,
Name = name,
Content = content,
mdMessage.Retries,
mdMessage.Added,
mdMessage.ExpiresAt,
StatusName = nameof(StatusName.Scheduled)
});
return mdMessage;
}

public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default)
{
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
return await connection.ExecuteAsync(
$@"DELETE FROM `{table}` WHERE ExpiresAt < @timeout limit @batchCount;",
new { timeout, batchCount });
}
await using var connection = new MySqlConnection(_options.Value.ConnectionString);

return await connection.ExecuteAsync(
$@"DELETE FROM `{table}` WHERE ExpiresAt < @timeout limit @batchCount;",
new { timeout, batchCount });
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
@@ -173,19 +165,17 @@ namespace DotNetCore.CAP.MySql
var sql = $"SELECT * FROM `{_options.Value.TableNamePrefix}.published` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";

var result = new List<MediumMessage>();
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
await using var connection = new MySqlConnection(_options.Value.ConnectionString);
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
{
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
result.Add(new MediumMessage
{
result.Add(new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});
}
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});
}
return result;
}
@@ -197,20 +187,20 @@ namespace DotNetCore.CAP.MySql
$"SELECT * FROM `{_options.Value.TableNamePrefix}.received` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";

var result = new List<MediumMessage>();
using (var connection = new MySqlConnection(_options.Value.ConnectionString))

await using var connection = new MySqlConnection(_options.Value.ConnectionString);
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
{
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
result.Add(new MediumMessage
{
result.Add(new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});
}
DbId = reader.GetInt64(0).ToString(),
Origin = StringSerializer.DeSerialize(reader.GetString(3)),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});
}

return result;
}


src/DotNetCore.CAP.MySql/IStorage.MySql.cs → src/DotNetCore.CAP.MySql/IStorageInitializer.MySql.cs Bestand weergeven

@@ -1,6 +1,7 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
@@ -18,8 +19,7 @@ namespace DotNetCore.CAP.MySql

public MySqlStorageInitializer(
ILogger<MySqlStorageInitializer> logger,
IOptions<MySqlOptions> options,
IOptions<CapOptions> capOptions)
IOptions<MySqlOptions> options)
{
_options = options;
_logger = logger;
@@ -43,7 +43,7 @@ namespace DotNetCore.CAP.MySql
}

var sql = CreateDbTablesScript(_options.Value.TableNamePrefix);
using (var connection = new MySqlConnection(_options.Value.ConnectionString))
await using (var connection = new MySqlConnection(_options.Value.ConnectionString))
{
await connection.ExecuteAsync(sql);
}

Laden…
Annuleren
Opslaan