Преглед на файлове

NATS upgrade client to support header (#960)

* replace BinaryFormatter with System.Text.Json.JsonSerializer

* Using heads and data of nats

* Remove unused namespaces.
master
changyin.han преди 3 години
committed by GitHub
родител
ревизия
4c3a36306e
No known key found for this signature in database GPG ключ ID: 4AEE18F83AFDEB23
променени са 4 файла, в които са добавени 47 реда и са изтрити 51 реда
  1. +1
    -1
      src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj
  2. +8
    -9
      src/DotNetCore.CAP.NATS/ITransport.NATS.cs
  3. +10
    -13
      src/DotNetCore.CAP.NATS/NATSConsumerClient.cs
  4. +28
    -28
      test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj

+ 1
- 1
src/DotNetCore.CAP.NATS/DotNetCore.CAP.NATS.csproj Целия файл

@@ -13,7 +13,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NATS.Client" Version="0.12.0" />
<PackageReference Include="NATS.Client" Version="0.14.0-pre1" />
</ItemGroup>

<ItemGroup>


+ 8
- 9
src/DotNetCore.CAP.NATS/ITransport.NATS.cs Целия файл

@@ -2,13 +2,12 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading.Tasks;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using Microsoft.Extensions.Logging;
using NATS.Client;

namespace DotNetCore.CAP.NATS
{
@@ -31,14 +30,14 @@ namespace DotNetCore.CAP.NATS

try
{
var binFormatter = new BinaryFormatter();
using var mStream = new MemoryStream();
binFormatter.Serialize(mStream, message);

//connection.Publish(message.GetName(), mStream.ToArray());
//return Task.FromResult(OperateResult.Success);
var msg = new Msg(message.GetName(), message.Body);
foreach (var header in message.Headers)
{
msg.Header[header.Key] = header.Value;
}
var reply= connection.Request(msg);

var reply = connection.Request(message.GetName(), mStream.ToArray(), 2000);
if (reply.Data != null && reply.Data[0] == 1)
{
_logger.LogDebug($"NATS subject message [{message.GetName()}] has been consumed.");


+ 10
- 13
src/DotNetCore.CAP.NATS/NATSConsumerClient.cs Целия файл

@@ -3,8 +3,6 @@

using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
@@ -71,22 +69,21 @@ namespace DotNetCore.CAP.NATS

private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e)
{
using var mStream = new MemoryStream();
var binFormatter = new BinaryFormatter();

mStream.Write(e.Message.Data, 0, e.Message.Data.Length);
mStream.Position = 0;

var message = (TransportMessage)binFormatter.Deserialize(mStream);
message.Headers.Add(Headers.Group, _groupId);
OnMessageReceived?.Invoke(e.Message.Reply, message);
var headers = new Dictionary<string, string>();
foreach (string h in e.Message.Header.Keys)
{
headers.Add(h, e.Message.Header[h]);
}
var tranmessage = new TransportMessage(headers, e.Message.Data);
tranmessage.Headers.Add(Headers.Group, _groupId);
OnMessageReceived?.Invoke(e.Message.Reply, tranmessage);
}

public void Commit(object sender)
{
if (sender is string reply)
{
_consumerClient.Publish(reply, new byte[] { 1 });
_consumerClient.Publish(reply, new byte[] {1});
}
}

@@ -94,7 +91,7 @@ namespace DotNetCore.CAP.NATS
{
if (sender is string reply)
{
_consumerClient.Publish(reply, new byte[] { 0 });
_consumerClient.Publish(reply, new byte[] {0});
}
}



+ 28
- 28
test/DotNetCore.CAP.Test/DotNetCore.CAP.Test.csproj Целия файл

@@ -1,35 +1,35 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>
<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1701;1702;CS0067</NoWarn>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<NoWarn>1701;1702;CS0067</NoWarn>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="5.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.1" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="3.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Moq" Version="4.16.1" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="5.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.9.1" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="3.0.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Moq" Version="4.16.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.InMemoryStorage\DotNetCore.CAP.InMemoryStorage.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
<ProjectReference Include="..\DotNetCore.CAP.MultiModuleSubscriberTests\DotNetCore.CAP.MultiModuleSubscriberTests.csproj" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\DotNetCore.CAP.InMemoryStorage\DotNetCore.CAP.InMemoryStorage.csproj" />
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" />
<ProjectReference Include="..\DotNetCore.CAP.MultiModuleSubscriberTests\DotNetCore.CAP.MultiModuleSubscriberTests.csproj" />
</ItemGroup>

</Project>

Зареждане…
Отказ
Запис