|
@@ -30,20 +30,16 @@ namespace DotNetCore.CAP.PostgreSql |
|
|
var sql = |
|
|
var sql = |
|
|
$"SELECT * FROM \"{_options.Value.Schema}\".\"published\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; |
|
|
$"SELECT * FROM \"{_options.Value.Schema}\".\"published\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; |
|
|
|
|
|
|
|
|
using (var connection = new NpgsqlConnection(_options.Value.ConnectionString)) |
|
|
|
|
|
{ |
|
|
|
|
|
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); |
|
|
|
|
|
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public async Task<MediumMessage> GetReceivedMessageAsync(long id) |
|
|
public async Task<MediumMessage> GetReceivedMessageAsync(long id) |
|
|
{ |
|
|
{ |
|
|
var sql = |
|
|
var sql = |
|
|
$"SELECT * FROM \"{_options.Value.Schema}\".\"received\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; |
|
|
$"SELECT * FROM \"{_options.Value.Schema}\".\"received\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED"; |
|
|
using (var connection = new NpgsqlConnection(_options.Value.ConnectionString)) |
|
|
|
|
|
{ |
|
|
|
|
|
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql); |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); |
|
|
|
|
|
return await connection.QueryFirstOrDefaultAsync<MediumMessage>(sql); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public StatisticsDto GetStatistics() |
|
|
public StatisticsDto GetStatistics() |
|
@@ -138,7 +134,7 @@ select count(""Id"") from ""{0}"".""received"" where ""StatusName"" = N'Failed' |
|
|
var sqlQuery = |
|
|
var sqlQuery = |
|
|
$"select count(\"Id\") from \"{_options.Value.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)"; |
|
|
$"select count(\"Id\") from \"{_options.Value.Schema}\".\"{tableName}\" where Lower(\"StatusName\") = Lower(@state)"; |
|
|
|
|
|
|
|
|
var count = connection.ExecuteScalar<int>(sqlQuery, new {state = statusName}); |
|
|
|
|
|
|
|
|
var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName }); |
|
|
return count; |
|
|
return count; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@@ -180,7 +176,7 @@ with aggr as ( |
|
|
) |
|
|
) |
|
|
select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);"; |
|
|
select ""Key"",""Count"" from aggr where ""Key""= Any(@keys);"; |
|
|
|
|
|
|
|
|
var valuesMap = connection.Query<TimelineCounter>(sqlQuery, new {keys = keyMaps.Keys.ToList(), statusName}) |
|
|
|
|
|
|
|
|
var valuesMap = connection.Query<TimelineCounter>(sqlQuery, new { keys = keyMaps.Keys.ToList(), statusName }) |
|
|
.ToList() |
|
|
.ToList() |
|
|
.ToDictionary(x => x.Key, x => x.Count); |
|
|
.ToDictionary(x => x.Key, x => x.Count); |
|
|
|
|
|
|
|
|