@@ -19,7 +19,9 @@ namespace MQTTnet | |||||
if (publisher == null) throw new ArgumentNullException(nameof(publisher)); | if (publisher == null) throw new ArgumentNullException(nameof(publisher)); | ||||
if (topic == null) throw new ArgumentNullException(nameof(topic)); | if (topic == null) throw new ArgumentNullException(nameof(topic)); | ||||
return publisher.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).Build()); | |||||
return publisher.PublishAsync(builder => builder | |||||
.WithTopic(topic) | |||||
); | |||||
} | } | ||||
public static Task PublishAsync(this IApplicationMessagePublisher publisher, string topic, string payload) | public static Task PublishAsync(this IApplicationMessagePublisher publisher, string topic, string payload) | ||||
@@ -27,7 +29,10 @@ namespace MQTTnet | |||||
if (publisher == null) throw new ArgumentNullException(nameof(publisher)); | if (publisher == null) throw new ArgumentNullException(nameof(publisher)); | ||||
if (topic == null) throw new ArgumentNullException(nameof(topic)); | if (topic == null) throw new ArgumentNullException(nameof(topic)); | ||||
return publisher.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(payload).Build()); | |||||
return publisher.PublishAsync(builder => builder | |||||
.WithTopic(topic) | |||||
.WithPayload(payload) | |||||
); | |||||
} | } | ||||
public static Task PublishAsync(this IApplicationMessagePublisher publisher, string topic, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel) | public static Task PublishAsync(this IApplicationMessagePublisher publisher, string topic, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel) | ||||
@@ -35,7 +40,17 @@ namespace MQTTnet | |||||
if (publisher == null) throw new ArgumentNullException(nameof(publisher)); | if (publisher == null) throw new ArgumentNullException(nameof(publisher)); | ||||
if (topic == null) throw new ArgumentNullException(nameof(topic)); | if (topic == null) throw new ArgumentNullException(nameof(topic)); | ||||
return publisher.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(payload).WithQualityOfServiceLevel(qualityOfServiceLevel).Build()); | |||||
return publisher.PublishAsync(builder => builder | |||||
.WithTopic(topic) | |||||
.WithPayload(payload) | |||||
.WithQualityOfServiceLevel(qualityOfServiceLevel) | |||||
); | |||||
} | |||||
public static Task PublishAsync(this IApplicationMessagePublisher publisher, Func<MqttApplicationMessageBuilder, MqttApplicationMessageBuilder> builder) | |||||
{ | |||||
var message = builder(new MqttApplicationMessageBuilder()).Build(); | |||||
return publisher.PublishAsync(message); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -214,7 +214,7 @@ namespace MQTTnet.Core.Tests | |||||
await s.StartAsync(new MqttServerOptions()); | await s.StartAsync(new MqttServerOptions()); | ||||
var c1 = await serverAdapter.ConnectTestClient("c1"); | var c1 = await serverAdapter.ConnectTestClient("c1"); | ||||
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).Build()); | |||||
await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[3])); | |||||
await c1.DisconnectAsync(); | await c1.DisconnectAsync(); | ||||
var c2 = await serverAdapter.ConnectTestClient("c2"); | var c2 = await serverAdapter.ConnectTestClient("c2"); | ||||
@@ -272,8 +272,8 @@ namespace MQTTnet.Core.Tests | |||||
await s.StartAsync(new MqttServerOptions()); | await s.StartAsync(new MqttServerOptions()); | ||||
var c1 = await serverAdapter.ConnectTestClient("c1"); | var c1 = await serverAdapter.ConnectTestClient("c1"); | ||||
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag().Build()); | |||||
await c1.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag().Build()); | |||||
await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[3]).WithRetainFlag()); | |||||
await c1.PublishAsync(builder => builder.WithTopic("retained").WithPayload(new byte[0]).WithRetainFlag()); | |||||
await c1.DisconnectAsync(); | await c1.DisconnectAsync(); | ||||
var c2 = await serverAdapter.ConnectTestClient("c2"); | var c2 = await serverAdapter.ConnectTestClient("c2"); | ||||
@@ -366,8 +366,7 @@ namespace MQTTnet.Core.Tests | |||||
isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0; | isIntercepted = string.Compare("extended", Encoding.UTF8.GetString(args.ApplicationMessage.Payload), StringComparison.Ordinal) == 0; | ||||
}; | }; | ||||
var m = new MqttApplicationMessageBuilder().WithTopic("test").Build(); | |||||
await c1.PublishAsync(m); | |||||
await c1.PublishAsync(builder => builder.WithTopic("test")); | |||||
await c1.DisconnectAsync(); | await c1.DisconnectAsync(); | ||||
await Task.Delay(500); | await Task.Delay(500); | ||||
@@ -403,7 +402,7 @@ namespace MQTTnet.Core.Tests | |||||
}; | }; | ||||
await c1.SubscribeAsync("A", MqttQualityOfServiceLevel.AtMostOnce); | await c1.SubscribeAsync("A", MqttQualityOfServiceLevel.AtMostOnce); | ||||
await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("A").WithPayload(Encoding.UTF8.GetBytes("The body")).Build()); | |||||
await c2.PublishAsync(builder => builder.WithTopic("A").WithPayload(Encoding.UTF8.GetBytes("The body"))); | |||||
await Task.Delay(1000); | await Task.Delay(1000); | ||||
} | } | ||||
@@ -452,7 +451,7 @@ namespace MQTTnet.Core.Tests | |||||
c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; | c1.ApplicationMessageReceived += (_, __) => receivedMessagesCount++; | ||||
await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); | await c1.SubscribeAsync(new TopicFilterBuilder().WithTopic(topicFilter).WithQualityOfServiceLevel(filterQualityOfServiceLevel).Build()); | ||||
await c2.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel).Build()); | |||||
await c2.PublishAsync(builder => builder.WithTopic(topic).WithPayload(new byte[0]).WithQualityOfServiceLevel(qualityOfServiceLevel)); | |||||
await Task.Delay(500); | await Task.Delay(500); | ||||
await c1.UnsubscribeAsync(topicFilter); | await c1.UnsubscribeAsync(topicFilter); | ||||
@@ -39,15 +39,15 @@ namespace MQTTnet.TestApp.NetCore | |||||
Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic); | Console.WriteLine(">> RECEIVED: " + e.ApplicationMessage.Topic); | ||||
}; | }; | ||||
await managedClient.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("1").Build()); | |||||
await managedClient.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS().Build()); | |||||
await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("1")); | |||||
await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("2").WithAtLeastOnceQoS()); | |||||
await managedClient.StartAsync(options); | await managedClient.StartAsync(options); | ||||
await managedClient.SubscribeAsync(new TopicFilter("xyz", MqttQualityOfServiceLevel.AtMostOnce)); | await managedClient.SubscribeAsync(new TopicFilter("xyz", MqttQualityOfServiceLevel.AtMostOnce)); | ||||
await managedClient.SubscribeAsync(new TopicFilter("abc", MqttQualityOfServiceLevel.AtMostOnce)); | await managedClient.SubscribeAsync(new TopicFilter("abc", MqttQualityOfServiceLevel.AtMostOnce)); | ||||
await managedClient.PublishAsync(new MqttApplicationMessageBuilder().WithTopic("Step").WithPayload("3").Build()); | |||||
await managedClient.PublishAsync(builder => builder.WithTopic("Step").WithPayload("3")); | |||||
Console.WriteLine("Managed client started."); | Console.WriteLine("Managed client started."); | ||||
Console.ReadLine(); | Console.ReadLine(); | ||||