diff --git a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs index 13adb95..cd42410 100644 --- a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs +++ b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs @@ -13,6 +13,8 @@ namespace DotNetCore.CAP.MySql { private readonly string _prefix; + private const string DateTimeMaxValue = "9999-12-31 23:59:59"; + public MySqlStorageConnection(MySqlOptions options) { Options = options; @@ -38,24 +40,18 @@ namespace DotNetCore.CAP.MySql public Task FetchNextMessageAsync() { - //Last execute statement(FOR UPDATE to fix dirty read) : - - //SET TRANSACTION ISOLATION LEVEL READ COMMITTED; - //START TRANSACTION; - //SELECT MessageId,MessageType FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE; - //DELETE FROM `{_prefix}.queue` LIMIT 1; - //COMMIT; - var sql = $@" -SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE; -DELETE FROM `{_prefix}.queue` LIMIT 1;"; +SELECT @MessageId:=`MessageId` as MessageId, @MessageType:=`MessageType` as MessageType FROM `{_prefix}.queue` LIMIT 1; +DELETE FROM `{_prefix}.queue` where `MessageId` = @MessageId AND `MessageType`=@MessageType;"; return FetchNextMessageCoreAsync(sql); } public async Task GetNextPublishedMessageToBeEnqueuedAsync() { - var sql = $"SELECT * FROM `{_prefix}.published` WHERE `StatusName` = '{StatusName.Scheduled}' LIMIT 1;"; + var sql = $@" +UPDATE `{_prefix}.published` SET Id=LAST_INSERT_ID(Id),ExpiresAt='{DateTimeMaxValue}' WHERE ExpiresAt IS NULL AND `StatusName` = '{StatusName.Scheduled}' LIMIT 1; +SELECT * FROM `{_prefix}.published` WHERE Id=LAST_INSERT_ID();"; using (var connection = new MySqlConnection(Options.ConnectionString)) { @@ -98,7 +94,10 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; public async Task GetNextReceivedMessageToBeEnqueuedAsync() { - var sql = $"SELECT * FROM `{_prefix}.received` WHERE `StatusName` = '{StatusName.Scheduled}' LIMIT 1;"; + var sql = $@" +UPDATE `{_prefix}.received` SET Id=LAST_INSERT_ID(Id),ExpiresAt='{DateTimeMaxValue}' WHERE ExpiresAt IS NULL AND `StatusName` = '{StatusName.Scheduled}' LIMIT 1; +SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();"; + using (var connection = new MySqlConnection(Options.ConnectionString)) { return await connection.QueryFirstOrDefaultAsync(sql); @@ -147,10 +146,19 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; var connection = new MySqlConnection(Options.ConnectionString); await connection.OpenAsync(); var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted); - FetchedMessage fetchedMessage; + FetchedMessage fetchedMessage = null; try { - fetchedMessage = await connection.QueryFirstOrDefaultAsync(sql, args, transaction); + //fetchedMessage = await connection.QuerySingleOrDefaultAsync(sql, args, transaction); + var reader = connection.ExecuteReader(sql, args, transaction); + while (reader.Read()) + { + fetchedMessage = new FetchedMessage + { + MessageId = (int)reader.GetInt64(0), + MessageType = (MessageType)reader.GetInt64(1) + }; + } } catch (MySqlException) {