diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerMonitoringApi.cs b/src/DotNetCore.CAP.SqlServer/SqlServerMonitoringApi.cs new file mode 100644 index 0000000..0111aae --- /dev/null +++ b/src/DotNetCore.CAP.SqlServer/SqlServerMonitoringApi.cs @@ -0,0 +1,237 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Text; +using Dapper; +using DotNetCore.CAP.Dashboard; +using DotNetCore.CAP.Dashboard.Monitoring; + +namespace DotNetCore.CAP.SqlServer +{ + internal class SqlServerMonitoringApi : IMonitoringApi + { + private readonly SqlServerStorage _storage; + private readonly SqlServerOptions _options; + + public SqlServerMonitoringApi(IStorage storage, SqlServerOptions options) + { + if (storage == null) throw new ArgumentNullException(nameof(storage)); + if (options == null) throw new ArgumentNullException(nameof(options)); + + _options = options; + _storage = storage as SqlServerStorage; + } + + public JobList DeletedJobs(int from, int count) + { + return new JobList(new Dictionary()); + } + + public long DeletedListCount() + { + return 11; + } + + public long EnqueuedCount(string queue) + { + return 11; + } + + public JobList EnqueuedJobs(string queue, int from, int perPage) + { + return new JobList(new Dictionary()); + } + + public IDictionary FailedByDatesCount() + { + return new Dictionary(); + } + + public long FailedCount() + { + return 11; + } + + public JobList FailedJobs(int from, int count) + { + return new JobList(new Dictionary()); + } + + public long FetchedCount(string queue) + { + return 11; + } + + public JobList FetchedJobs(string queue, int from, int perPage) + { + return new JobList(new Dictionary()); + } + + public StatisticsDto GetStatistics() + { + string sql = String.Format(@" +set transaction isolation level read committed; +select count(Id) from [{0}].Published with (nolock) where StatusName = N'Enqueued'; +select count(Id) from [{0}].Published with (nolock) where StatusName = N'Failed'; +select count(Id) from [{0}].Published with (nolock) where StatusName = N'Processing'; +select count(Id) from [{0}].Published with (nolock) where StatusName = N'Scheduled'; +select 11; +select count(Id) from [{0}].Published with (nolock) where StatusName = N'Successed'; +select count(Id) from [{0}].Published with (nolock) where StatusName = N'Deleted'; +select 14; + ", _options.Schema); + + var statistics = UseConnection(connection => + { + var stats = new StatisticsDto(); + using (var multi = connection.QueryMultiple(sql)) + { + stats.Enqueued = multi.ReadSingle(); + stats.Failed = multi.ReadSingle(); + stats.Processing = multi.ReadSingle(); + stats.Scheduled = multi.ReadSingle(); + + stats.Servers = multi.ReadSingle(); + + stats.Succeeded = multi.ReadSingleOrDefault() ?? 0; + stats.Deleted = multi.ReadSingleOrDefault() ?? 0; + + stats.Recurring = multi.ReadSingle(); + } + return stats; + }); + + //statistics.Queues = _storage.QueueProviders + // .SelectMany(x => x.GetJobQueueMonitoringApi().GetQueues()) + // .Count(); + statistics.Queues = 3; + return statistics; + } + + public IDictionary HourlyFailedJobs() + { + return UseConnection(connection => + GetHourlyTimelineStats(connection, "failed")); + } + + public IDictionary HourlySucceededJobs() + { + return UseConnection(connection => + GetHourlyTimelineStats(connection, "succeeded")); + } + + public JobDetailsDto JobDetails(string jobId) + { + return new JobDetailsDto(); + } + + public long ProcessingCount() + { + return 11; + } + + public JobList ProcessingJobs(int from, int count) + { + return new JobList(new Dictionary()); + } + + public IList Queues() + { + return new List(); + } + + public long ScheduledCount() + { + return 0; + } + + public JobList ScheduledJobs(int from, int count) + { + return new JobList(new Dictionary()); + } + + public IList Servers() + { + return new List(); + } + + public IDictionary SucceededByDatesCount() + { + return new Dictionary(); + } + + public JobList SucceededJobs(int from, int count) + { + return new JobList(new Dictionary()); + } + + public long SucceededListCount() + { + return 11; + } + + private T UseConnection(Func action) + { + return _storage.UseConnection(action); + } + + private Dictionary GetHourlyTimelineStats(IDbConnection connection, string type) + { + var endDate = DateTime.UtcNow; + var dates = new List(); + for (var i = 0; i < 24; i++) + { + dates.Add(endDate); + endDate = endDate.AddHours(-1); + } + + var keyMaps = dates.ToDictionary(x => $"stats:{type}:{x.ToString("yyyy-MM-dd-HH")}", x => x); + + return GetTimelineStats(connection, keyMaps); + } + + private Dictionary GetTimelineStats(IDbConnection connection, string type) + { + var endDate = DateTime.UtcNow.Date; + var dates = new List(); + for (var i = 0; i < 7; i++) + { + dates.Add(endDate); + endDate = endDate.AddDays(-1); + } + + var keyMaps = dates.ToDictionary(x => $"stats:{type}:{x.ToString("yyyy-MM-dd")}", x => x); + + return GetTimelineStats(connection, keyMaps); + } + + private Dictionary GetTimelineStats( + IDbConnection connection, + IDictionary keyMaps) + { + string sqlQuery = +$@"select [Key], [Value] as [Count] from [{_options.Schema}].AggregatedCounter with (nolock) +where [Key] in @keys"; + + var valuesMap = connection.Query( + sqlQuery, + new { keys = keyMaps.Keys }) + .ToDictionary(x => (string)x.Key, x => (long)x.Count); + + foreach (var key in keyMaps.Keys) + { + if (!valuesMap.ContainsKey(key)) valuesMap.Add(key, 0); + } + + var result = new Dictionary(); + for (var i = 0; i < keyMaps.Count; i++) + { + var value = valuesMap[keyMaps.ElementAt(i).Key]; + result.Add(keyMaps.ElementAt(i).Value, value); + } + + return result; + } + } +} diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerStorage.cs b/src/DotNetCore.CAP.SqlServer/SqlServerStorage.cs index 1d008cd..8d4fd26 100644 --- a/src/DotNetCore.CAP.SqlServer/SqlServerStorage.cs +++ b/src/DotNetCore.CAP.SqlServer/SqlServerStorage.cs @@ -1,3 +1,5 @@ +using System; +using System.Data; using System.Data.SqlClient; using System.Threading; using System.Threading.Tasks; @@ -11,6 +13,7 @@ namespace DotNetCore.CAP.SqlServer { private readonly SqlServerOptions _options; private readonly ILogger _logger; + private readonly IDbConnection _existingConnection; public SqlServerStorage(ILogger logger, SqlServerOptions options) { @@ -20,12 +23,12 @@ namespace DotNetCore.CAP.SqlServer public IStorageConnection GetConnection() { - throw new System.NotImplementedException(); + return new SqlServerStorageConnection(_options); } public IMonitoringApi GetMonitoringApi() { - throw new System.NotImplementedException(); + return new SqlServerMonitoringApi(this, _options); } public async Task InitializeAsync(CancellationToken cancellationToken) @@ -94,5 +97,46 @@ CREATE TABLE [{schema}].[Published]( END;"; return batchSql; } + + internal T UseConnection(Func func) + { + IDbConnection connection = null; + + try + { + connection = CreateAndOpenConnection(); + return func(connection); + } + finally + { + ReleaseConnection(connection); + } + } + + internal IDbConnection CreateAndOpenConnection() + { + var connection = _existingConnection ?? new SqlConnection(_options.ConnectionString); + + if (connection.State == ConnectionState.Closed) + { + connection.Open(); + } + + return connection; + } + + internal bool IsExistingConnection(IDbConnection connection) + { + return connection != null && ReferenceEquals(connection, _existingConnection); + } + + internal void ReleaseConnection(IDbConnection connection) + { + if (connection != null && !IsExistingConnection(connection)) + { + connection.Dispose(); + } + } + } } \ No newline at end of file diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs index c9cbdd9..13b0aba 100644 --- a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs +++ b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs @@ -140,24 +140,26 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; return new SqlServerFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection, transaction); } + // ------------------------------------------ + public long GetSetCount(string key) { - throw new NotImplementedException(); + return 11; } public List GetRangeFromSet(string key, int startingFrom, int endingAt) { - throw new NotImplementedException(); + return new List { "11", "22", "33" }; } public MessageData GetJobData(string jobId) { - throw new NotImplementedException(); + return new MessageData(); } public StateData GetStateData(string jobId) { - throw new NotImplementedException(); + return new StateData(); } } } \ No newline at end of file