@@ -52,14 +52,14 @@ namespace DotNetCore.CAP.MySql | |||
{ | |||
dbConnection.Execute(PrepareSql(), message, dbTransaction); | |||
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); | |||
_logger.LogInformation("Published Message has been persisted in the database. name:" + message); | |||
} | |||
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) | |||
{ | |||
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction); | |||
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); | |||
_logger.LogInformation("Published Message has been persisted in the database. name:" + message); | |||
} | |||
public async Task PublishAsync(CapPublishedMessage message) | |||
@@ -9,7 +9,6 @@ namespace DotNetCore.CAP.MySql | |||
{ | |||
internal class DefaultAdditionalProcessor : IAdditionalProcessor | |||
{ | |||
private readonly IServiceProvider _provider; | |||
private readonly ILogger _logger; | |||
private readonly MySqlOptions _options; | |||
@@ -17,13 +16,10 @@ namespace DotNetCore.CAP.MySql | |||
private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); | |||
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); | |||
public DefaultAdditionalProcessor( | |||
IServiceProvider provider, | |||
ILogger<DefaultAdditionalProcessor> logger, | |||
public DefaultAdditionalProcessor(ILogger<DefaultAdditionalProcessor> logger, | |||
MySqlOptions mysqlOptions) | |||
{ | |||
_logger = logger; | |||
_provider = provider; | |||
_options = mysqlOptions; | |||
} | |||
@@ -31,14 +27,14 @@ namespace DotNetCore.CAP.MySql | |||
{ | |||
_logger.LogDebug("Collecting expired entities."); | |||
var tables = new string[]{ | |||
var tables = new[]{ | |||
$"{_options.TableNamePrefix}.published", | |||
$"{_options.TableNamePrefix}.received" | |||
}; | |||
foreach (var table in tables) | |||
{ | |||
var removedCount = 0; | |||
int removedCount; | |||
do | |||
{ | |||
using (var connection = new MySqlConnection(_options.ConnectionString)) | |||
@@ -129,7 +129,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; | |||
var connection = new MySqlConnection(_options.ConnectionString); | |||
await connection.OpenAsync(); | |||
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted); | |||
FetchedMessage fetchedMessage = null; | |||
FetchedMessage fetchedMessage; | |||
try | |||
{ | |||
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction); | |||
@@ -7,7 +7,7 @@ using MySql.Data.MySqlClient; | |||
namespace DotNetCore.CAP.MySql | |||
{ | |||
public class MySqlStorageTransaction : IStorageTransaction, IDisposable | |||
public class MySqlStorageTransaction : IStorageTransaction | |||
{ | |||
private readonly string _prefix; | |||
@@ -52,14 +52,14 @@ namespace DotNetCore.CAP.PostgreSql | |||
{ | |||
dbConnection.Execute(PrepareSql(), message, dbTransaction); | |||
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); | |||
_logger.LogInformation("Published Message has been persisted in the database. name:" + message); | |||
} | |||
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) | |||
{ | |||
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction); | |||
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); | |||
_logger.LogInformation("Published Message has been persisted in the database. name:" + message); | |||
} | |||
public async Task PublishAsync(CapPublishedMessage message) | |||
@@ -2,7 +2,6 @@ using System.Threading; | |||
using System.Threading.Tasks; | |||
using Dapper; | |||
using DotNetCore.CAP.Dashboard; | |||
using Microsoft.EntityFrameworkCore; | |||
using Microsoft.Extensions.Logging; | |||
using Npgsql; | |||
@@ -113,7 +113,7 @@ namespace DotNetCore.CAP.PostgreSql | |||
var connection = new NpgsqlConnection(_options.ConnectionString); | |||
await connection.OpenAsync(); | |||
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted); | |||
FetchedMessage fetchedMessage = null; | |||
FetchedMessage fetchedMessage; | |||
try | |||
{ | |||
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction); | |||
@@ -7,7 +7,7 @@ using Npgsql; | |||
namespace DotNetCore.CAP.PostgreSql | |||
{ | |||
public class PostgreSqlStorageTransaction : IStorageTransaction, IDisposable | |||
public class PostgreSqlStorageTransaction : IStorageTransaction | |||
{ | |||
private readonly string _schema; | |||
@@ -1,6 +1,4 @@ | |||
using System; | |||
// ReSharper disable once CheckNamespace | |||
// ReSharper disable once CheckNamespace | |||
namespace DotNetCore.CAP | |||
{ | |||
public class RabbitMQOptions | |||
@@ -1,7 +1,4 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using RabbitMQ.Client; | |||
using RabbitMQ.Client; | |||
namespace DotNetCore.CAP.RabbitMQ | |||
{ | |||
@@ -13,8 +13,8 @@ namespace DotNetCore.CAP.RabbitMQ | |||
private readonly string _exchageName; | |||
private readonly string _queueName; | |||
private readonly RabbitMQOptions _rabbitMQOptions; | |||
private readonly ConnectionPool _connectionPool; | |||
private ConnectionPool _connectionPool; | |||
private IModel _channel; | |||
private ulong _deliveryTag; | |||
@@ -45,7 +45,7 @@ namespace DotNetCore.CAP.RabbitMQ | |||
type: RabbitMQOptions.ExchangeType, | |||
durable: true); | |||
var arguments = new Dictionary<string, object> { { "x-message-ttl", (int)_rabbitMQOptions.QueueMessageExpires } }; | |||
var arguments = new Dictionary<string, object> { { "x-message-ttl", _rabbitMQOptions.QueueMessageExpires } }; | |||
_channel.QueueDeclare(_queueName, | |||
durable: true, | |||
exclusive: false, | |||
@@ -1,7 +1,4 @@ | |||
using Microsoft.Extensions.Options; | |||
using RabbitMQ.Client; | |||
namespace DotNetCore.CAP.RabbitMQ | |||
namespace DotNetCore.CAP.RabbitMQ | |||
{ | |||
internal sealed class RabbitMQConsumerClientFactory : IConsumerClientFactory | |||
{ | |||
@@ -52,14 +52,14 @@ namespace DotNetCore.CAP.SqlServer | |||
{ | |||
dbConnection.Execute(PrepareSql(), message, dbTransaction); | |||
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); | |||
_logger.LogInformation("Published Message has been persisted in the database. name:" + message); | |||
} | |||
protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction, CapPublishedMessage message) | |||
{ | |||
await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction); | |||
_logger.LogInformation("Published Message has been persisted in the database. name:" + message.ToString()); | |||
_logger.LogInformation("Published Message has been persisted in the database. name:" + message); | |||
} | |||
public async Task PublishAsync(CapPublishedMessage message) | |||
@@ -9,7 +9,6 @@ namespace DotNetCore.CAP.SqlServer | |||
{ | |||
public class DefaultAdditionalProcessor : IAdditionalProcessor | |||
{ | |||
private readonly IServiceProvider _provider; | |||
private readonly ILogger _logger; | |||
private readonly SqlServerOptions _options; | |||
@@ -22,13 +21,10 @@ namespace DotNetCore.CAP.SqlServer | |||
"Published","Received" | |||
}; | |||
public DefaultAdditionalProcessor( | |||
IServiceProvider provider, | |||
ILogger<DefaultAdditionalProcessor> logger, | |||
public DefaultAdditionalProcessor(ILogger<DefaultAdditionalProcessor> logger, | |||
SqlServerOptions sqlServerOptions) | |||
{ | |||
_logger = logger; | |||
_provider = provider; | |||
_options = sqlServerOptions; | |||
} | |||
@@ -38,7 +34,7 @@ namespace DotNetCore.CAP.SqlServer | |||
foreach (var table in Tables) | |||
{ | |||
var removedCount = 0; | |||
int removedCount; | |||
do | |||
{ | |||
using (var connection = new SqlConnection(_options.ConnectionString)) | |||
@@ -18,11 +18,8 @@ namespace DotNetCore.CAP.SqlServer | |||
public SqlServerMonitoringApi(IStorage storage, SqlServerOptions options) | |||
{ | |||
if (storage == null) throw new ArgumentNullException(nameof(storage)); | |||
if (options == null) throw new ArgumentNullException(nameof(options)); | |||
_options = options; | |||
_storage = storage as SqlServerStorage; | |||
_options = options ?? throw new ArgumentNullException(nameof(options)); | |||
_storage = storage as SqlServerStorage ?? throw new ArgumentNullException(nameof(storage)); | |||
} | |||
@@ -74,7 +71,7 @@ _options.Schema); | |||
public IList<MessageDto> Messages(MessageQueryDto queryDto) | |||
{ | |||
var tableName = queryDto.MessageType == Models.MessageType.Publish ? "Published" : "Received"; | |||
var tableName = queryDto.MessageType == MessageType.Publish ? "Published" : "Received"; | |||
var where = string.Empty; | |||
if (!string.IsNullOrEmpty(queryDto.StatusName)) | |||
{ | |||
@@ -95,66 +92,45 @@ _options.Schema); | |||
var sqlQuery = $"select * from [{_options.Schema}].{tableName} where 1=1 {where} order by Added desc offset @Offset rows fetch next @Limit rows only"; | |||
return UseConnection(conn => | |||
return UseConnection(conn => conn.Query<MessageDto>(sqlQuery, new | |||
{ | |||
return conn.Query<MessageDto>(sqlQuery, new | |||
{ | |||
StatusName = queryDto.StatusName, | |||
Group = queryDto.Group, | |||
Name = queryDto.Name, | |||
Content = queryDto.Content, | |||
Offset = queryDto.CurrentPage * queryDto.PageSize, | |||
Limit = queryDto.PageSize, | |||
}).ToList(); | |||
}); | |||
StatusName = queryDto.StatusName, | |||
Group = queryDto.Group, | |||
Name = queryDto.Name, | |||
Content = queryDto.Content, | |||
Offset = queryDto.CurrentPage * queryDto.PageSize, | |||
Limit = queryDto.PageSize, | |||
}).ToList()); | |||
} | |||
public int PublishedFailedCount() | |||
{ | |||
return UseConnection(conn => | |||
{ | |||
return GetNumberOfMessage(conn, "Published", StatusName.Failed); | |||
}); | |||
return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Failed)); | |||
} | |||
public int PublishedProcessingCount() | |||
{ | |||
return UseConnection(conn => | |||
{ | |||
return GetNumberOfMessage(conn, "Published", StatusName.Processing); | |||
}); | |||
return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Processing)); | |||
} | |||
public int PublishedSucceededCount() | |||
{ | |||
return UseConnection(conn => | |||
{ | |||
return GetNumberOfMessage(conn, "Published", StatusName.Succeeded); | |||
}); | |||
return UseConnection(conn => GetNumberOfMessage(conn, "Published", StatusName.Succeeded)); | |||
} | |||
public int ReceivedFailedCount() | |||
{ | |||
return UseConnection(conn => | |||
{ | |||
return GetNumberOfMessage(conn, "Received", StatusName.Failed); | |||
}); | |||
return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Failed)); | |||
} | |||
public int ReceivedProcessingCount() | |||
{ | |||
return UseConnection(conn => | |||
{ | |||
return GetNumberOfMessage(conn, "Received", StatusName.Processing); | |||
}); | |||
return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Processing)); | |||
} | |||
public int ReceivedSucceededCount() | |||
{ | |||
return UseConnection(conn => | |||
{ | |||
return GetNumberOfMessage(conn, "Received", StatusName.Succeeded); | |||
}); | |||
return UseConnection(conn => GetNumberOfMessage(conn, "Received", StatusName.Succeeded)); | |||
} | |||
private int GetNumberOfMessage(IDbConnection connection, string tableName, string statusName) | |||
@@ -139,7 +139,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; | |||
var connection = new SqlConnection(_options.ConnectionString); | |||
await connection.OpenAsync(); | |||
var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted); | |||
FetchedMessage fetchedMessage = null; | |||
FetchedMessage fetchedMessage; | |||
try | |||
{ | |||
fetchedMessage = await connection.QueryFirstOrDefaultAsync<FetchedMessage>(sql, args, transaction); | |||
@@ -7,7 +7,7 @@ using DotNetCore.CAP.Models; | |||
namespace DotNetCore.CAP.SqlServer | |||
{ | |||
public class SqlServerStorageTransaction : IStorageTransaction, IDisposable | |||
public class SqlServerStorageTransaction : IStorageTransaction | |||
{ | |||
private readonly string _schema; | |||
@@ -1,5 +1,4 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Collections.Generic; | |||
namespace DotNetCore.CAP.Abstractions | |||
{ | |||
@@ -10,7 +9,6 @@ namespace DotNetCore.CAP.Abstractions | |||
{ | |||
/// <summary> | |||
/// Selects a set of <see cref="ConsumerExecutorDescriptor"/> candidates for the current message associated with | |||
/// <paramref name="provider"/>. | |||
/// </summary> | |||
/// <returns>A set of <see cref="ConsumerExecutorDescriptor"/> candidates or <c>null</c>.</returns> | |||
IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates(); | |||
@@ -77,7 +77,7 @@ namespace DotNetCore.CAP.Abstractions.ModelBinding | |||
{ | |||
return | |||
IsSuccess == other.IsSuccess && | |||
object.Equals(Model, other.Model); | |||
Equals(Model, other.Model); | |||
} | |||
/// <summary> | |||
@@ -2,6 +2,7 @@ | |||
namespace DotNetCore.CAP.Abstractions | |||
{ | |||
/// <inheritdoc /> | |||
/// <summary> | |||
/// An abstract attribute that for kafka attribute or rabbitmq attribute | |||
/// </summary> | |||
@@ -23,10 +24,5 @@ namespace DotNetCore.CAP.Abstractions | |||
/// rabbitmq --> queue.name | |||
/// </summary> | |||
public string Group { get; set; } = "cap.default.group"; | |||
/// <summary> | |||
/// unused now | |||
/// </summary> | |||
public bool IsOneWay { get; set; } | |||
} | |||
} |
@@ -3,6 +3,7 @@ using DotNetCore.CAP; | |||
using DotNetCore.CAP.Dashboard.GatewayProxy; | |||
using Microsoft.Extensions.DependencyInjection; | |||
// ReSharper disable once CheckNamespace | |||
namespace Microsoft.AspNetCore.Builder | |||
{ | |||
/// <summary> | |||
@@ -52,7 +53,7 @@ namespace Microsoft.AspNetCore.Builder | |||
var marker = app.ApplicationServices.GetService<CapMarkerService>(); | |||
if (marker == null) | |||
{ | |||
throw new InvalidOperationException("AddCap must be called on the service collection. eg: services.AddCap(...)"); | |||
throw new InvalidOperationException("AddCap() must be called on the service collection. eg: services.AddCap(...)"); | |||
} | |||
var messageQueuemarker = app.ApplicationServices.GetService<CapMessageQueueMakerService>(); | |||
@@ -39,7 +39,7 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// Gets the <see cref="IServiceCollection"/> where MVC services are configured. | |||
/// </summary> | |||
public IServiceCollection Services { get; private set; } | |||
public IServiceCollection Services { get; } | |||
/// <summary> | |||
/// Adds a scoped service of the type specified in serviceType with an implementation | |||
@@ -8,7 +8,7 @@ namespace DotNetCore.CAP | |||
/// </summary> | |||
public class CapOptions | |||
{ | |||
internal IList<ICapOptionsExtension> Extensions { get; private set; } | |||
internal IList<ICapOptionsExtension> Extensions { get; } | |||
/// <summary> | |||
/// Default value for polling delay timeout, in seconds. | |||
@@ -1,14 +1,13 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Reflection; | |||
using DotNetCore.CAP; | |||
using DotNetCore.CAP.Abstractions; | |||
using DotNetCore.CAP.Infrastructure; | |||
using DotNetCore.CAP.Internal; | |||
using DotNetCore.CAP.Processor; | |||
using DotNetCore.CAP.Processor.States; | |||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||
// ReSharper disable once CheckNamespace | |||
namespace Microsoft.Extensions.DependencyInjection | |||
{ | |||
/// <summary> | |||
@@ -19,9 +19,9 @@ namespace DotNetCore.CAP | |||
/// </summary> | |||
public Cache() { } | |||
private Dictionary<K, T> cache = new Dictionary<K, T>(); | |||
private Dictionary<K, Timer> timers = new Dictionary<K, Timer>(); | |||
private ReaderWriterLockSlim locker = new ReaderWriterLockSlim(); | |||
private Dictionary<K, T> _cache = new Dictionary<K, T>(); | |||
private Dictionary<K, Timer> _timers = new Dictionary<K, Timer>(); | |||
private ReaderWriterLockSlim _locker = new ReaderWriterLockSlim(); | |||
#endregion | |||
#region IDisposable implementation & Clear | |||
@@ -51,7 +51,7 @@ namespace DotNetCore.CAP | |||
{ | |||
// Dispose managed resources. | |||
Clear(); | |||
locker.Dispose(); | |||
_locker.Dispose(); | |||
} | |||
// Dispose unmanaged resources | |||
} | |||
@@ -62,21 +62,21 @@ namespace DotNetCore.CAP | |||
/// </summary> | |||
public void Clear() | |||
{ | |||
locker.EnterWriteLock(); | |||
_locker.EnterWriteLock(); | |||
try | |||
{ | |||
try | |||
{ | |||
foreach (Timer t in timers.Values) | |||
foreach (Timer t in _timers.Values) | |||
t.Dispose(); | |||
} | |||
catch | |||
{ } | |||
timers.Clear(); | |||
cache.Clear(); | |||
_timers.Clear(); | |||
_cache.Clear(); | |||
} | |||
finally { locker.ExitWriteLock(); } | |||
finally { _locker.ExitWriteLock(); } | |||
} | |||
#endregion | |||
@@ -86,22 +86,22 @@ namespace DotNetCore.CAP | |||
{ | |||
Timer timer; | |||
if (timers.TryGetValue(key, out timer)) | |||
if (_timers.TryGetValue(key, out timer)) | |||
{ | |||
if (restartTimerIfExists) | |||
{ | |||
timer.Change( | |||
(cacheTimeout == null ? Timeout.InfiniteTimeSpan : cacheTimeout.Value), | |||
cacheTimeout ?? Timeout.InfiniteTimeSpan, | |||
Timeout.InfiniteTimeSpan); | |||
} | |||
} | |||
else | |||
timers.Add( | |||
_timers.Add( | |||
key, | |||
new Timer( | |||
new TimerCallback(RemoveByTimer), | |||
key, | |||
(cacheTimeout == null ? Timeout.InfiniteTimeSpan : cacheTimeout.Value), | |||
cacheTimeout ?? Timeout.InfiniteTimeSpan, | |||
Timeout.InfiniteTimeSpan)); | |||
} | |||
@@ -125,17 +125,17 @@ namespace DotNetCore.CAP | |||
{ | |||
if (disposed) return; | |||
locker.EnterWriteLock(); | |||
_locker.EnterWriteLock(); | |||
try | |||
{ | |||
CheckTimer(key, cacheTimeout, restartTimerIfExists); | |||
if (!cache.ContainsKey(key)) | |||
cache.Add(key, cacheObject); | |||
if (!_cache.ContainsKey(key)) | |||
_cache.Add(key, cacheObject); | |||
else | |||
cache[key] = cacheObject; | |||
_cache[key] = cacheObject; | |||
} | |||
finally { locker.ExitWriteLock(); } | |||
finally { _locker.ExitWriteLock(); } | |||
} | |||
/// <summary> | |||
@@ -164,13 +164,13 @@ namespace DotNetCore.CAP | |||
{ | |||
if (disposed) return default(T); | |||
locker.EnterReadLock(); | |||
_locker.EnterReadLock(); | |||
try | |||
{ | |||
T rv; | |||
return (cache.TryGetValue(key, out rv) ? rv : default(T)); | |||
return (_cache.TryGetValue(key, out rv) ? rv : default(T)); | |||
} | |||
finally { locker.ExitReadLock(); } | |||
finally { _locker.ExitReadLock(); } | |||
} | |||
/// <summary> | |||
@@ -187,12 +187,12 @@ namespace DotNetCore.CAP | |||
return false; | |||
} | |||
locker.EnterReadLock(); | |||
_locker.EnterReadLock(); | |||
try | |||
{ | |||
return cache.TryGetValue(key, out value); | |||
return _cache.TryGetValue(key, out value); | |||
} | |||
finally { locker.ExitReadLock(); } | |||
finally { _locker.ExitReadLock(); } | |||
} | |||
/// <summary> | |||
@@ -203,22 +203,22 @@ namespace DotNetCore.CAP | |||
{ | |||
if (disposed) return; | |||
locker.EnterWriteLock(); | |||
_locker.EnterWriteLock(); | |||
try | |||
{ | |||
var removers = (from k in cache.Keys.Cast<K>() | |||
var removers = (from k in _cache.Keys.Cast<K>() | |||
where keyPattern(k) | |||
select k).ToList(); | |||
foreach (K workKey in removers) | |||
{ | |||
try { timers[workKey].Dispose(); } | |||
try { _timers[workKey].Dispose(); } | |||
catch { } | |||
timers.Remove(workKey); | |||
cache.Remove(workKey); | |||
_timers.Remove(workKey); | |||
_cache.Remove(workKey); | |||
} | |||
} | |||
finally { locker.ExitWriteLock(); } | |||
finally { _locker.ExitWriteLock(); } | |||
} | |||
/// <summary> | |||
@@ -230,18 +230,18 @@ namespace DotNetCore.CAP | |||
{ | |||
if (disposed) return; | |||
locker.EnterWriteLock(); | |||
_locker.EnterWriteLock(); | |||
try | |||
{ | |||
if (cache.ContainsKey(key)) | |||
if (_cache.ContainsKey(key)) | |||
{ | |||
try { timers[key].Dispose(); } | |||
try { _timers[key].Dispose(); } | |||
catch { } | |||
timers.Remove(key); | |||
cache.Remove(key); | |||
_timers.Remove(key); | |||
_cache.Remove(key); | |||
} | |||
} | |||
finally { locker.ExitWriteLock(); } | |||
finally { _locker.ExitWriteLock(); } | |||
} | |||
/// <summary> | |||
@@ -253,12 +253,12 @@ namespace DotNetCore.CAP | |||
{ | |||
if (disposed) return false; | |||
locker.EnterReadLock(); | |||
_locker.EnterReadLock(); | |||
try | |||
{ | |||
return cache.ContainsKey(key); | |||
return _cache.ContainsKey(key); | |||
} | |||
finally { locker.ExitReadLock(); } | |||
finally { _locker.ExitReadLock(); } | |||
} | |||
#endregion | |||
} | |||
@@ -1,9 +1,11 @@ | |||
using System; | |||
using System.Linq; | |||
using System.Net; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Dashboard; | |||
using Microsoft.AspNetCore.Http; | |||
// ReSharper disable once CheckNamespace | |||
namespace DotNetCore.CAP | |||
{ | |||
public class DashboardMiddleware | |||
@@ -23,10 +25,8 @@ namespace DotNetCore.CAP | |||
public Task Invoke(HttpContext context) | |||
{ | |||
PathString matchedPath; | |||
PathString remainingPath; | |||
if (context.Request.Path.StartsWithSegments(_options.PathMatch, out matchedPath, out remainingPath)) | |||
if (context.Request.Path.StartsWithSegments(_options.PathMatch, | |||
out var matchedPath, out var remainingPath)) | |||
{ | |||
// Update the path | |||
var path = context.Request.Path; | |||
@@ -44,18 +44,15 @@ namespace DotNetCore.CAP | |||
return _next.Invoke(context); | |||
} | |||
foreach (var filter in _options.Authorization) | |||
if (_options.Authorization.Any(filter => !filter.Authorize(dashboardContext))) | |||
{ | |||
if (!filter.Authorize(dashboardContext)) | |||
{ | |||
var isAuthenticated = context.User?.Identity?.IsAuthenticated; | |||
var isAuthenticated = context.User?.Identity?.IsAuthenticated; | |||
context.Response.StatusCode = isAuthenticated == true | |||
? (int)HttpStatusCode.Forbidden | |||
: (int)HttpStatusCode.Unauthorized; | |||
context.Response.StatusCode = isAuthenticated == true | |||
? (int)HttpStatusCode.Forbidden | |||
: (int)HttpStatusCode.Unauthorized; | |||
return Task.CompletedTask; | |||
} | |||
return Task.CompletedTask; | |||
} | |||
dashboardContext.UriMatch = findResult.Item2; | |||
@@ -1,8 +1,7 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Collections.Generic; | |||
using DotNetCore.CAP.Dashboard; | |||
// ReSharper disable once CheckNamespace | |||
namespace DotNetCore.CAP | |||
{ | |||
public class DashboardOptions | |||
@@ -2,9 +2,9 @@ | |||
namespace DotNetCore.CAP | |||
{ | |||
using DotNetCore.CAP.Dashboard; | |||
using DotNetCore.CAP.Dashboard.GatewayProxy; | |||
using DotNetCore.CAP.Dashboard.GatewayProxy.Requester; | |||
using Dashboard; | |||
using Dashboard.GatewayProxy; | |||
using Dashboard.GatewayProxy.Requester; | |||
using Microsoft.Extensions.DependencyInjection; | |||
internal sealed class DashboardOptionsExtension : ICapOptionsExtension | |||
@@ -25,8 +25,6 @@ namespace DotNetCore.CAP | |||
services.AddSingleton<IHttpRequester, HttpClientHttpRequester>(); | |||
services.AddSingleton<IHttpClientCache, MemoryHttpClientCache>(); | |||
services.AddSingleton<IRequestMapper, RequestMapper>(); | |||
//services.AddScoped<IRequestScopedDataRepository, ScopedDataRepository>(); | |||
//services.AddScoped<IRequestScopedDataRepository, HttpDataRepository>(); | |||
} | |||
} | |||
} | |||
@@ -310,6 +310,7 @@ a:hover .label-hover { | |||
padding: 12px; | |||
background-color: #fff; | |||
border: 1px solid #e5e5e5; | |||
-ms-border-radius: 3px; | |||
border-radius: 3px; | |||
} | |||
@@ -333,12 +334,14 @@ a:hover .label-hover { | |||
.state-card-body { | |||
padding: 10px; | |||
margin: 10px -12px -12px -12px; | |||
-ms-border-bottom-left-radius: 3px; | |||
border-bottom-left-radius: 3px; | |||
-ms-border-bottom-right-radius: 3px; | |||
border-bottom-right-radius: 3px; | |||
background-color: #f5f5f5; | |||
} | |||
.state-card-body dl { | |||
.state-card-body dl { | |||
margin-top: 5px; | |||
margin-bottom: 0; | |||
} | |||
@@ -353,7 +356,7 @@ a:hover .label-hover { | |||
.state-card-body .stack-trace { | |||
background-color: transparent; | |||
padding: 0 20px; | |||
margin-bottom: 0px; | |||
margin-bottom: 0; | |||
} | |||
.state-card-body .exception-type { | |||
@@ -453,6 +456,7 @@ span.metric-default { | |||
div.metric { | |||
border: solid 1px transparent; | |||
-ms-border-radius: 4px; | |||
border-radius: 4px; | |||
-webkit-box-shadow: 0 1px 1px rgba(0,0,0,.05); | |||
box-shadow: 0 1px 1px rgba(0,0,0,.05); | |||
@@ -460,7 +464,7 @@ div.metric { | |||
transition: color .1s ease-out, background .1s ease-out, border .1s ease-out; | |||
} | |||
div.metric .metric-body { | |||
div.metric .metric-body { | |||
padding: 15px 15px 0; | |||
font-size: 26px; | |||
text-align: center; | |||
@@ -398,10 +398,10 @@ | |||
updateRelativeDates(); | |||
setInterval(updateRelativeDates, 30 * 1000); | |||
$('*[title]').tooltip(); | |||
$("*[title]").tooltip(); | |||
var self = this; | |||
$('*[data-metric]').each(function () { | |||
$("*[data-metric]").each(function () { | |||
var name = $(this).data('metric'); | |||
self._metrics.addElement(name, this); | |||
}); | |||
@@ -24,20 +24,19 @@ namespace DotNetCore.CAP.Dashboard | |||
public CapDashboardResponse(HttpContext context) | |||
{ | |||
if (context == null) throw new ArgumentNullException(nameof(context)); | |||
_context = context; | |||
_context = context ?? throw new ArgumentNullException(nameof(context)); | |||
} | |||
public override string ContentType | |||
{ | |||
get { return _context.Response.ContentType; } | |||
set { _context.Response.ContentType = value; } | |||
get => _context.Response.ContentType; | |||
set => _context.Response.ContentType = value; | |||
} | |||
public override int StatusCode | |||
{ | |||
get { return _context.Response.StatusCode; } | |||
set { _context.Response.StatusCode = value; } | |||
get => _context.Response.StatusCode; | |||
set => _context.Response.StatusCode = value; | |||
} | |||
public override Stream Body => _context.Response.Body; | |||
@@ -125,11 +125,6 @@ namespace DotNetCore.CAP.Dashboard | |||
return $"{GetContentFolderNamespace(contentFolder)}.{resourceName}"; | |||
} | |||
private static EnqueuedState CreateEnqueuedState() | |||
{ | |||
return new EnqueuedState(); | |||
} | |||
private static Assembly GetExecutingAssembly() | |||
{ | |||
return typeof(DashboardRoutes).GetTypeInfo().Assembly; | |||
@@ -15,12 +15,16 @@ namespace DotNetCore.CAP.Dashboard | |||
Assembly assembly, | |||
string resourceName) | |||
{ | |||
if (contentType == null) throw new ArgumentNullException(nameof(contentType)); | |||
if (assembly == null) throw new ArgumentNullException(nameof(assembly)); | |||
_assembly = assembly; | |||
_resourceName = resourceName; | |||
_contentType = contentType; | |||
if (assembly != null) | |||
{ | |||
_assembly = assembly; | |||
_resourceName = resourceName; | |||
_contentType = contentType ?? throw new ArgumentNullException(nameof(contentType)); | |||
} | |||
else | |||
{ | |||
throw new ArgumentNullException(nameof(assembly)); | |||
} | |||
} | |||
public Task Dispatch(DashboardContext context) | |||
@@ -7,6 +7,6 @@ | |||
Value = value; | |||
} | |||
public string Value { get; private set; } | |||
public string Value { get; } | |||
} | |||
} |
@@ -15,7 +15,7 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy | |||
{ | |||
public class GatewayProxyMiddleware | |||
{ | |||
private const string NODE_COOKIE_NAME = "cap.node"; | |||
public const string NodeCookieName = "cap.node"; | |||
private readonly RequestDelegate _next; | |||
private readonly ILogger _logger; | |||
@@ -45,12 +45,9 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy | |||
var request = context.Request; | |||
var pathMatch = discoveryOptions.MatchPath; | |||
var isCapRequest = request.Path.StartsWithSegments( | |||
new PathString(pathMatch), | |||
out PathString matchedPath, | |||
out PathString remainingPath); | |||
var isCapRequest = request.Path.StartsWithSegments(new PathString(pathMatch)); | |||
var isSwitchNode = request.Cookies.TryGetValue(NODE_COOKIE_NAME, out string requestNodeId); | |||
var isSwitchNode = request.Cookies.TryGetValue(NodeCookieName, out string requestNodeId); | |||
var isCurrentNode = discoveryOptions.NodeId.ToString() == requestNodeId; | |||
if (!isCapRequest || !isSwitchNode || isCurrentNode) | |||
@@ -80,7 +77,7 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy | |||
} | |||
else | |||
{ | |||
context.Response.Cookies.Delete(NODE_COOKIE_NAME); | |||
context.Response.Cookies.Delete(NodeCookieName); | |||
await _next.Invoke(context); | |||
} | |||
} | |||
@@ -31,7 +31,7 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester | |||
catch (Exception exception) | |||
{ | |||
_logger.LogError("Error making http request, exception:" + exception.Message); | |||
throw exception; | |||
throw; | |||
} | |||
finally | |||
{ | |||
@@ -1,4 +1,6 @@ | |||
namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester | |||
using System.Net.Http; | |||
namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester | |||
{ | |||
public interface IHttpClientBuilder | |||
{ | |||
@@ -9,8 +9,7 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester | |||
public void Set(string id, IHttpClient client, TimeSpan expirationTime) | |||
{ | |||
ConcurrentQueue<IHttpClient> connectionQueue; | |||
if (_httpClientsCache.TryGetValue(id, out connectionQueue)) | |||
if (_httpClientsCache.TryGetValue(id, out var connectionQueue)) | |||
{ | |||
connectionQueue.Enqueue(client); | |||
} | |||
@@ -24,15 +23,13 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester | |||
public bool Exists(string id) | |||
{ | |||
ConcurrentQueue<IHttpClient> connectionQueue; | |||
return _httpClientsCache.TryGetValue(id, out connectionQueue); | |||
return _httpClientsCache.TryGetValue(id, out _); | |||
} | |||
public IHttpClient Get(string id) | |||
{ | |||
IHttpClient client = null; | |||
ConcurrentQueue<IHttpClient> connectionQueue; | |||
if (_httpClientsCache.TryGetValue(id, out connectionQueue)) | |||
if (_httpClientsCache.TryGetValue(id, out var connectionQueue)) | |||
{ | |||
connectionQueue.TryDequeue(out client); | |||
} | |||
@@ -41,8 +38,7 @@ namespace DotNetCore.CAP.Dashboard.GatewayProxy.Requester | |||
public void Remove(string id) | |||
{ | |||
ConcurrentQueue<IHttpClient> connectionQueue; | |||
_httpClientsCache.TryRemove(id, out connectionQueue); | |||
_httpClientsCache.TryRemove(id, out _); | |||
} | |||
} | |||
} |
@@ -19,8 +19,7 @@ namespace DotNetCore.CAP.Dashboard | |||
public HtmlHelper(RazorPage page) | |||
{ | |||
if (page == null) throw new ArgumentNullException(nameof(page)); | |||
_page = page; | |||
_page = page ?? throw new ArgumentNullException(nameof(page)); | |||
} | |||
public NonEscapedString Breadcrumbs(string title, IDictionary<string, string> items) | |||
@@ -35,10 +34,7 @@ namespace DotNetCore.CAP.Dashboard | |||
{ | |||
return SidebarMenu(MessagesSidebarMenu.PublishedItems); | |||
} | |||
else | |||
{ | |||
return SidebarMenu(MessagesSidebarMenu.ReceivedItems); | |||
} | |||
return SidebarMenu(MessagesSidebarMenu.ReceivedItems); | |||
} | |||
public NonEscapedString SidebarMenu(IEnumerable<Func<RazorPage, MenuItem>> items) | |||
@@ -198,11 +194,9 @@ namespace DotNetCore.CAP.Dashboard | |||
#region MethodEscaped | |||
public NonEscapedString MethodEscaped(MethodInfo method) | |||
{ | |||
var outputString = string.Empty; | |||
var @public = WrapKeyword("public"); | |||
var @async = string.Empty; | |||
var @return = string.Empty; | |||
string @return; | |||
var isAwaitable = CoercedAwaitableInfo.IsTypeAwaitable(method.ReturnType, out var coercedAwaitableInfo); | |||
if (isAwaitable) | |||
@@ -221,7 +215,6 @@ namespace DotNetCore.CAP.Dashboard | |||
string paramType = null; | |||
string paramName = null; | |||
string paramString = string.Empty; | |||
var @params = method.GetParameters(); | |||
if (@params.Length == 1) | |||
@@ -232,16 +225,9 @@ namespace DotNetCore.CAP.Dashboard | |||
paramName = firstParam.Name; | |||
} | |||
if (paramType == null) | |||
{ | |||
paramString = "();"; | |||
} | |||
else | |||
{ | |||
paramString = $"({paramType} {paramName});"; | |||
} | |||
var paramString = paramType == null ? "();" : $"({paramType} {paramName});"; | |||
outputString = @public + " " + (string.IsNullOrEmpty(@async) ? "" : @async + " ") + @return + " " + @name + paramString; | |||
var outputString = @public + " " + (string.IsNullOrEmpty(@async) ? "" : @async + " ") + @return + " " + @name + paramString; | |||
return new NonEscapedString(outputString); | |||
} | |||
@@ -261,7 +247,7 @@ namespace DotNetCore.CAP.Dashboard | |||
{ | |||
return WrapType(type.Name); | |||
} | |||
if (type.IsPrimitive || type.Equals(typeof(string)) || type.Equals(typeof(decimal))) | |||
if (type.IsPrimitive || type == typeof(string) || type == typeof(decimal)) | |||
{ | |||
return WrapKeyword(type.Name.ToLower()); | |||
} | |||
@@ -286,11 +272,6 @@ namespace DotNetCore.CAP.Dashboard | |||
return Span("type", value); | |||
} | |||
private string WrapString(string value) | |||
{ | |||
return Span("string", value); | |||
} | |||
private string Span(string @class, string value) | |||
{ | |||
return $"<span class=\"{@class}\">{value}</span>"; | |||
@@ -1,9 +1,7 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using DotNetCore.CAP.Infrastructure; | |||
using DotNetCore.CAP.Processor.States; | |||
using Newtonsoft.Json; | |||
namespace DotNetCore.CAP.Dashboard | |||
{ | |||
@@ -23,9 +23,6 @@ namespace DotNetCore.CAP.Dashboard | |||
public async Task Dispatch(DashboardContext context) | |||
{ | |||
var request = context.Request; | |||
var response = context.Response; | |||
string serialized = null; | |||
if (_command != null) | |||
{ | |||
@@ -1,6 +1,6 @@ | |||
namespace DotNetCore.CAP.Dashboard.Pages | |||
{ | |||
partial class BlockMetric | |||
internal partial class BlockMetric | |||
{ | |||
public BlockMetric(DashboardMetric dashboardMetric) | |||
{ | |||
@@ -2,7 +2,7 @@ | |||
namespace DotNetCore.CAP.Dashboard.Pages | |||
{ | |||
partial class Breadcrumbs | |||
internal partial class Breadcrumbs | |||
{ | |||
public Breadcrumbs(string title, IDictionary<string, string> items) | |||
{ | |||
@@ -1,13 +1,12 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Collections.Generic; | |||
using DotNetCore.CAP.NodeDiscovery; | |||
using Microsoft.Extensions.DependencyInjection; | |||
namespace DotNetCore.CAP.Dashboard.Pages | |||
{ | |||
partial class NodePage | |||
internal partial class NodePage | |||
{ | |||
private IList<Node> _nodes = null; | |||
private IList<Node> _nodes; | |||
private INodeDiscoveryProvider _discoveryProvider; | |||
public NodePage() | |||
@@ -14,18 +14,15 @@ namespace DotNetCore.CAP.Dashboard.Pages | |||
public int GetTotal(IMonitoringApi api) | |||
{ | |||
if (String.Compare(StatusName, SucceededState.StateName, true) == 0) | |||
if (string.Compare(StatusName, SucceededState.StateName, StringComparison.OrdinalIgnoreCase) == 0) | |||
{ | |||
return api.PublishedSucceededCount(); | |||
} | |||
else if (String.Compare(StatusName, ProcessingState.StateName, true) == 0) | |||
if (string.Compare(StatusName, ProcessingState.StateName, StringComparison.OrdinalIgnoreCase) == 0) | |||
{ | |||
return api.PublishedProcessingCount(); | |||
} | |||
else | |||
{ | |||
return api.PublishedFailedCount(); | |||
} | |||
return api.PublishedFailedCount(); | |||
} | |||
} | |||
} |
@@ -14,18 +14,15 @@ namespace DotNetCore.CAP.Dashboard.Pages | |||
public int GetTotal(IMonitoringApi api) | |||
{ | |||
if (String.Compare(StatusName, SucceededState.StateName, true) == 0) | |||
if (string.Compare(StatusName, SucceededState.StateName, StringComparison.OrdinalIgnoreCase) == 0) | |||
{ | |||
return api.ReceivedSucceededCount(); | |||
} | |||
else if (String.Compare(StatusName, ProcessingState.StateName, true) == 0) | |||
if (string.Compare(StatusName, ProcessingState.StateName, StringComparison.OrdinalIgnoreCase) == 0) | |||
{ | |||
return api.ReceivedProcessingCount(); | |||
} | |||
else | |||
{ | |||
return api.ReceivedFailedCount(); | |||
} | |||
return api.ReceivedFailedCount(); | |||
} | |||
} | |||
} |
@@ -3,12 +3,11 @@ using System.Collections.Generic; | |||
namespace DotNetCore.CAP.Dashboard.Pages | |||
{ | |||
partial class SidebarMenu | |||
internal partial class SidebarMenu | |||
{ | |||
public SidebarMenu(IEnumerable<Func<RazorPage, MenuItem>> items) | |||
{ | |||
if (items == null) throw new ArgumentNullException(nameof(items)); | |||
Items = items; | |||
Items = items ?? throw new ArgumentNullException(nameof(items)); | |||
} | |||
public IEnumerable<Func<RazorPage, MenuItem>> Items { get; } | |||
@@ -39,7 +39,7 @@ | |||
{ | |||
var i = 0; | |||
var rowCount = subscriber.Value.Count; | |||
@foreach (var column in subscriber.Value) | |||
foreach (var column in subscriber.Value) | |||
{ | |||
<tr> | |||
@if (i == 0) | |||
@@ -44,13 +44,13 @@ namespace DotNetCore.CAP.Dashboard | |||
public static void AddJsonResult( | |||
this RouteCollection routes, | |||
string pathTemplate, | |||
Func<DashboardContext, string> Jsonfunc) | |||
Func<DashboardContext, string> jsonfunc) | |||
{ | |||
if (routes == null) throw new ArgumentNullException(nameof(routes)); | |||
if (pathTemplate == null) throw new ArgumentNullException(nameof(pathTemplate)); | |||
if (Jsonfunc == null) throw new ArgumentNullException(nameof(Jsonfunc)); | |||
if (jsonfunc == null) throw new ArgumentNullException(nameof(jsonfunc)); | |||
routes.Add(pathTemplate, new JsonDispatcher(Jsonfunc)); | |||
routes.Add(pathTemplate, new JsonDispatcher(jsonfunc)); | |||
} | |||
public static void AddPublishBatchCommand( | |||
@@ -9,8 +9,7 @@ namespace DotNetCore.CAP.Dashboard | |||
public UrlHelper(DashboardContext context) | |||
{ | |||
if (context == null) throw new ArgumentNullException(nameof(context)); | |||
_context = context; | |||
_context = context ?? throw new ArgumentNullException(nameof(context)); | |||
} | |||
public string To(string relativePath) | |||
@@ -3,7 +3,6 @@ using System.Collections.Generic; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using Microsoft.AspNetCore.Hosting; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
@@ -12,7 +11,7 @@ namespace DotNetCore.CAP | |||
/// <summary> | |||
/// Default implement of <see cref="IBootstrapper"/>. | |||
/// </summary> | |||
public class DefaultBootstrapper : IBootstrapper | |||
internal class DefaultBootstrapper : IBootstrapper | |||
{ | |||
private readonly ILogger<DefaultBootstrapper> _logger; | |||
private readonly IApplicationLifetime _appLifetime; | |||
@@ -8,20 +8,17 @@ using DotNetCore.CAP.Models; | |||
using DotNetCore.CAP.Processor; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
namespace DotNetCore.CAP | |||
{ | |||
public class ConsumerHandler : IConsumerHandler, IDisposable | |||
internal class ConsumerHandler : IConsumerHandler | |||
{ | |||
private readonly IServiceProvider _serviceProvider; | |||
private readonly IConsumerInvokerFactory _consumerInvokerFactory; | |||
private readonly IConsumerClientFactory _consumerClientFactory; | |||
private readonly ILogger _logger; | |||
private readonly CancellationTokenSource _cts; | |||
private readonly MethodMatcherCache _selector; | |||
private readonly CapOptions _options; | |||
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1); | |||
@@ -30,18 +27,14 @@ namespace DotNetCore.CAP | |||
public ConsumerHandler( | |||
IServiceProvider serviceProvider, | |||
IConsumerInvokerFactory consumerInvokerFactory, | |||
IConsumerClientFactory consumerClientFactory, | |||
ILogger<ConsumerHandler> logger, | |||
MethodMatcherCache selector, | |||
IOptions<CapOptions> options) | |||
MethodMatcherCache selector) | |||
{ | |||
_selector = selector; | |||
_logger = logger; | |||
_serviceProvider = serviceProvider; | |||
_consumerInvokerFactory = consumerInvokerFactory; | |||
_consumerClientFactory = consumerClientFactory; | |||
_options = options.Value; | |||
_cts = new CancellationTokenSource(); | |||
} | |||
@@ -99,7 +92,8 @@ namespace DotNetCore.CAP | |||
using (var scope = _serviceProvider.CreateScope()) | |||
{ | |||
var receviedMessage = StoreMessage(scope, message); | |||
StoreMessage(scope, message); | |||
client.Commit(); | |||
} | |||
Pulse(); | |||
@@ -111,7 +105,7 @@ namespace DotNetCore.CAP | |||
}; | |||
} | |||
private CapReceivedMessage StoreMessage(IServiceScope serviceScope, MessageContext messageContext) | |||
private static void StoreMessage(IServiceScope serviceScope, MessageContext messageContext) | |||
{ | |||
var provider = serviceScope.ServiceProvider; | |||
var messageStore = provider.GetRequiredService<IStorageConnection>(); | |||
@@ -120,7 +114,6 @@ namespace DotNetCore.CAP | |||
StatusName = StatusName.Scheduled, | |||
}; | |||
messageStore.StoreReceivedMessageAsync(receivedMessage).GetAwaiter().GetResult(); | |||
return receivedMessage; | |||
} | |||
public void Pulse() | |||
@@ -2,6 +2,7 @@ | |||
namespace DotNetCore.CAP | |||
{ | |||
/// <inheritdoc /> | |||
/// <summary> | |||
/// A process thread abstract of job process. | |||
/// </summary> | |||
@@ -41,7 +41,7 @@ namespace DotNetCore.CAP | |||
var result = await PublishAsync(message.Name, message.Content); | |||
sp.Stop(); | |||
var newState = default(IState); | |||
IState newState; | |||
if (!result.Succeeded) | |||
{ | |||
var shouldRetry = await UpdateMessageForRetryAsync(message, connection); | |||
@@ -78,11 +78,10 @@ namespace DotNetCore.CAP | |||
} | |||
} | |||
private async Task<bool> UpdateMessageForRetryAsync(CapPublishedMessage message, IStorageConnection connection) | |||
private static async Task<bool> UpdateMessageForRetryAsync(CapPublishedMessage message, IStorageConnection connection) | |||
{ | |||
var retryBehavior = RetryBehavior.DefaultRetry; | |||
var now = DateTime.Now; | |||
var retries = ++message.Retries; | |||
if (retries >= retryBehavior.RetryCount) | |||
{ | |||
@@ -47,7 +47,7 @@ namespace DotNetCore.CAP | |||
var result = await ExecuteSubscribeAsync(message); | |||
sp.Stop(); | |||
var newState = default(IState); | |||
IState newState; | |||
if (!result.Succeeded) | |||
{ | |||
var shouldRetry = await UpdateMessageForRetryAsync(message, connection); | |||
@@ -57,8 +57,7 @@ namespace DotNetCore.CAP.Infrastructure | |||
public static DateTime DeserializeDateTime(string value) | |||
{ | |||
long timestamp; | |||
if (long.TryParse(value, out timestamp)) | |||
if (long.TryParse(value, out var timestamp)) | |||
{ | |||
return FromTimestamp(timestamp); | |||
} | |||
@@ -102,13 +101,13 @@ namespace DotNetCore.CAP.Infrastructure | |||
private static bool IsSimpleType(Type type) | |||
{ | |||
return type.GetTypeInfo().IsPrimitive || | |||
type.Equals(typeof(decimal)) || | |||
type.Equals(typeof(string)) || | |||
type.Equals(typeof(DateTime)) || | |||
type.Equals(typeof(Guid)) || | |||
type.Equals(typeof(DateTimeOffset)) || | |||
type.Equals(typeof(TimeSpan)) || | |||
type.Equals(typeof(Uri)); | |||
type == typeof(decimal) || | |||
type == typeof(string) || | |||
type == typeof(DateTime) || | |||
type == typeof(Guid) || | |||
type == typeof(DateTimeOffset) || | |||
type == typeof(TimeSpan) || | |||
type == typeof(Uri); | |||
} | |||
} | |||
} |
@@ -45,7 +45,7 @@ namespace DotNetCore.CAP.Internal | |||
var jsonConent = _consumerContext.DeliverMessage.Content; | |||
var message = serializer.DeSerialize<CapMessageDto>(jsonConent); | |||
object result = null; | |||
object result; | |||
if (_executor.MethodParameters.Length > 0) | |||
{ | |||
result = await ExecuteWithParameterAsync(obj, message.Content.ToString()); | |||
@@ -68,10 +68,7 @@ namespace DotNetCore.CAP.Internal | |||
{ | |||
return await _executor.ExecuteAsync(@class); | |||
} | |||
else | |||
{ | |||
return _executor.Execute(@class); | |||
} | |||
return _executor.Execute(@class); | |||
} | |||
private async Task<object> ExecuteWithParameterAsync(object @class, string parameterString) | |||
@@ -87,15 +84,9 @@ namespace DotNetCore.CAP.Internal | |||
{ | |||
return await _executor.ExecuteAsync(@class, bindResult.Model); | |||
} | |||
else | |||
{ | |||
return _executor.Execute(@class, bindResult.Model); | |||
} | |||
} | |||
else | |||
{ | |||
throw new MethodBindException($"Parameters:{firstParameter.Name} bind failed! ParameterString is: {parameterString} "); | |||
return _executor.Execute(@class, bindResult.Model); | |||
} | |||
throw new MethodBindException($"Parameters:{firstParameter.Name} bind failed! ParameterString is: {parameterString} "); | |||
} | |||
catch (FormatException ex) | |||
{ | |||
@@ -8,8 +8,9 @@ using Microsoft.Extensions.DependencyInjection; | |||
namespace DotNetCore.CAP.Internal | |||
{ | |||
/// <inheritdoc /> | |||
/// <summary> | |||
/// A default <see cref="IConsumerServiceSelector"/> implementation. | |||
/// A default <see cref="T:DotNetCore.CAP.Abstractions.IConsumerServiceSelector" /> implementation. | |||
/// </summary> | |||
public class DefaultConsumerServiceSelector : IConsumerServiceSelector | |||
{ | |||
@@ -39,7 +40,7 @@ namespace DotNetCore.CAP.Internal | |||
executorDescriptorList.AddRange(FindConsumersFromInterfaceTypes(_serviceProvider)); | |||
executorDescriptorList.AddRange(FindConsumersFromControllerTypes(_serviceProvider)); | |||
executorDescriptorList.AddRange(FindConsumersFromControllerTypes()); | |||
return executorDescriptorList; | |||
} | |||
@@ -67,8 +68,7 @@ namespace DotNetCore.CAP.Internal | |||
} | |||
} | |||
private static IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromControllerTypes( | |||
IServiceProvider provider) | |||
private static IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromControllerTypes() | |||
{ | |||
var executorDescriptorList = new List<ConsumerExecutorDescriptor>(); | |||
@@ -91,7 +91,7 @@ namespace DotNetCore.CAP.Internal | |||
{ | |||
var topicAttrs = method.GetCustomAttributes<TopicAttribute>(true); | |||
if (topicAttrs.Count() == 0) continue; | |||
if (!topicAttrs.Any()) continue; | |||
foreach (var attr in topicAttrs) | |||
{ | |||
@@ -73,7 +73,7 @@ namespace DotNetCore.CAP.Internal | |||
// so we capture the inner exception. | |||
exception = ExceptionDispatchInfo.Capture(exception.InnerException).SourceException; | |||
} | |||
throw exception; | |||
throw; | |||
} | |||
} | |||
@@ -88,7 +88,7 @@ namespace DotNetCore.CAP.Internal | |||
public bool Equals(Key other) | |||
{ | |||
return _metadata.Equals(other._metadata) && object.ReferenceEquals(_token, other._token); | |||
return _metadata.Equals(other._metadata) && ReferenceEquals(_token, other._token); | |||
} | |||
public override bool Equals(object obj) | |||
@@ -16,11 +16,11 @@ namespace DotNetCore.CAP | |||
private static readonly Action<ILogger, string, Exception> _receivedMessageRetryExecuting; | |||
private static readonly Action<ILogger, string, string, string, Exception> _modelBinderFormattingException; | |||
private static Action<ILogger, Exception> _jobFailed; | |||
private static Action<ILogger, Exception> _jobFailedWillRetry; | |||
private static Action<ILogger, double, Exception> _jobExecuted; | |||
private static Action<ILogger, int, Exception> _jobRetrying; | |||
private static Action<ILogger, string, Exception> _exceptionOccuredWhileExecutingJob; | |||
private static readonly Action<ILogger, Exception> _jobFailed; | |||
private static readonly Action<ILogger, Exception> _jobFailedWillRetry; | |||
private static readonly Action<ILogger, double, Exception> _jobExecuted; | |||
private static readonly Action<ILogger, int, Exception> _jobRetrying; | |||
private static readonly Action<ILogger, string, Exception> _exceptionOccuredWhileExecutingJob; | |||
static LoggerExtensions() | |||
{ | |||
@@ -1,4 +1,5 @@ | |||
namespace DotNetCore.CAP | |||
// ReSharper disable once CheckNamespace | |||
namespace DotNetCore.CAP | |||
{ | |||
public class DiscoveryOptions | |||
{ | |||
@@ -32,4 +33,4 @@ | |||
public string MatchPath { get; set; } | |||
} | |||
} | |||
} |
@@ -1,10 +1,8 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace DotNetCore.CAP | |||
{ | |||
using DotNetCore.CAP.NodeDiscovery; | |||
using NodeDiscovery; | |||
using Microsoft.Extensions.DependencyInjection; | |||
internal sealed class DiscoveryOptionsExtension : ICapOptionsExtension | |||
@@ -1,10 +1,8 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace DotNetCore.CAP.NodeDiscovery | |||
{ | |||
class DiscoveryProviderFactory : IDiscoveryProviderFactory | |||
internal class DiscoveryProviderFactory : IDiscoveryProviderFactory | |||
{ | |||
public INodeDiscoveryProvider Create(DiscoveryOptions options) | |||
{ | |||
@@ -1,10 +1,6 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace DotNetCore.CAP.NodeDiscovery | |||
namespace DotNetCore.CAP.NodeDiscovery | |||
{ | |||
interface IDiscoveryProviderFactory | |||
internal interface IDiscoveryProviderFactory | |||
{ | |||
INodeDiscoveryProvider Create(DiscoveryOptions options); | |||
} | |||
@@ -39,10 +39,11 @@ namespace DotNetCore.CAP.NodeDiscovery | |||
Port = x.Value.Port, | |||
Tags = string.Join(", ", x.Value.Tags) | |||
}); | |||
var nodeList = nodes.ToList(); | |||
CapCache.Global.AddOrUpdate("cap.nodes.count", nodes.Count(), TimeSpan.FromSeconds(30),true); | |||
CapCache.Global.AddOrUpdate("cap.nodes.count", nodeList.Count, TimeSpan.FromSeconds(30),true); | |||
return nodes.ToList(); | |||
return nodeList; | |||
} | |||
catch (Exception) { | |||
return null; | |||
@@ -57,7 +58,7 @@ namespace DotNetCore.CAP.NodeDiscovery | |||
Name = _options.NodeName, | |||
Address = _options.CurrentNodeHostName, | |||
Port = _options.CurrentNodePort, | |||
Tags = new string[] { "CAP", "Client", "Dashboard" }, | |||
Tags = new[] { "CAP", "Client", "Dashboard" }, | |||
Check = new AgentServiceCheck | |||
{ | |||
DeregisterCriticalServiceAfter = TimeSpan.FromSeconds(30), | |||
@@ -1,6 +1,4 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Collections.Generic; | |||
using System.Threading.Tasks; | |||
namespace DotNetCore.CAP.NodeDiscovery | |||
@@ -1,25 +1,21 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace DotNetCore.CAP.NodeDiscovery | |||
namespace DotNetCore.CAP.NodeDiscovery | |||
{ | |||
class ConsulProcessingNodeServer : IProcessingServer | |||
internal class ConsulProcessingNodeServer : IProcessingServer | |||
{ | |||
private readonly DiscoveryOptions dashboardOptions; | |||
private readonly IDiscoveryProviderFactory discoveryProviderFactory; | |||
private readonly DiscoveryOptions _dashboardOptions; | |||
private readonly IDiscoveryProviderFactory _discoveryProviderFactory; | |||
public ConsulProcessingNodeServer( | |||
DiscoveryOptions dashboardOptions, | |||
IDiscoveryProviderFactory discoveryProviderFactory) | |||
{ | |||
this.dashboardOptions = dashboardOptions; | |||
this.discoveryProviderFactory = discoveryProviderFactory; | |||
_dashboardOptions = dashboardOptions; | |||
_discoveryProviderFactory = discoveryProviderFactory; | |||
} | |||
public void Start() | |||
{ | |||
var discoveryProvider = discoveryProviderFactory.Create(dashboardOptions); | |||
var discoveryProvider = _discoveryProviderFactory.Create(_dashboardOptions); | |||
discoveryProvider.RegisterNode(); | |||
} | |||
@@ -1,8 +1,4 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace DotNetCore.CAP.NodeDiscovery | |||
namespace DotNetCore.CAP.NodeDiscovery | |||
{ | |||
public class Node | |||
{ | |||
@@ -51,8 +51,11 @@ namespace DotNetCore.CAP | |||
public static OperateResult Failed(Exception ex, params OperateError[] errors) | |||
{ | |||
var result = new OperateResult { Succeeded = false }; | |||
result.Exception = ex; | |||
var result = new OperateResult | |||
{ | |||
Succeeded = false, | |||
Exception = ex | |||
}; | |||
if (errors != null) | |||
{ | |||
result._errors.AddRange(errors); | |||
@@ -3,7 +3,6 @@ using System.Threading; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Infrastructure; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
namespace DotNetCore.CAP.Processor | |||
@@ -11,24 +10,15 @@ namespace DotNetCore.CAP.Processor | |||
public class DefaultDispatcher : IDispatcher | |||
{ | |||
private readonly IQueueExecutorFactory _queueExecutorFactory; | |||
private readonly IServiceProvider _provider; | |||
private readonly ILogger _logger; | |||
private readonly CancellationTokenSource _cts; | |||
private readonly TimeSpan _pollingDelay; | |||
internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); | |||
public DefaultDispatcher( | |||
IServiceProvider provider, | |||
IQueueExecutorFactory queueExecutorFactory, | |||
IOptions<CapOptions> capOptions, | |||
ILogger<DefaultDispatcher> logger) | |||
public DefaultDispatcher(IQueueExecutorFactory queueExecutorFactory, | |||
IOptions<CapOptions> capOptions) | |||
{ | |||
_logger = logger; | |||
_queueExecutorFactory = queueExecutorFactory; | |||
_provider = provider; | |||
_cts = new CancellationTokenSource(); | |||
_pollingDelay = TimeSpan.FromSeconds(capOptions.Value.PollingDelay); | |||
} | |||
@@ -73,7 +63,7 @@ namespace DotNetCore.CAP.Processor | |||
private async Task<bool> Step(ProcessingContext context) | |||
{ | |||
var fetched = default(IFetchedMessage); | |||
IFetchedMessage fetched; | |||
using (var scopedContext = context.CreateScope()) | |||
{ | |||
var provider = scopedContext.Provider; | |||
@@ -9,16 +9,16 @@ using Microsoft.Extensions.Options; | |||
namespace DotNetCore.CAP.Processor | |||
{ | |||
public class CapProcessingServer : IProcessingServer, IDisposable | |||
public class CapProcessingServer : IProcessingServer | |||
{ | |||
private readonly ILogger _logger; | |||
private readonly ILoggerFactory _loggerFactory; | |||
private readonly IServiceProvider _provider; | |||
private readonly CancellationTokenSource _cts; | |||
private readonly CapOptions _options; | |||
private readonly IList<IDispatcher> _messageDispatchers; | |||
private IProcessor[] _processors; | |||
private IList<IDispatcher> _messageDispatchers; | |||
private ProcessingContext _context; | |||
private Task _compositeTask; | |||
private bool _disposed; | |||
@@ -109,7 +109,7 @@ namespace DotNetCore.CAP.Processor | |||
private IProcessor[] GetProcessors(int processorCount) | |||
{ | |||
var returnedProcessors = new List<IProcessor>(); | |||
for (int i = 0; i < processorCount; i++) | |||
for (var i = 0; i < processorCount; i++) | |||
{ | |||
var messageProcessors = _provider.GetRequiredService<IDispatcher>(); | |||
_messageDispatchers.Add(messageProcessors); | |||
@@ -13,7 +13,6 @@ namespace DotNetCore.CAP.Processor | |||
public class PublishQueuer : IProcessor | |||
{ | |||
private readonly ILogger _logger; | |||
private readonly CapOptions _options; | |||
private readonly IStateChanger _stateChanger; | |||
private readonly IServiceProvider _provider; | |||
private readonly TimeSpan _pollingDelay; | |||
@@ -27,15 +26,16 @@ namespace DotNetCore.CAP.Processor | |||
IServiceProvider provider) | |||
{ | |||
_logger = logger; | |||
_options = options.Value; | |||
_stateChanger = stateChanger; | |||
_provider = provider; | |||
_pollingDelay = TimeSpan.FromSeconds(_options.PollingDelay); | |||
var capOptions = options.Value; | |||
_pollingDelay = TimeSpan.FromSeconds(capOptions.PollingDelay); | |||
} | |||
public async Task ProcessAsync(ProcessingContext context) | |||
{ | |||
_logger.LogDebug("Publish Queuer start calling."); | |||
using (var scope = _provider.CreateScope()) | |||
{ | |||
CapPublishedMessage sentMessage; | |||
@@ -12,11 +12,10 @@ namespace DotNetCore.CAP.Processor | |||
{ | |||
public class SubscribeQueuer : IProcessor | |||
{ | |||
private ILogger _logger; | |||
private CapOptions _options; | |||
private IStateChanger _stateChanger; | |||
private IServiceProvider _provider; | |||
private TimeSpan _pollingDelay; | |||
private readonly ILogger _logger; | |||
private readonly IStateChanger _stateChanger; | |||
private readonly IServiceProvider _provider; | |||
private readonly TimeSpan _pollingDelay; | |||
internal static readonly AutoResetEvent PulseEvent = new AutoResetEvent(true); | |||
@@ -27,15 +26,16 @@ namespace DotNetCore.CAP.Processor | |||
IServiceProvider provider) | |||
{ | |||
_logger = logger; | |||
_options = options.Value; | |||
_stateChanger = stateChanger; | |||
_provider = provider; | |||
_pollingDelay = TimeSpan.FromSeconds(_options.PollingDelay); | |||
var capOptions = options.Value; | |||
_pollingDelay = TimeSpan.FromSeconds(capOptions.PollingDelay); | |||
} | |||
public async Task ProcessAsync(ProcessingContext context) | |||
{ | |||
_logger.LogDebug("SubscribeQueuer start calling."); | |||
using (var scope = _provider.CreateScope()) | |||
{ | |||
CapReceivedMessage message; | |||
@@ -7,7 +7,7 @@ namespace DotNetCore.CAP.Processor.States | |||
{ | |||
public const string StateName = "Succeeded"; | |||
public TimeSpan? ExpiresAfter { get; private set; } | |||
public TimeSpan? ExpiresAfter { get; } | |||
public string Name => StateName; | |||
@@ -1,13 +0,0 @@ | |||
using System.Collections.Generic; | |||
namespace DotNetCore.CAP | |||
{ | |||
public class StateData | |||
{ | |||
public string Name { get; set; } | |||
public string Reason { get; set; } | |||
public IDictionary<string, string> Data { get; set; } | |||
} | |||
} |