Browse Source

Support null value of message body.

master
Savorboard 5 years ago
parent
commit
cb86c9522b
6 changed files with 25 additions and 16 deletions
  1. +0
    -1
      src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs
  2. +10
    -9
      src/DotNetCore.CAP/ICapPublisher.cs
  3. +1
    -1
      src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
  4. +4
    -2
      src/DotNetCore.CAP/Messages/Message.cs
  5. +4
    -2
      src/DotNetCore.CAP/Messages/TransportMessage.cs
  6. +6
    -1
      src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs

+ 0
- 1
src/DotNetCore.CAP.Kafka/ITransport.Kafka.cs View File

@@ -2,7 +2,6 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.


using System; using System;
using System.Linq;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Confluent.Kafka; using Confluent.Kafka;


+ 10
- 9
src/DotNetCore.CAP/ICapPublisher.cs View File

@@ -5,6 +5,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using JetBrains.Annotations;


namespace DotNetCore.CAP namespace DotNetCore.CAP
{ {
@@ -17,42 +18,42 @@ namespace DotNetCore.CAP


/// <summary> /// <summary>
/// CAP transaction context object /// CAP transaction context object
/// </summary>
/// </summary>
AsyncLocal<ICapTransaction> Transaction { get; } AsyncLocal<ICapTransaction> Transaction { get; }


/// <summary> /// <summary>
/// Asynchronous publish an object message. /// Asynchronous publish an object message.
/// </summary> /// </summary>
/// <param name="name">the topic name or exchange router key.</param> /// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized.</param>
/// <param name="contentObj">message body content, that will be serialized. (can be null)</param>
/// <param name="callbackName">callback subscriber name</param> /// <param name="callbackName">callback subscriber name</param>
/// <param name="cancellationToken"></param> /// <param name="cancellationToken"></param>
Task PublishAsync<T>(string name, T contentObj, string callbackName = null, CancellationToken cancellationToken = default);
Task PublishAsync<T>(string name, [CanBeNull] T contentObj, string callbackName = null, CancellationToken cancellationToken = default);


/// <summary> /// <summary>
/// Asynchronous publish an object message with custom headers /// Asynchronous publish an object message with custom headers
/// </summary> /// </summary>
/// <typeparam name="T">content object</typeparam> /// <typeparam name="T">content object</typeparam>
/// <param name="name">the topic name or exchange router key.</param> /// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized.</param>
/// <param name="contentObj">message body content, that will be serialized. (can be null)</param>
/// <param name="headers">message additional headers.</param> /// <param name="headers">message additional headers.</param>
/// <param name="cancellationToken"></param> /// <param name="cancellationToken"></param>
Task PublishAsync<T>(string name, T contentObj, IDictionary<string, string> headers, CancellationToken cancellationToken = default);
Task PublishAsync<T>(string name, [CanBeNull] T contentObj, IDictionary<string, string> headers, CancellationToken cancellationToken = default);


/// <summary> /// <summary>
/// Publish an object message. /// Publish an object message.
/// </summary> /// </summary>
/// <param name="name">the topic name or exchange router key.</param> /// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="contentObj">message body content, that will be serialized. (can be null)</param>
/// <param name="callbackName">callback subscriber name</param> /// <param name="callbackName">callback subscriber name</param>
void Publish<T>(string name, T contentObj, string callbackName = null);
void Publish<T>(string name, [CanBeNull] T contentObj, string callbackName = null);


/// <summary> /// <summary>
/// Publish an object message. /// Publish an object message.
/// </summary> /// </summary>
/// <param name="name">the topic name or exchange router key.</param> /// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="contentObj">message body content, that will be serialized. (can be null)</param>
/// <param name="headers">message additional headers.</param> /// <param name="headers">message additional headers.</param>
void Publish<T>(string name, T contentObj, IDictionary<string, string> headers);
void Publish<T>(string name, [CanBeNull] T contentObj, IDictionary<string, string> headers);
} }
} }

+ 1
- 1
src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs View File

@@ -30,7 +30,7 @@ namespace DotNetCore.CAP.Internal
_logger.LogDebug("Executing subscriber method : {0}", context.ConsumerDescriptor.MethodInfo.Name); _logger.LogDebug("Executing subscriber method : {0}", context.ConsumerDescriptor.MethodInfo.Name);
var executor = ObjectMethodExecutor.Create( var executor = ObjectMethodExecutor.Create(
context.ConsumerDescriptor.MethodInfo,
context.ConsumerDescriptor.MethodInfo,
context.ConsumerDescriptor.ImplTypeInfo); context.ConsumerDescriptor.ImplTypeInfo);
using (var scope = _serviceProvider.CreateScope()) using (var scope = _serviceProvider.CreateScope())


+ 4
- 2
src/DotNetCore.CAP/Messages/Message.cs View File

@@ -1,18 +1,20 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using JetBrains.Annotations;


namespace DotNetCore.CAP.Messages namespace DotNetCore.CAP.Messages
{ {
public class Message public class Message
{ {
public Message(IDictionary<string, string> headers, object value)
public Message(IDictionary<string, string> headers, [CanBeNull] object value)
{ {
Headers = headers ?? throw new ArgumentNullException(nameof(headers)); Headers = headers ?? throw new ArgumentNullException(nameof(headers));
Value = value ?? throw new ArgumentNullException(nameof(value));
Value = value;
} }


public IDictionary<string, string> Headers { get; } public IDictionary<string, string> Headers { get; }


[CanBeNull]
public object Value { get; } public object Value { get; }
} }




+ 4
- 2
src/DotNetCore.CAP/Messages/TransportMessage.cs View File

@@ -1,5 +1,6 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using JetBrains.Annotations;


namespace DotNetCore.CAP.Messages namespace DotNetCore.CAP.Messages
{ {
@@ -8,10 +9,10 @@ namespace DotNetCore.CAP.Messages
/// </summary> /// </summary>
public class TransportMessage public class TransportMessage
{ {
public TransportMessage(IDictionary<string, string> headers, byte[] body)
public TransportMessage(IDictionary<string, string> headers, [CanBeNull] byte[] body)
{ {
Headers = headers ?? throw new ArgumentNullException(nameof(headers)); Headers = headers ?? throw new ArgumentNullException(nameof(headers));
Body = body ?? throw new ArgumentNullException(nameof(body));
Body = body;
} }


/// <summary> /// <summary>
@@ -22,6 +23,7 @@ namespace DotNetCore.CAP.Messages
/// <summary> /// <summary>
/// Gets the body object of this message /// Gets the body object of this message
/// </summary> /// </summary>
[CanBeNull]
public byte[] Body { get; } public byte[] Body { get; }


public string GetId() public string GetId()


+ 6
- 1
src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs View File

@@ -13,13 +13,18 @@ namespace DotNetCore.CAP.Serialization
{ {
public Task<TransportMessage> SerializeAsync(Message message) public Task<TransportMessage> SerializeAsync(Message message)
{ {
if (message.Value == null)
{
return Task.FromResult(new TransportMessage(message.Headers, null));
}

var json = JsonConvert.SerializeObject(message.Value); var json = JsonConvert.SerializeObject(message.Value);
return Task.FromResult(new TransportMessage(message.Headers, Encoding.UTF8.GetBytes(json))); return Task.FromResult(new TransportMessage(message.Headers, Encoding.UTF8.GetBytes(json)));
} }


public Task<Message> DeserializeAsync(TransportMessage transportMessage, Type valueType) public Task<Message> DeserializeAsync(TransportMessage transportMessage, Type valueType)
{ {
if (valueType == null)
if (valueType == null || transportMessage.Body == null)
{ {
return Task.FromResult(new Message(transportMessage.Headers, null)); return Task.FromResult(new Message(transportMessage.Headers, null));
} }


Loading…
Cancel
Save