|
|
@@ -108,14 +108,14 @@ WHERE StateName = '{StatusName.Enqueued}'"; |
|
|
|
|
|
|
|
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null) |
|
|
|
{ |
|
|
|
FetchedMessage fetchedJob = null; |
|
|
|
FetchedMessage fetchedMessage = null; |
|
|
|
var connection = _context.GetDbConnection(); |
|
|
|
var transaction = _context.Database.CurrentTransaction; |
|
|
|
transaction = transaction ?? await _context.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted); |
|
|
|
|
|
|
|
try |
|
|
|
{ |
|
|
|
fetchedJob = |
|
|
|
fetchedMessage = |
|
|
|
(await connection.QueryAsync<FetchedMessage>(sql, args, transaction.GetDbTransaction())) |
|
|
|
.FirstOrDefault(); |
|
|
|
} |
|
|
@@ -125,7 +125,7 @@ WHERE StateName = '{StatusName.Enqueued}'"; |
|
|
|
throw; |
|
|
|
} |
|
|
|
|
|
|
|
if (fetchedJob == null) |
|
|
|
if (fetchedMessage == null) |
|
|
|
{ |
|
|
|
transaction.Rollback(); |
|
|
|
transaction.Dispose(); |
|
|
@@ -133,8 +133,8 @@ WHERE StateName = '{StatusName.Enqueued}'"; |
|
|
|
} |
|
|
|
|
|
|
|
return new EFFetchedMessage( |
|
|
|
fetchedJob.MessageId, |
|
|
|
fetchedJob.Type, |
|
|
|
fetchedMessage.MessageId, |
|
|
|
fetchedMessage.Type, |
|
|
|
connection, |
|
|
|
transaction); |
|
|
|
} |
|
|
|