diff --git a/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs b/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs
index fe0cfe0..c83d124 100644
--- a/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs
+++ b/src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs
@@ -2,7 +2,6 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
-using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
diff --git a/src/DotNetCore.CAP/ICapPublisher.cs b/src/DotNetCore.CAP/ICapPublisher.cs
index 58f4181..158417d 100644
--- a/src/DotNetCore.CAP/ICapPublisher.cs
+++ b/src/DotNetCore.CAP/ICapPublisher.cs
@@ -5,6 +5,7 @@ using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
+using JetBrains.Annotations;
namespace DotNetCore.CAP
{
@@ -17,42 +18,42 @@ namespace DotNetCore.CAP
///
/// CAP transaction context object
- ///
+ ///
AsyncLocal Transaction { get; }
///
/// Asynchronous publish an object message.
///
/// the topic name or exchange router key.
- /// message body content, that will be serialized.
+ /// message body content, that will be serialized. (can be null)
/// callback subscriber name
///
- Task PublishAsync(string name, T contentObj, string callbackName = null, CancellationToken cancellationToken = default);
+ Task PublishAsync(string name, [CanBeNull] T contentObj, string callbackName = null, CancellationToken cancellationToken = default);
///
/// Asynchronous publish an object message with custom headers
///
/// content object
/// the topic name or exchange router key.
- /// message body content, that will be serialized.
+ /// message body content, that will be serialized. (can be null)
/// message additional headers.
///
- Task PublishAsync(string name, T contentObj, IDictionary headers, CancellationToken cancellationToken = default);
+ Task PublishAsync(string name, [CanBeNull] T contentObj, IDictionary headers, CancellationToken cancellationToken = default);
///
/// Publish an object message.
///
/// the topic name or exchange router key.
- /// message body content, that will be serialized of json.
+ /// message body content, that will be serialized. (can be null)
/// callback subscriber name
- void Publish(string name, T contentObj, string callbackName = null);
+ void Publish(string name, [CanBeNull] T contentObj, string callbackName = null);
///
/// Publish an object message.
///
/// the topic name or exchange router key.
- /// message body content, that will be serialized of json.
+ /// message body content, that will be serialized. (can be null)
/// message additional headers.
- void Publish(string name, T contentObj, IDictionary headers);
+ void Publish(string name, [CanBeNull] T contentObj, IDictionary headers);
}
}
\ No newline at end of file
diff --git a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
index d50bb50..b87dd06 100644
--- a/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
+++ b/src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
@@ -30,7 +30,7 @@ namespace DotNetCore.CAP.Internal
_logger.LogDebug("Executing subscriber method : {0}", context.ConsumerDescriptor.MethodInfo.Name);
var executor = ObjectMethodExecutor.Create(
- context.ConsumerDescriptor.MethodInfo,
+ context.ConsumerDescriptor.MethodInfo,
context.ConsumerDescriptor.ImplTypeInfo);
using (var scope = _serviceProvider.CreateScope())
diff --git a/src/DotNetCore.CAP/Messages/Message.cs b/src/DotNetCore.CAP/Messages/Message.cs
index 213193f..2b4bfbf 100644
--- a/src/DotNetCore.CAP/Messages/Message.cs
+++ b/src/DotNetCore.CAP/Messages/Message.cs
@@ -1,18 +1,20 @@
using System;
using System.Collections.Generic;
+using JetBrains.Annotations;
namespace DotNetCore.CAP.Messages
{
public class Message
{
- public Message(IDictionary headers, object value)
+ public Message(IDictionary headers, [CanBeNull] object value)
{
Headers = headers ?? throw new ArgumentNullException(nameof(headers));
- Value = value ?? throw new ArgumentNullException(nameof(value));
+ Value = value;
}
public IDictionary Headers { get; }
+ [CanBeNull]
public object Value { get; }
}
diff --git a/src/DotNetCore.CAP/Messages/TransportMessage.cs b/src/DotNetCore.CAP/Messages/TransportMessage.cs
index 365d823..8f0ae4f 100644
--- a/src/DotNetCore.CAP/Messages/TransportMessage.cs
+++ b/src/DotNetCore.CAP/Messages/TransportMessage.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using JetBrains.Annotations;
namespace DotNetCore.CAP.Messages
{
@@ -8,10 +9,10 @@ namespace DotNetCore.CAP.Messages
///
public class TransportMessage
{
- public TransportMessage(IDictionary headers, byte[] body)
+ public TransportMessage(IDictionary headers, [CanBeNull] byte[] body)
{
Headers = headers ?? throw new ArgumentNullException(nameof(headers));
- Body = body ?? throw new ArgumentNullException(nameof(body));
+ Body = body;
}
///
@@ -22,6 +23,7 @@ namespace DotNetCore.CAP.Messages
///
/// Gets the body object of this message
///
+ [CanBeNull]
public byte[] Body { get; }
public string GetId()
diff --git a/src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs b/src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs
index f638751..b3fdbd9 100644
--- a/src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs
+++ b/src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs
@@ -13,13 +13,18 @@ namespace DotNetCore.CAP.Serialization
{
public Task SerializeAsync(Message message)
{
+ if (message.Value == null)
+ {
+ return Task.FromResult(new TransportMessage(message.Headers, null));
+ }
+
var json = JsonConvert.SerializeObject(message.Value);
return Task.FromResult(new TransportMessage(message.Headers, Encoding.UTF8.GetBytes(json)));
}
public Task DeserializeAsync(TransportMessage transportMessage, Type valueType)
{
- if (valueType == null)
+ if (valueType == null || transportMessage.Body == null)
{
return Task.FromResult(new Message(transportMessage.Headers, null));
}