@@ -9,7 +9,6 @@ | |||||
<PropertyGroup> | <PropertyGroup> | ||||
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.PostgreSql.xml</DocumentationFile> | <DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.PostgreSql.xml</DocumentationFile> | ||||
<NoWarn>1701;1702;1705;CS1591</NoWarn> | <NoWarn>1701;1702;1705;CS1591</NoWarn> | ||||
<LangVersion>8</LangVersion> | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
@@ -42,7 +42,7 @@ namespace DotNetCore.CAP.PostgreSql | |||||
{ | { | ||||
var sql = | var sql = | ||||
$"UPDATE {_pubName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id"; | $"UPDATE {_pubName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id"; | ||||
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
await connection.ExecuteAsync(sql, new | await connection.ExecuteAsync(sql, new | ||||
{ | { | ||||
Id = long.Parse(message.DbId), | Id = long.Parse(message.DbId), | ||||
@@ -56,7 +56,7 @@ namespace DotNetCore.CAP.PostgreSql | |||||
{ | { | ||||
var sql = | var sql = | ||||
$"UPDATE {_recName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id"; | $"UPDATE {_recName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id"; | ||||
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
await connection.ExecuteAsync(sql, new | await connection.ExecuteAsync(sql, new | ||||
{ | { | ||||
Id = long.Parse(message.DbId), | Id = long.Parse(message.DbId), | ||||
@@ -164,7 +164,7 @@ namespace DotNetCore.CAP.PostgreSql | |||||
public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, | public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, | ||||
CancellationToken token = default) | CancellationToken token = default) | ||||
{ | { | ||||
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
return await connection.ExecuteAsync( | return await connection.ExecuteAsync( | ||||
$"DELETE FROM {table} WHERE \"ExpiresAt\" < @timeout AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @batchCount);", | $"DELETE FROM {table} WHERE \"ExpiresAt\" < @timeout AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @batchCount);", | ||||
@@ -178,7 +178,7 @@ namespace DotNetCore.CAP.PostgreSql | |||||
$"SELECT * FROM {_pubName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; | $"SELECT * FROM {_pubName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; | ||||
var result = new List<MediumMessage>(); | var result = new List<MediumMessage>(); | ||||
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
var reader = await connection.ExecuteReaderAsync(sql); | var reader = await connection.ExecuteReaderAsync(sql); | ||||
while (reader.Read()) | while (reader.Read()) | ||||
{ | { | ||||
@@ -202,7 +202,7 @@ namespace DotNetCore.CAP.PostgreSql | |||||
var result = new List<MediumMessage>(); | var result = new List<MediumMessage>(); | ||||
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
var reader = await connection.ExecuteReaderAsync(sql); | var reader = await connection.ExecuteReaderAsync(sql); | ||||
while (reader.Read()) | while (reader.Read()) | ||||
{ | { | ||||
@@ -34,7 +34,7 @@ namespace DotNetCore.CAP.PostgreSql | |||||
var sql = | var sql = | ||||
$"SELECT * FROM {_pubName} WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; | $"SELECT * FROM {_pubName} WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; | ||||
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql); | return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql); | ||||
} | } | ||||
@@ -42,7 +42,7 @@ namespace DotNetCore.CAP.PostgreSql | |||||
{ | { | ||||
var sql = | var sql = | ||||
$"SELECT * FROM {_recName} WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; | $"SELECT * FROM {_recName} WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; | ||||
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql); | return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql); | ||||
} | } | ||||
@@ -39,7 +39,7 @@ namespace DotNetCore.CAP.PostgreSql | |||||
if (cancellationToken.IsCancellationRequested) return; | if (cancellationToken.IsCancellationRequested) return; | ||||
var sql = CreateDbTablesScript(_options.Value.Schema); | var sql = CreateDbTablesScript(_options.Value.Schema); | ||||
await using (var connection = new NpgsqlConnection(_options.Value.ConnectionString)) | |||||
using (var connection = new NpgsqlConnection(_options.Value.ConnectionString)) | |||||
{ | { | ||||
await connection.ExecuteAsync(sql); | await connection.ExecuteAsync(sql); | ||||
} | } | ||||
@@ -10,7 +10,6 @@ | |||||
<PropertyGroup> | <PropertyGroup> | ||||
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.SqlServer.xml</DocumentationFile> | <DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.SqlServer.xml</DocumentationFile> | ||||
<NoWarn>1701;1702;1705;CS1591</NoWarn> | <NoWarn>1701;1702;1705;CS1591</NoWarn> | ||||
<LangVersion>8</LangVersion> | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
@@ -42,7 +42,7 @@ namespace DotNetCore.CAP.SqlServer | |||||
{ | { | ||||
var sql = | var sql = | ||||
$"UPDATE {_pubName} SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id"; | $"UPDATE {_pubName} SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id"; | ||||
await using var connection = new SqlConnection(_options.Value.ConnectionString); | |||||
using var connection = new SqlConnection(_options.Value.ConnectionString); | |||||
await connection.ExecuteAsync(sql, new | await connection.ExecuteAsync(sql, new | ||||
{ | { | ||||
Id = message.DbId, | Id = message.DbId, | ||||
@@ -56,7 +56,7 @@ namespace DotNetCore.CAP.SqlServer | |||||
{ | { | ||||
var sql = | var sql = | ||||
$"UPDATE {_recName} SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id"; | $"UPDATE {_recName} SET Retries=@Retries,ExpiresAt=@ExpiresAt,StatusName=@StatusName WHERE Id=@Id"; | ||||
await using var connection = new SqlConnection(_options.Value.ConnectionString); | |||||
using var connection = new SqlConnection(_options.Value.ConnectionString); | |||||
await connection.ExecuteAsync(sql, new | await connection.ExecuteAsync(sql, new | ||||
{ | { | ||||
Id = message.DbId, | Id = message.DbId, | ||||
@@ -163,7 +163,7 @@ namespace DotNetCore.CAP.SqlServer | |||||
public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, | public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, | ||||
CancellationToken token = default) | CancellationToken token = default) | ||||
{ | { | ||||
await using var connection = new SqlConnection(_options.Value.ConnectionString); | |||||
using var connection = new SqlConnection(_options.Value.ConnectionString); | |||||
return await connection.ExecuteAsync( | return await connection.ExecuteAsync( | ||||
$"DELETE TOP (@batchCount) FROM {table} WITH (readpast) WHERE ExpiresAt < @timeout;", | $"DELETE TOP (@batchCount) FROM {table} WITH (readpast) WHERE ExpiresAt < @timeout;", | ||||
new { timeout, batchCount }); | new { timeout, batchCount }); | ||||
@@ -176,7 +176,7 @@ namespace DotNetCore.CAP.SqlServer | |||||
$"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; | $"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; | ||||
var result = new List<MediumMessage>(); | var result = new List<MediumMessage>(); | ||||
await using var connection = new SqlConnection(_options.Value.ConnectionString); | |||||
using var connection = new SqlConnection(_options.Value.ConnectionString); | |||||
var reader = await connection.ExecuteReaderAsync(sql); | var reader = await connection.ExecuteReaderAsync(sql); | ||||
while (reader.Read()) | while (reader.Read()) | ||||
{ | { | ||||
@@ -201,7 +201,7 @@ namespace DotNetCore.CAP.SqlServer | |||||
var result = new List<MediumMessage>(); | var result = new List<MediumMessage>(); | ||||
await using var connection = new SqlConnection(_options.Value.ConnectionString); | |||||
var connection = new SqlConnection(_options.Value.ConnectionString); | |||||
var reader = await connection.ExecuteReaderAsync(sql); | var reader = await connection.ExecuteReaderAsync(sql); | ||||
while (reader.Read()) | while (reader.Read()) | ||||
{ | { | ||||
@@ -125,14 +125,14 @@ select count(Id) from {_recName} with (nolock) where StatusName = N'Failed';"; | |||||
public async Task<MediumMessage> GetPublishedMessageAsync(long id) | public async Task<MediumMessage> GetPublishedMessageAsync(long id) | ||||
{ | { | ||||
var sql = $@"SELECT * FROM {_pubName} WITH (readpast) WHERE Id={id}"; | var sql = $@"SELECT * FROM {_pubName} WITH (readpast) WHERE Id={id}"; | ||||
await using var connection = new SqlConnection(_options.ConnectionString); | |||||
using var connection = new SqlConnection(_options.ConnectionString); | |||||
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql); | return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql); | ||||
} | } | ||||
public async Task<MediumMessage> GetReceivedMessageAsync(long id) | public async Task<MediumMessage> GetReceivedMessageAsync(long id) | ||||
{ | { | ||||
var sql = $@"SELECT * FROM {_recName} WITH (readpast) WHERE Id={id}"; | var sql = $@"SELECT * FROM {_recName} WITH (readpast) WHERE Id={id}"; | ||||
await using var connection = new SqlConnection(_options.ConnectionString); | |||||
using var connection = new SqlConnection(_options.ConnectionString); | |||||
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql); | return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql); | ||||
} | } | ||||
@@ -39,7 +39,7 @@ namespace DotNetCore.CAP.SqlServer | |||||
if (cancellationToken.IsCancellationRequested) return; | if (cancellationToken.IsCancellationRequested) return; | ||||
var sql = CreateDbTablesScript(_options.Value.Schema); | var sql = CreateDbTablesScript(_options.Value.Schema); | ||||
await using (var connection = new SqlConnection(_options.Value.ConnectionString)) | |||||
using (var connection = new SqlConnection(_options.Value.ConnectionString)) | |||||
{ | { | ||||
await connection.ExecuteAsync(sql); | await connection.ExecuteAsync(sql); | ||||
} | } | ||||