* fix bug * fix bug Co-authored-by: wandone\xlw <123456>master
@@ -128,7 +128,7 @@ namespace DotNetCore.CAP.Dashboard | |||
Routes.AddRazorPage("/nodes", x => | |||
{ | |||
var id = x.Request.Cookies["cap.node"]; | |||
var id = x.Request.Cookies.ContainsKey("cap.node") ? x.Request.Cookies["cap.node"] : string.Empty; | |||
return new NodePage(id); | |||
}); | |||
@@ -141,22 +141,11 @@ namespace DotNetCore.CAP.MySql | |||
new MySqlParameter("@timeout", timeout), new MySqlParameter("@batchCount", batchCount)); | |||
} | |||
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry() | |||
{ | |||
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); | |||
var sql = $"SELECT * FROM `{_pubName}` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; | |||
return await GetMessagesOfNeedRetryAsync(sql); | |||
} | |||
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry() | |||
{ | |||
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); | |||
var sql = | |||
$"SELECT * FROM `{_recName}` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; | |||
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry() => | |||
await GetMessagesOfNeedRetryAsync(_pubName); | |||
return await GetMessagesOfNeedRetryAsync(sql); | |||
} | |||
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry() => | |||
await GetMessagesOfNeedRetryAsync(_recName); | |||
public IMonitoringApi GetMonitoringApi() | |||
{ | |||
@@ -189,8 +178,13 @@ namespace DotNetCore.CAP.MySql | |||
connection.ExecuteNonQuery(sql, sqlParams: sqlParams); | |||
} | |||
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string sql) | |||
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName) | |||
{ | |||
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); | |||
var sql = | |||
$"SELECT `Id`,`Content`,`Retries`,`Added` FROM `{tableName}` WHERE `Retries`<{_capOptions.Value.FailedRetryCount} " + | |||
$"AND `Version`='{_capOptions.Value.Version}' AND `Added`<'{fourMinAgo}' AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; | |||
await using var connection = new MySqlConnection(_options.Value.ConnectionString); | |||
var result = connection.ExecuteReader(sql, reader => | |||
{ | |||
@@ -143,23 +143,11 @@ namespace DotNetCore.CAP.PostgreSql | |||
return await Task.FromResult(count); | |||
} | |||
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry() | |||
{ | |||
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); | |||
var sql = | |||
$"SELECT * FROM {_pubName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; | |||
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry() => | |||
await GetMessagesOfNeedRetryAsync(_pubName); | |||
return await GetMessagesOfNeedRetryAsync(sql); | |||
} | |||
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry() | |||
{ | |||
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); | |||
var sql = | |||
$"SELECT * FROM {_recName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; | |||
return await GetMessagesOfNeedRetryAsync(sql); | |||
} | |||
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry() => | |||
await GetMessagesOfNeedRetryAsync(_recName); | |||
public IMonitoringApi GetMonitoringApi() | |||
{ | |||
@@ -195,8 +183,13 @@ namespace DotNetCore.CAP.PostgreSql | |||
connection.ExecuteNonQuery(sql, sqlParams: sqlParams); | |||
} | |||
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string sql) | |||
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName) | |||
{ | |||
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); | |||
var sql = | |||
$"SELECT \"Id\",\"Content\",\"Retries\",\"Added\" FROM {tableName} WHERE \"Retries\"<{_capOptions.Value.FailedRetryCount} " + | |||
$"AND \"Version\"='{_capOptions.Value.Version}' AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; | |||
await using var connection = new NpgsqlConnection(_options.Value.ConnectionString); | |||
var result = connection.ExecuteReader(sql, reader => | |||
{ | |||
@@ -142,24 +142,11 @@ namespace DotNetCore.CAP.SqlServer | |||
return await Task.FromResult(count); | |||
} | |||
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry() | |||
{ | |||
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); | |||
var sql = $"SELECT TOP (200) Id, Content, Retries, Added FROM {_pubName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " + | |||
$"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; | |||
return await GetMessagesOfNeedRetryAsync(sql); | |||
} | |||
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry() | |||
{ | |||
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); | |||
var sql = | |||
$"SELECT TOP (200) Id, Content, Retries, Added FROM {_recName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " + | |||
$"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; | |||
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry() => | |||
await GetMessagesOfNeedRetryAsync(_pubName); | |||
return await GetMessagesOfNeedRetryAsync(sql); | |||
} | |||
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry() => | |||
await GetMessagesOfNeedRetryAsync(_recName); | |||
public IMonitoringApi GetMonitoringApi() | |||
{ | |||
@@ -195,8 +182,13 @@ namespace DotNetCore.CAP.SqlServer | |||
connection.ExecuteNonQuery(sql, sqlParams: sqlParams); | |||
} | |||
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string sql) | |||
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName) | |||
{ | |||
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); | |||
var sql = | |||
$"SELECT TOP (200) Id, Content, Retries, Added FROM {tableName} WITH (readpast) WHERE Retries<{_capOptions.Value.FailedRetryCount} " + | |||
$"AND Version='{_capOptions.Value.Version}' AND Added<'{fourMinAgo}' AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; | |||
List<MediumMessage> result; | |||
using (var connection = new SqlConnection(_options.Value.ConnectionString)) | |||
{ | |||
@@ -193,7 +193,7 @@ with aggr as ( | |||
where StatusName = @statusName | |||
group by replace(convert(varchar, Added, 111), '/','-') + '-' + CONVERT(varchar, DATEPART(hh, Added)) | |||
) | |||
select [Key], [Count] from aggr with (nolock) where [Key] in @keys;"; | |||
select [Key], [Count] from aggr with (nolock) where [Key] >= @minKey and [Key] <= @maxKey;"; | |||
//SQL Server 2012+ | |||
var sqlQuery = $@" | |||