Browse Source

Merge pull request #214 from JanEggers/Fix175

added tcp adapter
release/3.x.x
Christian 6 years ago
committed by GitHub
parent
commit
f90985a1f9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 12 deletions
  1. +4
    -1
      Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs
  2. +8
    -11
      Tests/MQTTnet.Core.Tests/MqttServerTests.cs
  3. +38
    -0
      Tests/MQTTnet.Core.Tests/TestServerExtensions.cs

+ 4
- 1
Frameworks/MQTTnet.AspnetCore/ServiceCollectionExtensions.cs View File

@@ -4,6 +4,7 @@ using Microsoft.Extensions.Hosting;
using MQTTnet.Adapter;
using MQTTnet.Diagnostics;
using MQTTnet.Server;
using MQTTnet.Implementations;

namespace MQTTnet.AspNetCore
{
@@ -20,7 +21,9 @@ namespace MQTTnet.AspNetCore
services.AddSingleton<IMqttServer>(s => s.GetService<MqttHostedServer>());
services.AddSingleton<MqttWebSocketServerAdapter>();
services.AddSingleton<IMqttServerAdapter>(s => s.GetService<MqttWebSocketServerAdapter>());
services.AddSingleton<MqttServerAdapter>();
services.AddSingleton<IMqttServerAdapter>(s => s.GetService<MqttWebSocketServerAdapter>());
services.AddSingleton<IMqttServerAdapter>(s => s.GetService<MqttServerAdapter>());

return services;
}


+ 8
- 11
Tests/MQTTnet.Core.Tests/MqttServerTests.cs View File

@@ -209,12 +209,9 @@ namespace MQTTnet.Core.Tests
await s.StartAsync(new MqttServerOptions());

var c1 = await serverAdapter.ConnectTestClient(s, "c1");
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
await c1.DisconnectAsync();

await Task.Delay(TimeSpan.FromSeconds(2));
// TODO: Find another way to wait for the retained components.

var c2 = await serverAdapter.ConnectTestClient(s, "c2");
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
await c2.SubscribeAsync(new TopicFilterBuilder().WithTopic("retained").Build());
@@ -244,7 +241,7 @@ namespace MQTTnet.Core.Tests
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build());
await c1.DisconnectAsync();
var c2 = await serverAdapter.ConnectTestClient(s, "c2");
c2.ApplicationMessageReceived += (_, __) => receivedMessagesCount++;
await c2.SubscribeAsync(new TopicFilter("retained", MqttQualityOfServiceLevel.AtMostOnce));
@@ -274,7 +271,8 @@ namespace MQTTnet.Core.Tests
await s.StartAsync(options);

var c1 = await serverAdapter.ConnectTestClient(s, "c1");
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());

await c1.PublishAndWaitForAsync(s, new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build());
await c1.DisconnectAsync();
}
finally
@@ -282,8 +280,7 @@ namespace MQTTnet.Core.Tests
await s.StopAsync();
}

await Task.Delay(TimeSpan.FromSeconds(2));
// TODO: Find another way to wait for the retained components.
Assert.AreEqual(1, storage.Messages.Count);

s = new MqttFactory().CreateMqttServer(new[] { serverAdapter }, new MqttNetLogger());

@@ -385,17 +382,17 @@ namespace MQTTnet.Core.Tests

private class TestStorage : IMqttServerStorage
{
private IList<MqttApplicationMessage> _messages = new List<MqttApplicationMessage>();
public IList<MqttApplicationMessage> Messages = new List<MqttApplicationMessage>();

public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages)
{
_messages = messages;
Messages = messages;
return Task.CompletedTask;
}

public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync()
{
return Task.FromResult(_messages);
return Task.FromResult(Messages);
}
}



+ 38
- 0
Tests/MQTTnet.Core.Tests/TestServerExtensions.cs View File

@@ -0,0 +1,38 @@
using MQTTnet.Client;
using MQTTnet.Server;
using System;
using System.Threading.Tasks;

namespace MQTTnet.Core.Tests
{
public static class TestServerExtensions
{
/// <summary>
/// publishes a message with a client and waits in the server until a message with the same topic is received
/// </summary>
/// <returns></returns>
public static async Task PublishAndWaitForAsync(this IMqttClient client, IMqttServer server, MqttApplicationMessage message)
{
var tcs = new TaskCompletionSource<object>();

EventHandler<MqttApplicationMessageReceivedEventArgs> handler = (sender, args) =>
{
if (args.ApplicationMessage.Topic == message.Topic)
{
tcs.SetResult(true);
}
};
server.ApplicationMessageReceived += handler;
try
{
await client.PublishAsync(message).ConfigureAwait(false);
await tcs.Task.ConfigureAwait(false);
}
finally
{
server.ApplicationMessageReceived -= handler;
}
}
}
}

Loading…
Cancel
Save