diff --git a/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj b/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj
index 8d22151..7d6a5c5 100644
--- a/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj
+++ b/src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj
@@ -13,7 +13,7 @@
-
+
diff --git a/src/DotNetCore.CAP.NATS/ITransport.NATS.cs b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs
index ce8d564..0fe2057 100644
--- a/src/DotNetCore.CAP.NATS/ITransport.NATS.cs
+++ b/src/DotNetCore.CAP.NATS/ITransport.NATS.cs
@@ -2,13 +2,12 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
-using System.IO;
-using System.Runtime.Serialization.Formatters.Binary;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
+using NATS.Client;
namespace DotNetCore.CAP.NATS
{
@@ -31,14 +30,14 @@ namespace DotNetCore.CAP.NATS
try
{
- var binFormatter = new BinaryFormatter();
- using var mStream = new MemoryStream();
- binFormatter.Serialize(mStream, message);
-
- //connection.Publish(message.GetName(), mStream.ToArray());
- //return Task.FromResult(OperateResult.Success);
+ var msg = new Msg(message.GetName(), message.Body);
+ foreach (var header in message.Headers)
+ {
+ msg.Header[header.Key] = header.Value;
+ }
+
+ var reply= connection.Request(msg);
- var reply = connection.Request(message.GetName(), mStream.ToArray(), 2000);
if (reply.Data != null && reply.Data[0] == 1)
{
_logger.LogDebug($"NATS subject message [{message.GetName()}] has been consumed.");
diff --git a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs
index 1e04854..6c37b82 100644
--- a/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs
+++ b/src/DotNetCore.CAP.NATS/NATSConsumerClient.cs
@@ -3,8 +3,6 @@
using System;
using System.Collections.Generic;
-using System.IO;
-using System.Runtime.Serialization.Formatters.Binary;
using System.Threading;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
@@ -71,22 +69,21 @@ namespace DotNetCore.CAP.NATS
private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e)
{
- using var mStream = new MemoryStream();
- var binFormatter = new BinaryFormatter();
-
- mStream.Write(e.Message.Data, 0, e.Message.Data.Length);
- mStream.Position = 0;
-
- var message = (TransportMessage)binFormatter.Deserialize(mStream);
- message.Headers.Add(Headers.Group, _groupId);
- OnMessageReceived?.Invoke(e.Message.Reply, message);
+ var headers = new Dictionary();
+ foreach (string h in e.Message.Header.Keys)
+ {
+ headers.Add(h, e.Message.Header[h]);
+ }
+ var tranmessage = new TransportMessage(headers, e.Message.Data);
+ tranmessage.Headers.Add(Headers.Group, _groupId);
+ OnMessageReceived?.Invoke(e.Message.Reply, tranmessage);
}
public void Commit(object sender)
{
if (sender is string reply)
{
- _consumerClient.Publish(reply, new byte[] { 1 });
+ _consumerClient.Publish(reply, new byte[] {1});
}
}
@@ -94,7 +91,7 @@ namespace DotNetCore.CAP.NATS
{
if (sender is string reply)
{
- _consumerClient.Publish(reply, new byte[] { 0 });
+ _consumerClient.Publish(reply, new byte[] {0});
}
}
diff --git a/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj b/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj
index d646506..5abdbe1 100644
--- a/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj
+++ b/test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj
@@ -1,35 +1,35 @@
-
- net5.0
- false
-
+
+ net5.0
+ false
+
-
- 1701;1702;CS0067
-
+
+ 1701;1702;CS0067
+
-
-
-
-
-
-
-
- all
- runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
- all
- runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
-
+
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
-
-
-
-
-
+
+
+
+
+