Non puoi selezionare più di 25 argomenti Gli argomenti devono iniziare con una lettera o un numero, possono includere trattini ('-') e possono essere lunghi fino a 35 caratteri.
 
 
 
 

68 righe
2.4 KiB

  1. using Microsoft.AspNetCore.Connections;
  2. using Microsoft.VisualStudio.TestTools.UnitTesting;
  3. using MQTTnet.AspNetCore.Tests.Mockups;
  4. using MQTTnet.Exceptions;
  5. using MQTTnet.Packets;
  6. using System;
  7. using System.Linq;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. using MQTTnet.Formatter;
  11. namespace MQTTnet.AspNetCore.Tests
  12. {
  13. [TestClass]
  14. public class MqttConnectionContextTest
  15. {
  16. [TestMethod]
  17. public async Task TestReceivePacketAsyncThrowsWhenReaderCompleted()
  18. {
  19. var serializer = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311);
  20. var pipe = new DuplexPipeMockup();
  21. var connection = new DefaultConnectionContext();
  22. connection.Transport = pipe;
  23. var ctx = new MqttConnectionContext(serializer, connection);
  24. pipe.Receive.Writer.Complete();
  25. await Assert.ThrowsExceptionAsync<MqttCommunicationException>(() => ctx.ReceivePacketAsync(TimeSpan.Zero, CancellationToken.None));
  26. }
  27. [TestMethod]
  28. public async Task TestParallelWrites()
  29. {
  30. var serializer = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311);
  31. var pipe = new DuplexPipeMockup();
  32. var connection = new DefaultConnectionContext();
  33. connection.Transport = pipe;
  34. var ctx = new MqttConnectionContext(serializer, connection);
  35. var tasks = Enumerable.Range(1, 10).Select(_ => Task.Run(async () =>
  36. {
  37. for (int i = 0; i < 100; i++)
  38. {
  39. await ctx.SendPacketAsync(new MqttPublishPacket(), TimeSpan.Zero, CancellationToken.None).ConfigureAwait(false);
  40. }
  41. }));
  42. await Task.WhenAll(tasks).ConfigureAwait(false);
  43. }
  44. [TestMethod]
  45. public async Task TestLargePacket()
  46. {
  47. var serializer = new MqttPacketFormatterAdapter(MqttProtocolVersion.V311);
  48. var pipe = new DuplexPipeMockup();
  49. var connection = new DefaultConnectionContext();
  50. connection.Transport = pipe;
  51. var ctx = new MqttConnectionContext(serializer, connection);
  52. await ctx.SendPacketAsync(new MqttPublishPacket() { Payload = new byte[20_000] }, TimeSpan.Zero, CancellationToken.None).ConfigureAwait(false);
  53. var readResult = await pipe.Send.Reader.ReadAsync();
  54. Assert.IsTrue(readResult.Buffer.Length > 20000);
  55. }
  56. }
  57. }