|
|
@@ -24,7 +24,7 @@ namespace DotNetCore.CAP.Kafka |
|
|
|
|
|
|
|
public event EventHandler<MessageContext> OnMessageReceived; |
|
|
|
|
|
|
|
public event EventHandler<string> OnError; |
|
|
|
public event EventHandler<LogMessageEventArgs> OnLog; |
|
|
|
|
|
|
|
public void Subscribe(IEnumerable<string> topics) |
|
|
|
{ |
|
|
@@ -34,7 +34,6 @@ namespace DotNetCore.CAP.Kafka |
|
|
|
if (_consumerClient == null) |
|
|
|
InitKafkaClient(); |
|
|
|
|
|
|
|
//_consumerClient.Assign(topics.Select(x=> new TopicPartition(x, 0))); |
|
|
|
_consumerClient.Subscribe(topics); |
|
|
|
} |
|
|
|
|
|
|
@@ -76,12 +75,17 @@ namespace DotNetCore.CAP.Kafka |
|
|
|
_consumerClient.OnError += ConsumerClient_OnError; |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private void ConsumerClient_OnConsumeError(object sender, Message e) |
|
|
|
{ |
|
|
|
var message = e.Deserialize<Null, string>(null, StringDeserializer); |
|
|
|
|
|
|
|
OnError?.Invoke(sender, $"An error occurred during consume the message; Topic:'{e.Topic}'," + |
|
|
|
$"Message:'{message.Value}', Reason:'{e.Error}'."); |
|
|
|
var logArgs = new LogMessageEventArgs |
|
|
|
{ |
|
|
|
LogType = MqLogType.ConsumeError, |
|
|
|
Reason = $"An error occurred during consume the message; Topic:'{e.Topic}'," + |
|
|
|
$"Message:'{message.Value}', Reason:'{e.Error}'." |
|
|
|
}; |
|
|
|
OnLog?.Invoke(sender, logArgs); |
|
|
|
} |
|
|
|
|
|
|
|
private void ConsumerClient_OnMessage(object sender, Message<Null, string> e) |
|
|
@@ -98,7 +102,12 @@ namespace DotNetCore.CAP.Kafka |
|
|
|
|
|
|
|
private void ConsumerClient_OnError(object sender, Error e) |
|
|
|
{ |
|
|
|
OnError?.Invoke(sender, e.ToString()); |
|
|
|
var logArgs = new LogMessageEventArgs |
|
|
|
{ |
|
|
|
LogType = MqLogType.ServerConnError, |
|
|
|
Reason = e.ToString() |
|
|
|
}; |
|
|
|
OnLog?.Invoke(sender, logArgs); |
|
|
|
} |
|
|
|
|
|
|
|
#endregion private methods |
|
|
|