diff --git a/build.cake b/build.cake
index 02b52c4..1b68025 100644
--- a/build.cake
+++ b/build.cake
@@ -68,6 +68,7 @@ Task("Pack")
{
Configuration = build.Configuration,
VersionSuffix = build.Version.Suffix,
+ IncludeSymbols = true,
OutputDirectory = "./artifacts/packages"
};
foreach (var project in build.ProjectFiles)
diff --git a/build/version.props b/build/version.props
index 7dc91a1..4ba57d7 100644
--- a/build/version.props
+++ b/build/version.props
@@ -2,7 +2,7 @@
2
1
- 0
+ 3
$(VersionMajor).$(VersionMinor).$(VersionPatch)
diff --git a/samples/Sample.Kafka.SqlServer/Sample.Kafka.SqlServer.csproj b/samples/Sample.Kafka.SqlServer/Sample.Kafka.SqlServer.csproj
index a1e44b6..1441491 100644
--- a/samples/Sample.Kafka.SqlServer/Sample.Kafka.SqlServer.csproj
+++ b/samples/Sample.Kafka.SqlServer/Sample.Kafka.SqlServer.csproj
@@ -8,7 +8,7 @@
-
+
diff --git a/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj b/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj
index 4d5358c..053e875 100644
--- a/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj
+++ b/samples/Sample.RabbitMQ.MySql/Sample.RabbitMQ.MySql.csproj
@@ -10,8 +10,8 @@
-
-
+
+
diff --git a/samples/Sample.RabbitMQ.PostgreSql/Sample.RabbitMQ.PostgreSql.csproj b/samples/Sample.RabbitMQ.PostgreSql/Sample.RabbitMQ.PostgreSql.csproj
index 7dba0be..f2fe0c5 100644
--- a/samples/Sample.RabbitMQ.PostgreSql/Sample.RabbitMQ.PostgreSql.csproj
+++ b/samples/Sample.RabbitMQ.PostgreSql/Sample.RabbitMQ.PostgreSql.csproj
@@ -5,8 +5,8 @@
-
-
+
+
diff --git a/samples/Sample.RabbitMQ.SqlServer/Sample.RabbitMQ.SqlServer.csproj b/samples/Sample.RabbitMQ.SqlServer/Sample.RabbitMQ.SqlServer.csproj
index e5c69ab..a32da56 100644
--- a/samples/Sample.RabbitMQ.SqlServer/Sample.RabbitMQ.SqlServer.csproj
+++ b/samples/Sample.RabbitMQ.SqlServer/Sample.RabbitMQ.SqlServer.csproj
@@ -6,7 +6,7 @@
-
+
diff --git a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
index 724ffe7..3161a38 100644
--- a/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
+++ b/src/DotNetCore.CAP.Kafka/DotNetCore.CAP.Kafka.csproj
@@ -8,13 +8,14 @@
$(PackageTags);Kafka
-
+
NU1605;NU1701
- NU1701
+ NU1701;CS1591
+ bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.Kafka.xml
-
+
diff --git a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
index 2568536..2affaa6 100644
--- a/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
+++ b/src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs
@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.Kafka
public event EventHandler OnMessageReceived;
- public event EventHandler OnError;
+ public event EventHandler OnLog;
public void Subscribe(IEnumerable topics)
{
@@ -34,7 +34,6 @@ namespace DotNetCore.CAP.Kafka
if (_consumerClient == null)
InitKafkaClient();
- //_consumerClient.Assign(topics.Select(x=> new TopicPartition(x, 0)));
_consumerClient.Subscribe(topics);
}
@@ -55,7 +54,7 @@ namespace DotNetCore.CAP.Kafka
public void Reject()
{
- // Ignore, Kafka will not commit offset when not commit.
+ _consumerClient.Assign(_consumerClient.Assignment);
}
public void Dispose()
@@ -76,12 +75,17 @@ namespace DotNetCore.CAP.Kafka
_consumerClient.OnError += ConsumerClient_OnError;
}
+
private void ConsumerClient_OnConsumeError(object sender, Message e)
{
var message = e.Deserialize(null, StringDeserializer);
-
- OnError?.Invoke(sender, $"An error occurred during consume the message; Topic:'{e.Topic}'," +
- $"Message:'{message.Value}', Reason:'{e.Error}'.");
+ var logArgs = new LogMessageEventArgs
+ {
+ LogType = MqLogType.ConsumeError,
+ Reason = $"An error occurred during consume the message; Topic:'{e.Topic}'," +
+ $"Message:'{message.Value}', Reason:'{e.Error}'."
+ };
+ OnLog?.Invoke(sender, logArgs);
}
private void ConsumerClient_OnMessage(object sender, Message e)
@@ -98,7 +102,12 @@ namespace DotNetCore.CAP.Kafka
private void ConsumerClient_OnError(object sender, Error e)
{
- OnError?.Invoke(sender, e.ToString());
+ var logArgs = new LogMessageEventArgs
+ {
+ LogType = MqLogType.ServerConnError,
+ Reason = e.ToString()
+ };
+ OnLog?.Invoke(sender, logArgs);
}
#endregion private methods
diff --git a/src/DotNetCore.CAP.MySql/CapPublisher.cs b/src/DotNetCore.CAP.MySql/CapPublisher.cs
index 276335b..25f5127 100644
--- a/src/DotNetCore.CAP.MySql/CapPublisher.cs
+++ b/src/DotNetCore.CAP.MySql/CapPublisher.cs
@@ -62,14 +62,12 @@ namespace DotNetCore.CAP.MySql
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
}
- protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
+ protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
- dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
+ await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
-
- return Task.CompletedTask;
}
#region private methods
diff --git a/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj b/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj
index 011148b..d64ff32 100644
--- a/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj
+++ b/src/DotNetCore.CAP.MySql/DotNetCore.CAP.MySql.csproj
@@ -8,11 +8,16 @@
$(PackageTags);MySQL
+
+ bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.MySql.xml
+ 1701;1702;1705;CS1591
+
+
-
-
-
-
+
+
+
+
diff --git a/src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs b/src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs
index 8e4934e..c4ab741 100644
--- a/src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs
+++ b/src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs
@@ -1,29 +1,21 @@
-using System;
-using System.Data;
-using System.Threading;
-using Dapper;
+using Dapper;
using DotNetCore.CAP.Models;
+using MySql.Data.MySqlClient;
namespace DotNetCore.CAP.MySql
{
public class MySqlFetchedMessage : IFetchedMessage
{
- private static readonly TimeSpan KeepAliveInterval = TimeSpan.FromMinutes(1);
- private readonly IDbConnection _connection;
- private readonly object _lockObject = new object();
- private readonly Timer _timer;
- private readonly IDbTransaction _transaction;
+ private readonly MySqlOptions _options;
+ private readonly string _processId;
- public MySqlFetchedMessage(int messageId,
- MessageType type,
- IDbConnection connection,
- IDbTransaction transaction)
+ public MySqlFetchedMessage(int messageId, MessageType type, string processId, MySqlOptions options)
{
MessageId = messageId;
MessageType = type;
- _connection = connection;
- _transaction = transaction;
- _timer = new Timer(ExecuteKeepAliveQuery, null, KeepAliveInterval, KeepAliveInterval);
+
+ _processId = processId;
+ _options = options;
}
public int MessageId { get; }
@@ -32,43 +24,25 @@ namespace DotNetCore.CAP.MySql
public void RemoveFromQueue()
{
- lock (_lockObject)
+ using (var connection = new MySqlConnection(_options.ConnectionString))
{
- _transaction.Commit();
- }
+ connection.Execute($"DELETE FROM `{_options.TableNamePrefix}.queue` WHERE `ProcessId`=@ProcessId"
+ , new { ProcessId = _processId });
+ }
}
public void Requeue()
{
- lock (_lockObject)
+ using (var connection = new MySqlConnection(_options.ConnectionString))
{
- _transaction.Rollback();
+ connection.Execute($"UPDATE `{_options.TableNamePrefix}.queue` SET `ProcessId`=NULL WHERE `ProcessId`=@ProcessId"
+ , new { ProcessId = _processId });
}
}
public void Dispose()
{
- lock (_lockObject)
- {
- _timer?.Dispose();
- _transaction.Dispose();
- _connection.Dispose();
- }
- }
-
- private void ExecuteKeepAliveQuery(object obj)
- {
- lock (_lockObject)
- {
- try
- {
- _connection?.Execute("SELECT 1", _transaction);
- }
- catch
- {
- // ignored
- }
- }
+ // ignored
}
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.MySql/MySqlStorage.cs b/src/DotNetCore.CAP.MySql/MySqlStorage.cs
index 6e9c56e..3755b0d 100644
--- a/src/DotNetCore.CAP.MySql/MySqlStorage.cs
+++ b/src/DotNetCore.CAP.MySql/MySqlStorage.cs
@@ -53,7 +53,8 @@ namespace DotNetCore.CAP.MySql
$@"
CREATE TABLE IF NOT EXISTS `{prefix}.queue` (
`MessageId` int(11) NOT NULL,
- `MessageType` tinyint(4) NOT NULL
+ `MessageType` tinyint(4) NOT NULL,
+ `ProcessId` varchar(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE IF NOT EXISTS `{prefix}.received` (
@@ -62,8 +63,8 @@ CREATE TABLE IF NOT EXISTS `{prefix}.received` (
`Group` varchar(200) DEFAULT NULL,
`Content` longtext,
`Retries` int(11) DEFAULT NULL,
- `Added` datetime(6) NOT NULL,
- `ExpiresAt` datetime(6) DEFAULT NULL,
+ `Added` datetime NOT NULL,
+ `ExpiresAt` datetime DEFAULT NULL,
`StatusName` varchar(50) NOT NULL,
PRIMARY KEY (`Id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
@@ -73,8 +74,8 @@ CREATE TABLE IF NOT EXISTS `{prefix}.published` (
`Name` varchar(200) NOT NULL,
`Content` longtext,
`Retries` int(11) DEFAULT NULL,
- `Added` datetime(6) NOT NULL,
- `ExpiresAt` datetime(6) DEFAULT NULL,
+ `Added` datetime NOT NULL,
+ `ExpiresAt` datetime DEFAULT NULL,
`StatusName` varchar(40) NOT NULL,
PRIMARY KEY (`Id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
diff --git a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs
index 23d1abd..1ed4e0e 100644
--- a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs
+++ b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs
@@ -1,6 +1,5 @@
using System;
using System.Collections.Generic;
-using System.Data;
using System.Threading.Tasks;
using Dapper;
using DotNetCore.CAP.Infrastructure;
@@ -42,14 +41,12 @@ namespace DotNetCore.CAP.MySql
public Task FetchNextMessageAsync()
{
+ var processId = ObjectId.GenerateNewStringId();
var sql = $@"
-SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` LIMIT 1 FOR UPDATE;
-DELETE FROM `{_prefix}.queue` LIMIT 1;";
- // var sql = $@"
- //SELECT @MId:=`MessageId` as MessageId, @MType:=`MessageType` as MessageType FROM `{_prefix}.queue` LIMIT 1;
- //DELETE FROM `{_prefix}.queue` where `MessageId` = @MId AND `MessageType`=@MType;";
+UPDATE `{_prefix}.queue` SET `ProcessId`=@ProcessId WHERE `ProcessId` IS NULL LIMIT 1;
+SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId;";
- return FetchNextMessageCoreAsync(sql);
+ return FetchNextMessageCoreAsync(sql, processId);
}
public async Task GetNextPublishedMessageToBeEnqueuedAsync()
@@ -60,6 +57,7 @@ SELECT * FROM `{_prefix}.published` WHERE Id=LAST_INSERT_ID();";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
+ connection.Execute("SELECT LAST_INSERT_ID(0)");
return await connection.QueryFirstOrDefaultAsync(sql);
}
}
@@ -105,6 +103,7 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
+ connection.Execute("SELECT LAST_INSERT_ID(0)");
return await connection.QueryFirstOrDefaultAsync(sql);
}
}
@@ -118,15 +117,10 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
}
}
-
- public void Dispose()
- {
- }
-
public bool ChangePublishedState(int messageId, string state)
{
var sql =
- $"UPDATE `{_prefix}.published` SET `Retries`=`Retries`+1,`StatusName` = '{state}' WHERE `Id`={messageId}";
+ $"UPDATE `{_prefix}.published` SET `Retries`=`Retries`+1,`ExpiresAt`=NULL,`StatusName` = '{state}' WHERE `Id`={messageId}";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
@@ -137,7 +131,7 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
public bool ChangeReceivedState(int messageId, string state)
{
var sql =
- $"UPDATE `{_prefix}.received` SET `Retries`=`Retries`+1,`StatusName` = '{state}' WHERE `Id`={messageId}";
+ $"UPDATE `{_prefix}.received` SET `Retries`=`Retries`+1,`ExpiresAt`=NULL,`StatusName` = '{state}' WHERE `Id`={messageId}";
using (var connection = new MySqlConnection(Options.ConnectionString))
{
@@ -145,45 +139,22 @@ SELECT * FROM `{_prefix}.received` WHERE Id=LAST_INSERT_ID();";
}
}
- private async Task FetchNextMessageCoreAsync(string sql, object args = null)
+ private async Task FetchNextMessageCoreAsync(string sql, string processId)
{
- //here don't use `using` to dispose
- var connection = new MySqlConnection(Options.ConnectionString);
- await connection.OpenAsync();
- var transaction = connection.BeginTransaction(IsolationLevel.ReadCommitted);
- FetchedMessage fetchedMessage = null;
- try
- {
- //fetchedMessage = await connection.QuerySingleOrDefaultAsync(sql, args, transaction);
- // An anomaly with unknown causes, sometimes QuerySingleOrDefaultAsync can't return expected result.
- using (var reader = connection.ExecuteReader(sql, args, transaction))
- {
- while (reader.Read())
- {
- fetchedMessage = new FetchedMessage
- {
- MessageId = (int)reader.GetInt64(0),
- MessageType = (MessageType)reader.GetInt64(1)
- };
- }
- }
- }
- catch (MySqlException)
+ FetchedMessage fetchedMessage;
+ using (var connection = new MySqlConnection(Options.ConnectionString))
{
- transaction.Dispose();
- throw;
+ fetchedMessage = await connection.QuerySingleOrDefaultAsync(sql, new { ProcessId = processId });
}
if (fetchedMessage == null)
- {
- transaction.Rollback();
- transaction.Dispose();
- connection.Dispose();
return null;
- }
- return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, connection,
- transaction);
+ return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, processId, Options);
+ }
+
+ public void Dispose()
+ {
}
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs b/src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs
index dd8472e..0dbb424 100644
--- a/src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs
+++ b/src/DotNetCore.CAP.MySql/MySqlStorageTransaction.cs
@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.MySql
{
if (message == null) throw new ArgumentNullException(nameof(message));
- var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);";
+ var sql = $"INSERT INTO `{_prefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Publish},
_dbTransaction);
}
@@ -55,7 +55,7 @@ namespace DotNetCore.CAP.MySql
{
if (message == null) throw new ArgumentNullException(nameof(message));
- var sql = $"INSERT INTO `{_prefix}.queue` values(@MessageId,@MessageType);";
+ var sql = $"INSERT INTO `{_prefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);";
_dbConnection.Execute(sql, new CapQueue {MessageId = message.Id, MessageType = MessageType.Subscribe},
_dbTransaction);
}
diff --git a/src/DotNetCore.CAP.PostgreSql/CapPublisher.cs b/src/DotNetCore.CAP.PostgreSql/CapPublisher.cs
index 2c62532..bbaa86e 100644
--- a/src/DotNetCore.CAP.PostgreSql/CapPublisher.cs
+++ b/src/DotNetCore.CAP.PostgreSql/CapPublisher.cs
@@ -64,14 +64,12 @@ namespace DotNetCore.CAP.PostgreSql
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
}
- protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
+ protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
- dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
+ await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
_logger.LogInformation("Published Message has been persisted in the database. name:" + message);
-
- return Task.CompletedTask;
}
#region private methods
diff --git a/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj b/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj
index d3884ce..96dca04 100644
--- a/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj
+++ b/src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj
@@ -8,11 +8,16 @@
$(PackageTags);PostgreSQL
+
+ bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.PostgreSql.xml
+ 1701;1702;1705;CS1591
+
+
-
-
-
-
+
+
+
+
diff --git a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs b/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs
index e7be1a3..4874f8a 100644
--- a/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs
+++ b/src/DotNetCore.CAP.PostgreSql/PostgreSqlStorageConnection.cs
@@ -113,7 +113,7 @@ namespace DotNetCore.CAP.PostgreSql
public bool ChangePublishedState(int messageId, string state)
{
var sql =
- $"UPDATE \"{Options.Schema}\".\"published\" SET \"Retries\"=\"Retries\"+1,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";
+ $"UPDATE \"{Options.Schema}\".\"published\" SET \"Retries\"=\"Retries\"+1,\"ExpiresAt\"=NULL,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
@@ -124,7 +124,7 @@ namespace DotNetCore.CAP.PostgreSql
public bool ChangeReceivedState(int messageId, string state)
{
var sql =
- $"UPDATE \"{Options.Schema}\".\"received\" SET \"Retries\"=\"Retries\"+1,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";
+ $"UPDATE \"{Options.Schema}\".\"received\" SET \"Retries\"=\"Retries\"+1,\"ExpiresAt\"=NULL,\"StatusName\" = '{state}' WHERE \"Id\"={messageId}";
using (var connection = new NpgsqlConnection(Options.ConnectionString))
{
@@ -146,6 +146,7 @@ namespace DotNetCore.CAP.PostgreSql
catch (NpgsqlException)
{
transaction.Dispose();
+ connection.Dispose();
throw;
}
diff --git a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
index 149c1f3..d76ab91 100644
--- a/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/CAP.RabbiMQOptions.cs
@@ -28,9 +28,9 @@ namespace DotNetCore.CAP
public const string DefaultVHost = "/";
///
- /// Default exchange name (value: "cap.default.topic").
+ /// Default exchange name (value: "cap.default.router").
///
- public const string DefaultExchangeName = "cap.default.topic";
+ public const string DefaultExchangeName = "cap.default.router";
/// The topic exchange type.
public const string ExchangeType = "topic";
diff --git a/src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj b/src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj
index 55a0deb..6a68173 100644
--- a/src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj
+++ b/src/DotNetCore.CAP.RabbitMQ/DotNetCore.CAP.RabbitMQ.csproj
@@ -7,6 +7,11 @@
DotNetCore.CAP.RabbitMQ
$(PackageTags);RabbitMQ
+
+
+ bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.RabbitMQ.xml
+ 1701;1702;1705;CS1591
+
diff --git a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
index 596d1ec..da483f0 100644
--- a/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
+++ b/src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs
@@ -2,7 +2,6 @@
using System.Collections.Generic;
using System.Text;
using System.Threading;
-using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
@@ -15,6 +14,7 @@ namespace DotNetCore.CAP.RabbitMQ
private readonly string _queueName;
private readonly RabbitMQOptions _rabbitMQOptions;
+ private IConnection _connection;
private IModel _channel;
private ulong _deliveryTag;
@@ -32,7 +32,7 @@ namespace DotNetCore.CAP.RabbitMQ
public event EventHandler OnMessageReceived;
- public event EventHandler OnError;
+ public event EventHandler OnLog;
public void Subscribe(IEnumerable topics)
{
@@ -47,9 +47,18 @@ namespace DotNetCore.CAP.RabbitMQ
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += OnConsumerReceived;
consumer.Shutdown += OnConsumerShutdown;
+ consumer.Registered += OnConsumerRegistered;
+ consumer.Unregistered += OnConsumerUnregistered;
+ consumer.ConsumerCancelled += OnConsumerConsumerCancelled;
+
_channel.BasicConsume(_queueName, false, consumer);
+
while (true)
- Task.Delay(timeout, cancellationToken).GetAwaiter().GetResult();
+ {
+ cancellationToken.ThrowIfCancellationRequested();
+ cancellationToken.WaitHandle.WaitOne(timeout);
+ }
+ // ReSharper disable once FunctionNeverReturns
}
public void Commit()
@@ -65,13 +74,14 @@ namespace DotNetCore.CAP.RabbitMQ
public void Dispose()
{
_channel.Dispose();
+ _connection.Dispose();
}
private void InitClient()
{
- var connection = _connectionChannelPool.GetConnection();
+ _connection = _connectionChannelPool.GetConnection();
- _channel = connection.CreateModel();
+ _channel = _connection.CreateModel();
_channel.ExchangeDeclare(
_exchageName,
@@ -84,6 +94,38 @@ namespace DotNetCore.CAP.RabbitMQ
_channel.QueueDeclare(_queueName, true, false, false, arguments);
}
+ #region events
+
+ private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e)
+ {
+ var args = new LogMessageEventArgs
+ {
+ LogType = MqLogType.ConsumerCancelled,
+ Reason = e.ConsumerTag
+ };
+ OnLog?.Invoke(sender, args);
+ }
+
+ private void OnConsumerUnregistered(object sender, ConsumerEventArgs e)
+ {
+ var args = new LogMessageEventArgs
+ {
+ LogType = MqLogType.ConsumerUnregistered,
+ Reason = e.ConsumerTag
+ };
+ OnLog?.Invoke(sender, args);
+ }
+
+ private void OnConsumerRegistered(object sender, ConsumerEventArgs e)
+ {
+ var args = new LogMessageEventArgs
+ {
+ LogType = MqLogType.ConsumerRegistered,
+ Reason = e.ConsumerTag
+ };
+ OnLog?.Invoke(sender, args);
+ }
+
private void OnConsumerReceived(object sender, BasicDeliverEventArgs e)
{
_deliveryTag = e.DeliveryTag;
@@ -98,7 +140,14 @@ namespace DotNetCore.CAP.RabbitMQ
private void OnConsumerShutdown(object sender, ShutdownEventArgs e)
{
- OnError?.Invoke(sender, e.Cause?.ToString());
+ var args = new LogMessageEventArgs
+ {
+ LogType = MqLogType.ConsumerShutdown,
+ Reason = e.ToString()
+ };
+ OnLog?.Invoke(sender, args);
}
+
+ #endregion
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs
index 9410a4c..f362d12 100644
--- a/src/DotNetCore.CAP.SqlServer/CapPublisher.cs
+++ b/src/DotNetCore.CAP.SqlServer/CapPublisher.cs
@@ -60,17 +60,15 @@ namespace DotNetCore.CAP.SqlServer
{
dbConnection.Execute(PrepareSql(), message, dbTransaction);
- _logger.LogInformation("Published Message has been persisted in the database. name:" + message);
+ _logger.LogInformation("published message has been persisted to the database. name:" + message);
}
- protected override Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
+ protected override async Task ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message)
{
- dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
+ await dbConnection.ExecuteAsync(PrepareSql(), message, dbTransaction);
- _logger.LogInformation("Published Message has been persisted in the database. name:" + message);
-
- return Task.CompletedTask;
+ _logger.LogInformation("published message has been persisted to the database. name:" + message);
}
#region private methods
diff --git a/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj b/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj
index f2c293b..7436359 100644
--- a/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj
+++ b/src/DotNetCore.CAP.SqlServer/DotNetCore.CAP.SqlServer.csproj
@@ -8,11 +8,16 @@
$(PackageTags);SQL Server
+
+ bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.SqlServer.xml
+ 1701;1702;1705;CS1591
+
+
-
-
-
-
+
+
+
+
diff --git a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs
index 2e66d97..424c43d 100644
--- a/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs
+++ b/src/DotNetCore.CAP.SqlServer/SqlServerStorageConnection.cs
@@ -68,19 +68,6 @@ OUTPUT DELETED.MessageId,DELETED.[MessageType];";
}
}
- public bool ChangePublishedState(int messageId, string state)
- {
- var sql =
- $"UPDATE [{Options.Schema}].[Published] SET Retries=Retries+1,StatusName = '{state}' WHERE Id={messageId}";
-
- using (var connection = new SqlConnection(Options.ConnectionString))
- {
- return connection.Execute(sql) > 0;
- }
- }
-
- // CapReceivedMessage
-
public async Task StoreReceivedMessageAsync(CapReceivedMessage message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
@@ -124,10 +111,21 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
}
}
+ public bool ChangePublishedState(int messageId, string state)
+ {
+ var sql =
+ $"UPDATE [{Options.Schema}].[Published] SET Retries=Retries+1,ExpiresAt=NULL,StatusName = '{state}' WHERE Id={messageId}";
+
+ using (var connection = new SqlConnection(Options.ConnectionString))
+ {
+ return connection.Execute(sql) > 0;
+ }
+ }
+
public bool ChangeReceivedState(int messageId, string state)
{
var sql =
- $"UPDATE [{Options.Schema}].[Received] SET Retries=Retries+1,StatusName = '{state}' WHERE Id={messageId}";
+ $"UPDATE [{Options.Schema}].[Received] SET Retries=Retries+1,ExpiresAt=NULL,StatusName = '{state}' WHERE Id={messageId}";
using (var connection = new SqlConnection(Options.ConnectionString))
{
@@ -153,6 +151,7 @@ VALUES(@Name,@Group,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
catch (SqlException)
{
transaction.Dispose();
+ connection.Dispose();
throw;
}
diff --git a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
index f821824..37e9808 100644
--- a/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
+++ b/src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs
@@ -123,7 +123,7 @@ namespace DotNetCore.CAP.Abstractions
"If you are using the EntityFramework, you do not need to use this overloaded.");
}
- private Task PublishWithTransAsync(string name, string content)
+ private async Task PublishWithTransAsync(string name, string content)
{
var message = new CapPublishedMessage
{
@@ -132,13 +132,11 @@ namespace DotNetCore.CAP.Abstractions
StatusName = StatusName.Scheduled
};
- ExecuteAsync(DbConnection, DbTransaction, message);
+ await ExecuteAsync(DbConnection, DbTransaction, message);
ClosedCap();
PublishQueuer.PulseEvent.Set();
-
- return Task.CompletedTask;
}
private void PublishWithTrans(string name, string content)
diff --git a/src/DotNetCore.CAP/Abstractions/TopicAttribute.cs b/src/DotNetCore.CAP/Abstractions/TopicAttribute.cs
index 454572f..59dd897 100644
--- a/src/DotNetCore.CAP/Abstractions/TopicAttribute.cs
+++ b/src/DotNetCore.CAP/Abstractions/TopicAttribute.cs
@@ -20,9 +20,10 @@ namespace DotNetCore.CAP.Abstractions
public string Name { get; }
///
+ /// Default group name is CapOptions setting.(Assembly name)
/// kafka --> groups.id
/// rabbit MQ --> queue.name
///
- public string Group { get; set; } = "cap.default.group";
+ public string Group { get; set; }
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs
index b80d0e0..a7f6198 100644
--- a/src/DotNetCore.CAP/CAP.Options.cs
+++ b/src/DotNetCore.CAP/CAP.Options.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Reflection;
using DotNetCore.CAP.Models;
namespace DotNetCore.CAP
@@ -34,6 +35,7 @@ namespace DotNetCore.CAP
///
public const int DefaultFailedRetryCount = 100;
+
public CapOptions()
{
PollingDelay = DefaultPollingDelay;
@@ -42,10 +44,16 @@ namespace DotNetCore.CAP
FailedRetryInterval = DefaultFailedMessageWaitingInterval;
FailedRetryCount = DefaultFailedRetryCount;
Extensions = new List();
+ DefaultGroup = "cap.queue." + Assembly.GetEntryAssembly().GetName().Name.ToLower();
}
internal IList Extensions { get; }
+ ///
+ /// Subscriber default group name. kafka-->group name. rabbitmq --> queue name.
+ ///
+ public string DefaultGroup { get; set; }
+
///
/// Producer job polling delay time.
/// Default is 15 sec.
diff --git a/src/DotNetCore.CAP/Dashboard/Content/js/cap.js b/src/DotNetCore.CAP/Dashboard/Content/js/cap.js
index be6a83d..9b5a98c 100644
--- a/src/DotNetCore.CAP/Dashboard/Content/js/cap.js
+++ b/src/DotNetCore.CAP/Dashboard/Content/js/cap.js
@@ -371,7 +371,7 @@
receivedSucceeded,
receivedFailed,
receivedSucceededStr,
- receivedFailedStr,
+ receivedFailedStr
);
$(window).resize(function() {
diff --git a/src/DotNetCore.CAP/DotNetCore.CAP.csproj b/src/DotNetCore.CAP/DotNetCore.CAP.csproj
index 5b22347..e4fb3ea 100644
--- a/src/DotNetCore.CAP/DotNetCore.CAP.csproj
+++ b/src/DotNetCore.CAP/DotNetCore.CAP.csproj
@@ -5,6 +5,10 @@
DotNetCore.CAP
$(PackageTags);
+
+ bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.xml
+ 1701;1702;1705;CS1591
+
diff --git a/src/DotNetCore.CAP/IConsumerClient.cs b/src/DotNetCore.CAP/IConsumerClient.cs
index f6adb66..4dd3143 100644
--- a/src/DotNetCore.CAP/IConsumerClient.cs
+++ b/src/DotNetCore.CAP/IConsumerClient.cs
@@ -33,6 +33,6 @@ namespace DotNetCore.CAP
event EventHandler OnMessageReceived;
- event EventHandler OnError;
+ event EventHandler OnLog;
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/IConsumerHandler.Default.cs b/src/DotNetCore.CAP/IConsumerHandler.Default.cs
index e727702..645a121 100644
--- a/src/DotNetCore.CAP/IConsumerHandler.Default.cs
+++ b/src/DotNetCore.CAP/IConsumerHandler.Default.cs
@@ -23,6 +23,7 @@ namespace DotNetCore.CAP
private readonly IServiceProvider _serviceProvider;
private Task _compositeTask;
+
private bool _disposed;
public ConsumerHandler(
@@ -44,17 +45,18 @@ namespace DotNetCore.CAP
foreach (var matchGroup in groupingMatches)
Task.Factory.StartNew(() =>
- {
- using (var client = _consumerClientFactory.Create(matchGroup.Key))
- {
- RegisterMessageProcessor(client);
+ {
+ using (var client = _consumerClientFactory.Create(matchGroup.Key))
+ {
+ RegisterMessageProcessor(client);
- client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));
+ client.Subscribe(matchGroup.Value.Select(x => x.Attribute.Name));
- client.Listening(_pollingDelay, _cts.Token);
- }
- }, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
- _compositeTask = Task.CompletedTask;
+ client.Listening(_pollingDelay, _cts.Token);
+ }
+ }, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
+
+ _compositeTask = Task.CompletedTask;
}
public void Dispose()
@@ -62,13 +64,10 @@ namespace DotNetCore.CAP
if (_disposed)
return;
_disposed = true;
-
- _logger.ServerShuttingDown();
_cts.Cancel();
-
try
{
- _compositeTask.Wait(TimeSpan.FromSeconds(10));
+ _compositeTask.Wait(TimeSpan.FromSeconds(2));
}
catch (AggregateException ex)
{
@@ -105,7 +104,34 @@ namespace DotNetCore.CAP
Pulse();
};
- client.OnError += (sender, reason) => { _logger.MessageQueueError(reason); };
+ client.OnLog += WriteLog;
+ }
+
+ private void WriteLog(object sender, LogMessageEventArgs logmsg)
+ {
+ switch (logmsg.LogType)
+ {
+ case MqLogType.ConsumerCancelled:
+ _logger.LogWarning("RabbitMQ consumer cancelled. reason: " + logmsg.Reason);
+ break;
+ case MqLogType.ConsumerRegistered:
+ _logger.LogInformation("RabbitMQ consumer registered. " + logmsg.Reason);
+ break;
+ case MqLogType.ConsumerUnregistered:
+ _logger.LogWarning("RabbitMQ consumer unregistered. reason: " + logmsg.Reason);
+ break;
+ case MqLogType.ConsumerShutdown:
+ _logger.LogWarning("RabbitMQ consumer shutdown. reason:" + logmsg.Reason);
+ break;
+ case MqLogType.ConsumeError:
+ _logger.LogError("Kakfa client consume error. reason:" + logmsg.Reason);
+ break;
+ case MqLogType.ServerConnError:
+ _logger.LogCritical("Kafka server connection error. reason:" + logmsg.Reason);
+ break;
+ default:
+ throw new ArgumentOutOfRangeException();
+ }
}
private static void StoreMessage(IServiceScope serviceScope, MessageContext messageContext)
diff --git a/src/DotNetCore.CAP/IQueueExecutor.Subscribe.cs b/src/DotNetCore.CAP/IQueueExecutor.Subscribe.cs
index 9322026..7435b4b 100644
--- a/src/DotNetCore.CAP/IQueueExecutor.Subscribe.cs
+++ b/src/DotNetCore.CAP/IQueueExecutor.Subscribe.cs
@@ -76,7 +76,7 @@ namespace DotNetCore.CAP
}
catch (Exception ex)
{
- _logger.ExceptionOccuredWhileExecuting(message?.Name, ex);
+ _logger.ExceptionOccuredWhileExecuting(message.Name, ex);
fetched.Requeue();
@@ -89,7 +89,7 @@ namespace DotNetCore.CAP
IState newState;
if (!result.Succeeded)
{
- var shouldRetry = UpdateMessageForRetryAsync(message);
+ var shouldRetry = UpdateMessageForRetry(message);
if (shouldRetry)
{
newState = new ScheduledState();
@@ -109,7 +109,7 @@ namespace DotNetCore.CAP
return newState;
}
- private static bool UpdateMessageForRetryAsync(CapReceivedMessage message)
+ private static bool UpdateMessageForRetry(CapReceivedMessage message)
{
var retryBehavior = RetryBehavior.DefaultRetry;
diff --git a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs
index 1d23740..e20d306 100644
--- a/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs
+++ b/src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs
@@ -10,23 +10,26 @@ namespace DotNetCore.CAP.Internal
{
///
///
- /// A default implementation.
+ /// A default implementation.
///
internal class DefaultConsumerServiceSelector : IConsumerServiceSelector
{
+ private readonly CapOptions _capOptions;
private readonly IServiceProvider _serviceProvider;
///
- /// Creates a new .
+ /// Creates a new .
///
- public DefaultConsumerServiceSelector(IServiceProvider serviceProvider)
+ public DefaultConsumerServiceSelector(IServiceProvider serviceProvider, CapOptions capOptions)
{
_serviceProvider = serviceProvider;
+ _capOptions = capOptions;
}
///
- /// Selects the best candidate from for the
- /// current message associated.
+ /// Selects the best candidate from for
+ /// the
+ /// current message associated.
///
public ConsumerExecutorDescriptor SelectBestCandidate(string key,
IReadOnlyList executeDescriptor)
@@ -45,7 +48,7 @@ namespace DotNetCore.CAP.Internal
return executorDescriptorList;
}
- private static IEnumerable FindConsumersFromInterfaceTypes(
+ private IEnumerable FindConsumersFromInterfaceTypes(
IServiceProvider provider)
{
var executorDescriptorList = new List();
@@ -66,7 +69,7 @@ namespace DotNetCore.CAP.Internal
}
}
- private static IEnumerable FindConsumersFromControllerTypes()
+ private IEnumerable FindConsumersFromControllerTypes()
{
var executorDescriptorList = new List();
@@ -81,18 +84,21 @@ namespace DotNetCore.CAP.Internal
return executorDescriptorList;
}
- private static IEnumerable GetTopicAttributesDescription(TypeInfo typeInfo)
+ private IEnumerable GetTopicAttributesDescription(TypeInfo typeInfo)
{
foreach (var method in typeInfo.DeclaredMethods)
{
var topicAttr = method.GetCustomAttributes(true);
-
var topicAttributes = topicAttr as IList ?? topicAttr.ToList();
if (!topicAttributes.Any()) continue;
foreach (var attr in topicAttributes)
+ {
+ if (attr.Group == null)
+ attr.Group = _capOptions.DefaultGroup;
yield return InitDescriptor(attr, method, typeInfo);
+ }
}
}
diff --git a/src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs b/src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs
index 24d3e00..ca13a7f 100644
--- a/src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs
+++ b/src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs
@@ -28,22 +28,18 @@ namespace DotNetCore.CAP.Internal
public async Task ExecuteAsync(CapReceivedMessage receivedMessage)
{
+ if (!_selector.TryGetTopicExector(receivedMessage.Name, receivedMessage.Group,
+ out var executor))
+ {
+ var error = "message can not be found subscriber. Message:" + receivedMessage;
+ error += "\r\n see: https://github.com/dotnetcore/CAP/issues/63";
+ throw new SubscriberNotFoundException(error);
+ }
+
+ var consumerContext = new ConsumerContext(executor, receivedMessage.ToMessageContext());
try
{
- var executeDescriptorGroup = _selector.GetTopicExector(receivedMessage.Name);
-
- if (!executeDescriptorGroup.ContainsKey(receivedMessage.Group))
- {
- var error = $"Topic:{receivedMessage.Name}, can not be found subscriber method.";
- throw new SubscriberNotFoundException(error);
- }
-
- // If there are multiple consumers in the same group, we will take the first
- var executeDescriptor = executeDescriptorGroup[receivedMessage.Group][0];
- var consumerContext = new ConsumerContext(executeDescriptor, receivedMessage.ToMessageContext());
-
var ret = await Invoker.InvokeAsync(consumerContext);
-
if (!string.IsNullOrEmpty(ret.CallbackName))
await _callbackMessageSender.SendAsync(ret.MessageId, ret.CallbackName, ret.Result);
diff --git a/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs b/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs
index 4e861a7..831a082 100644
--- a/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs
+++ b/src/DotNetCore.CAP/Internal/MethodMatcherCache.cs
@@ -2,13 +2,13 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
-using DotNetCore.CAP.Abstractions;
namespace DotNetCore.CAP.Internal
{
internal class MethodMatcherCache
{
private readonly IConsumerServiceSelector _selector;
+ private List _allTopics;
public MethodMatcherCache(IConsumerServiceSelector selector)
{
@@ -54,5 +54,50 @@ namespace DotNetCore.CAP.Internal
}
return dic;
}
+
+ ///
+ /// Attempts to get the topic exector associated with the specified topic name and group name from the .
+ ///
+ /// The topic name of the value to get.
+ /// The group name of the value to get.
+ /// topic exector of the value.
+ /// true if the key was found, otherwise false.
+ public bool TryGetTopicExector(string topicName, string groupName,
+ out ConsumerExecutorDescriptor matchTopic)
+ {
+ if (Entries == null)
+ throw new ArgumentNullException(nameof(Entries));
+
+ matchTopic = null;
+
+ if (Entries.TryGetValue(groupName, out var groupMatchTopics))
+ {
+ matchTopic = groupMatchTopics.FirstOrDefault(x => x.Attribute.Name == topicName);
+ return matchTopic != null;
+ }
+ return false;
+ }
+
+ ///
+ /// Get all subscribe topics name.
+ ///
+ public IEnumerable GetSubscribeTopics()
+ {
+ if (_allTopics != null)
+ {
+ return _allTopics;
+ }
+
+ if (Entries == null)
+ throw new ArgumentNullException(nameof(Entries));
+
+ _allTopics = new List();
+
+ foreach (var descriptors in Entries.Values)
+ {
+ _allTopics.AddRange(descriptors.Select(x => x.Attribute.Name));
+ }
+ return _allTopics;
+ }
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/LoggerExtensions.cs b/src/DotNetCore.CAP/LoggerExtensions.cs
index 1352fda..99cccba 100644
--- a/src/DotNetCore.CAP/LoggerExtensions.cs
+++ b/src/DotNetCore.CAP/LoggerExtensions.cs
@@ -37,7 +37,7 @@ namespace DotNetCore.CAP
"Starting the processors throw an exception.");
_serverShuttingDown = LoggerMessage.Define(
- LogLevel.Debug,
+ LogLevel.Information,
2,
"Shutting down the processing server...");
diff --git a/src/DotNetCore.CAP/Models/CapReceivedMessage.cs b/src/DotNetCore.CAP/Models/CapReceivedMessage.cs
index 8849a87..eae744d 100644
--- a/src/DotNetCore.CAP/Models/CapReceivedMessage.cs
+++ b/src/DotNetCore.CAP/Models/CapReceivedMessage.cs
@@ -47,7 +47,7 @@ namespace DotNetCore.CAP.Models
public override string ToString()
{
- return "name:" + Name + ", content:" + Content;
+ return "name:" + Name + ", group:" + Group + ", content:" + Content;
}
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/MqLogType.cs b/src/DotNetCore.CAP/MqLogType.cs
new file mode 100644
index 0000000..1a392cb
--- /dev/null
+++ b/src/DotNetCore.CAP/MqLogType.cs
@@ -0,0 +1,24 @@
+using System;
+
+namespace DotNetCore.CAP
+{
+ public enum MqLogType
+ {
+ //RabbitMQ
+ ConsumerCancelled,
+ ConsumerRegistered,
+ ConsumerUnregistered,
+ ConsumerShutdown,
+
+ //Kafka
+ ConsumeError,
+ ServerConnError
+ }
+
+ public class LogMessageEventArgs : EventArgs
+ {
+ public string Reason { get; set; }
+
+ public MqLogType LogType { get; set; }
+ }
+}
diff --git a/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj b/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj
index 3bee533..5f22d0a 100644
--- a/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj
+++ b/test/DotNetCore.CAP.MySql.Test/DotNetCore.CAP.MySql.Test.csproj
@@ -14,14 +14,14 @@
-
-
-
-
-
-
+
+
+
+
+
+
-
+
diff --git a/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj b/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj
index 42c4471..2c34952 100644
--- a/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj
+++ b/test/DotNetCore.CAP.PostgreSql.Test/DotNetCore.CAP.PostgreSql.Test.csproj
@@ -7,11 +7,11 @@
-
-
-
-
-
+
+
+
+
+
diff --git a/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj b/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj
index 301ada2..677adf2 100644
--- a/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj
+++ b/test/DotNetCore.CAP.SqlServer.Test/DotNetCore.CAP.SqlServer.Test.csproj
@@ -11,14 +11,14 @@
-
-
-
-
-
-
+
+
+
+
+
+
-
+
diff --git a/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj b/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj
index 7d96d98..66c7673 100644
--- a/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj
+++ b/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj
@@ -8,13 +8,13 @@
-
+
-
-
-
+
+
+
-
+