ソースを参照

Fix subscribe interceptor

release/3.x.x
Christian Kratky 7年前
コミット
30df7d4c9e
5個のファイルの変更50行の追加33行の削除
  1. +3
    -0
      Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs
  2. +40
    -26
      MQTTnet.Core/Server/MqttClientSession.cs
  3. +4
    -4
      MQTTnet.Core/Server/MqttClientSessionsManager.cs
  4. +2
    -2
      MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs
  5. +1
    -1
      MQTTnet.Core/Server/MqttServer.cs

+ 3
- 0
Frameworks/MQTTnet.UniversalWindows/Implementations/MqttServerAdapter.cs ファイルの表示

@@ -30,6 +30,7 @@ namespace MQTTnet.Implementations
if (options.DefaultEndpointOptions.IsEnabled)
{
_defaultEndpointSocket = new StreamSocketListener();
_defaultEndpointSocket.Control.NoDelay = true;
await _defaultEndpointSocket.BindServiceNameAsync(options.GetDefaultEndpointPort().ToString(), SocketProtectionLevel.PlainSocket);
_defaultEndpointSocket.ConnectionReceived += AcceptDefaultEndpointConnectionsAsync;
}
@@ -57,6 +58,8 @@ namespace MQTTnet.Implementations
{
try
{
args.Socket.Control.NoDelay = true;

var clientAdapter = _mqttCommunicationAdapterFactory.CreateServerMqttCommunicationAdapter(new MqttTcpChannel(args.Socket));
ClientAccepted?.Invoke(this, new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
}


+ 40
- 26
MQTTnet.Core/Server/MqttClientSession.cs ファイルの表示

@@ -142,51 +142,65 @@ namespace MQTTnet.Core.Server
}
}

private async Task ProcessReceivedPacketAsync(IMqttCommunicationAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken)
private Task ProcessReceivedPacketAsync(IMqttCommunicationAdapter adapter, MqttBasePacket packet, CancellationToken cancellationToken)
{
if (packet is MqttSubscribePacket subscribePacket)
if (packet is MqttPingReqPacket)
{
var subscribeResult = _subscriptionsManager.Subscribe(subscribePacket);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket);
EnqueueRetainedMessages(subscribePacket);

if (subscribeResult.CloseConnection)
{
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttDisconnectPacket());
Stop();
}
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttPingRespPacket());
}
else if (packet is MqttUnsubscribePacket unsubscribePacket)

if (packet is MqttPublishPacket publishPacket)
{
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, _subscriptionsManager.Unsubscribe(unsubscribePacket));
return HandleIncomingPublishPacketAsync(adapter, publishPacket, cancellationToken);
}
else if (packet is MqttPublishPacket publishPacket)

if (packet is MqttPubRelPacket pubRelPacket)
{
await HandleIncomingPublishPacketAsync(adapter, publishPacket, cancellationToken);
return HandleIncomingPubRelPacketAsync(adapter, pubRelPacket, cancellationToken);
}
else if (packet is MqttPubRelPacket pubRelPacket)

if (packet is MqttPubRecPacket pubRecPacket)
{
await HandleIncomingPubRelPacketAsync(adapter, pubRelPacket, cancellationToken);
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, pubRecPacket.CreateResponse<MqttPubRelPacket>());
}
else if (packet is MqttPubRecPacket pubRecPacket)

if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)
{
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, pubRecPacket.CreateResponse<MqttPubRelPacket>());
// Discard message.
return Task.FromResult(0);
}
else if (packet is MqttPubAckPacket || packet is MqttPubCompPacket)

if (packet is MqttSubscribePacket subscribePacket)
{
// Discard message.
return HandleIncomingSubscribePacketAsync(adapter, subscribePacket, cancellationToken);
}
else if (packet is MqttPingReqPacket)

if (packet is MqttUnsubscribePacket unsubscribePacket)
{
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttPingRespPacket());
return adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, _subscriptionsManager.Unsubscribe(unsubscribePacket));
}
else if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
if (packet is MqttDisconnectPacket || packet is MqttConnectPacket)
{
Stop();
return Task.FromResult(0);
}
else

_logger.LogWarning("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
Stop();

return Task.FromResult(0);
}

private async Task HandleIncomingSubscribePacketAsync(IMqttCommunicationAdapter adapter, MqttSubscribePacket subscribePacket, CancellationToken cancellationToken)
{
var subscribeResult = _subscriptionsManager.Subscribe(subscribePacket, ClientId);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, subscribeResult.ResponsePacket);
EnqueueRetainedMessages(subscribePacket);

if (subscribeResult.CloseConnection)
{
_logger.LogWarning("Client '{0}': Received not supported packet ({1}). Closing connection.", ClientId, packet);
await adapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttDisconnectPacket());
Stop();
}
}


+ 4
- 4
MQTTnet.Core/Server/MqttClientSessionsManager.cs ファイルの表示

@@ -39,12 +39,12 @@ namespace MQTTnet.Core.Server

public MqttClientRetainedMessagesManager RetainedMessagesManager { get; }

public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter)
public async Task RunClientSessionAsync(IMqttCommunicationAdapter clientAdapter, CancellationToken cancellationToken)
{
var clientId = string.Empty;
try
{
if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, CancellationToken.None).ConfigureAwait(false) is MqttConnectPacket connectPacket))
if (!(await clientAdapter.ReceivePacketAsync(_options.DefaultCommunicationTimeout, cancellationToken).ConfigureAwait(false) is MqttConnectPacket connectPacket))
{
throw new MqttProtocolViolationException("The first packet from a client must be a 'CONNECT' packet [MQTT-3.1.0-1].");
}
@@ -57,7 +57,7 @@ namespace MQTTnet.Core.Server
var connectReturnCode = ValidateConnection(connectPacket);
if (connectReturnCode != MqttConnectReturnCode.ConnectionAccepted)
{
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttConnAckPacket
{
ConnectReturnCode = connectReturnCode
}).ConfigureAwait(false);
@@ -67,7 +67,7 @@ namespace MQTTnet.Core.Server

var clientSession = GetOrCreateClientSession(connectPacket);

await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, CancellationToken.None, new MqttConnAckPacket
await clientAdapter.SendPacketsAsync(_options.DefaultCommunicationTimeout, cancellationToken, new MqttConnAckPacket
{
ConnectReturnCode = connectReturnCode,
IsSessionPresent = clientSession.IsExistingSession


+ 2
- 2
MQTTnet.Core/Server/MqttClientSubscriptionsManager.cs ファイルの表示

@@ -16,7 +16,7 @@ namespace MQTTnet.Core.Server
_options = options?.Value ?? throw new ArgumentNullException(nameof(options));
}

public MqttClientSubscribeResult Subscribe(MqttSubscribePacket subscribePacket)
public MqttClientSubscribeResult Subscribe(MqttSubscribePacket subscribePacket, string clientId)
{
if (subscribePacket == null) throw new ArgumentNullException(nameof(subscribePacket));

@@ -27,7 +27,7 @@ namespace MQTTnet.Core.Server
{
foreach (var topicFilter in subscribePacket.TopicFilters)
{
var interceptorContext = new MqttSubscriptionInterceptorContext("", topicFilter);
var interceptorContext = new MqttSubscriptionInterceptorContext(clientId, topicFilter);
_options.SubscriptionsInterceptor?.Invoke(interceptorContext);
responsePacket.SubscribeReturnCodes.Add(interceptorContext.AcceptSubscription ? MqttSubscribeReturnCode.SuccessMaximumQoS1 : MqttSubscribeReturnCode.Failure);


+ 1
- 1
MQTTnet.Core/Server/MqttServer.cs ファイルの表示

@@ -109,7 +109,7 @@ namespace MQTTnet.Core.Server

private void OnClientAccepted(object sender, MqttServerAdapterClientAcceptedEventArgs eventArgs)
{
eventArgs.SessionTask = Task.Run(async () => await _clientSessionsManager.RunClientSessionAsync(eventArgs.Client), _cancellationTokenSource.Token);
eventArgs.SessionTask = Task.Run(async () => await _clientSessionsManager.RunClientSessionAsync(eventArgs.Client, _cancellationTokenSource.Token), _cancellationTokenSource.Token);
}

private void OnClientConnected(object sender, MqttClientConnectedEventArgs eventArgs)


読み込み中…
キャンセル
保存