@@ -191,26 +191,23 @@ namespace DotNetCore.CAP.MySql | |||||
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string sql) | private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string sql) | ||||
{ | { | ||||
List<MediumMessage> result; | |||||
await using (var connection = new MySqlConnection(_options.Value.ConnectionString)) | |||||
await using var connection = new MySqlConnection(_options.Value.ConnectionString); | |||||
var result = connection.ExecuteReader(sql, reader => | |||||
{ | { | ||||
result = connection.ExecuteReader(sql, reader => | |||||
var messages = new List<MediumMessage>(); | |||||
while (reader.Read()) | |||||
{ | { | ||||
var messages = new List<MediumMessage>(); | |||||
while (reader.Read()) | |||||
messages.Add(new MediumMessage | |||||
{ | { | ||||
messages.Add(new MediumMessage | |||||
{ | |||||
DbId = reader.GetInt64(0).ToString(), | |||||
Origin = StringSerializer.DeSerialize(reader.GetString(1)), | |||||
Retries = reader.GetInt32(2), | |||||
Added = reader.GetDateTime(3) | |||||
}); | |||||
} | |||||
return messages; | |||||
}); | |||||
} | |||||
DbId = reader.GetInt64(0).ToString(), | |||||
Origin = StringSerializer.DeSerialize(reader.GetString(1)), | |||||
Retries = reader.GetInt32(2), | |||||
Added = reader.GetDateTime(3) | |||||
}); | |||||
} | |||||
return messages; | |||||
}); | |||||
return result; | return result; | ||||
} | } | ||||
@@ -30,39 +30,35 @@ namespace DotNetCore.CAP.MySql | |||||
public StatisticsDto GetStatistics() | public StatisticsDto GetStatistics() | ||||
{ | { | ||||
var sql = $@" | var sql = $@" | ||||
set transaction isolation level read committed; | |||||
SELECT | |||||
( | |||||
SELECT COUNT(Id) FROM `{_pubName}` WHERE StatusName = N'Succeeded' | |||||
) AS PublishedSucceeded, | |||||
( | |||||
SELECT COUNT(Id) FROM `{_recName}` WHERE StatusName = N'Succeeded' | |||||
) AS ReceivedSucceeded, | |||||
( | |||||
SELECT COUNT(Id) FROM `{_pubName}` WHERE StatusName = N'Failed' | |||||
) AS PublishedFailed, | |||||
( | |||||
SELECT COUNT(Id) FROM `{_recName}` WHERE StatusName = N'Failed' | |||||
) AS ReceivedFailed;"; | |||||
StatisticsDto statistics; | |||||
using (var connection = new MySqlConnection(_options.ConnectionString)) | |||||
SELECT | |||||
( | |||||
SELECT COUNT(Id) FROM `{_pubName}` WHERE StatusName = N'Succeeded' | |||||
) AS PublishedSucceeded, | |||||
( | |||||
SELECT COUNT(Id) FROM `{_recName}` WHERE StatusName = N'Succeeded' | |||||
) AS ReceivedSucceeded, | |||||
( | |||||
SELECT COUNT(Id) FROM `{_pubName}` WHERE StatusName = N'Failed' | |||||
) AS PublishedFailed, | |||||
( | |||||
SELECT COUNT(Id) FROM `{_recName}` WHERE StatusName = N'Failed' | |||||
) AS ReceivedFailed;"; | |||||
using var connection = new MySqlConnection(_options.ConnectionString); | |||||
var statistics = connection.ExecuteReader(sql, reader => | |||||
{ | { | ||||
statistics = connection.ExecuteReader(sql, reader => | |||||
{ | |||||
var statisticsDto = new StatisticsDto(); | |||||
var statisticsDto = new StatisticsDto(); | |||||
while (reader.Read()) | |||||
{ | |||||
statisticsDto.PublishedSucceeded = reader.GetInt32(0); | |||||
statisticsDto.ReceivedSucceeded = reader.GetInt32(1); | |||||
statisticsDto.PublishedFailed = reader.GetInt32(2); | |||||
statisticsDto.ReceivedFailed = reader.GetInt32(3); | |||||
} | |||||
while (reader.Read()) | |||||
{ | |||||
statisticsDto.PublishedSucceeded = reader.GetInt32(0); | |||||
statisticsDto.ReceivedSucceeded = reader.GetInt32(1); | |||||
statisticsDto.PublishedFailed = reader.GetInt32(2); | |||||
statisticsDto.ReceivedFailed = reader.GetInt32(3); | |||||
} | |||||
return statisticsDto; | |||||
}); | |||||
} | |||||
return statisticsDto; | |||||
}); | |||||
return statistics; | return statistics; | ||||
} | } | ||||
@@ -189,15 +185,17 @@ namespace DotNetCore.CAP.MySql | |||||
string statusName, | string statusName, | ||||
IDictionary<string, DateTime> keyMaps) | IDictionary<string, DateTime> keyMaps) | ||||
{ | { | ||||
var sqlQuery = | |||||
$@" | |||||
select aggr.* from ( | |||||
select date_format(`Added`,'%Y-%m-%d-%H') as `Key`, | |||||
count(id) `Count` | |||||
from `{tableName}` | |||||
where StatusName = @statusName | |||||
group by date_format(`Added`,'%Y-%m-%d-%H') | |||||
) aggr where `Key` >= @minKey and `Key` <= @maxKey;"; | |||||
var sqlQuery = $@" | |||||
SELECT aggr.* | |||||
FROM ( | |||||
SELECT date_format(`Added`, '%Y-%m-%d-%H') AS `Key`, | |||||
count(id) `Count` | |||||
FROM `{tableName}` | |||||
WHERE StatusName = @statusName | |||||
GROUP BY date_format(`Added`, '%Y-%m-%d-%H') | |||||
) aggr | |||||
WHERE `Key` >= @minKey | |||||
AND `Key` <= @maxKey;"; | |||||
object[] sqlParams = | object[] sqlParams = | ||||
{ | { | ||||
@@ -249,7 +247,7 @@ select aggr.* from ( | |||||
var sql = $@"SELECT `Id` as DbId, `Content`,`Added`,`ExpiresAt`,`Retries` FROM `{tableName}` WHERE Id={id};"; | var sql = $@"SELECT `Id` as DbId, `Content`,`Added`,`ExpiresAt`,`Retries` FROM `{tableName}` WHERE Id={id};"; | ||||
await using var connection = new MySqlConnection(_options.ConnectionString); | await using var connection = new MySqlConnection(_options.ConnectionString); | ||||
var mediumMessae = connection.ExecuteReader(sql, reader => | |||||
var mediumMessage = connection.ExecuteReader(sql, reader => | |||||
{ | { | ||||
MediumMessage message = null; | MediumMessage message = null; | ||||
@@ -268,13 +266,7 @@ select aggr.* from ( | |||||
return message; | return message; | ||||
}); | }); | ||||
return mediumMessae; | |||||
return mediumMessage; | |||||
} | } | ||||
} | } | ||||
class TimelineCounter | |||||
{ | |||||
public string Key { get; set; } | |||||
public int Count { get; set; } | |||||
} | |||||
} | } |
@@ -135,7 +135,7 @@ namespace DotNetCore.CAP.PostgreSql | |||||
public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, | public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, | ||||
CancellationToken token = default) | CancellationToken token = default) | ||||
{ | { | ||||
using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
var count = connection.ExecuteNonQuery( | var count = connection.ExecuteNonQuery( | ||||
$"DELETE FROM {table} WHERE \"ExpiresAt\" < @timeout AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @batchCount);", null, | $"DELETE FROM {table} WHERE \"ExpiresAt\" < @timeout AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @batchCount);", null, | ||||
new NpgsqlParameter("@timeout", timeout), new NpgsqlParameter("@batchCount", batchCount)); | new NpgsqlParameter("@timeout", timeout), new NpgsqlParameter("@batchCount", batchCount)); | ||||
@@ -179,7 +179,7 @@ namespace DotNetCore.CAP.PostgreSql | |||||
new NpgsqlParameter("@StatusName", state.ToString("G")) | new NpgsqlParameter("@StatusName", state.ToString("G")) | ||||
}; | }; | ||||
using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
connection.ExecuteNonQuery(sql, sqlParams: sqlParams); | connection.ExecuteNonQuery(sql, sqlParams: sqlParams); | ||||
await Task.CompletedTask; | await Task.CompletedTask; | ||||
@@ -197,28 +197,25 @@ namespace DotNetCore.CAP.PostgreSql | |||||
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string sql) | private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string sql) | ||||
{ | { | ||||
List<MediumMessage> result; | |||||
using (var connection = new NpgsqlConnection(_options.Value.ConnectionString)) | |||||
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||||
var result = connection.ExecuteReader(sql, reader => | |||||
{ | { | ||||
result = connection.ExecuteReader(sql, reader => | |||||
var messages = new List<MediumMessage>(); | |||||
while (reader.Read()) | |||||
{ | { | ||||
var messages = new List<MediumMessage>(); | |||||
while (reader.Read()) | |||||
messages.Add(new MediumMessage | |||||
{ | { | ||||
messages.Add(new MediumMessage | |||||
{ | |||||
DbId = reader.GetInt64(0).ToString(), | |||||
Origin = StringSerializer.DeSerialize(reader.GetString(1)), | |||||
Retries = reader.GetInt32(2), | |||||
Added = reader.GetDateTime(3) | |||||
}); | |||||
} | |||||
return messages; | |||||
}); | |||||
} | |||||
DbId = reader.GetInt64(0).ToString(), | |||||
Origin = StringSerializer.DeSerialize(reader.GetString(1)), | |||||
Retries = reader.GetInt32(2), | |||||
Added = reader.GetDateTime(3) | |||||
}); | |||||
} | |||||
return messages; | |||||
}); | |||||
return await Task.FromResult(result); | |||||
return result; | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -238,8 +238,8 @@ select ""Key"",""Count"" from aggr where ""Key"" >= @minKey and ""Key"" <= @maxK | |||||
{ | { | ||||
var sql = $@"SELECT ""Id"" AS ""DbId"", ""Content"", ""Added"", ""ExpiresAt"", ""Retries"" FROM {tableName} WHERE ""Id""={id} FOR UPDATE SKIP LOCKED"; | var sql = $@"SELECT ""Id"" AS ""DbId"", ""Content"", ""Added"", ""ExpiresAt"", ""Retries"" FROM {tableName} WHERE ""Id""={id} FOR UPDATE SKIP LOCKED"; | ||||
using var connection = new NpgsqlConnection(_options.ConnectionString); | |||||
var mediumMessae = connection.ExecuteReader(sql, reader => | |||||
await using var connection = new NpgsqlConnection(_options.ConnectionString); | |||||
var mediumMessage = connection.ExecuteReader(sql, reader => | |||||
{ | { | ||||
MediumMessage message = null; | MediumMessage message = null; | ||||
@@ -258,13 +258,7 @@ select ""Key"",""Count"" from aggr where ""Key"" >= @minKey and ""Key"" <= @maxK | |||||
return message; | return message; | ||||
}); | }); | ||||
return await Task.FromResult(mediumMessae); | |||||
return await Task.FromResult(mediumMessage); | |||||
} | } | ||||
} | |||||
internal class TimelineCounter | |||||
{ | |||||
public string Key { get; set; } | |||||
public int Count { get; set; } | |||||
} | |||||
} | |||||
} | } |
@@ -30,39 +30,35 @@ namespace DotNetCore.CAP.SqlServer | |||||
public StatisticsDto GetStatistics() | public StatisticsDto GetStatistics() | ||||
{ | { | ||||
var sql = $@" | var sql = $@" | ||||
SET TRANSACTION ISOLATION LEVEL READ COMMITTED; | |||||
SELECT | |||||
( | |||||
SELECT COUNT(Id) FROM {_pubName} WHERE StatusName = N'Succeeded' | |||||
) AS PublishedSucceeded, | |||||
( | |||||
SELECT COUNT(Id) FROM {_recName} WHERE StatusName = N'Succeeded' | |||||
) AS ReceivedSucceeded, | |||||
( | |||||
SELECT COUNT(Id) FROM {_pubName} WHERE StatusName = N'Failed' | |||||
) AS PublishedFailed, | |||||
( | |||||
SELECT COUNT(Id) FROM {_recName} WHERE StatusName = N'Failed' | |||||
) AS ReceivedFailed;"; | |||||
StatisticsDto statistics; | |||||
using (var connection = new SqlConnection(_options.ConnectionString)) | |||||
SELECT | |||||
( | |||||
SELECT COUNT(Id) FROM {_pubName} WHERE StatusName = N'Succeeded' | |||||
) AS PublishedSucceeded, | |||||
( | |||||
SELECT COUNT(Id) FROM {_recName} WHERE StatusName = N'Succeeded' | |||||
) AS ReceivedSucceeded, | |||||
( | |||||
SELECT COUNT(Id) FROM {_pubName} WHERE StatusName = N'Failed' | |||||
) AS PublishedFailed, | |||||
( | |||||
SELECT COUNT(Id) FROM {_recName} WHERE StatusName = N'Failed' | |||||
) AS ReceivedFailed;"; | |||||
using var connection = new SqlConnection(_options.ConnectionString); | |||||
var statistics = connection.ExecuteReader(sql, reader => | |||||
{ | { | ||||
statistics = connection.ExecuteReader(sql, reader => | |||||
{ | |||||
var statisticsDto = new StatisticsDto(); | |||||
var statisticsDto = new StatisticsDto(); | |||||
while (reader.Read()) | |||||
{ | |||||
statisticsDto.PublishedSucceeded = reader.GetInt32(0); | |||||
statisticsDto.ReceivedSucceeded = reader.GetInt32(1); | |||||
statisticsDto.PublishedFailed = reader.GetInt32(2); | |||||
statisticsDto.ReceivedFailed = reader.GetInt32(3); | |||||
} | |||||
while (reader.Read()) | |||||
{ | |||||
statisticsDto.PublishedSucceeded = reader.GetInt32(0); | |||||
statisticsDto.ReceivedSucceeded = reader.GetInt32(1); | |||||
statisticsDto.PublishedFailed = reader.GetInt32(2); | |||||
statisticsDto.ReceivedFailed = reader.GetInt32(3); | |||||
} | |||||
return statisticsDto; | |||||
}); | |||||
} | |||||
return statisticsDto; | |||||
}); | |||||
return statistics; | return statistics; | ||||
} | } | ||||
@@ -253,7 +249,7 @@ select [Key], [Count] from aggr with (nolock) where [Key] >= @minKey and [Key] < | |||||
var sql = $@"SELECT TOP 1 Id AS DbId, Content, Added, ExpiresAt, Retries FROM {tableName} WITH (readpast) WHERE Id={id}"; | var sql = $@"SELECT TOP 1 Id AS DbId, Content, Added, ExpiresAt, Retries FROM {tableName} WITH (readpast) WHERE Id={id}"; | ||||
using var connection = new SqlConnection(_options.ConnectionString); | using var connection = new SqlConnection(_options.ConnectionString); | ||||
var mediumMessae = connection.ExecuteReader(sql, reader => | |||||
var mediumMessage = connection.ExecuteReader(sql, reader => | |||||
{ | { | ||||
MediumMessage message = null; | MediumMessage message = null; | ||||
@@ -272,13 +268,7 @@ select [Key], [Count] from aggr with (nolock) where [Key] >= @minKey and [Key] < | |||||
return message; | return message; | ||||
}); | }); | ||||
return await Task.FromResult(mediumMessae); | |||||
return await Task.FromResult(mediumMessage); | |||||
} | } | ||||
} | |||||
internal class TimelineCounter | |||||
{ | |||||
public string Key { get; set; } | |||||
public int Count { get; set; } | |||||
} | |||||
} | |||||
} | } |