Browse Source

Fix broken package reader

release/3.x.x
Christian 7 years ago
parent
commit
8856a7aa34
3 changed files with 38 additions and 4 deletions
  1. +1
    -1
      Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs
  2. +1
    -2
      Tests/MQTTnet.Core.Tests/MqttClientTests.cs
  3. +36
    -1
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs

+ 1
- 1
Frameworks/MQTTnet.NetStandard/Adapter/MqttChannelAdapter.cs View File

@@ -137,7 +137,7 @@ namespace MQTTnet.Adapter
return new ReceivedMqttPacket(header, new MemoryStream(new byte[0], false));
}

var body = header.BodyLength <= ReadBufferSize ? new MemoryStream(new byte[header.BodyLength]) : new MemoryStream();
var body = header.BodyLength <= ReadBufferSize ? new MemoryStream(header.BodyLength) : new MemoryStream();
var buffer = new byte[ReadBufferSize];
while (body.Length < header.BodyLength)


+ 1
- 2
Tests/MQTTnet.Core.Tests/MqttClientTests.cs View File

@@ -1,5 +1,4 @@
using System;
using System.Net.Sockets;
using System.Net.Sockets;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using MQTTnet.Client;


+ 36
- 1
Tests/MQTTnet.Core.Tests/MqttServerTests.cs View File

@@ -299,7 +299,7 @@ namespace MQTTnet.Core.Tests

var serverAdapter = new TestMqttServerAdapter();
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());
try
{
var options = new MqttServerOptions { ApplicationMessageInterceptor = Interceptor };
@@ -330,6 +330,41 @@ namespace MQTTnet.Core.Tests
}
}

[TestMethod]
public async Task MqttServer_Body()
{
var serverAdapter = new TestMqttServerAdapter();
var s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());

var bodyIsMatching = false;
try
{
await s.StartAsync(new MqttServerOptions());

var c1 = await serverAdapter.ConnectTestClient(s, "c1");
var c2 = await serverAdapter.ConnectTestClient(s, "c2");
c1.ApplicationMessageReceived += (_, e) =>
{
if (Encoding.UTF8.GetString(e.ApplicationMessage.Payload) == "The body")
{
bodyIsMatching = true;
}
};

await c1.SubscribeAsync("A", MqttQualityOfServiceLevel.AtMostOnce);
await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("A").WithPayload(Encoding.UTF8.GetBytes("The body")).Build());

await Task.Delay(500);
}
finally
{
await s.StopAsync();
}

Assert.IsTrue(bodyIsMatching);
}

private class TestStorage : IMqttServerStorage
{
private IList<MqttApplicationMessage> _messages = new List<MqttApplicationMessage>();


Loading…
Cancel
Save