From e65c3f2f2b84b47952f15824cfe961f897a21d43 Mon Sep 17 00:00:00 2001 From: JanEggers Date: Sat, 16 Jun 2018 17:30:08 +0200 Subject: [PATCH] moved files into the correct place --- .../MqttConnectionContext.cs | 114 ----------------- .../MqttClientConnectionContextFactory.cs | 0 .../Client/Tcp/BufferExtensions.cs | 0 .../Client/Tcp/DuplexPipe.cs | 0 .../Client/Tcp/SocketAwaitable.cs | 0 .../Client/Tcp/SocketReceiver.cs | 0 .../Client/Tcp/SocketSender.cs | 0 .../Client/Tcp/TcpConnection.cs | 1 + .../ConnectionBuilderExtensions.cs | 0 .../MqttConnectionContext.cs | 120 ++++++++++++++++++ .../MqttConnectionHandler.cs | 0 .../MQTTnet.AspnetCore/ReaderExtensions.cs | 0 12 files changed, 121 insertions(+), 114 deletions(-) delete mode 100644 Frameworks/MQTTnet.AspnetCore/MqttConnectionContext.cs rename {Frameworks => Source}/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs (100%) rename {Frameworks => Source}/MQTTnet.AspnetCore/Client/Tcp/BufferExtensions.cs (100%) rename {Frameworks => Source}/MQTTnet.AspnetCore/Client/Tcp/DuplexPipe.cs (100%) rename {Frameworks => Source}/MQTTnet.AspnetCore/Client/Tcp/SocketAwaitable.cs (100%) rename {Frameworks => Source}/MQTTnet.AspnetCore/Client/Tcp/SocketReceiver.cs (100%) rename {Frameworks => Source}/MQTTnet.AspnetCore/Client/Tcp/SocketSender.cs (100%) rename {Frameworks => Source}/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs (99%) rename {Frameworks => Source}/MQTTnet.AspnetCore/ConnectionBuilderExtensions.cs (100%) create mode 100644 Source/MQTTnet.AspnetCore/MqttConnectionContext.cs rename {Frameworks => Source}/MQTTnet.AspnetCore/MqttConnectionHandler.cs (100%) rename {Frameworks => Source}/MQTTnet.AspnetCore/ReaderExtensions.cs (100%) diff --git a/Frameworks/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Frameworks/MQTTnet.AspnetCore/MqttConnectionContext.cs deleted file mode 100644 index f7acfa4..0000000 --- a/Frameworks/MQTTnet.AspnetCore/MqttConnectionContext.cs +++ /dev/null @@ -1,114 +0,0 @@ -using Microsoft.AspNetCore.Connections; -using MQTTnet.Adapter; -using MQTTnet.AspNetCore.Client.Tcp; -using MQTTnet.Packets; -using MQTTnet.Serializer; -using System; -using System.Collections.Generic; -using System.IO.Pipelines; -using System.Threading; -using System.Threading.Tasks; - -namespace MQTTnet.AspNetCore -{ - public class MqttConnectionContext : IMqttChannelAdapter - { - public IMqttPacketSerializer PacketSerializer { get; } - public ConnectionContext Connection { get; } - - public string Endpoint => Connection.ConnectionId; - - public MqttConnectionContext( - IMqttPacketSerializer packetSerializer, - ConnectionContext connection) - { - PacketSerializer = packetSerializer; - Connection = connection; - } - - public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) - { - if (Connection is TcpConnection tcp && !tcp.IsConnected) - { - return tcp.StartAsync(); - } - return Task.CompletedTask; - } - - public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken) - { - Connection.Transport.Input.Complete(); - Connection.Transport.Output.Complete(); - - return Task.CompletedTask; - } - - public void Dispose() - { - } - - public async Task ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken) - { - var input = Connection.Transport.Input; - - while (!cancellationToken.IsCancellationRequested) - { - ReadResult readResult; - ReadingPacketStarted?.Invoke(this, EventArgs.Empty); - - var readTask = input.ReadAsync(cancellationToken); - if (readTask.IsCompleted) - { - readResult = readTask.Result; - } - else - { - readResult = await readTask; - } - - var buffer = readResult.Buffer; - - var consumed = buffer.Start; - var observed = buffer.Start; - - try - { - if (!buffer.IsEmpty) - { - if (PacketSerializer.TryDeserialize(buffer, out var packet, out consumed, out observed)) - { - return packet; - } - } - else if (readResult.IsCompleted) - { - break; - } - } - finally - { - // The buffer was sliced up to where it was consumed, so we can just advance to the start. - // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data - // before yielding the read again. - input.AdvanceTo(consumed, observed); - ReadingPacketCompleted?.Invoke(this, EventArgs.Empty); - } - } - - cancellationToken.ThrowIfCancellationRequested(); - return null; - } - - public async Task SendPacketsAsync(TimeSpan timeout, IEnumerable packets, CancellationToken cancellationToken) - { - foreach (var packet in packets) - { - var buffer = PacketSerializer.Serialize(packet); - await Connection.Transport.Output.WriteAsync(buffer.AsMemory()); - } - } - - public event EventHandler ReadingPacketStarted; - public event EventHandler ReadingPacketCompleted; - } -} diff --git a/Frameworks/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs b/Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs similarity index 100% rename from Frameworks/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs rename to Source/MQTTnet.AspnetCore/Client/MqttClientConnectionContextFactory.cs diff --git a/Frameworks/MQTTnet.AspnetCore/Client/Tcp/BufferExtensions.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/BufferExtensions.cs similarity index 100% rename from Frameworks/MQTTnet.AspnetCore/Client/Tcp/BufferExtensions.cs rename to Source/MQTTnet.AspnetCore/Client/Tcp/BufferExtensions.cs diff --git a/Frameworks/MQTTnet.AspnetCore/Client/Tcp/DuplexPipe.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/DuplexPipe.cs similarity index 100% rename from Frameworks/MQTTnet.AspnetCore/Client/Tcp/DuplexPipe.cs rename to Source/MQTTnet.AspnetCore/Client/Tcp/DuplexPipe.cs diff --git a/Frameworks/MQTTnet.AspnetCore/Client/Tcp/SocketAwaitable.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketAwaitable.cs similarity index 100% rename from Frameworks/MQTTnet.AspnetCore/Client/Tcp/SocketAwaitable.cs rename to Source/MQTTnet.AspnetCore/Client/Tcp/SocketAwaitable.cs diff --git a/Frameworks/MQTTnet.AspnetCore/Client/Tcp/SocketReceiver.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketReceiver.cs similarity index 100% rename from Frameworks/MQTTnet.AspnetCore/Client/Tcp/SocketReceiver.cs rename to Source/MQTTnet.AspnetCore/Client/Tcp/SocketReceiver.cs diff --git a/Frameworks/MQTTnet.AspnetCore/Client/Tcp/SocketSender.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/SocketSender.cs similarity index 100% rename from Frameworks/MQTTnet.AspnetCore/Client/Tcp/SocketSender.cs rename to Source/MQTTnet.AspnetCore/Client/Tcp/SocketSender.cs diff --git a/Frameworks/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs b/Source/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs similarity index 99% rename from Frameworks/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs rename to Source/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs index 4bf74fb..7417e74 100644 --- a/Frameworks/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs +++ b/Source/MQTTnet.AspnetCore/Client/Tcp/TcpConnection.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.IO; using System.IO.Pipelines; using System.Net; diff --git a/Frameworks/MQTTnet.AspnetCore/ConnectionBuilderExtensions.cs b/Source/MQTTnet.AspnetCore/ConnectionBuilderExtensions.cs similarity index 100% rename from Frameworks/MQTTnet.AspnetCore/ConnectionBuilderExtensions.cs rename to Source/MQTTnet.AspnetCore/ConnectionBuilderExtensions.cs diff --git a/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs new file mode 100644 index 0000000..3d42937 --- /dev/null +++ b/Source/MQTTnet.AspnetCore/MqttConnectionContext.cs @@ -0,0 +1,120 @@ +using Microsoft.AspNetCore.Connections; +using MQTTnet.Adapter; +using MQTTnet.AspNetCore.Client.Tcp; +using MQTTnet.Packets; +using MQTTnet.Serializer; +using System; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; + +namespace MQTTnet.AspNetCore +{ + public class MqttConnectionContext : IMqttChannelAdapter + { + public IMqttPacketSerializer PacketSerializer { get; } + public ConnectionContext Connection { get; } + + public string Endpoint => Connection.ConnectionId; + + public MqttConnectionContext( + IMqttPacketSerializer packetSerializer, + ConnectionContext connection) + { + PacketSerializer = packetSerializer; + Connection = connection; + } + + public Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken) + { + if (Connection is TcpConnection tcp && !tcp.IsConnected) + { + return tcp.StartAsync(); + } + return Task.CompletedTask; + } + + public Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken) + { + Connection.Transport.Input.Complete(); + Connection.Transport.Output.Complete(); + + return Task.CompletedTask; + } + + public void Dispose() + { + } + + public async Task ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken) + { + var input = Connection.Transport.Input; + + try + { + while (!cancellationToken.IsCancellationRequested) + { + ReadResult readResult; + var readTask = input.ReadAsync(cancellationToken); + if (readTask.IsCompleted) + { + readResult = readTask.Result; + } + else + { + readResult = await readTask; + } + + var buffer = readResult.Buffer; + + var consumed = buffer.Start; + var observed = buffer.Start; + + try + { + if (!buffer.IsEmpty) + { + if (PacketSerializer.TryDeserialize(buffer, out var packet, out consumed, out observed)) + { + return packet; + } + else + { + // we did receive something but the message is not yet complete + ReadingPacketStarted?.Invoke(this, EventArgs.Empty); + } + } + else if (readResult.IsCompleted) + { + break; + } + } + finally + { + // The buffer was sliced up to where it was consumed, so we can just advance to the start. + // We mark examined as buffer.End so that if we didn't receive a full frame, we'll wait for more data + // before yielding the read again. + input.AdvanceTo(consumed, observed); + } + } + } + finally + { + ReadingPacketCompleted?.Invoke(this, EventArgs.Empty); + } + + cancellationToken.ThrowIfCancellationRequested(); + return null; + } + + public Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken) + { + var buffer = PacketSerializer.Serialize(packet); + return Connection.Transport.Output.WriteAsync(buffer.AsMemory(), cancellationToken).AsTask(); + } + + public event EventHandler ReadingPacketStarted; + public event EventHandler ReadingPacketCompleted; + } +} diff --git a/Frameworks/MQTTnet.AspnetCore/MqttConnectionHandler.cs b/Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs similarity index 100% rename from Frameworks/MQTTnet.AspnetCore/MqttConnectionHandler.cs rename to Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs diff --git a/Frameworks/MQTTnet.AspnetCore/ReaderExtensions.cs b/Source/MQTTnet.AspnetCore/ReaderExtensions.cs similarity index 100% rename from Frameworks/MQTTnet.AspnetCore/ReaderExtensions.cs rename to Source/MQTTnet.AspnetCore/ReaderExtensions.cs