diff --git a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs b/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs index dfa1a70..3ddd3ab 100644 --- a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs +++ b/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs @@ -37,13 +37,13 @@ namespace DotNetCore.CAP.PostgreSql public Task FetchNextMessageAsync() { - var sql = $@"DELETE FROM ""{_options.Schema}"".""queue"" WHERE ""MessageId"" = (SELECT ""MessageId"" FROM ""{_options.Schema}"".""queue"" LIMIT 1) RETURNING *;"; + var sql = $@"DELETE FROM ""{_options.Schema}"".""queue"" WHERE ""MessageId"" = (SELECT ""MessageId"" FROM ""{_options.Schema}"".""queue"" FOR UPDATE SKIP LOCKED LIMIT 1) RETURNING *;"; return FetchNextMessageCoreAsync(sql); } public async Task GetNextPublishedMessageToBeEnqueuedAsync() { - var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\" = '{StatusName.Scheduled}' LIMIT 1;"; + var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;"; using (var connection = new NpgsqlConnection(_options.ConnectionString)) { @@ -53,7 +53,7 @@ namespace DotNetCore.CAP.PostgreSql public async Task> GetFailedPublishedMessages() { - var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\"='{StatusName.Failed}'"; + var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;"; using (var connection = new NpgsqlConnection(_options.ConnectionString)) { @@ -86,7 +86,7 @@ namespace DotNetCore.CAP.PostgreSql public async Task GetNextReceviedMessageToBeEnqueuedAsync() { - var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\" = '{StatusName.Scheduled}' LIMIT 1;"; + var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;"; using (var connection = new NpgsqlConnection(_options.ConnectionString)) { return await connection.QueryFirstOrDefaultAsync(sql); @@ -95,7 +95,7 @@ namespace DotNetCore.CAP.PostgreSql public async Task> GetFailedReceviedMessages() { - var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\"='{StatusName.Failed}'"; + var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;"; using (var connection = new NpgsqlConnection(_options.ConnectionString)) { return await connection.QueryAsync(sql);