From 1d67da713a36c5fc42f430431ec70863ce68a7e3 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Tue, 5 Nov 2019 18:21:54 +0800 Subject: [PATCH] Add dashboard api interface --- .../IMonitoringApi.MySql.cs | 217 ++++++++++++++++++ .../Monitoring/IMonitoringApi.cs | 34 +++ src/DotNetCore.CAP/Monitoring/MessageDto.cs | 28 +++ .../Monitoring/MessageQueryDto.cs | 23 ++ .../Monitoring/StatisticsDto.cs | 16 ++ 5 files changed, 318 insertions(+) create mode 100644 src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs create mode 100644 src/DotNetCore.CAP/Monitoring/IMonitoringApi.cs create mode 100644 src/DotNetCore.CAP/Monitoring/MessageDto.cs create mode 100644 src/DotNetCore.CAP/Monitoring/MessageQueryDto.cs create mode 100644 src/DotNetCore.CAP/Monitoring/StatisticsDto.cs diff --git a/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs b/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs new file mode 100644 index 0000000..d626599 --- /dev/null +++ b/src/DotNetCore.CAP.MySql/IMonitoringApi.MySql.cs @@ -0,0 +1,217 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Threading.Tasks; +using Dapper; +using DotNetCore.CAP.Internal; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Monitoring; +using DotNetCore.CAP.Persistence; +using Microsoft.Extensions.Options; +using MySql.Data.MySqlClient; + +namespace DotNetCore.CAP.MySql +{ + internal class MySqlMonitoringApi : IMonitoringApi + { + private readonly IOptions _options; + private readonly string _prefix; + + public MySqlMonitoringApi(IOptions options) + { + _options = options; + _prefix = options.Value.TableNamePrefix ?? throw new ArgumentNullException(nameof(options)); + } + + public StatisticsDto GetStatistics() + { + var sql = string.Format(@" +set transaction isolation level read committed; +select count(Id) from `{0}.published` where StatusName = N'Succeeded'; +select count(Id) from `{0}.received` where StatusName = N'Succeeded'; +select count(Id) from `{0}.published` where StatusName = N'Failed'; +select count(Id) from `{0}.received` where StatusName = N'Failed';", _prefix); + + var statistics = UseConnection(connection => + { + var stats = new StatisticsDto(); + using (var multi = connection.QueryMultiple(sql)) + { + stats.PublishedSucceeded = multi.ReadSingle(); + stats.ReceivedSucceeded = multi.ReadSingle(); + + stats.PublishedFailed = multi.ReadSingle(); + stats.ReceivedFailed = multi.ReadSingle(); + } + + return stats; + }); + return statistics; + } + + public IDictionary HourlyFailedJobs(MessageType type) + { + var tableName = type == MessageType.Publish ? "published" : "received"; + return UseConnection(connection => + GetHourlyTimelineStats(connection, tableName,nameof(StatusName.Failed))); + } + + public IDictionary HourlySucceededJobs(MessageType type) + { + var tableName = type == MessageType.Publish ? "published" : "received"; + return UseConnection(connection => + GetHourlyTimelineStats(connection, tableName, nameof( StatusName.Succeeded))); + } + + public IList Messages(MessageQueryDto queryDto) + { + var tableName = queryDto.MessageType == MessageType.Publish ? "published" : "received"; + var where = string.Empty; + if (!string.IsNullOrEmpty(queryDto.StatusName)) + { + where += " and StatusName=@StatusName"; + } + + if (!string.IsNullOrEmpty(queryDto.Name)) + { + where += " and Name=@Name"; + } + + if (!string.IsNullOrEmpty(queryDto.Group)) + { + where += " and `Group`=@Group"; + } + + if (!string.IsNullOrEmpty(queryDto.Content)) + { + where += " and Content like '%@Content%'"; + } + + var sqlQuery = + $"select * from `{_prefix}.{tableName}` where 1=1 {where} order by Added desc limit @Limit offset @Offset"; + + return UseConnection(conn => conn.Query(sqlQuery, new + { + queryDto.StatusName, + queryDto.Group, + queryDto.Name, + queryDto.Content, + Offset = queryDto.CurrentPage * queryDto.PageSize, + Limit = queryDto.PageSize + }).ToList()); + } + + public int PublishedFailedCount() + { + return UseConnection(conn => GetNumberOfMessage(conn, "published", nameof( StatusName.Failed))); + } + + public int PublishedSucceededCount() + { + return UseConnection(conn => GetNumberOfMessage(conn, "published", nameof(StatusName.Succeeded))); + } + + public int ReceivedFailedCount() + { + return UseConnection(conn => GetNumberOfMessage(conn, "received", nameof(StatusName.Failed))); + } + + public int ReceivedSucceededCount() + { + return UseConnection(conn => GetNumberOfMessage(conn, "received", nameof(StatusName.Succeeded))); + } + + private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName) + { + var sqlQuery = $"select count(Id) from `{_prefix}.{tableName}` where StatusName = @state"; + + var count = connection.ExecuteScalar(sqlQuery, new { state = statusName }); + return count; + } + + private T UseConnection(Func action) + { + return action(new MySqlConnection(_options.Value.ConnectionString)); + } + + private Dictionary GetHourlyTimelineStats(IDbConnection connection, string tableName, + string statusName) + { + var endDate = DateTime.Now; + var dates = new List(); + for (var i = 0; i < 24; i++) + { + dates.Add(endDate); + endDate = endDate.AddHours(-1); + } + + var keyMaps = dates.ToDictionary(x => x.ToString("yyyy-MM-dd-HH"), x => x); + + return GetTimelineStats(connection, tableName, statusName, keyMaps); + } + + private Dictionary GetTimelineStats( + IDbConnection connection, + string tableName, + string statusName, + IDictionary keyMaps) + { + var sqlQuery = + $@" +select aggr.* from ( + select date_format(`Added`,'%Y-%m-%d-%H') as `Key`, + count(id) `Count` + from `{_prefix}.{tableName}` + where StatusName = @statusName + group by date_format(`Added`,'%Y-%m-%d-%H') +) aggr where `Key` in @keys;"; + + var valuesMap = connection.Query( + sqlQuery, + new { keys = keyMaps.Keys, statusName }) + .ToDictionary(x => x.Key, x => x.Count); + + foreach (var key in keyMaps.Keys) + { + if (!valuesMap.ContainsKey(key)) + { + valuesMap.Add(key, 0); + } + } + + var result = new Dictionary(); + for (var i = 0; i < keyMaps.Count; i++) + { + var value = valuesMap[keyMaps.ElementAt(i).Key]; + result.Add(keyMaps.ElementAt(i).Value, value); + } + + return result; + } + + public async Task GetPublishedMessageAsync(long id) + { + var sql = $@"SELECT * FROM `{_prefix}.published` WHERE `Id`={id};"; + + await using var connection = new MySqlConnection(_options.Value.ConnectionString); + return await connection.QueryFirstOrDefaultAsync(sql); + } + + public async Task GetReceivedMessageAsync(long id) + { + var sql = $@"SELECT * FROM `{_prefix}.received` WHERE Id={id};"; + await using var connection = new MySqlConnection(_options.Value.ConnectionString); + return await connection.QueryFirstOrDefaultAsync(sql); + } + } + + class TimelineCounter + { + public string Key { get; set; } + public int Count { get; set; } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Monitoring/IMonitoringApi.cs b/src/DotNetCore.CAP/Monitoring/IMonitoringApi.cs new file mode 100644 index 0000000..8ebb8cb --- /dev/null +++ b/src/DotNetCore.CAP/Monitoring/IMonitoringApi.cs @@ -0,0 +1,34 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using DotNetCore.CAP.Messages; +using DotNetCore.CAP.Persistence; + +namespace DotNetCore.CAP.Monitoring +{ + public interface IMonitoringApi + { + Task GetPublishedMessageAsync(long id); + + Task GetReceivedMessageAsync(long id); + + StatisticsDto GetStatistics(); + + IList Messages(MessageQueryDto queryDto); + + int PublishedFailedCount(); + + int PublishedSucceededCount(); + + int ReceivedFailedCount(); + + int ReceivedSucceededCount(); + + IDictionary HourlySucceededJobs(MessageType type); + + IDictionary HourlyFailedJobs(MessageType type); + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Monitoring/MessageDto.cs b/src/DotNetCore.CAP/Monitoring/MessageDto.cs new file mode 100644 index 0000000..3151193 --- /dev/null +++ b/src/DotNetCore.CAP/Monitoring/MessageDto.cs @@ -0,0 +1,28 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; + +namespace DotNetCore.CAP.Monitoring +{ + public class MessageDto + { + public long Id { get; set; } + + public string Version { get; set; } + + public string Group { get; set; } + + public string Name { get; set; } + + public string Content { get; set; } + + public DateTime Added { get; set; } + + public DateTime? ExpiresAt { get; set; } + + public int Retries { get; set; } + + public string StatusName { get; set; } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Monitoring/MessageQueryDto.cs b/src/DotNetCore.CAP/Monitoring/MessageQueryDto.cs new file mode 100644 index 0000000..39b4a56 --- /dev/null +++ b/src/DotNetCore.CAP/Monitoring/MessageQueryDto.cs @@ -0,0 +1,23 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using DotNetCore.CAP.Messages; + +namespace DotNetCore.CAP.Monitoring +{ + public class MessageQueryDto + { + public MessageType MessageType { get; set; } + + public string Group { get; set; } + public string Name { get; set; } + + public string Content { get; set; } + + public string StatusName { get; set; } + + public int CurrentPage { get; set; } + + public int PageSize { get; set; } + } +} \ No newline at end of file diff --git a/src/DotNetCore.CAP/Monitoring/StatisticsDto.cs b/src/DotNetCore.CAP/Monitoring/StatisticsDto.cs new file mode 100644 index 0000000..d698f85 --- /dev/null +++ b/src/DotNetCore.CAP/Monitoring/StatisticsDto.cs @@ -0,0 +1,16 @@ +// Copyright (c) .NET Core Community. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +namespace DotNetCore.CAP.Monitoring +{ + public class StatisticsDto + { + public int Servers { get; set; } + + public int PublishedSucceeded { get; set; } + public int ReceivedSucceeded { get; set; } + + public int PublishedFailed { get; set; } + public int ReceivedFailed { get; set; } + } +} \ No newline at end of file