Procházet zdrojové kódy

Improved query performance without lock table. (#36)

undefined
Savorboard před 7 roky
rodič
revize
0a94921ec3
1 změnil soubory, kde provedl 22 přidání a 14 odebrání
  1. +22
    -14
      src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs

+ 22
- 14
src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs Zobrazit soubor

@@ -13,6 +13,8 @@ namespace DotNetCore.CAP.MySql
{
private readonly string _prefix;

private const string DateTimeMaxValue = "9999-12-31 23:59:59";

public MySqlStorageConnection(MySqlOptions options)
{
Options = options;
@@ -38,24 +40,18 @@ namespace DotNetCore.CAP.MySql

public Task<IFetchedMessage> FetchNextMessageAsync()
{
//Last execute statement(FOR UPDATE to fix dirty read) :

//SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
//START TRANSACTION;
//SELECT MessageId,MessageType FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE;
//DELETE FROM `{_prefix}.queue` LIMIT 1;
//COMMIT;

var sql = $@"
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE;
DELETE FROM `{_prefix}.queue` LIMIT 1;";
SELECT @MessageId:=`MessageId` as MessageId, @MessageType:=`MessageType` as MessageType FROM `{_prefix}.queue` LIMIT 1;
DELETE FROM `{_prefix}.queue` where `MessageId` = @MessageId AND `MessageType`=@MessageType;";

return FetchNextMessageCoreAsync(sql);
}

public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT * FROM `{_prefix}.published` WHERE `StatusName` = '{StatusName.Scheduled}' LIMIT 1;";
var sql = $@"
UPDATE `{_prefix}.published` SET Id=LAST_INSERT_ID(Id),ExpiresAt='{DateTimeMaxValue}' WHERE ExpiresAt IS NULL AND `StatusName` = '{StatusName.Scheduled}' LIMIT 1;
SELECT * FROM `{_prefix}.published` WHERE Id=LAST_INSERT_ID();";

using (var connection = new MySqlConnection(Options.ConnectionString))
{
@@ -98,7 +94,10 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

public async Task<CapReceivedMessage> GetNextReceivedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT * FROM `{_prefix}.received` WHERE `StatusName` = '{StatusName.Scheduled}' LIMIT 1;";
var sql = $@"
UPDATE `{_prefix}.received` SET Id=LAST_INSERT_ID(Id),ExpiresAt='{DateTimeMaxValue}' WHERE ExpiresAt IS NULL AND `StatusName` = '{StatusName.Scheduled}' LIMIT 1;
SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";

using (var connection = new MySqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
@@ -147,10 +146,19 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var connection = new MySqlConnection(Options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage;
FetchedMessage fetchedMessage = null;
try
{
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction);
//fetchedMessage = await connection.QuerySingleOrDefaultAsync<FetchedMessage>(sql, args, transaction);
var reader = connection.ExecuteReader(sql, args, transaction);
while (reader.Read())
{
fetchedMessage = new FetchedMessage
{
MessageId = (int)reader.GetInt64(0),
MessageType = (MessageType)reader.GetInt64(1)
};
}
}
catch (MySqlException)
{


Načítá se…
Zrušit
Uložit