Browse Source

implements dashboard interface.

master
Savorboard 7 years ago
parent
commit
ffc3399014
3 changed files with 303 additions and 39 deletions
  1. +208
    -0
      src/DotNetCore.CAP.PostgreSql/PostgreSqlMonitoringApi.cs
  2. +46
    -2
      src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs
  3. +49
    -37
      src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs

+ 208
- 0
src/DotNetCore.CAP.PostgreSql/PostgreSqlMonitoringApi.cs View File

@@ -0,0 +1,208 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using Dapper;
using DotNetCore.CAP.Dashboard;
using DotNetCore.CAP.Dashboard.Monitoring;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;

namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlMonitoringApi: IMonitoringApi
{
private readonly PostgreSqlStorage _storage;
private readonly PostgreSqlOptions _options;

public PostgreSqlMonitoringApi(IStorage storage, PostgreSqlOptions options)
{
_options = options ?? throw new ArgumentNullException(nameof(options));
_storage = storage as PostgreSqlStorage ?? throw new ArgumentNullException(nameof(storage));
}

public StatisticsDto GetStatistics()
{
string sql = String.Format(@"
select count(Id) from ""{0}"".""published"" where ""StatusName"" = N'Succeeded';
select count(Id) from ""{0}"".""received"" where ""StatusName"" = N'Succeeded';
select count(Id) from ""{0}"".""published"" where ""StatusName"" = N'Failed';
select count(Id) from ""{0}"".""received"" where ""StatusName"" = N'Failed';
select count(Id) from ""{0}"".""published"" where ""StatusName"" in (N'Processing',N'Scheduled',N'Enqueued');
select count(Id) from ""{0}"".""received"" where ""StatusName"" = N'Processing';",
_options.Schema);

var statistics = UseConnection(connection =>
{
var stats = new StatisticsDto();
using (var multi = connection.QueryMultiple(sql))
{
stats.PublishedSucceeded = multi.ReadSingle<int>();
stats.ReceivedSucceeded = multi.ReadSingle<int>();

stats.PublishedFailed = multi.ReadSingle<int>();
stats.ReceivedFailed = multi.ReadSingle<int>();

stats.PublishedProcessing = multi.ReadSingle<int>();
stats.ReceivedProcessing = multi.ReadSingle<int>();
}
return stats;
});
return statistics;
}

public IList<MessageDto> Messages(MessageQueryDto queryDto)
{
var tableName = queryDto.MessageType == MessageType.Publish ? "published" : "received";
var where = string.Empty;
if (!string.IsNullOrEmpty(queryDto.StatusName))
{
if (string.Equals(queryDto.StatusName, StatusName.Processing, StringComparison.CurrentCultureIgnoreCase))
{
where += " and \"StatusName\" in (N'Processing',N'Scheduled',N'Enqueued')";
}
else
{
where += " and \"StatusName\" = @StatusName";
}
}
if (!string.IsNullOrEmpty(queryDto.Name))
{
where += " and \"Name\" = @Name";
}
if (!string.IsNullOrEmpty(queryDto.Group))
{
where += " and \"Group\" = @Group";
}
if (!string.IsNullOrEmpty(queryDto.Content))
{
where += " and \"Content\" like '%@Content%'";
}

var sqlQuery = $"select * from \"{_options.Schema}\".\"{tableName}\" where 1=1 {where} order by \"Added\" desc offset @Offset limit @Limit";

return UseConnection(conn => conn.Query<MessageDto>(sqlQuery, new
{
queryDto.StatusName,
queryDto.Group,
queryDto.Name,
queryDto.Content,
Offset = queryDto.CurrentPage * queryDto.PageSize,
Limit = queryDto.PageSize,
}).ToList());
}

public int PublishedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Failed));
}

public int PublishedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Processing));
}

public int PublishedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "published", StatusName.Succeeded));
}

public int ReceivedFailedCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Failed));
}

public int ReceivedProcessingCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Processing));
}

public int ReceivedSucceededCount()
{
return UseConnection(conn => GetNumberOfMessage(conn, "received", StatusName.Succeeded));
}

public IDictionary<DateTime, int> HourlySucceededJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? "published" : "received";
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, StatusName.Succeeded));
}

public IDictionary<DateTime, int> HourlyFailedJobs(MessageType type)
{
var tableName = type == MessageType.Publish ? "published" : "received";
return UseConnection(connection =>
GetHourlyTimelineStats(connection, tableName, StatusName.Failed));
}

private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName)
{
var sqlQuery = statusName == StatusName.Processing
? $"select count(Id) from \"{_options.Schema}\".\"{tableName}\" where \"StatusName\" in (N'Processing',N'Scheduled',N'Enqueued')"
: $"select count(Id) from \"{_options.Schema}\".\"{tableName}\" where \"StatusName\" = @state";

var count = connection.ExecuteScalar<int>(sqlQuery, new { state = statusName });
return count;
}

private T UseConnection<T>(Func<IDbConnection, T> action)
{
return _storage.UseConnection(action);
}

private Dictionary<DateTime, int> GetHourlyTimelineStats(IDbConnection connection, string tableName, string statusName)
{
var endDate = DateTime.Now;
var dates = new List<DateTime>();
for (var i = 0; i < 24; i++)
{
dates.Add(endDate);
endDate = endDate.AddHours(-1);
}

var keyMaps = dates.ToDictionary(x => x.ToString("yyyy-MM-dd-HH"), x => x);

return GetTimelineStats(connection, tableName, statusName, keyMaps);
}

private Dictionary<DateTime, int> GetTimelineStats(
IDbConnection connection,
string tableName,
string statusName,
IDictionary<string, DateTime> keyMaps)
{
//SQL Server 2012+
string sqlQuery =
$@"
with aggr as (
select to_char(""Added"",'yyyy-MM-dd-HH') as ""Key"",
count(""Id"") as ""Count""
from ""{_options.Schema}"".""{tableName}""
where ""StatusName"" = N'Processing'
group by to_char(""Added"", 'yyyy-MM-dd-HH')
)
select ""Key"",""Count"" from aggr where ""Key"" in @keys;";

var valuesMap = connection.Query(
sqlQuery,
new { keys = keyMaps.Keys, statusName })
.ToDictionary(x => (string)x.Key, x => (int)x.Count);

foreach (var key in keyMaps.Keys)
{
if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0);
}

var result = new Dictionary<DateTime, int>();
for (var i = 0; i < keyMaps.Count; i++)
{
var value = valuesMap[keyMaps.ElementAt(i).Key];
result.Add(keyMaps.ElementAt(i).Value, value);
}

return result;
}
}
}

+ 46
- 2
src/DotNetCore.CAP.PostgreSql/PostgreSqlStorage.cs View File

@@ -1,3 +1,6 @@
using System;
using System.Data;
using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
@@ -11,6 +14,7 @@ namespace DotNetCore.CAP.PostgreSql
{
private readonly PostgreSqlOptions _options;
private readonly ILogger _logger;
private readonly IDbConnection _existingConnection = null;

public PostgreSqlStorage(ILogger<PostgreSqlStorage> logger, PostgreSqlOptions options)
{
@@ -20,12 +24,12 @@ namespace DotNetCore.CAP.PostgreSql

public IStorageConnection GetConnection()
{
throw new System.NotImplementedException();
return new PostgreSqlStorageConnection(_options);
}

public IMonitoringApi GetMonitoringApi()
{
throw new System.NotImplementedException();
return new PostgreSqlMonitoringApi(this, _options);
}

public async Task InitializeAsync(CancellationToken cancellationToken)
@@ -41,6 +45,46 @@ namespace DotNetCore.CAP.PostgreSql
_logger.LogDebug("Ensuring all create database tables script are applied.");
}

internal T UseConnection<T>(Func<IDbConnection, T> func)
{
IDbConnection connection = null;

try
{
connection = CreateAndOpenConnection();
return func(connection);
}
finally
{
ReleaseConnection(connection);
}
}

internal IDbConnection CreateAndOpenConnection()
{
var connection = _existingConnection ?? new SqlConnection(_options.ConnectionString);

if (connection.State == ConnectionState.Closed)
{
connection.Open();
}

return connection;
}

internal bool IsExistingConnection(IDbConnection connection)
{
return connection != null && ReferenceEquals(connection, _existingConnection);
}

internal void ReleaseConnection(IDbConnection connection)
{
if (connection != null && !IsExistingConnection(connection))
{
connection.Dispose();
}
}

protected virtual string CreateDbTablesScript(string schema)
{
var batchSql = $@"


+ 49
- 37
src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs View File

@@ -5,21 +5,18 @@ using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models;
using DotNetCore.CAP.Processor.States;
using Npgsql;

namespace DotNetCore.CAP.PostgreSql
{
public class PostgreSqlStorageConnection : IStorageConnection
{
private readonly PostgreSqlOptions _options;

public PostgreSqlStorageConnection(PostgreSqlOptions options)
{
_options = options;
Options = options;
}

public PostgreSqlOptions Options => _options;
public PostgreSqlOptions Options { get; }

public IStorageTransaction CreateTransaction()
{
@@ -28,9 +25,9 @@ namespace DotNetCore.CAP.PostgreSql

public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"Id\"={id}";
var sql = $"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";

using (var connection = new NpgsqlConnection(_options.ConnectionString))
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
@@ -38,15 +35,18 @@ namespace DotNetCore.CAP.PostgreSql

public Task<IFetchedMessage> FetchNextMessageAsync()
{
var sql = $@"DELETE FROM ""{_options.Schema}"".""queue"" WHERE ""MessageId"" = (SELECT ""MessageId"" FROM ""{_options.Schema}"".""queue"" FOR UPDATE SKIP LOCKED LIMIT 1) RETURNING *;";
var sql = $@"DELETE FROM ""{Options.Schema}"".""queue"" WHERE ""MessageId"" = (SELECT ""MessageId"" FROM ""{
Options.Schema
}"".""queue"" FOR UPDATE SKIP LOCKED LIMIT 1) RETURNING *;";
return FetchNextMessageCoreAsync(sql);
}

public async Task<CapPublishedMessage> GetNextPublishedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";

using (var connection = new NpgsqlConnection(_options.ConnectionString))
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapPublishedMessage>(sql);
}
@@ -54,9 +54,10 @@ namespace DotNetCore.CAP.PostgreSql

public async Task<IEnumerable<CapPublishedMessage>> GetFailedPublishedMessages()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"published\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;";
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"published\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;";

using (var connection = new NpgsqlConnection(_options.ConnectionString))
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapPublishedMessage>(sql);
}
@@ -68,9 +69,10 @@ namespace DotNetCore.CAP.PostgreSql
{
if (message == null) throw new ArgumentNullException(nameof(message));

var sql = $"INSERT INTO \"{_options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
var sql =
$"INSERT INTO \"{Options.Schema}\".\"received\"(\"Name\",\"Group\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";

using (var connection = new NpgsqlConnection(_options.ConnectionString))
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
await connection.ExecuteAsync(sql, message);
}
@@ -78,8 +80,8 @@ namespace DotNetCore.CAP.PostgreSql

public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"Id\"={id}";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
var sql = $"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"Id\"={id} FOR UPDATE SKIP LOCKED";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
@@ -87,8 +89,9 @@ namespace DotNetCore.CAP.PostgreSql

public async Task<CapReceivedMessage> GetNextReceviedMessageToBeEnqueuedAsync()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"StatusName\" = '{StatusName.Scheduled}' FOR UPDATE SKIP LOCKED LIMIT 1;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryFirstOrDefaultAsync<CapReceivedMessage>(sql);
}
@@ -96,8 +99,9 @@ namespace DotNetCore.CAP.PostgreSql

public async Task<IEnumerable<CapReceivedMessage>> GetFailedReceviedMessages()
{
var sql = $"SELECT * FROM \"{_options.Schema}\".\"received\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;";
using (var connection = new NpgsqlConnection(_options.ConnectionString))
var sql =
$"SELECT * FROM \"{Options.Schema}\".\"received\" WHERE \"StatusName\"='{StatusName.Failed}' LIMIT 1000;";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return await connection.QueryAsync<CapReceivedMessage>(sql);
}
@@ -107,10 +111,32 @@ namespace DotNetCore.CAP.PostgreSql
{
}

public bool ChangePublishedState(int messageId, string state)
{
var sql =
$"UPDATE \"{Options.Schema}\".\"published\" SET \"Retries\"=\"Retries\"+1,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";

using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return connection.Execute(sql) > 0;
}
}

public bool ChangeReceivedState(int messageId, string state)
{
var sql =
$"UPDATE \"{Options.Schema}\".\"received\" SET \"Retries\"=\"Retries\"+1,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";

using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
return connection.Execute(sql) > 0;
}
}

private async Task<IFetchedMessage> FetchNextMessageCoreAsync(string sql, object args = null)
{
//here don't use `using` to dispose
var connection = new NpgsqlConnection(_options.ConnectionString);
var connection = new NpgsqlConnection(Options.ConnectionString);
await connection.OpenAsync();
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
FetchedMessage fetchedMessage;
@@ -132,22 +158,8 @@ namespace DotNetCore.CAP.PostgreSql
return null;
}

return new PostgreSqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction);
}

public List<string> GetRangeFromSet(string key, int startingFrom, int endingAt)
{
throw new NotImplementedException();
}

public bool ChangePublishedState(int messageId, IState state)
{
throw new NotImplementedException();
}

public bool ChangeReceivedState(int messageId, IState state)
{
throw new NotImplementedException();
return new PostgreSqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection,
transaction);
}
}
}

Loading…
Cancel
Save