Parcourir la source

modify fetched message method without delete queue message when get queue message.

master
Savorboard il y a 6 ans
Parent
révision
3f4f349536
2 fichiers modifiés avec 16 ajouts et 10 suppressions
  1. +10
    -4
      src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs
  2. +6
    -6
      src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs

+ 10
- 4
src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs Voir le fichier

@@ -7,12 +7,14 @@ namespace DotNetCore.CAP.MySql
public class MySqlFetchedMessage : IFetchedMessage
{
private readonly MySqlOptions _options;
private readonly string _processId;

public MySqlFetchedMessage(int messageId, MessageType type, MySqlOptions options)
public MySqlFetchedMessage(int messageId, MessageType type, string processId, MySqlOptions options)
{
MessageId = messageId;
MessageType = type;

_processId = processId;
_options = options;
}

@@ -22,15 +24,19 @@ namespace DotNetCore.CAP.MySql

public void RemoveFromQueue()
{
// ignored
using (var connection = new MySqlConnection(_options.ConnectionString))
{
connection.Execute($"DELETE FROM `{_options.TableNamePrefix}.queue` WHERE `ProcessId`=@ProcessId"
, new { ProcessId = _processId });
}
}

public void Requeue()
{
using (var connection = new MySqlConnection(_options.ConnectionString))
{
connection.Execute($"insert into `{_options.TableNamePrefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);"
, new {MessageId, MessageType });
connection.Execute($"UPDATE `{_options.TableNamePrefix}.queue` SET `ProcessId`=NULL WHERE `ProcessId`=@ProcessId"
, new { ProcessId = _processId });
}
}



+ 6
- 6
src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs Voir le fichier

@@ -41,12 +41,12 @@ namespace DotNetCore.CAP.MySql

public Task<IFetchedMessage> FetchNextMessageAsync()
{
var processId = ObjectId.GenerateNewStringId();
var sql = $@"
UPDATE `{_prefix}.queue` SET `ProcessId`=@ProcessId WHERE `ProcessId` IS NULL LIMIT 1;
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId;
DELETE FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId";
SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId;";

return FetchNextMessageCoreAsync(sql, new { ProcessId = Guid.NewGuid().ToString() });
return FetchNextMessageCoreAsync(sql, processId);
}

public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
@@ -139,18 +139,18 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
}
}

private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, string processId)
{
FetchedMessage fetchedMessage;
using (var connection = new MySqlConnection(Options.ConnectionString))
{
fetchedMessage = await connection.QuerySingleOrDefaultAsync<FetchedMessage>(sql, args);
fetchedMessage = await connection.QuerySingleOrDefaultAsync<FetchedMessage>(sql, new { ProcessId = processId });
}

if (fetchedMessage == null)
return null;

return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, Options);
return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, processId, Options);
}

public void Dispose()


Chargement…
Annuler
Enregistrer