Browse Source

refactor

master
Savorboard 7 years ago
parent
commit
2a1a908b8e
4 changed files with 53 additions and 56 deletions
  1. +8
    -11
      src/DotNetCore.CAP.SqlServer/SqlServerMonitoringApi.cs
  2. +32
    -35
      src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs
  3. +1
    -1
      src/DotNetCore.CAP/Dashboard/HtmlHelper.cs
  4. +12
    -9
      src/DotNetCore.CAP/IStorageConnection.cs

+ 8
- 11
src/DotNetCore.CAP.SqlServer/SqlServerMonitoringApi.cs View File

@@ -7,7 +7,6 @@ using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring; using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor.States;


namespace DotNetCore.CAP.SqlServer namespace DotNetCore.CAP.SqlServer
{ {
@@ -22,8 +21,6 @@ namespace DotNetCore.CAP.SqlServer
_storage = storage as SqlServerStorage ?? throw new ArgumentNullException(nameof(storage)); _storage = storage as SqlServerStorage ?? throw new ArgumentNullException(nameof(storage));
} }




public StatisticsDto GetStatistics() public StatisticsDto GetStatistics()
{ {
string sql = String.Format(@" string sql = String.Format(@"
@@ -59,14 +56,14 @@ _options.Schema);
{ {
var tableName = type == MessageType.Publish ? "Published" : "Received"; var tableName = type == MessageType.Publish ? "Published" : "Received";
return UseConnection(connection => return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, FailedState.StateName));
GetHourlyTimelineStats(connection, tableName, StatusName.Failed));
} }


public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type) public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{ {
var tableName = type == MessageType.Publish ? "Published" : "Received"; var tableName = type == MessageType.Publish ? "Published" : "Received";
return UseConnection(connection => return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, SucceededState.StateName));
GetHourlyTimelineStats(connection, tableName, StatusName.Succeeded));
} }


public IList<MessageDto> Messages(MessageQueryDto queryDto) public IList<MessageDto> Messages(MessageQueryDto queryDto)
@@ -75,7 +72,7 @@ _options.Schema);
var where = string.Empty; var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName)) if (!string.IsNullOrEmpty(queryDto.StatusName))
{ {
if (string.Equals(queryDto.StatusName, ProcessingState.StateName, StringComparison.CurrentCultureIgnoreCase))
if (string.Equals(queryDto.StatusName, StatusName.Processing, StringComparison.CurrentCultureIgnoreCase))
{ {
where += " and statusname in (N'Processing',N'Scheduled',N'Enqueued')"; where += " and statusname in (N'Processing',N'Scheduled',N'Enqueued')";
} }
@@ -101,10 +98,10 @@ _options.Schema);


return UseConnection(conn => conn.Query<MessageDto>(sqlQuery, new return UseConnection(conn => conn.Query<MessageDto>(sqlQuery, new
{ {
StatusName = queryDto.StatusName,
Group = queryDto.Group,
Name = queryDto.Name,
Content = queryDto.Content,
queryDto.StatusName,
queryDto.Group,
queryDto.Name,
queryDto.Content,
Offset = queryDto.CurrentPage * queryDto.PageSize, Offset = queryDto.CurrentPage * queryDto.PageSize,
Limit = queryDto.PageSize, Limit = queryDto.PageSize,
}).ToList()); }).ToList());
@@ -190,7 +187,7 @@ select [Key], [Count] from aggr with (nolock) where [Key] in @keys;";


var valuesMap = connection.Query( var valuesMap = connection.Query(
sqlQuery, sqlQuery,
new { keys = keyMaps.Keys, statusName = statusName })
new { keys = keyMaps.Keys, statusName })
.ToDictionary(x => (string)x.Key, x => (int)x.Count); .ToDictionary(x => (string)x.Key, x => (int)x.Count);


foreach (var key in keyMaps.Keys) foreach (var key in keyMaps.Keys)


+ 32
- 35
src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs View File

@@ -6,20 +6,17 @@ using System.Threading.Tasks;
using Dapper; using Dapper;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor.States;


namespace DotNetCore.CAP.SqlServer namespace DotNetCore.CAP.SqlServer
{ {
public class SqlServerStorageConnection : IStorageConnection public class SqlServerStorageConnection : IStorageConnection
{ {
private readonly SqlServerOptions _options;

public SqlServerStorageConnection(SqlServerOptions options) public SqlServerStorageConnection(SqlServerOptions options)
{ {
_options = options;
Options = options;
} }


public SqlServerOptions Options => _options;
public SqlServerOptions Options { get; }


public IStorageTransaction CreateTransaction() public IStorageTransaction CreateTransaction()
{ {
@@ -28,9 +25,9 @@ namespace DotNetCore.CAP.SqlServer


public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id) public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
{ {
var sql = $@"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE Id={id}";
var sql = $@"SELECT * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE Id={id}";


using (var connection = new SqlConnection(_options.ConnectionString))
using (var connection = new SqlConnection(Options.ConnectionString))
{ {
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql); return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
} }
@@ -40,7 +37,7 @@ namespace DotNetCore.CAP.SqlServer
{ {
var sql = $@" var sql = $@"
DELETE TOP (1) DELETE TOP (1)
FROM [{_options.Schema}].[Queue] WITH (readpast, updlock, rowlock)
FROM [{Options.Schema}].[Queue] WITH (readpast, updlock, rowlock)
OUTPUT DELETED.MessageId,DELETED.[MessageType];"; OUTPUT DELETED.MessageId,DELETED.[MessageType];";


return FetchNextMessageCoreAsync(sql); return FetchNextMessageCoreAsync(sql);
@@ -48,9 +45,10 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];";


public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync() public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
{ {
var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";
var sql =
$"SELECT TOP (1) * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";


using (var connection = new SqlConnection(_options.ConnectionString))
using (var connection = new SqlConnection(Options.ConnectionString))
{ {
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql); return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
} }
@@ -58,19 +56,21 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];";


public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages() public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
{ {
var sql = $"SELECT * FROM [{_options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'";
var sql =
$"SELECT * FROM [{Options.Schema}].[Published] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'";


using (var connection = new SqlConnection(_options.ConnectionString))
using (var connection = new SqlConnection(Options.ConnectionString))
{ {
return await connection.QueryAsync<CapPublishedMessage>(sql); return await connection.QueryAsync<CapPublishedMessage>(sql);
} }
} }


public bool ChangePublishedState(int messageId, IState state)
public bool ChangePublishedState(int messageId, string state)
{ {
var sql = $"UPDATE [{_options.Schema}].[Published] SET Retries=Retries+1,StatusName = '{state.Name}' WHERE Id={messageId}";
var sql =
$"UPDATE [{Options.Schema}].[Published] SET Retries=Retries+1,StatusName = '{state}' WHERE Id={messageId}";


using (var connection = new SqlConnection(_options.ConnectionString))
using (var connection = new SqlConnection(Options.ConnectionString))
{ {
return connection.Execute(sql) > 0; return connection.Execute(sql) > 0;
} }
@@ -83,10 +83,10 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];";
if (message == null) throw new ArgumentNullException(nameof(message)); if (message == null) throw new ArgumentNullException(nameof(message));


var sql = $@" var sql = $@"
INSERT INTO [{_options.Schema}].[Received]([Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])
INSERT INTO [{Options.Schema}].[Received]([Name],[Group],[Content],[Retries],[Added],[ExpiresAt],[StatusName])
VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";


using (var connection = new SqlConnection(_options.ConnectionString))
using (var connection = new SqlConnection(Options.ConnectionString))
{ {
await connection.ExecuteAsync(sql, message); await connection.ExecuteAsync(sql, message);
} }
@@ -94,8 +94,8 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";


public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id) public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
{ {
var sql = $@"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE Id={id}";
using (var connection = new SqlConnection(_options.ConnectionString))
var sql = $@"SELECT * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE Id={id}";
using (var connection = new SqlConnection(Options.ConnectionString))
{ {
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql); return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
} }
@@ -103,8 +103,9 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";


public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync() public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync()
{ {
var sql = $"SELECT TOP (1) * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";
using (var connection = new SqlConnection(_options.ConnectionString))
var sql =
$"SELECT TOP (1) * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Scheduled}'";
using (var connection = new SqlConnection(Options.ConnectionString))
{ {
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql); return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
} }
@@ -112,18 +113,20 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";


public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages() public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages()
{ {
var sql = $"SELECT * FROM [{_options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'";
using (var connection = new SqlConnection(_options.ConnectionString))
var sql =
$"SELECT * FROM [{Options.Schema}].[Received] WITH (readpast) WHERE StatusName = '{StatusName.Failed}'";
using (var connection = new SqlConnection(Options.ConnectionString))
{ {
return await connection.QueryAsync<CapReceivedMessage>(sql); return await connection.QueryAsync<CapReceivedMessage>(sql);
} }
} }


public bool ChangeReceivedState(int messageId, IState state)
public bool ChangeReceivedState(int messageId, string state)
{ {
var sql = $"UPDATE [{_options.Schema}].[Received] SET Retries=Retries+1,StatusName = '{state.Name}' WHERE Id={messageId}";
var sql =
$"UPDATE [{Options.Schema}].[Received] SET Retries=Retries+1,StatusName = '{state}' WHERE Id={messageId}";


using (var connection = new SqlConnection(_options.ConnectionString))
using (var connection = new SqlConnection(Options.ConnectionString))
{ {
return connection.Execute(sql) > 0; return connection.Execute(sql) > 0;
} }
@@ -136,7 +139,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null) private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{ {
//here don't use `using` to dispose //here don't use `using` to dispose
var connection = new SqlConnection(_options.ConnectionString);
var connection = new SqlConnection(Options.ConnectionString);
await connection.OpenAsync(); await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted); var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage; FetchedMessage fetchedMessage;
@@ -158,14 +161,8 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
return null; return null;
} }


return new SqlServerFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction);
}

// ------------------------------------------

public List<string> GetRangeFromSet(string key, int startingFrom, int endingAt)
{
return new List<string> { "11", "22", "33" };
return new SqlServerFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection,
transaction);
} }
} }
} }

+ 1
- 1
src/DotNetCore.CAP/Dashboard/HtmlHelper.cs View File

@@ -85,7 +85,7 @@ namespace DotNetCore.CAP.Dashboard
return Raw($"<em>{Strings.Common_NoState}</em>"); return Raw($"<em>{Strings.Common_NoState}</em>");
} }


return Raw($"<span class=\"label label-default\" style=\"background-color: {JobHistoryRenderer.GetForegroundStateColor(stateName)};\">{stateName}</span>");
return Raw($"<span class=\"label label-default\" style=\"background-color: {MessageHistoryRenderer.GetForegroundStateColor(stateName)};\">{stateName}</span>");
} }


public NonEscapedString RelativeTime(DateTime value) public NonEscapedString RelativeTime(DateTime value)


+ 12
- 9
src/DotNetCore.CAP/IStorageConnection.cs View File

@@ -2,7 +2,6 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor.States;


namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
@@ -58,19 +57,23 @@ namespace DotNetCore.CAP
/// </summary> /// </summary>
Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages(); Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages();


//-----------------------------------------

/// <summary> /// <summary>
/// Creates and returns an <see cref="IStorageTransaction"/>. /// Creates and returns an <see cref="IStorageTransaction"/>.
/// </summary> /// </summary>
IStorageTransaction CreateTransaction(); IStorageTransaction CreateTransaction();


//-------------------------------------------
bool ChangePublishedState(int messageId, IState state);
bool ChangeReceivedState(int messageId, IState state);
List<string> GetRangeFromSet(string key, int startingFrom, int endingAt);
/// <summary>
/// Change specified message's state of published message
/// </summary>
/// <param name="messageId">Message id</param>
/// <param name="state">State name</param>
bool ChangePublishedState(int messageId, string state);


/// <summary>
/// Change specified message's state of received message
/// </summary>
/// <param name="messageId">Message id</param>
/// <param name="state">State name</param>
bool ChangeReceivedState(int messageId, string state);
} }
} }

Loading…
Cancel
Save