From 2a1a908b8e4d949e530590f5ba5ba10646acb22b Mon Sep 17 00:00:00 2001 From: Savorboard Date: Fri, 29 Sep 2017 15:04:48 +0800 Subject: [PATCH] refactor --- .../SqlServerMonitoringApi.cs | 19 +++--- .../SqlServerStorageConnection.cs | 67 +++++++++---------- src/DotNetCore.CAP/Dashboard/HtmlHelper.cs | 2 +- src/DotNetCore.CAP/IStorageConnection.cs | 21 +++--- 4 files changed, 53 insertions(+), 56 deletions(-) diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerMonitoringApi.cs b/src/DotNetCore.CAP.SqlServer/SqlServerMonitoringApi.cs index 421bb11..0836988 100644 --- a/src/DotNetCore.CAP.SqlServer/SqlServerMonitoringApi.cs +++ b/src/DotNetCore.CAP.SqlServer/SqlServerMonitoringApi.cs @@ -7,7 +7,6 @@ using DotNetCore.CAP.Dashboard; using DotNetCore.CAP.Dashboard.Monitoring; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Models; -using DotNetCore.CAP.Processor.States; namespace DotNetCore.CAP.SqlServer { @@ -22,8 +21,6 @@ namespace DotNetCore.CAP.SqlServer _storage = storage as SqlServerStorage ?? throw new ArgumentNullException(nameof(storage)); } - - public StatisticsDto GetStatistics() { string sql = String.Format(@" @@ -59,14 +56,14 @@ _options.Schema); { var tableName = type == MessageType.Publish ? "Published" : "Received"; return UseConnection(connection => - GetHourlyTimelineStats(connection, tableName, FailedState.StateName)); + GetHourlyTimelineStats(connection, tableName, StatusName.Failed)); } public IDictionary HourlySucceededJobs(MessageType type) { var tableName = type == MessageType.Publish ? "Published" : "Received"; return UseConnection(connection => - GetHourlyTimelineStats(connection, tableName, SucceededState.StateName)); + GetHourlyTimelineStats(connection, tableName, StatusName.Succeeded)); } public IList Messages(MessageQueryDto queryDto) @@ -75,7 +72,7 @@ _options.Schema); var where = string.Empty; if (!string.IsNullOrEmpty(queryDto.StatusName)) { - if (string.Equals(queryDto.StatusName, ProcessingState.StateName, StringComparison.CurrentCultureIgnoreCase)) + if (string.Equals(queryDto.StatusName, StatusName.Processing, StringComparison.CurrentCultureIgnoreCase)) { where += " and statusname in (N'Processing',N'Scheduled',N'Enqueued')"; } @@ -101,10 +98,10 @@ _options.Schema); return UseConnection(conn => conn.Query(sqlQuery, new { - StatusName = queryDto.StatusName, - Group = queryDto.Group, - Name = queryDto.Name, - Content = queryDto.Content, + queryDto.StatusName, + queryDto.Group, + queryDto.Name, + queryDto.Content, Offset = queryDto.CurrentPage * queryDto.PageSize, Limit = queryDto.PageSize, }).ToList()); @@ -190,7 +187,7 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;"; var valuesMap = connection.Query( sqlQuery, - new { keys = keyMaps.Keys, statusName = statusName }) + new { keys = keyMaps.Keys, statusName }) .ToDictionary(x => (string)x.Key, x => (int)x.Count); foreach (var key in keyMaps.Keys) diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs index daebe5a..50ca30a 100644 --- a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs +++ b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs @@ -6,20 +6,17 @@ using System.Threading.Tasks; using Dapper; using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Models; -using DotNetCore.CAP.Processor.States; namespace DotNetCore.CAP.SqlServer { public class SqlServerStorageConnection : IStorageConnection { - private readonly SqlServerOptions _options; - public SqlServerStorageConnection(SqlServerOptions options) { - _options = options; + Options = options; } - public SqlServerOptions Options => _options; + public SqlServerOptions Options { get; } public IStorageTransaction CreateTransaction() { @@ -28,9 +25,9 @@ namespace DotNetCore.CAP.SqlServer public async Task GetPublishedMessageAsync(int id) { - var sql = $@"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE Id={id}"; + var sql = $@"SELECT * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Id={id}"; - using (var connection = new SqlConnection(_options.ConnectionString)) + using (var connection = new SqlConnection(Options.ConnectionString)) { return await connection.QueryFirstOrDefaultAsync(sql); } @@ -40,7 +37,7 @@ namespace DotNetCore.CAP.SqlServer { var sql = $@" DELETE TOP (1) -FROM [{_options.Schema}].[Queue] WITH (readpast, updlock, rowlock) +FROM [{Options.Schema}].[Queue] WITH (readpast, updlock, rowlock) OUTPUT DELETED.MessageId,DELETED.[MessageType];"; return FetchNextMessageCoreAsync(sql); @@ -48,9 +45,10 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];"; public async Task GetNextPublishedMessageToBeEnqueuedAsync() { - var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'"; + var sql = + $"SELECT TOP (1) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'"; - using (var connection = new SqlConnection(_options.ConnectionString)) + using (var connection = new SqlConnection(Options.ConnectionString)) { return await connection.QueryFirstOrDefaultAsync(sql); } @@ -58,19 +56,21 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];"; public async Task> GetFailedPublishedMessages() { - var sql = $"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'"; + var sql = + $"SELECT * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'"; - using (var connection = new SqlConnection(_options.ConnectionString)) + using (var connection = new SqlConnection(Options.ConnectionString)) { return await connection.QueryAsync(sql); } } - public bool ChangePublishedState(int messageId, IState state) + public bool ChangePublishedState(int messageId, string state) { - var sql = $"UPDATE [{_options.Schema}].[Published] SET Retries=Retries+1,StatusName = '{state.Name}' WHERE Id={messageId}"; + var sql = + $"UPDATE [{Options.Schema}].[Published] SET Retries=Retries+1,StatusName = '{state}' WHERE Id={messageId}"; - using (var connection = new SqlConnection(_options.ConnectionString)) + using (var connection = new SqlConnection(Options.ConnectionString)) { return connection.Execute(sql) > 0; } @@ -83,10 +83,10 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];"; if (message == null) throw new ArgumentNullException(nameof(message)); var sql = $@" -INSERT INTO [{_options.Schema}].[Received]([Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) +INSERT INTO [{Options.Schema}].[Received]([Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName]) VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; - using (var connection = new SqlConnection(_options.ConnectionString)) + using (var connection = new SqlConnection(Options.ConnectionString)) { await connection.ExecuteAsync(sql, message); } @@ -94,8 +94,8 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; public async Task GetReceivedMessageAsync(int id) { - var sql = $@"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE Id={id}"; - using (var connection = new SqlConnection(_options.ConnectionString)) + var sql = $@"SELECT * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Id={id}"; + using (var connection = new SqlConnection(Options.ConnectionString)) { return await connection.QueryFirstOrDefaultAsync(sql); } @@ -103,8 +103,9 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; public async Task GetNextReceviedMessageToBeEnqueuedAsync() { - var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'"; - using (var connection = new SqlConnection(_options.ConnectionString)) + var sql = + $"SELECT TOP (1) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'"; + using (var connection = new SqlConnection(Options.ConnectionString)) { return await connection.QueryFirstOrDefaultAsync(sql); } @@ -112,18 +113,20 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; public async Task> GetFailedReceviedMessages() { - var sql = $"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'"; - using (var connection = new SqlConnection(_options.ConnectionString)) + var sql = + $"SELECT * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'"; + using (var connection = new SqlConnection(Options.ConnectionString)) { return await connection.QueryAsync(sql); } } - public bool ChangeReceivedState(int messageId, IState state) + public bool ChangeReceivedState(int messageId, string state) { - var sql = $"UPDATE [{_options.Schema}].[Received] SET Retries=Retries+1,StatusName = '{state.Name}' WHERE Id={messageId}"; + var sql = + $"UPDATE [{Options.Schema}].[Received] SET Retries=Retries+1,StatusName = '{state}' WHERE Id={messageId}"; - using (var connection = new SqlConnection(_options.ConnectionString)) + using (var connection = new SqlConnection(Options.ConnectionString)) { return connection.Execute(sql) > 0; } @@ -136,7 +139,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; private async Task FetchNextMessageCoreAsync(string sql, object args = null) { //here don't use `using` to dispose - var connection = new SqlConnection(_options.ConnectionString); + var connection = new SqlConnection(Options.ConnectionString); await connection.OpenAsync(); var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted); FetchedMessage fetchedMessage; @@ -158,14 +161,8 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; return null; } - return new SqlServerFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction); - } - - // ------------------------------------------ - - public List GetRangeFromSet(string key, int startingFrom, int endingAt) - { - return new List { "11", "22", "33" }; + return new SqlServerFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, + transaction); } } } \ No newline at end of file diff --git a/src/DotNetCore.CAP/Dashboard/HtmlHelper.cs b/src/DotNetCore.CAP/Dashboard/HtmlHelper.cs index b9e9777..91bbcde 100644 --- a/src/DotNetCore.CAP/Dashboard/HtmlHelper.cs +++ b/src/DotNetCore.CAP/Dashboard/HtmlHelper.cs @@ -85,7 +85,7 @@ namespace DotNetCore.CAP.Dashboard return Raw($"{Strings.Common_NoState}"); } - return Raw($"{stateName}"); + return Raw($"{stateName}"); } public NonEscapedString RelativeTime(DateTime value) diff --git a/src/DotNetCore.CAP/IStorageConnection.cs b/src/DotNetCore.CAP/IStorageConnection.cs index 0fe80fe..2a832a1 100644 --- a/src/DotNetCore.CAP/IStorageConnection.cs +++ b/src/DotNetCore.CAP/IStorageConnection.cs @@ -2,7 +2,6 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; using DotNetCore.CAP.Models; -using DotNetCore.CAP.Processor.States; namespace DotNetCore.CAP { @@ -58,19 +57,23 @@ namespace DotNetCore.CAP /// Task> GetFailedReceviedMessages(); - //----------------------------------------- - /// /// Creates and returns an . /// IStorageTransaction CreateTransaction(); - //------------------------------------------- - - bool ChangePublishedState(int messageId, IState state); - bool ChangeReceivedState(int messageId, IState state); - - List GetRangeFromSet(string key, int startingFrom, int endingAt); + /// + /// Change specified message's state of published message + /// + /// Message id + /// State name + bool ChangePublishedState(int messageId, string state); + /// + /// Change specified message's state of received message + /// + /// Message id + /// State name + bool ChangeReceivedState(int messageId, string state); } } \ No newline at end of file