Przeglądaj źródła

Add sync publish method with additional header

master
Savorboard 5 lat temu
rodzic
commit
772a509ce6
3 zmienionych plików z 44 dodań i 21 usunięć
  1. +18
    -3
      src/DotNetCore.CAP/ICapPublisher.cs
  2. +21
    -18
      src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs
  3. +5
    -0
      test/DotNetCore.CAP.Test/CAP.BuilderTest.cs

+ 18
- 3
src/DotNetCore.CAP/ICapPublisher.cs Wyświetl plik

@@ -24,13 +24,20 @@ namespace DotNetCore.CAP
/// Asynchronous publish an object message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="contentObj">message body content, that will be serialized.</param>
/// <param name="callbackName">callback subscriber name</param>
/// <param name="cancellationToken"></param>
Task PublishAsync<T>(string name, T contentObj, string callbackName = null, CancellationToken cancellationToken = default);


Task PublishAsync<T>(string name, T contentObj, IDictionary<string, string> optionHeaders, CancellationToken cancellationToken = default);
/// <summary>
/// Asynchronous publish an object message with custom headers
/// </summary>
/// <typeparam name="T">content object</typeparam>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized.</param>
/// <param name="headers">message additional headers.</param>
/// <param name="cancellationToken"></param>
Task PublishAsync<T>(string name, T contentObj, IDictionary<string, string> headers, CancellationToken cancellationToken = default);

/// <summary>
/// Publish an object message.
@@ -39,5 +46,13 @@ namespace DotNetCore.CAP
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="callbackName">callback subscriber name</param>
void Publish<T>(string name, T contentObj, string callbackName = null);

/// <summary>
/// Publish an object message.
/// </summary>
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized of json.</param>
/// <param name="headers">message additional headers.</param>
void Publish<T>(string name, T contentObj, IDictionary<string, string> headers);
}
}

+ 21
- 18
src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs Wyświetl plik

@@ -35,32 +35,30 @@ namespace DotNetCore.CAP.Internal

public AsyncLocal<ICapTransaction> Transaction { get; }

public async Task PublishAsync<T>(string name, T value,
IDictionary<string, string> optionHeaders,
CancellationToken cancellationToken = default)
public async Task PublishAsync<T>(string name, T value, IDictionary<string, string> headers, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(name))
{
throw new ArgumentNullException(nameof(name));
}

if (optionHeaders == null)
if (headers == null)
{
optionHeaders = new Dictionary<string, string>();
headers = new Dictionary<string, string>();
}

var messageId = SnowflakeId.Default().NextId().ToString();
optionHeaders.Add(Headers.MessageId, messageId);
optionHeaders.Add(Headers.MessageName, name);
optionHeaders.Add(Headers.Type, typeof(T).FullName);
optionHeaders.Add(Headers.SentTime, DateTimeOffset.Now.ToString());
if (!optionHeaders.ContainsKey(Headers.CorrelationId))
headers.Add(Headers.MessageId, messageId);
headers.Add(Headers.MessageName, name);
headers.Add(Headers.Type, typeof(T).FullName);
headers.Add(Headers.SentTime, DateTimeOffset.Now.ToString());
if (!headers.ContainsKey(Headers.CorrelationId))
{
optionHeaders.Add(Headers.CorrelationId, messageId);
optionHeaders.Add(Headers.CorrelationSequence, 0.ToString());
headers.Add(Headers.CorrelationId, messageId);
headers.Add(Headers.CorrelationSequence, 0.ToString());
}

var message = new Message(optionHeaders, value);
var message = new Message(headers, value);

long? tracingTimestamp = null;
try
@@ -99,11 +97,6 @@ namespace DotNetCore.CAP.Internal
}
}

public void Publish<T>(string name, T value, string callbackName = null)
{
PublishAsync(name, value, callbackName).GetAwaiter().GetResult();
}

public Task PublishAsync<T>(string name, T value, string callbackName = null,
CancellationToken cancellationToken = default)
{
@@ -115,6 +108,16 @@ namespace DotNetCore.CAP.Internal
return PublishAsync(name, value, header, cancellationToken);
}

public void Publish<T>(string name, T value, string callbackName = null)
{
PublishAsync(name, value, callbackName).GetAwaiter().GetResult();
}

public void Publish<T>(string name, T value, IDictionary<string, string> headers)
{
PublishAsync(name, value, headers).GetAwaiter().GetResult();
}

#region tracing

private long? TracingBefore(Message message)


+ 5
- 0
test/DotNetCore.CAP.Test/CAP.BuilderTest.cs Wyświetl plik

@@ -84,6 +84,11 @@ namespace DotNetCore.CAP.Test
{
throw new NotImplementedException();
}

public void Publish<T>(string name, T contentObj, IDictionary<string, string> headers)
{
throw new NotImplementedException();
}
}
}
}

Ładowanie…
Anuluj
Zapisz