Procházet zdrojové kódy

Enable #nullable default

master
Savorboard před 3 roky
rodič
revize
9512640bc7
53 změnil soubory, kde provedl 826 přidání a 836 odebrání
  1. +2
    -2
      CAP.sln
  2. +2
    -2
      src/DotNetCore.CAP/CAP.Attribute.cs
  3. +4
    -4
      src/DotNetCore.CAP/CAP.Options.cs
  4. +6
    -6
      src/DotNetCore.CAP/Diagnostics/EventData.Cap.P.cs
  5. +7
    -9
      src/DotNetCore.CAP/Diagnostics/EventData.Cap.S.cs
  6. +1
    -0
      src/DotNetCore.CAP/DotNetCore.CAP.csproj
  7. +4
    -5
      src/DotNetCore.CAP/ICapPublisher.cs
  8. +1
    -1
      src/DotNetCore.CAP/ICapTransaction.Base.cs
  9. +1
    -1
      src/DotNetCore.CAP/ICapTransaction.cs
  10. +3
    -3
      src/DotNetCore.CAP/Internal/ConsumerExecutedResult.cs
  11. +12
    -12
      src/DotNetCore.CAP/Internal/ConsumerExecutorDescriptor.cs
  12. +1
    -1
      src/DotNetCore.CAP/Internal/Filter/ExceptionContext.cs
  13. +2
    -2
      src/DotNetCore.CAP/Internal/Filter/ExecutedContext.cs
  14. +2
    -2
      src/DotNetCore.CAP/Internal/Filter/ExecutingContext.cs
  15. +5
    -6
      src/DotNetCore.CAP/Internal/IBootstrapper.Default.cs
  16. +7
    -10
      src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs
  17. +35
    -21
      src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
  18. +10
    -10
      src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs
  19. +1
    -1
      src/DotNetCore.CAP/Internal/IConsumerServiceSelector.cs
  20. +7
    -8
      src/DotNetCore.CAP/Internal/IMessageSender.Default.cs
  21. +1
    -2
      src/DotNetCore.CAP/Internal/IMessageSender.cs
  22. +6
    -7
      src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs
  23. +8
    -11
      src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs
  24. +1
    -2
      src/DotNetCore.CAP/Internal/ISubscribeInvoker.cs
  25. +1
    -1
      src/DotNetCore.CAP/Internal/LoggerExtensions.cs
  26. +2
    -2
      src/DotNetCore.CAP/Internal/MethodMatcherCache.cs
  27. +99
    -105
      src/DotNetCore.CAP/Internal/ObjectMethodExecutor/AwaitableInfo.cs
  28. +38
    -39
      src/DotNetCore.CAP/Internal/ObjectMethodExecutor/CoercedAwaitableInfo.cs
  29. +291
    -285
      src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutor.cs
  30. +85
    -89
      src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorAwaitable.cs
  31. +119
    -115
      src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorFSharpSupport.cs
  32. +1
    -1
      src/DotNetCore.CAP/Internal/PublisherSentFailedException.cs
  33. +1
    -1
      src/DotNetCore.CAP/Internal/SnowflakeId.cs
  34. +1
    -1
      src/DotNetCore.CAP/Internal/TopicAttribute.cs
  35. +2
    -2
      src/DotNetCore.CAP/Messages/FailedInfo.cs
  36. +12
    -13
      src/DotNetCore.CAP/Messages/Message.cs
  37. +7
    -9
      src/DotNetCore.CAP/Messages/TransportMessage.cs
  38. +6
    -6
      src/DotNetCore.CAP/Monitoring/MessageDto.cs
  39. +4
    -4
      src/DotNetCore.CAP/Monitoring/MessageQueryDto.cs
  40. +1
    -1
      src/DotNetCore.CAP/Monitoring/PagedQueryResult.cs
  41. +1
    -1
      src/DotNetCore.CAP/OperateResult.cs
  42. +1
    -1
      src/DotNetCore.CAP/Persistence/IDataStorage.cs
  43. +3
    -3
      src/DotNetCore.CAP/Persistence/MediumMessage.cs
  44. +4
    -4
      src/DotNetCore.CAP/Processor/IDispatcher.Default.cs
  45. +3
    -4
      src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs
  46. +2
    -2
      src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs
  47. +1
    -1
      src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs
  48. +1
    -5
      src/DotNetCore.CAP/Processor/ProcessingContext.cs
  49. +3
    -3
      src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs
  50. +3
    -4
      src/DotNetCore.CAP/Serialization/ISerializer.cs
  51. +3
    -4
      src/DotNetCore.CAP/Transport/BrokerAddress.cs
  52. +1
    -1
      src/DotNetCore.CAP/Transport/IConsumerClient.cs
  53. +1
    -1
      src/DotNetCore.CAP/Transport/MqLogType.cs

+ 2
- 2
CAP.sln Zobrazit soubor

@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29025.244
# Visual Studio Version 17
VisualStudioVersion = 17.0.31919.166
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{9B2AE124-6636-4DE9-83A3-70360DABD0C4}"
EndProject


+ 2
- 2
src/DotNetCore.CAP/CAP.Attribute.cs Zobrazit soubor

@@ -29,9 +29,9 @@ namespace DotNetCore.CAP
}
public class CapHeader : ReadOnlyDictionary<string, string>
public class CapHeader : ReadOnlyDictionary<string, string?>
{
public CapHeader(IDictionary<string, string> dictionary) : base(dictionary)
public CapHeader(IDictionary<string, string?> dictionary) : base(dictionary)
{
}


+ 4
- 4
src/DotNetCore.CAP/CAP.Options.cs Zobrazit soubor

@@ -40,12 +40,12 @@ namespace DotNetCore.CAP
/// <summary>
/// Subscriber group prefix.
/// </summary>
public string GroupNamePrefix { get; set; }
public string? GroupNamePrefix { get; set; }
/// <summary>
/// Topic prefix.
/// </summary>
public string TopicNamePrefix { get; set; }
public string? TopicNamePrefix { get; set; }

/// <summary>
/// The default version of the message, configured to isolate data in the same instance. The length must not exceed 20
@@ -67,7 +67,7 @@ namespace DotNetCore.CAP
/// <summary>
/// We’ll invoke this call-back with message type,name,content when retry failed (send or executed) messages equals <see cref="FailedRetryCount"/> times.
/// </summary>
public Action<FailedInfo> FailedThresholdCallback { get; set; }
public Action<FailedInfo>? FailedThresholdCallback { get; set; }

/// <summary>
/// The number of message retries, the retry will stop when the threshold is reached.
@@ -116,6 +116,6 @@ namespace DotNetCore.CAP
/// <summary>
/// Configure JSON serialization settings
/// </summary>
public JsonSerializerOptions JsonSerializerOptions { get; } = new JsonSerializerOptions();
public JsonSerializerOptions JsonSerializerOptions { get; } = new ();
}
}

+ 6
- 6
src/DotNetCore.CAP/Diagnostics/EventData.Cap.P.cs Zobrazit soubor

@@ -8,27 +8,27 @@ namespace DotNetCore.CAP.Diagnostics
{
public long? OperationTimestamp { get; set; }

public string Operation { get; set; }
public string Operation { get; set; } = default!;

public Message Message { get; set; }
public Message Message { get; set; } = default!;

public long? ElapsedTimeMs { get; set; }

public Exception Exception { get; set; }
public Exception? Exception { get; set; }
}

public class CapEventDataPubSend
{
public long? OperationTimestamp { get; set; }

public string Operation { get; set; }
public string Operation { get; set; } = default!;

public TransportMessage TransportMessage { get; set; }
public TransportMessage TransportMessage { get; set; } = default!;

public BrokerAddress BrokerAddress { get; set; }

public long? ElapsedTimeMs { get; set; }

public Exception Exception { get; set; }
public Exception? Exception { get; set; }
}
}

+ 7
- 9
src/DotNetCore.CAP/Diagnostics/EventData.Cap.S.cs Zobrazit soubor

@@ -5,7 +5,6 @@ using System;
using System.Reflection;
using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Transport;
using JetBrains.Annotations;

namespace DotNetCore.CAP.Diagnostics
{
@@ -13,30 +12,29 @@ namespace DotNetCore.CAP.Diagnostics
{
public long? OperationTimestamp { get; set; }

public string Operation { get; set; }
public string Operation { get; set; } = default!;

public TransportMessage TransportMessage { get; set; }
public TransportMessage TransportMessage { get; set; } = default!;

public BrokerAddress BrokerAddress { get; set; }

public long? ElapsedTimeMs { get; set; }

public Exception Exception { get; set; }
public Exception? Exception { get; set; }
}

public class CapEventDataSubExecute
{
public long? OperationTimestamp { get; set; }

public string Operation { get; set; }
public string Operation { get; set; } = default!;

public Message Message { get; set; }
public Message Message { get; set; } = default!;

[CanBeNull]
public MethodInfo MethodInfo { get; set; }
public MethodInfo? MethodInfo { get; set; }

public long? ElapsedTimeMs { get; set; }

public Exception Exception { get; set; }
public Exception? Exception { get; set; }
}
}

+ 1
- 0
src/DotNetCore.CAP/DotNetCore.CAP.csproj Zobrazit soubor

@@ -2,6 +2,7 @@
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>
<PropertyGroup>


+ 4
- 5
src/DotNetCore.CAP/ICapPublisher.cs Zobrazit soubor

@@ -5,7 +5,6 @@ using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;

namespace DotNetCore.CAP
{
@@ -28,7 +27,7 @@ namespace DotNetCore.CAP
/// <param name="contentObj">message body content, that will be serialized. (can be null)</param>
/// <param name="callbackName">callback subscriber name</param>
/// <param name="cancellationToken"></param>
Task PublishAsync<T>(string name, [CanBeNull] T contentObj, string callbackName = null, CancellationToken cancellationToken = default);
Task PublishAsync<T>(string name, T? contentObj, string? callbackName = null, CancellationToken cancellationToken = default);

/// <summary>
/// Asynchronous publish an object message with custom headers
@@ -38,7 +37,7 @@ namespace DotNetCore.CAP
/// <param name="contentObj">message body content, that will be serialized. (can be null)</param>
/// <param name="headers">message additional headers.</param>
/// <param name="cancellationToken"></param>
Task PublishAsync<T>(string name, [CanBeNull] T contentObj, IDictionary<string, string> headers, CancellationToken cancellationToken = default);
Task PublishAsync<T>(string name, T? contentObj, IDictionary<string, string?> headers, CancellationToken cancellationToken = default);

/// <summary>
/// Publish an object message.
@@ -46,7 +45,7 @@ namespace DotNetCore.CAP
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized. (can be null)</param>
/// <param name="callbackName">callback subscriber name</param>
void Publish<T>(string name, [CanBeNull] T contentObj, string callbackName = null);
void Publish<T>(string name, T? contentObj, string? callbackName = null);

/// <summary>
/// Publish an object message.
@@ -54,6 +53,6 @@ namespace DotNetCore.CAP
/// <param name="name">the topic name or exchange router key.</param>
/// <param name="contentObj">message body content, that will be serialized. (can be null)</param>
/// <param name="headers">message additional headers.</param>
void Publish<T>(string name, [CanBeNull] T contentObj, IDictionary<string, string> headers);
void Publish<T>(string name, T? contentObj, IDictionary<string, string?> headers);
}
}

+ 1
- 1
src/DotNetCore.CAP/ICapTransaction.Base.cs Zobrazit soubor

@@ -23,7 +23,7 @@ namespace DotNetCore.CAP

public bool AutoCommit { get; set; }

public object DbTransaction { get; set; }
public object? DbTransaction { get; set; }

protected internal virtual void AddToSent(MediumMessage msg)
{


+ 1
- 1
src/DotNetCore.CAP/ICapTransaction.cs Zobrazit soubor

@@ -20,7 +20,7 @@ namespace DotNetCore.CAP
/// <summary>
/// Database transaction object, can be converted to a specific database transaction object or IDBTransaction when used
/// </summary>
object DbTransaction { get; set; }
object? DbTransaction { get; set; }

/// <summary>
/// Submit the transaction context of the CAP, we will send the message to the message queue at the time of submission


+ 3
- 3
src/DotNetCore.CAP/Internal/ConsumerExecutedResult.cs Zobrazit soubor

@@ -5,17 +5,17 @@ namespace DotNetCore.CAP.Internal
{
public class ConsumerExecutedResult
{
public ConsumerExecutedResult(object result, string msgId, string callbackName)
public ConsumerExecutedResult(object? result, string msgId, string? callbackName)
{
Result = result;
MessageId = msgId;
CallbackName = callbackName;
}

public object Result { get; set; }
public object? Result { get; set; }

public string MessageId { get; set; }

public string CallbackName { get; set; }
public string? CallbackName { get; set; }
}
}

+ 12
- 12
src/DotNetCore.CAP/Internal/ConsumerExecutorDescriptor.cs Zobrazit soubor

@@ -12,21 +12,21 @@ namespace DotNetCore.CAP.Internal
/// </summary>
public class ConsumerExecutorDescriptor
{
public TypeInfo ServiceTypeInfo { get; set; }
public TypeInfo? ServiceTypeInfo { get; set; }

public MethodInfo MethodInfo { get; set; }
public MethodInfo MethodInfo { get; set; } = default!;

public TypeInfo ImplTypeInfo { get; set; }
public TypeInfo ImplTypeInfo { get; set; } = default!;

public TopicAttribute Attribute { get; set; }
public TopicAttribute Attribute { get; set; } = default!;

public TopicAttribute ClassAttribute { get; set; }
public TopicAttribute? ClassAttribute { get; set; }

public IList<ParameterDescriptor> Parameters { get; set; }
public IList<ParameterDescriptor> Parameters { get; set; } = new List<ParameterDescriptor>();

public string TopicNamePrefix { get; set; }
public string? TopicNamePrefix { get; set; }

private string _topicName;
private string? _topicName;
/// <summary>
/// Topic name based on both <see cref="Attribute"/> and <see cref="ClassAttribute"/>.
/// </summary>
@@ -58,7 +58,7 @@ namespace DotNetCore.CAP.Internal

public class ConsumerExecutorDescriptorComparer : IEqualityComparer<ConsumerExecutorDescriptor>
{
public bool Equals(ConsumerExecutorDescriptor x, ConsumerExecutorDescriptor y)
public bool Equals(ConsumerExecutorDescriptor? x, ConsumerExecutorDescriptor? y)
{
//Check whether the compared objects reference the same data.
if (ReferenceEquals(x, y))
@@ -77,7 +77,7 @@ namespace DotNetCore.CAP.Internal
x.Attribute.Group.Equals(y.Attribute.Group, StringComparison.OrdinalIgnoreCase);
}

public int GetHashCode(ConsumerExecutorDescriptor obj)
public int GetHashCode(ConsumerExecutorDescriptor? obj)
{
//Check whether the object is null
if (obj is null) return 0;
@@ -95,9 +95,9 @@ namespace DotNetCore.CAP.Internal

public class ParameterDescriptor
{
public string Name { get; set; }
public string Name { get; set; } = default!;

public Type ParameterType { get; set; }
public Type ParameterType { get; set; } = default!;

public bool IsFromCap { get; set; }
}

+ 1
- 1
src/DotNetCore.CAP/Internal/Filter/ExceptionContext.cs Zobrazit soubor

@@ -19,6 +19,6 @@ namespace DotNetCore.CAP.Filter

public bool ExceptionHandled { get; set; }

public object Result { get; set; }
public object? Result { get; set; }
}
}

+ 2
- 2
src/DotNetCore.CAP/Internal/Filter/ExecutedContext.cs Zobrazit soubor

@@ -8,11 +8,11 @@ namespace DotNetCore.CAP.Filter
{
public class ExecutedContext : FilterContext
{
public ExecutedContext(ConsumerContext context, object result) : base(context)
public ExecutedContext(ConsumerContext context, object? result) : base(context)
{
Result = result;
}

public object Result { get; set; }
public object? Result { get; set; }
}
}

+ 2
- 2
src/DotNetCore.CAP/Internal/Filter/ExecutingContext.cs Zobrazit soubor

@@ -8,11 +8,11 @@ namespace DotNetCore.CAP.Filter
{
public class ExecutingContext : FilterContext
{
public ExecutingContext(ConsumerContext context, object[] arguments) : base(context)
public ExecutingContext(ConsumerContext context, object?[] arguments) : base(context)
{
Arguments = arguments;
}

public object[] Arguments { get; set; }
public object?[] Arguments { get; set; }
}
}

+ 5
- 6
src/DotNetCore.CAP/Internal/IBootstrapper.Default.cs Zobrazit soubor

@@ -19,8 +19,8 @@ namespace DotNetCore.CAP.Internal
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<Bootstrapper> _logger;
private IEnumerable<IProcessingServer> _processors;
private CancellationTokenSource _cts = new CancellationTokenSource();
private readonly CancellationTokenSource _cts = new ();
private IEnumerable<IProcessingServer> _processors = default!;

public Bootstrapper(IServiceProvider serviceProvider, ILogger<Bootstrapper> logger)
{
@@ -93,9 +93,8 @@ namespace DotNetCore.CAP.Internal

public override void Dispose()
{
_cts?.Cancel();
_cts?.Dispose();
_cts = null;
_cts.Cancel();
_cts.Dispose();
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
@@ -105,7 +104,7 @@ namespace DotNetCore.CAP.Internal

public override async Task StopAsync(CancellationToken cancellationToken)
{
_cts?.Cancel();
_cts.Cancel();

await base.StopAsync(cancellationToken);
}


+ 7
- 10
src/DotNetCore.CAP/Internal/ICapPublisher.Default.cs Zobrazit soubor

@@ -22,8 +22,7 @@ namespace DotNetCore.CAP.Internal
private readonly CapOptions _capOptions;

// ReSharper disable once InconsistentNaming
protected static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName);
protected static readonly DiagnosticListener s_diagnosticListener = new(CapDiagnosticListenerNames.DiagnosticListenerName);

public CapPublisher(IServiceProvider service)
{
@@ -38,20 +37,20 @@ namespace DotNetCore.CAP.Internal

public AsyncLocal<ICapTransaction> Transaction { get; }

public Task PublishAsync<T>(string name, T value, IDictionary<string, string> headers, CancellationToken cancellationToken = default)
public Task PublishAsync<T>(string name, T? value, IDictionary<string, string?> headers, CancellationToken cancellationToken = default)
{
return Task.Run(() => Publish(name, value, headers), cancellationToken);
}

public Task PublishAsync<T>(string name, T value, string callbackName = null,
public Task PublishAsync<T>(string name, T? value, string? callbackName = null,
CancellationToken cancellationToken = default)
{
return Task.Run(() => Publish(name, value, callbackName), cancellationToken);
}

public void Publish<T>(string name, T value, string callbackName = null)
public void Publish<T>(string name, T? value, string? callbackName = null)
{
var header = new Dictionary<string, string>
var header = new Dictionary<string, string?>
{
{Headers.CallbackName, callbackName}
};
@@ -59,7 +58,7 @@ namespace DotNetCore.CAP.Internal
Publish(name, value, header);
}

public void Publish<T>(string name, T value, IDictionary<string, string> headers)
public void Publish<T>(string name, T? value, IDictionary<string, string?> headers)
{
if (string.IsNullOrEmpty(name))
{
@@ -71,14 +70,12 @@ namespace DotNetCore.CAP.Internal
name = $"{_capOptions.TopicNamePrefix}.{name}";
}

headers ??= new Dictionary<string, string>();

if (!headers.ContainsKey(Headers.MessageId))
{
var messageId = SnowflakeId.Default().NextId().ToString();
headers.Add(Headers.MessageId, messageId);
}
if (!headers.ContainsKey(Headers.CorrelationId))
{
headers.Add(Headers.CorrelationId, headers[Headers.MessageId]);


+ 35
- 21
src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs Zobrazit soubor

@@ -25,15 +25,15 @@ namespace DotNetCore.CAP.Internal
private readonly TimeSpan _pollingDelay = TimeSpan.FromSeconds(1);
private readonly CapOptions _options;

private IConsumerClientFactory _consumerClientFactory;
private IDispatcher _dispatcher;
private ISerializer _serializer;
private IDataStorage _storage;
private IConsumerClientFactory _consumerClientFactory = default!;
private IDispatcher _dispatcher = default!;
private ISerializer _serializer = default!;
private IDataStorage _storage = default!;

private MethodMatcherCache _selector;
private MethodMatcherCache _selector = default!;
private CancellationTokenSource _cts;
private BrokerAddress _serverAddress;
private Task _compositeTask;
private Task? _compositeTask;
private bool _disposed;
private bool _isHealthy = true;

@@ -57,13 +57,13 @@ namespace DotNetCore.CAP.Internal

public void Start(CancellationToken stoppingToken)
{
_selector = _serviceProvider.GetService<MethodMatcherCache>();
_dispatcher = _serviceProvider.GetService<IDispatcher>();
_serializer = _serviceProvider.GetService<ISerializer>();
_storage = _serviceProvider.GetService<IDataStorage>();
_consumerClientFactory = _serviceProvider.GetService<IConsumerClientFactory>();
_selector = _serviceProvider.GetRequiredService<MethodMatcherCache>();
_dispatcher = _serviceProvider.GetRequiredService<IDispatcher>();
_serializer = _serviceProvider.GetRequiredService<ISerializer>();
_storage = _serviceProvider.GetRequiredService<IDataStorage>();
_consumerClientFactory = _serviceProvider.GetRequiredService<IConsumerClientFactory>();

stoppingToken.Register(() => _cts?.Cancel());
stoppingToken.Register(() => _cts.Cancel());

Execute();
}
@@ -131,7 +131,7 @@ namespace DotNetCore.CAP.Internal
if (!IsHealthy() || force)
{
Pulse();
_cts = new CancellationTokenSource();
_isHealthy = true;

@@ -166,9 +166,8 @@ namespace DotNetCore.CAP.Internal

public void Pulse()
{
_cts?.Cancel();
_cts?.Dispose();
_cts = null;
_cts.Cancel();
_cts.Dispose();
}

private void RegisterMessageProcessor(IConsumerClient client)
@@ -184,7 +183,7 @@ namespace DotNetCore.CAP.Internal
tracingTimestamp = TracingBefore(transportMessage, _serverAddress);

var name = transportMessage.GetName();
var group = transportMessage.GetGroup();
var group = transportMessage.GetGroup()!;

Message message;

@@ -201,21 +200,36 @@ namespace DotNetCore.CAP.Internal
throw ex;
}

var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType;
var type = executor!.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType;
message = _serializer.DeserializeAsync(transportMessage, type).GetAwaiter().GetResult();
message.RemoveException();
}
catch (Exception e)
{
transportMessage.Headers[Headers.Exception] = e.GetType().Name + "-->" + e.Message;
string? dataUri;
if (transportMessage.Headers.TryGetValue(Headers.Type, out var val))
{
var dataUri = $"data:{val};base64," + Convert.ToBase64String(transportMessage.Body);
if (transportMessage.Body != null)
{
dataUri = $"data:{val};base64," + Convert.ToBase64String(transportMessage.Body);
}
else
{
dataUri = null;
}
message = new Message(transportMessage.Headers, dataUri);
}
else
{
var dataUri = "data:UnknownType;base64," + Convert.ToBase64String(transportMessage.Body);
if (transportMessage.Body != null)
{
dataUri = "data:UnknownType;base64," + Convert.ToBase64String(transportMessage.Body);
}
else
{
dataUri = null;
}
message = new Message(transportMessage.Headers, dataUri);
}
}
@@ -255,7 +269,7 @@ namespace DotNetCore.CAP.Internal

TracingAfter(tracingTimestamp, transportMessage, _serverAddress);

_dispatcher.EnqueueToExecute(mediumMessage, executor);
_dispatcher.EnqueueToExecute(mediumMessage, executor!);
}
}
catch (Exception e)


+ 10
- 10
src/DotNetCore.CAP/Internal/IConsumerServiceSelector.Default.cs Zobrazit soubor

@@ -33,7 +33,7 @@ namespace DotNetCore.CAP.Internal
public ConsumerServiceSelector(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
_capOptions = serviceProvider.GetService<IOptions<CapOptions>>().Value;
_capOptions = serviceProvider.GetRequiredService<IOptions<CapOptions>>().Value;

_cacheList = new ConcurrentDictionary<string, List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>>>();
}
@@ -51,7 +51,7 @@ namespace DotNetCore.CAP.Internal
return executorDescriptorList;
}

public ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
public ConsumerExecutorDescriptor? SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
{
if (executeDescriptor.Count == 0)
{
@@ -107,7 +107,7 @@ namespace DotNetCore.CAP.Internal
{
var executorDescriptorList = new List<ConsumerExecutorDescriptor>();

var types = Assembly.GetEntryAssembly().ExportedTypes;
var types = Assembly.GetEntryAssembly()!.ExportedTypes;
foreach (var type in types)
{
var typeInfo = type.GetTypeInfo();
@@ -120,7 +120,7 @@ namespace DotNetCore.CAP.Internal
return executorDescriptorList;
}

protected IEnumerable<ConsumerExecutorDescriptor> GetTopicAttributesDescription(TypeInfo typeInfo, TypeInfo serviceTypeInfo = null)
protected IEnumerable<ConsumerExecutorDescriptor> GetTopicAttributesDescription(TypeInfo typeInfo, TypeInfo? serviceTypeInfo = null)
{
var topicClassAttribute = typeInfo.GetCustomAttribute<TopicAttribute>(true);

@@ -169,9 +169,9 @@ namespace DotNetCore.CAP.Internal
TopicAttribute attr,
MethodInfo methodInfo,
TypeInfo implType,
TypeInfo serviceTypeInfo,
TypeInfo? serviceTypeInfo,
IList<ParameterDescriptor> parameters,
TopicAttribute classAttr = null)
TopicAttribute? classAttr = null)
{
var descriptor = new ConsumerExecutorDescriptor
{
@@ -187,7 +187,7 @@ namespace DotNetCore.CAP.Internal
return descriptor;
}

private ConsumerExecutorDescriptor MatchUsingName(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
private ConsumerExecutorDescriptor? MatchUsingName(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
{
if (key == null)
{
@@ -197,7 +197,7 @@ namespace DotNetCore.CAP.Internal
return executeDescriptor.FirstOrDefault(x => x.TopicName.Equals(key, StringComparison.InvariantCultureIgnoreCase));
}

private ConsumerExecutorDescriptor MatchWildcardUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
private ConsumerExecutorDescriptor? MatchWildcardUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
{
var group = executeDescriptor.First().Attribute.Group;
if (!_cacheList.TryGetValue(group, out var tmpList))
@@ -223,9 +223,9 @@ namespace DotNetCore.CAP.Internal

private class RegexExecuteDescriptor<T>
{
public string Name { get; set; }
public string Name { get; set; } = default!;

public T Descriptor { get; set; }
public T Descriptor { get; set; } = default!;
}
}
}

+ 1
- 1
src/DotNetCore.CAP/Internal/IConsumerServiceSelector.cs Zobrazit soubor

@@ -22,6 +22,6 @@ namespace DotNetCore.CAP.Internal
/// </summary>
/// <param name="key">topic or exchange router key.</param>
/// <param name="candidates">the set of <see cref="ConsumerExecutorDescriptor" /> candidates.</param>
ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> candidates);
ConsumerExecutorDescriptor? SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> candidates);
}
}

+ 7
- 8
src/DotNetCore.CAP/Internal/IMessageSender.Default.cs Zobrazit soubor

@@ -26,8 +26,7 @@ namespace DotNetCore.CAP.Internal
private readonly IOptions<CapOptions> _options;

// ReSharper disable once InconsistentNaming
protected static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName);
protected static readonly DiagnosticListener s_diagnosticListener = new(CapDiagnosticListenerNames.DiagnosticListenerName);

public MessageSender(
ILogger<MessageSender> logger,
@@ -36,10 +35,10 @@ namespace DotNetCore.CAP.Internal
_logger = logger;
_serviceProvider = serviceProvider;

_options = serviceProvider.GetService<IOptions<CapOptions>>();
_dataStorage = serviceProvider.GetService<IDataStorage>();
_serializer = serviceProvider.GetService<ISerializer>();
_transport = serviceProvider.GetService<ITransport>();
_options = serviceProvider.GetRequiredService<IOptions<CapOptions>>();
_dataStorage = serviceProvider.GetRequiredService<IDataStorage>();
_serializer = serviceProvider.GetRequiredService<ISerializer>();
_transport = serviceProvider.GetRequiredService<ITransport>();
}

public async Task<OperateResult> SendAsync(MediumMessage message)
@@ -80,9 +79,9 @@ namespace DotNetCore.CAP.Internal
{
TracingError(tracingTimestamp, transportMsg, _transport.BrokerAddress, result);

var needRetry = await SetFailedState(message, result.Exception);
var needRetry = await SetFailedState(message, result.Exception!);

return (needRetry, OperateResult.Failed(result.Exception));
return (needRetry, OperateResult.Failed(result.Exception!));
}
}



+ 1
- 2
src/DotNetCore.CAP/Internal/IMessageSender.cs Zobrazit soubor

@@ -3,12 +3,11 @@

using System.Threading.Tasks;
using DotNetCore.CAP.Persistence;
using JetBrains.Annotations;

namespace DotNetCore.CAP.Internal
{
public interface IMessageSender
{
Task<OperateResult> SendAsync([NotNull] MediumMessage message);
Task<OperateResult> SendAsync(MediumMessage message);
}
}

+ 6
- 7
src/DotNetCore.CAP/Internal/ISubscribeDispatcher.Default.cs Zobrazit soubor

@@ -25,8 +25,7 @@ namespace DotNetCore.CAP.Internal

// diagnostics listener
// ReSharper disable once InconsistentNaming
private static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName);
private static readonly DiagnosticListener s_diagnosticListener = new (CapDiagnosticListenerNames.DiagnosticListenerName);

public SubscribeDispatcher(
ILogger<SubscribeDispatcher> logger,
@@ -37,8 +36,8 @@ namespace DotNetCore.CAP.Internal
_logger = logger;
_options = options.Value;

_dataStorage = _provider.GetService<IDataStorage>();
Invoker = _provider.GetService<ISubscribeInvoker>();
_dataStorage = _provider.GetRequiredService<IDataStorage>();
Invoker = _provider.GetRequiredService<ISubscribeInvoker>();
}

private ISubscribeInvoker Invoker { get; }
@@ -46,7 +45,7 @@ namespace DotNetCore.CAP.Internal
public Task<OperateResult> DispatchAsync(MediumMessage message, CancellationToken cancellationToken)
{
var selector = _provider.GetRequiredService<MethodMatcherCache>();
if (!selector.TryGetTopicExecutor(message.Origin.GetName(), message.Origin.GetGroup(), out var executor))
if (!selector.TryGetTopicExecutor(message.Origin.GetName(), message.Origin.GetGroup()!, out var executor))
{
var error = $"Message (Name:{message.Origin.GetName()},Group:{message.Origin.GetGroup()}) can not be found subscriber." +
$"{Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63";
@@ -186,7 +185,7 @@ namespace DotNetCore.CAP.Internal

if (!string.IsNullOrEmpty(ret.CallbackName))
{
var header = new Dictionary<string, string>()
var header = new Dictionary<string, string?>()
{
[Headers.CorrelationId] = message.Origin.GetId(),
[Headers.CorrelationSequence] = (message.Origin.GetCorrelationSequence() + 1).ToString()
@@ -249,7 +248,7 @@ namespace DotNetCore.CAP.Internal
}
}

private void TracingError(long? tracingTimestamp, Message message, MethodInfo method, Exception ex)
private void TracingError(long? tracingTimestamp, Message message, MethodInfo? method, Exception ex)
{
if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.ErrorSubscriberInvoke))
{


+ 8
- 11
src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs Zobrazit soubor

@@ -12,22 +12,19 @@ using DotNetCore.CAP.Messages;
using DotNetCore.CAP.Serialization;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Internal;
using Microsoft.Extensions.Logging;

namespace DotNetCore.CAP.Internal
{
public class SubscribeInvoker : ISubscribeInvoker
{
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
private readonly ISerializer _serializer;
private readonly ConcurrentDictionary<string, ObjectMethodExecutor> _executors;

public SubscribeInvoker(ILoggerFactory loggerFactory, IServiceProvider serviceProvider, ISerializer serializer)
public SubscribeInvoker(IServiceProvider serviceProvider, ISerializer serializer)
{
_serviceProvider = serviceProvider;
_serializer = serializer;
_logger = loggerFactory.CreateLogger<SubscribeInvoker>();
_executors = new ConcurrentDictionary<string, ObjectMethodExecutor>();
}

@@ -36,11 +33,11 @@ namespace DotNetCore.CAP.Internal
cancellationToken.ThrowIfCancellationRequested();

var methodInfo = context.ConsumerDescriptor.MethodInfo;
var reflectedTypeHandle = methodInfo.ReflectedType.TypeHandle.Value;
var reflectedTypeHandle = methodInfo.ReflectedType!.TypeHandle.Value;
var methodHandle = methodInfo.MethodHandle.Value;
var key = $"{reflectedTypeHandle}_{methodHandle}";

var executor = _executors.GetOrAdd(key, x => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo));
var executor = _executors.GetOrAdd(key, _ => ObjectMethodExecutor.Create(methodInfo, context.ConsumerDescriptor.ImplTypeInfo));

using var scope = _serviceProvider.CreateScope();

@@ -50,7 +47,7 @@ namespace DotNetCore.CAP.Internal

var message = context.DeliverMessage;
var parameterDescriptors = context.ConsumerDescriptor.Parameters;
var executeParameters = new object[parameterDescriptors.Count];
var executeParameters = new object?[parameterDescriptors.Count];
for (var i = 0; i < parameterDescriptors.Count; i++)
{
var parameterDescriptor = parameterDescriptors[i];
@@ -90,7 +87,7 @@ namespace DotNetCore.CAP.Internal
}

var filter = provider.GetService<ISubscribeFilter>();
object resultObj = null;
object? resultObj = null;
try
{
if (filter != null)
@@ -155,10 +152,10 @@ namespace DotNetCore.CAP.Internal
var srvType = context.ConsumerDescriptor.ServiceTypeInfo?.AsType();
var implType = context.ConsumerDescriptor.ImplTypeInfo.AsType();

object obj = null;
object? obj = null;
if (srvType != null)
{
obj = provider.GetServices(srvType).FirstOrDefault(o => o.GetType() == implType);
obj = provider.GetServices(srvType).FirstOrDefault(o => o?.GetType() == implType);
}

if (obj == null)
@@ -169,7 +166,7 @@ namespace DotNetCore.CAP.Internal
return obj;
}

private async Task<object> ExecuteWithParameterAsync(ObjectMethodExecutor executor, object @class, object[] parameter)
private async Task<object?> ExecuteWithParameterAsync(ObjectMethodExecutor executor, object @class, object?[] parameter)
{
if (executor.IsMethodAsync)
{


+ 1
- 2
src/DotNetCore.CAP/Internal/ISubscribeInvoker.cs Zobrazit soubor

@@ -3,7 +3,6 @@

using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;

namespace DotNetCore.CAP.Internal
{
@@ -17,6 +16,6 @@ namespace DotNetCore.CAP.Internal
/// </summary>
/// <param name="context">consumer execute context</param>
/// <param name="cancellationToken">The object of <see cref="CancellationToken"/>.</param>
Task<ConsumerExecutedResult> InvokeAsync([NotNull] ConsumerContext context, CancellationToken cancellationToken = default);
Task<ConsumerExecutedResult> InvokeAsync(ConsumerContext context, CancellationToken cancellationToken = default);
}
}

+ 1
- 1
src/DotNetCore.CAP/Internal/LoggerExtensions.cs Zobrazit soubor

@@ -40,7 +40,7 @@ namespace DotNetCore.CAP.Internal
logger.LogDebug($"Received message. id:{messageId}, name: {name}");
}

public static void MessagePublishException(this ILogger logger, string messageId, string reason, Exception ex)
public static void MessagePublishException(this ILogger logger, string? messageId, string reason, Exception? ex)
{
logger.LogError(ex, $"An exception occured while publishing a message, reason:{reason}. message id:{messageId}");
}


+ 2
- 2
src/DotNetCore.CAP/Internal/MethodMatcherCache.cs Zobrazit soubor

@@ -4,6 +4,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;

namespace DotNetCore.CAP.Internal
@@ -66,8 +67,7 @@ namespace DotNetCore.CAP.Internal
/// <param name="groupName">The group name of the value to get.</param>
/// <param name="matchTopic">topic executor of the value.</param>
/// <returns>true if the key was found, otherwise false. </returns>
public bool TryGetTopicExecutor(string topicName, string groupName,
out ConsumerExecutorDescriptor matchTopic)
public bool TryGetTopicExecutor(string topicName, string groupName, [NotNullWhen(true)] out ConsumerExecutorDescriptor? matchTopic)
{
if (Entries == null)
{


+ 99
- 105
src/DotNetCore.CAP/Internal/ObjectMethodExecutor/AwaitableInfo.cs Zobrazit soubor

@@ -1,128 +1,122 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

#nullable disable

using System;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.Internal
namespace Microsoft.Extensions.Internal;

internal readonly struct AwaitableInfo
{
internal readonly struct AwaitableInfo
public Type AwaiterType { get; }
public PropertyInfo AwaiterIsCompletedProperty { get; }
public MethodInfo AwaiterGetResultMethod { get; }
public MethodInfo AwaiterOnCompletedMethod { get; }
public MethodInfo AwaiterUnsafeOnCompletedMethod { get; }
public Type ResultType { get; }
public MethodInfo GetAwaiterMethod { get; }

public AwaitableInfo(
Type awaiterType,
PropertyInfo awaiterIsCompletedProperty,
MethodInfo awaiterGetResultMethod,
MethodInfo awaiterOnCompletedMethod,
MethodInfo awaiterUnsafeOnCompletedMethod,
Type resultType,
MethodInfo getAwaiterMethod)
{
public Type AwaiterType { get; }
public PropertyInfo AwaiterIsCompletedProperty { get; }
public MethodInfo AwaiterGetResultMethod { get; }
public MethodInfo AwaiterOnCompletedMethod { get; }
public MethodInfo AwaiterUnsafeOnCompletedMethod { get; }
public Type ResultType { get; }
public MethodInfo GetAwaiterMethod { get; }
AwaiterType = awaiterType;
AwaiterIsCompletedProperty = awaiterIsCompletedProperty;
AwaiterGetResultMethod = awaiterGetResultMethod;
AwaiterOnCompletedMethod = awaiterOnCompletedMethod;
AwaiterUnsafeOnCompletedMethod = awaiterUnsafeOnCompletedMethod;
ResultType = resultType;
GetAwaiterMethod = getAwaiterMethod;
}

public AwaitableInfo(
Type awaiterType,
PropertyInfo awaiterIsCompletedProperty,
MethodInfo awaiterGetResultMethod,
MethodInfo awaiterOnCompletedMethod,
MethodInfo awaiterUnsafeOnCompletedMethod,
Type resultType,
MethodInfo getAwaiterMethod)
{
AwaiterType = awaiterType;
AwaiterIsCompletedProperty = awaiterIsCompletedProperty;
AwaiterGetResultMethod = awaiterGetResultMethod;
AwaiterOnCompletedMethod = awaiterOnCompletedMethod;
AwaiterUnsafeOnCompletedMethod = awaiterUnsafeOnCompletedMethod;
ResultType = resultType;
GetAwaiterMethod = getAwaiterMethod;
}
public static bool IsTypeAwaitable(Type type, out AwaitableInfo awaitableInfo)
{
// Based on Roslyn code: http://source.roslyn.io/#Microsoft.CodeAnalysis.Workspaces/Shared/Extensions/ISymbolExtensions.cs,db4d48ba694b9347

public static bool IsTypeAwaitable(Type type, out AwaitableInfo awaitableInfo)
// Awaitable must have method matching "object GetAwaiter()"
var getAwaiterMethod = type.GetRuntimeMethods().FirstOrDefault(m =>
m.Name.Equals("GetAwaiter", StringComparison.OrdinalIgnoreCase)
&& m.GetParameters().Length == 0
&& m.ReturnType != null);
if (getAwaiterMethod == null)
{
// Based on Roslyn code: http://source.roslyn.io/#Microsoft.CodeAnalysis.Workspaces/Shared/Extensions/ISymbolExtensions.cs,db4d48ba694b9347
awaitableInfo = default(AwaitableInfo);
return false;
}

// Awaitable must have method matching "object GetAwaiter()"
var getAwaiterMethod = type.GetRuntimeMethods().FirstOrDefault(m =>
m.Name.Equals("GetAwaiter", StringComparison.OrdinalIgnoreCase)
&& m.GetParameters().Length == 0);
if (getAwaiterMethod == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}
var awaiterType = getAwaiterMethod.ReturnType;

var awaiterType = getAwaiterMethod.ReturnType;
// Awaiter must have property matching "bool IsCompleted { get; }"
var isCompletedProperty = awaiterType.GetRuntimeProperties().FirstOrDefault(p =>
p.Name.Equals("IsCompleted", StringComparison.OrdinalIgnoreCase)
&& p.PropertyType == typeof(bool)
&& p.GetMethod != null);
if (isCompletedProperty == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}

// Awaiter must have property matching "bool IsCompleted { get; }"
var isCompletedProperty = awaiterType.GetRuntimeProperties().FirstOrDefault(p =>
p.Name.Equals("IsCompleted", StringComparison.OrdinalIgnoreCase)
&& p.PropertyType == typeof(bool)
&& p.GetMethod != null);
if (isCompletedProperty == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}
// Awaiter must implement INotifyCompletion
var awaiterInterfaces = awaiterType.GetInterfaces();
var implementsINotifyCompletion = awaiterInterfaces.Any(t => t == typeof(INotifyCompletion));
if (!implementsINotifyCompletion)
{
awaitableInfo = default(AwaitableInfo);
return false;
}

// Awaiter must implement INotifyCompletion
var awaiterInterfaces = awaiterType.GetInterfaces();
var implementsINotifyCompletion = awaiterInterfaces.Any(t => t == typeof(INotifyCompletion));
if (!implementsINotifyCompletion)
{
awaitableInfo = default(AwaitableInfo);
return false;
}
// INotifyCompletion supplies a method matching "void OnCompleted(Action action)"
var onCompletedMethod = typeof(INotifyCompletion).GetRuntimeMethods().Single(m =>
m.Name.Equals("OnCompleted", StringComparison.OrdinalIgnoreCase)
&& m.ReturnType == typeof(void)
&& m.GetParameters().Length == 1
&& m.GetParameters()[0].ParameterType == typeof(Action));

// INotifyCompletion supplies a method matching "void OnCompleted(Action action)"
var iNotifyCompletionMap = awaiterType
.GetTypeInfo()
.GetRuntimeInterfaceMap(typeof(INotifyCompletion));
var onCompletedMethod = iNotifyCompletionMap.InterfaceMethods.Single(m =>
m.Name.Equals("OnCompleted", StringComparison.OrdinalIgnoreCase)
// Awaiter optionally implements ICriticalNotifyCompletion
var implementsICriticalNotifyCompletion = awaiterInterfaces.Any(t => t == typeof(ICriticalNotifyCompletion));
MethodInfo unsafeOnCompletedMethod;
if (implementsICriticalNotifyCompletion)
{
// ICriticalNotifyCompletion supplies a method matching "void UnsafeOnCompleted(Action action)"
unsafeOnCompletedMethod = typeof(ICriticalNotifyCompletion).GetRuntimeMethods().Single(m =>
m.Name.Equals("UnsafeOnCompleted", StringComparison.OrdinalIgnoreCase)
&& m.ReturnType == typeof(void)
&& m.GetParameters().Length == 1
&& m.GetParameters()[0].ParameterType == typeof(Action));
}
else
{
unsafeOnCompletedMethod = null;
}

// Awaiter optionally implements ICriticalNotifyCompletion
var implementsICriticalNotifyCompletion =
awaiterInterfaces.Any(t => t == typeof(ICriticalNotifyCompletion));
MethodInfo unsafeOnCompletedMethod;
if (implementsICriticalNotifyCompletion)
{
// ICriticalNotifyCompletion supplies a method matching "void UnsafeOnCompleted(Action action)"
var iCriticalNotifyCompletionMap = awaiterType
.GetTypeInfo()
.GetRuntimeInterfaceMap(typeof(ICriticalNotifyCompletion));
unsafeOnCompletedMethod = iCriticalNotifyCompletionMap.InterfaceMethods.Single(m =>
m.Name.Equals("UnsafeOnCompleted", StringComparison.OrdinalIgnoreCase)
&& m.ReturnType == typeof(void)
&& m.GetParameters().Length == 1
&& m.GetParameters()[0].ParameterType == typeof(Action));
}
else
{
unsafeOnCompletedMethod = null;
}

// Awaiter must have method matching "void GetResult" or "T GetResult()"
var getResultMethod = awaiterType.GetRuntimeMethods().FirstOrDefault(m =>
m.Name.Equals("GetResult")
&& m.GetParameters().Length == 0);
if (getResultMethod == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}

awaitableInfo = new AwaitableInfo(
awaiterType,
isCompletedProperty,
getResultMethod,
onCompletedMethod,
unsafeOnCompletedMethod,
getResultMethod.ReturnType,
getAwaiterMethod);
return true;
// Awaiter must have method matching "void GetResult" or "T GetResult()"
var getResultMethod = awaiterType.GetRuntimeMethods().FirstOrDefault(m =>
m.Name.Equals("GetResult")
&& m.GetParameters().Length == 0);
if (getResultMethod == null)
{
awaitableInfo = default(AwaitableInfo);
return false;
}

awaitableInfo = new AwaitableInfo(
awaiterType,
isCompletedProperty,
getResultMethod,
onCompletedMethod,
unsafeOnCompletedMethod,
getResultMethod.ReturnType,
getAwaiterMethod);
return true;
}
}

+ 38
- 39
src/DotNetCore.CAP/Internal/ObjectMethodExecutor/CoercedAwaitableInfo.cs Zobrazit soubor

@@ -1,57 +1,56 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

#nullable disable

using System;
using System.Linq.Expressions;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.Internal
namespace Microsoft.Extensions.Internal;

internal readonly struct CoercedAwaitableInfo
{
internal readonly struct CoercedAwaitableInfo
public AwaitableInfo AwaitableInfo { get; }
public Expression CoercerExpression { get; }
public Type CoercerResultType { get; }
public bool RequiresCoercion => CoercerExpression != null;

public CoercedAwaitableInfo(AwaitableInfo awaitableInfo)
{
public AwaitableInfo AwaitableInfo { get; }
public Expression CoercerExpression { get; }
public Type CoercerResultType { get; }
public bool RequiresCoercion => CoercerExpression != null;
AwaitableInfo = awaitableInfo;
CoercerExpression = null;
CoercerResultType = null;
}

public CoercedAwaitableInfo(AwaitableInfo awaitableInfo)
{
AwaitableInfo = awaitableInfo;
CoercerExpression = null;
CoercerResultType = null;
}
public CoercedAwaitableInfo(Expression coercerExpression, Type coercerResultType, AwaitableInfo coercedAwaitableInfo)
{
CoercerExpression = coercerExpression;
CoercerResultType = coercerResultType;
AwaitableInfo = coercedAwaitableInfo;
}

public CoercedAwaitableInfo(Expression coercerExpression, Type coercerResultType,
AwaitableInfo coercedAwaitableInfo)
public static bool IsTypeAwaitable(Type type, out CoercedAwaitableInfo info)
{
if (AwaitableInfo.IsTypeAwaitable(type, out var directlyAwaitableInfo))
{
CoercerExpression = coercerExpression;
CoercerResultType = coercerResultType;
AwaitableInfo = coercedAwaitableInfo;
info = new CoercedAwaitableInfo(directlyAwaitableInfo);
return true;
}

public static bool IsTypeAwaitable(Type type, out CoercedAwaitableInfo info)
{
if (AwaitableInfo.IsTypeAwaitable(type, out var directlyAwaitableInfo))
{
info = new CoercedAwaitableInfo(directlyAwaitableInfo);
return true;
}

// It's not directly awaitable, but maybe we can coerce it.
// Currently we support coercing FSharpAsync<T>.
if (ObjectMethodExecutorFSharpSupport.TryBuildCoercerFromFSharpAsyncToAwaitable(type,
// It's not directly awaitable, but maybe we can coerce it.
// Currently we support coercing FSharpAsync<T>.
if (ObjectMethodExecutorFSharpSupport.TryBuildCoercerFromFSharpAsyncToAwaitable(type,
out var coercerExpression,
out var coercerResultType))
{
if (AwaitableInfo.IsTypeAwaitable(coercerResultType, out var coercedAwaitableInfo))
{
if (AwaitableInfo.IsTypeAwaitable(coercerResultType, out var coercedAwaitableInfo))
{
info = new CoercedAwaitableInfo(coercerExpression, coercerResultType, coercedAwaitableInfo);
return true;
}
info = new CoercedAwaitableInfo(coercerExpression, coercerResultType, coercedAwaitableInfo);
return true;
}

info = default(CoercedAwaitableInfo);
return false;
}

info = default(CoercedAwaitableInfo);
return false;
}
}

+ 291
- 285
src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutor.cs Zobrazit soubor

@@ -1,338 +1,344 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

#nullable enable

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq.Expressions;
using System.Reflection;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.Internal
namespace Microsoft.Extensions.Internal;

internal class ObjectMethodExecutor
{
internal class ObjectMethodExecutor
{
// ReSharper disable once InconsistentNaming
private static readonly ConstructorInfo _objectMethodExecutorAwaitableConstructor =
typeof(ObjectMethodExecutorAwaitable).GetConstructor(new[]
{
typeof(object), // customAwaitable
typeof(Func<object, object>), // getAwaiterMethod
typeof(Func<object, bool>), // isCompletedMethod
typeof(Func<object, object>), // getResultMethod
private readonly object?[]? _parameterDefaultValues;
private readonly MethodExecutorAsync? _executorAsync;
private readonly MethodExecutor? _executor;
private static readonly ConstructorInfo _objectMethodExecutorAwaitableConstructor =
typeof(ObjectMethodExecutorAwaitable).GetConstructor(new[] {
typeof(object), // customAwaitable
typeof(Func<object, object>), // getAwaiterMethod
typeof(Func<object, bool>), // isCompletedMethod
typeof(Func<object, object>), // getResultMethod
typeof(Action<object, Action>), // onCompletedMethod
typeof(Action<object, Action>) // unsafeOnCompletedMethod
});

private readonly MethodExecutor _executor;
private readonly MethodExecutorAsync _executorAsync;
private readonly object[] _parameterDefaultValues;
typeof(Action<object, Action>) // unsafeOnCompletedMethod
})!;

private ObjectMethodExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo, object[] parameterDefaultValues)
private ObjectMethodExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo, object?[]? parameterDefaultValues)
{
if (methodInfo == null)
{
if (methodInfo == null)
{
throw new ArgumentNullException(nameof(methodInfo));
}

MethodInfo = methodInfo;
MethodParameters = methodInfo.GetParameters();
TargetTypeInfo = targetTypeInfo;
MethodReturnType = methodInfo.ReturnType;
throw new ArgumentNullException(nameof(methodInfo));
}

var isAwaitable = CoercedAwaitableInfo.IsTypeAwaitable(MethodReturnType, out var coercedAwaitableInfo);
MethodInfo = methodInfo;
MethodParameters = methodInfo.GetParameters();
TargetTypeInfo = targetTypeInfo;
MethodReturnType = methodInfo.ReturnType;

IsMethodAsync = isAwaitable;
AsyncResultType = isAwaitable ? coercedAwaitableInfo.AwaitableInfo.ResultType : null;
var isAwaitable = CoercedAwaitableInfo.IsTypeAwaitable(MethodReturnType, out var coercedAwaitableInfo);

// Upstream code may prefer to use the sync-executor even for async methods, because if it knows
// that the result is a specific Task<T> where T is known, then it can directly cast to that type
// and await it without the extra heap allocations involved in the _executorAsync code path.
_executor = GetExecutor(methodInfo, targetTypeInfo);
IsMethodAsync = isAwaitable;
AsyncResultType = isAwaitable ? coercedAwaitableInfo.AwaitableInfo.ResultType : null;

if (IsMethodAsync)
{
_executorAsync = GetExecutorAsync(methodInfo, targetTypeInfo, coercedAwaitableInfo);
}
// Upstream code may prefer to use the sync-executor even for async methods, because if it knows
// that the result is a specific Task<T> where T is known, then it can directly cast to that type
// and await it without the extra heap allocations involved in the _executorAsync code path.
_executor = GetExecutor(methodInfo, targetTypeInfo);

_parameterDefaultValues = parameterDefaultValues;
if (IsMethodAsync)
{
_executorAsync = GetExecutorAsync(methodInfo, targetTypeInfo, coercedAwaitableInfo);
}

public MethodInfo MethodInfo { get; }
_parameterDefaultValues = parameterDefaultValues;
}

private delegate ObjectMethodExecutorAwaitable MethodExecutorAsync(object target, object?[]? parameters);

private delegate object? MethodExecutor(object target, object?[]? parameters);

public ParameterInfo[] MethodParameters { get; }
private delegate void VoidMethodExecutor(object target, object?[]? parameters);

public TypeInfo TargetTypeInfo { get; }
public MethodInfo MethodInfo { get; }

public Type AsyncResultType { get; }
public ParameterInfo[] MethodParameters { get; }

// This field is made internal set because it is set in unit tests.
public Type MethodReturnType { get; internal set; }
public TypeInfo TargetTypeInfo { get; }

public bool IsMethodAsync { get; }
public Type? AsyncResultType { get; }

public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo)
// This field is made internal set because it is set in unit tests.
public Type MethodReturnType { get; internal set; }

public bool IsMethodAsync { get; }

public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo)
{
return new ObjectMethodExecutor(methodInfo, targetTypeInfo, null);
}

public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo, object?[] parameterDefaultValues)
{
if (parameterDefaultValues == null)
{
return new ObjectMethodExecutor(methodInfo, targetTypeInfo, null);
throw new ArgumentNullException(nameof(parameterDefaultValues));
}

public static ObjectMethodExecutor Create(MethodInfo methodInfo, TypeInfo targetTypeInfo,
object[] parameterDefaultValues)
{
if (parameterDefaultValues == null)
{
throw new ArgumentNullException(nameof(parameterDefaultValues));
}
return new ObjectMethodExecutor(methodInfo, targetTypeInfo, parameterDefaultValues);
}

return new ObjectMethodExecutor(methodInfo, targetTypeInfo, parameterDefaultValues);
}
/// <summary>
/// Executes the configured method on <paramref name="target"/>. This can be used whether or not
/// the configured method is asynchronous.
/// </summary>
/// <remarks>
/// Even if the target method is asynchronous, it's desirable to invoke it using Execute rather than
/// ExecuteAsync if you know at compile time what the return type is, because then you can directly
/// "await" that value (via a cast), and then the generated code will be able to reference the
/// resulting awaitable as a value-typed variable. If you use ExecuteAsync instead, the generated
/// code will have to treat the resulting awaitable as a boxed object, because it doesn't know at
/// compile time what type it would be.
/// </remarks>
/// <param name="target">The object whose method is to be executed.</param>
/// <param name="parameters">Parameters to pass to the method.</param>
/// <returns>The method return value.</returns>
public object? Execute(object target, object?[]? parameters)
{
Debug.Assert(_executor != null, "Sync execution is not supported.");
return _executor(target, parameters);
}

/// <summary>
/// Executes the configured method on <paramref name="target"/>. This can only be used if the configured
/// method is asynchronous.
/// </summary>
/// <remarks>
/// If you don't know at compile time the type of the method's returned awaitable, you can use ExecuteAsync,
/// which supplies an awaitable-of-object. This always works, but can incur several extra heap allocations
/// as compared with using Execute and then using "await" on the result value typecasted to the known
/// awaitable type. The possible extra heap allocations are for:
///
/// 1. The custom awaitable (though usually there's a heap allocation for this anyway, since normally
/// it's a reference type, and you normally create a new instance per call).
/// 2. The custom awaiter (whether or not it's a value type, since if it's not, you need a new instance
/// of it, and if it is, it will have to be boxed so the calling code can reference it as an object).
/// 3. The async result value, if it's a value type (it has to be boxed as an object, since the calling
/// code doesn't know what type it's going to be).
/// </remarks>
/// <param name="target">The object whose method is to be executed.</param>
/// <param name="parameters">Parameters to pass to the method.</param>
/// <returns>An object that you can "await" to get the method return value.</returns>
public ObjectMethodExecutorAwaitable ExecuteAsync(object target, object?[]? parameters)
{
Debug.Assert(_executorAsync != null, "Async execution is not supported.");
return _executorAsync(target, parameters);
}

/// <summary>
/// Executes the configured method on <paramref name="target" />. This can be used whether or not
/// the configured method is asynchronous.
/// </summary>
/// <remarks>
/// Even if the target method is asynchronous, it's desirable to invoke it using Execute rather than
/// ExecuteAsync if you know at compile time what the return type is, because then you can directly
/// "await" that value (via a cast), and then the generated code will be able to reference the
/// resulting awaitable as a value-typed variable. If you use ExecuteAsync instead, the generated
/// code will have to treat the resulting awaitable as a boxed object, because it doesn't know at
/// compile time what type it would be.
/// </remarks>
/// <param name="target">The object whose method is to be executed.</param>
/// <param name="parameters">Parameters to pass to the method.</param>
/// <returns>The method return value.</returns>
public object Execute(object target, params object[] parameters)
public object? GetDefaultValueForParameter(int index)
{
if (_parameterDefaultValues == null)
{
return _executor(target, parameters);
throw new InvalidOperationException($"Cannot call {nameof(GetDefaultValueForParameter)}, because no parameter default values were supplied.");
}

/// <summary>
/// Executes the configured method on <paramref name="target" />. This can only be used if the configured
/// method is asynchronous.
/// </summary>
/// <remarks>
/// If you don't know at compile time the type of the method's returned awaitable, you can use ExecuteAsync,
/// which supplies an awaitable-of-object. This always works, but can incur several extra heap allocations
/// as compared with using Execute and then using "await" on the result value typecasted to the known
/// awaitable type. The possible extra heap allocations are for:
/// 1. The custom awaitable (though usually there's a heap allocation for this anyway, since normally
/// it's a reference type, and you normally create a new instance per call).
/// 2. The custom awaiter (whether or not it's a value type, since if it's not, you need a new instance
/// of it, and if it is, it will have to be boxed so the calling code can reference it as an object).
/// 3. The async result value, if it's a value type (it has to be boxed as an object, since the calling
/// code doesn't know what type it's going to be).
/// </remarks>
/// <param name="target">The object whose method is to be executed.</param>
/// <param name="parameters">Parameters to pass to the method.</param>
/// <returns>An object that you can "await" to get the method return value.</returns>
public ObjectMethodExecutorAwaitable ExecuteAsync(object target, params object[] parameters)
if (index < 0 || index > MethodParameters.Length - 1)
{
return _executorAsync(target, parameters);
throw new ArgumentOutOfRangeException(nameof(index));
}

public object GetDefaultValueForParameter(int index)
return _parameterDefaultValues[index];
}

private static MethodExecutor GetExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo)
{
// Parameters to executor
var targetParameter = Expression.Parameter(typeof(object), "target");
var parametersParameter = Expression.Parameter(typeof(object?[]), "parameters");

// Build parameter list
var paramInfos = methodInfo.GetParameters();
var parameters = new List<Expression>(paramInfos.Length);
for (int i = 0; i < paramInfos.Length; i++)
{
if (_parameterDefaultValues == null)
{
throw new InvalidOperationException(
$"Cannot call {nameof(GetDefaultValueForParameter)}, because no parameter default values were supplied.");
}

if (index < 0 || index > MethodParameters.Length - 1)
{
throw new ArgumentOutOfRangeException(nameof(index));
}

return _parameterDefaultValues[index];
var paramInfo = paramInfos[i];
var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i));
var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType);

// valueCast is "(Ti) parameters[i]"
parameters.Add(valueCast);
}

private static MethodExecutor GetExecutor(MethodInfo methodInfo, TypeInfo targetTypeInfo)
// Call method
var instanceCast = Expression.Convert(targetParameter, targetTypeInfo.AsType());
var methodCall = Expression.Call(instanceCast, methodInfo, parameters);

// methodCall is "((Ttarget) target) method((T0) parameters[0], (T1) parameters[1], ...)"
// Create function
if (methodCall.Type == typeof(void))
{
// Parameters to executor
var targetParameter = Expression.Parameter(typeof(object), "target");
var parametersParameter = Expression.Parameter(typeof(object[]), "parameters");

// Build parameter list
var parameters = new List<Expression>();
var paramInfos = methodInfo.GetParameters();
for (var i = 0; i < paramInfos.Length; i++)
{
var paramInfo = paramInfos[i];
var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i));
var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType);

// valueCast is "(Ti) parameters[i]"
parameters.Add(valueCast);
}

// Call method
var instanceCast = Expression.Convert(targetParameter, targetTypeInfo.AsType());
var methodCall = Expression.Call(instanceCast, methodInfo, parameters);

// methodCall is "((Ttarget) target) method((T0) parameters[0], (T1) parameters[1], ...)"
// Create function
if (methodCall.Type == typeof(void))
{
var lambda = Expression.Lambda<VoidMethodExecutor>(methodCall, targetParameter, parametersParameter);
var voidExecutor = lambda.Compile();
return WrapVoidMethod(voidExecutor);
}
else
{
// must coerce methodCall to match ActionExecutor signature
var castMethodCall = Expression.Convert(methodCall, typeof(object));
var lambda = Expression.Lambda<MethodExecutor>(castMethodCall, targetParameter, parametersParameter);
return lambda.Compile();
}
var lambda = Expression.Lambda<VoidMethodExecutor>(methodCall, targetParameter, parametersParameter);
var voidExecutor = lambda.Compile();
return WrapVoidMethod(voidExecutor);
}
else
{
// must coerce methodCall to match ActionExecutor signature
var castMethodCall = Expression.Convert(methodCall, typeof(object));
var lambda = Expression.Lambda<MethodExecutor>(castMethodCall, targetParameter, parametersParameter);
return lambda.Compile();
}
}

private static MethodExecutor WrapVoidMethod(VoidMethodExecutor executor)
{
return delegate (object target, object?[]? parameters)
{
executor(target, parameters);
return null;
};
}

private static MethodExecutor WrapVoidMethod(VoidMethodExecutor executor)
private static MethodExecutorAsync GetExecutorAsync(
MethodInfo methodInfo,
TypeInfo targetTypeInfo,
CoercedAwaitableInfo coercedAwaitableInfo)
{
// Parameters to executor
var targetParameter = Expression.Parameter(typeof(object), "target");
var parametersParameter = Expression.Parameter(typeof(object[]), "parameters");

// Build parameter list
var paramInfos = methodInfo.GetParameters();
var parameters = new List<Expression>(paramInfos.Length);
for (int i = 0; i < paramInfos.Length; i++)
{
return delegate(object target, object[] parameters)
{
executor(target, parameters);
return null;
};
var paramInfo = paramInfos[i];
var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i));
var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType);

// valueCast is "(Ti) parameters[i]"
parameters.Add(valueCast);
}

private static MethodExecutorAsync GetExecutorAsync(
MethodInfo methodInfo,
TypeInfo targetTypeInfo,
CoercedAwaitableInfo coercedAwaitableInfo)
// Call method
var instanceCast = Expression.Convert(targetParameter, targetTypeInfo.AsType());
var methodCall = Expression.Call(instanceCast, methodInfo, parameters);

// Using the method return value, construct an ObjectMethodExecutorAwaitable based on
// the info we have about its implementation of the awaitable pattern. Note that all
// the funcs/actions we construct here are precompiled, so that only one instance of
// each is preserved throughout the lifetime of the ObjectMethodExecutor.

// var getAwaiterFunc = (object awaitable) =>
// (object)((CustomAwaitableType)awaitable).GetAwaiter();
var customAwaitableParam = Expression.Parameter(typeof(object), "awaitable");
var awaitableInfo = coercedAwaitableInfo.AwaitableInfo;
var postCoercionMethodReturnType = coercedAwaitableInfo.CoercerResultType ?? methodInfo.ReturnType;
var getAwaiterFunc = Expression.Lambda<Func<object, object>>(
Expression.Convert(
Expression.Call(
Expression.Convert(customAwaitableParam, postCoercionMethodReturnType),
awaitableInfo.GetAwaiterMethod),
typeof(object)),
customAwaitableParam).Compile();

// var isCompletedFunc = (object awaiter) =>
// ((CustomAwaiterType)awaiter).IsCompleted;
var isCompletedParam = Expression.Parameter(typeof(object), "awaiter");
var isCompletedFunc = Expression.Lambda<Func<object, bool>>(
Expression.MakeMemberAccess(
Expression.Convert(isCompletedParam, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterIsCompletedProperty),
isCompletedParam).Compile();

var getResultParam = Expression.Parameter(typeof(object), "awaiter");
Func<object, object> getResultFunc;
if (awaitableInfo.ResultType == typeof(void))
{
// Parameters to executor
var targetParameter = Expression.Parameter(typeof(object), "target");
var parametersParameter = Expression.Parameter(typeof(object[]), "parameters");

// Build parameter list
var parameters = new List<Expression>();
var paramInfos = methodInfo.GetParameters();
for (var i = 0; i < paramInfos.Length; i++)
{
var paramInfo = paramInfos[i];
var valueObj = Expression.ArrayIndex(parametersParameter, Expression.Constant(i));
var valueCast = Expression.Convert(valueObj, paramInfo.ParameterType);

// valueCast is "(Ti) parameters[i]"
parameters.Add(valueCast);
}

// Call method
var instanceCast = Expression.Convert(targetParameter, targetTypeInfo.AsType());
var methodCall = Expression.Call(instanceCast, methodInfo, parameters);

// Using the method return value, construct an ObjectMethodExecutorAwaitable based on
// the info we have about its implementation of the awaitable pattern. Note that all
// the funcs/actions we construct here are precompiled, so that only one instance of
// each is preserved throughout the lifetime of the ObjectMethodExecutor.

// var getAwaiterFunc = (object awaitable) =>
// (object)((CustomAwaitableType)awaitable).GetAwaiter();
var customAwaitableParam = Expression.Parameter(typeof(object), "awaitable");
var awaitableInfo = coercedAwaitableInfo.AwaitableInfo;
var postCoercionMethodReturnType = coercedAwaitableInfo.CoercerResultType ?? methodInfo.ReturnType;
var getAwaiterFunc = Expression.Lambda<Func<object, object>>(
// var getResultFunc = (object awaiter) =>
// {
// ((CustomAwaiterType)awaiter).GetResult(); // We need to invoke this to surface any exceptions
// return (object)null;
// };
getResultFunc = Expression.Lambda<Func<object, object>>(
Expression.Block(
Expression.Call(
Expression.Convert(getResultParam, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterGetResultMethod),
Expression.Constant(null)
),
getResultParam).Compile();
}
else
{
// var getResultFunc = (object awaiter) =>
// (object)((CustomAwaiterType)awaiter).GetResult();
getResultFunc = Expression.Lambda<Func<object, object>>(
Expression.Convert(
Expression.Call(
Expression.Convert(customAwaitableParam, postCoercionMethodReturnType),
awaitableInfo.GetAwaiterMethod),
Expression.Convert(getResultParam, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterGetResultMethod),
typeof(object)),
customAwaitableParam).Compile();

// var isCompletedFunc = (object awaiter) =>
// ((CustomAwaiterType)awaiter).IsCompleted;
var isCompletedParam = Expression.Parameter(typeof(object), "awaiter");
var isCompletedFunc = Expression.Lambda<Func<object, bool>>(
Expression.MakeMemberAccess(
Expression.Convert(isCompletedParam, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterIsCompletedProperty),
isCompletedParam).Compile();

var getResultParam = Expression.Parameter(typeof(object), "awaiter");
Func<object, object> getResultFunc;
if (awaitableInfo.ResultType == typeof(void))
{
getResultFunc = Expression.Lambda<Func<object, object>>(
Expression.Block(
Expression.Call(
Expression.Convert(getResultParam, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterGetResultMethod),
Expression.Constant(null)
),
getResultParam).Compile();
}
else
{
getResultFunc = Expression.Lambda<Func<object, object>>(
Expression.Convert(
Expression.Call(
Expression.Convert(getResultParam, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterGetResultMethod),
typeof(object)),
getResultParam).Compile();
}

// var onCompletedFunc = (object awaiter, Action continuation) => {
// ((CustomAwaiterType)awaiter).OnCompleted(continuation);
getResultParam).Compile();
}

// var onCompletedFunc = (object awaiter, Action continuation) => {
// ((CustomAwaiterType)awaiter).OnCompleted(continuation);
// };
var onCompletedParam1 = Expression.Parameter(typeof(object), "awaiter");
var onCompletedParam2 = Expression.Parameter(typeof(Action), "continuation");
var onCompletedFunc = Expression.Lambda<Action<object, Action>>(
Expression.Call(
Expression.Convert(onCompletedParam1, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterOnCompletedMethod,
onCompletedParam2),
onCompletedParam1,
onCompletedParam2).Compile();

Action<object, Action>? unsafeOnCompletedFunc = null;
if (awaitableInfo.AwaiterUnsafeOnCompletedMethod != null)
{
// var unsafeOnCompletedFunc = (object awaiter, Action continuation) => {
// ((CustomAwaiterType)awaiter).UnsafeOnCompleted(continuation);
// };
var onCompletedParam1 = Expression.Parameter(typeof(object), "awaiter");
var onCompletedParam2 = Expression.Parameter(typeof(Action), "continuation");
var onCompletedFunc = Expression.Lambda<Action<object, Action>>(
var unsafeOnCompletedParam1 = Expression.Parameter(typeof(object), "awaiter");
var unsafeOnCompletedParam2 = Expression.Parameter(typeof(Action), "continuation");
unsafeOnCompletedFunc = Expression.Lambda<Action<object, Action>>(
Expression.Call(
Expression.Convert(onCompletedParam1, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterOnCompletedMethod,
onCompletedParam2),
onCompletedParam1,
onCompletedParam2).Compile();

Action<object, Action> unsafeOnCompletedFunc = null;
if (awaitableInfo.AwaiterUnsafeOnCompletedMethod != null)
{
// var unsafeOnCompletedFunc = (object awaiter, Action continuation) => {
// ((CustomAwaiterType)awaiter).UnsafeOnCompleted(continuation);
// };
var unsafeOnCompletedParam1 = Expression.Parameter(typeof(object), "awaiter");
var unsafeOnCompletedParam2 = Expression.Parameter(typeof(Action), "continuation");
unsafeOnCompletedFunc = Expression.Lambda<Action<object, Action>>(
Expression.Call(
Expression.Convert(unsafeOnCompletedParam1, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterUnsafeOnCompletedMethod,
unsafeOnCompletedParam2),
unsafeOnCompletedParam1,
unsafeOnCompletedParam2).Compile();
}

// If we need to pass the method call result through a coercer function to get an
// awaitable, then do so.
var coercedMethodCall = coercedAwaitableInfo.RequiresCoercion
? Expression.Invoke(coercedAwaitableInfo.CoercerExpression, methodCall)
: (Expression) methodCall;

// return new ObjectMethodExecutorAwaitable(
// (object)coercedMethodCall,
// getAwaiterFunc,
// isCompletedFunc,
// getResultFunc,
// onCompletedFunc,
// unsafeOnCompletedFunc);
var returnValueExpression = Expression.New(
_objectMethodExecutorAwaitableConstructor,
Expression.Convert(coercedMethodCall, typeof(object)),
Expression.Constant(getAwaiterFunc),
Expression.Constant(isCompletedFunc),
Expression.Constant(getResultFunc),
Expression.Constant(onCompletedFunc),
Expression.Constant(unsafeOnCompletedFunc, typeof(Action<object, Action>)));

var lambda =
Expression.Lambda<MethodExecutorAsync>(returnValueExpression, targetParameter, parametersParameter);
return lambda.Compile();
Expression.Convert(unsafeOnCompletedParam1, awaitableInfo.AwaiterType),
awaitableInfo.AwaiterUnsafeOnCompletedMethod,
unsafeOnCompletedParam2),
unsafeOnCompletedParam1,
unsafeOnCompletedParam2).Compile();
}

private delegate ObjectMethodExecutorAwaitable MethodExecutorAsync(object target, params object[] parameters);

private delegate object MethodExecutor(object target, params object[] parameters);

private delegate void VoidMethodExecutor(object target, object[] parameters);
// If we need to pass the method call result through a coercer function to get an
// awaitable, then do so.
var coercedMethodCall = coercedAwaitableInfo.RequiresCoercion
? Expression.Invoke(coercedAwaitableInfo.CoercerExpression, methodCall)
: (Expression)methodCall;

// return new ObjectMethodExecutorAwaitable(
// (object)coercedMethodCall,
// getAwaiterFunc,
// isCompletedFunc,
// getResultFunc,
// onCompletedFunc,
// unsafeOnCompletedFunc);
var returnValueExpression = Expression.New(
_objectMethodExecutorAwaitableConstructor,
Expression.Convert(coercedMethodCall, typeof(object)),
Expression.Constant(getAwaiterFunc),
Expression.Constant(isCompletedFunc),
Expression.Constant(getResultFunc),
Expression.Constant(onCompletedFunc),
Expression.Constant(unsafeOnCompletedFunc, typeof(Action<object, Action>)));

var lambda = Expression.Lambda<MethodExecutorAsync>(returnValueExpression, targetParameter, parametersParameter);
return lambda.Compile();
}
}

+ 85
- 89
src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorAwaitable.cs Zobrazit soubor

@@ -1,119 +1,115 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

#nullable disable

using System;
using System.Runtime.CompilerServices;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.Internal
namespace Microsoft.Extensions.Internal;

/// <summary>
/// Provides a common awaitable structure that <see cref="ObjectMethodExecutor.ExecuteAsync"/> can
/// return, regardless of whether the underlying value is a System.Task, an FSharpAsync, or an
/// application-defined custom awaitable.
/// </summary>
internal readonly struct ObjectMethodExecutorAwaitable
{
/// <summary>
/// Provides a common awaitable structure that <see cref="ObjectMethodExecutor.ExecuteAsync" /> can
/// return, regardless of whether the underlying value is a System.Task, an FSharpAsync, or an
/// application-defined custom awaitable.
/// </summary>
internal readonly struct ObjectMethodExecutorAwaitable
private readonly object _customAwaitable;
private readonly Func<object, object> _getAwaiterMethod;
private readonly Func<object, bool> _isCompletedMethod;
private readonly Func<object, object> _getResultMethod;
private readonly Action<object, Action> _onCompletedMethod;
private readonly Action<object, Action> _unsafeOnCompletedMethod;

// Perf note: since we're requiring the customAwaitable to be supplied here as an object,
// this will trigger a further allocation if it was a value type (i.e., to box it). We can't
// fix this by making the customAwaitable type generic, because the calling code typically
// does not know the type of the awaitable/awaiter at compile-time anyway.
//
// However, we could fix it by not passing the customAwaitable here at all, and instead
// passing a func that maps directly from the target object (e.g., controller instance),
// target method (e.g., action method info), and params array to the custom awaiter in the
// GetAwaiter() method below. In effect, by delaying the actual method call until the
// upstream code calls GetAwaiter on this ObjectMethodExecutorAwaitable instance.
// This optimization is not currently implemented because:
// [1] It would make no difference when the awaitable was an object type, which is
// by far the most common scenario (e.g., System.Task<T>).
// [2] It would be complex - we'd need some kind of object pool to track all the parameter
// arrays until we needed to use them in GetAwaiter().
// We can reconsider this in the future if there's a need to optimize for ValueTask<T>
// or other value-typed awaitables.

public ObjectMethodExecutorAwaitable(
object customAwaitable,
Func<object, object> getAwaiterMethod,
Func<object, bool> isCompletedMethod,
Func<object, object> getResultMethod,
Action<object, Action> onCompletedMethod,
Action<object, Action> unsafeOnCompletedMethod)
{
_customAwaitable = customAwaitable;
_getAwaiterMethod = getAwaiterMethod;
_isCompletedMethod = isCompletedMethod;
_getResultMethod = getResultMethod;
_onCompletedMethod = onCompletedMethod;
_unsafeOnCompletedMethod = unsafeOnCompletedMethod;
}

public Awaiter GetAwaiter()
{
var customAwaiter = _getAwaiterMethod(_customAwaitable);
return new Awaiter(customAwaiter, _isCompletedMethod, _getResultMethod, _onCompletedMethod, _unsafeOnCompletedMethod);
}

public readonly struct Awaiter : ICriticalNotifyCompletion
{
private readonly object _customAwaitable;
private readonly Func<object, object> _getAwaiterMethod;
private readonly object _customAwaiter;
private readonly Func<object, bool> _isCompletedMethod;
private readonly Func<object, object> _getResultMethod;
private readonly Action<object, Action> _onCompletedMethod;
private readonly Action<object, Action> _unsafeOnCompletedMethod;

// Perf note: since we're requiring the customAwaitable to be supplied here as an object,
// this will trigger a further allocation if it was a value type (i.e., to box it). We can't
// fix this by making the customAwaitable type generic, because the calling code typically
// does not know the type of the awaitable/awaiter at compile-time anyway.
//
// However, we could fix it by not passing the customAwaitable here at all, and instead
// passing a func that maps directly from the target object (e.g., controller instance),
// target method (e.g., action method info), and params array to the custom awaiter in the
// GetAwaiter() method below. In effect, by delaying the actual method call until the
// upstream code calls GetAwaiter on this ObjectMethodExecutorAwaitable instance.
// This optimization is not currently implemented because:
// [1] It would make no difference when the awaitable was an object type, which is
// by far the most common scenario (e.g., System.Task<T>).
// [2] It would be complex - we'd need some kind of object pool to track all the parameter
// arrays until we needed to use them in GetAwaiter().
// We can reconsider this in the future if there's a need to optimize for ValueTask<T>
// or other value-typed awaitables.

public ObjectMethodExecutorAwaitable(
object customAwaitable,
Func<object, object> getAwaiterMethod,
public Awaiter(
object customAwaiter,
Func<object, bool> isCompletedMethod,
Func<object, object> getResultMethod,
Action<object, Action> onCompletedMethod,
Action<object, Action> unsafeOnCompletedMethod)
{
_customAwaitable = customAwaitable;
_getAwaiterMethod = getAwaiterMethod;
_customAwaiter = customAwaiter;
_isCompletedMethod = isCompletedMethod;
_getResultMethod = getResultMethod;
_onCompletedMethod = onCompletedMethod;
_unsafeOnCompletedMethod = unsafeOnCompletedMethod;
}

public Awaiter GetAwaiter()
public bool IsCompleted => _isCompletedMethod(_customAwaiter);

public object GetResult() => _getResultMethod(_customAwaiter);

public void OnCompleted(Action continuation)
{
var customAwaiter = _getAwaiterMethod(_customAwaitable);
return new Awaiter(customAwaiter, _isCompletedMethod, _getResultMethod, _onCompletedMethod,
_unsafeOnCompletedMethod);
_onCompletedMethod(_customAwaiter, continuation);
}

public struct Awaiter : ICriticalNotifyCompletion
public void UnsafeOnCompleted(Action continuation)
{
private readonly object _customAwaiter;
private readonly Func<object, bool> _isCompletedMethod;
private readonly Func<object, object> _getResultMethod;
private readonly Action<object, Action> _onCompletedMethod;
private readonly Action<object, Action> _unsafeOnCompletedMethod;

public Awaiter(
object customAwaiter,
Func<object, bool> isCompletedMethod,
Func<object, object> getResultMethod,
Action<object, Action> onCompletedMethod,
Action<object, Action> unsafeOnCompletedMethod)
{
_customAwaiter = customAwaiter;
_isCompletedMethod = isCompletedMethod;
_getResultMethod = getResultMethod;
_onCompletedMethod = onCompletedMethod;
_unsafeOnCompletedMethod = unsafeOnCompletedMethod;
}

public bool IsCompleted => _isCompletedMethod(_customAwaiter);

public object GetResult()
{
return _getResultMethod(_customAwaiter);
}

public void OnCompleted(Action continuation)
{
_onCompletedMethod(_customAwaiter, continuation);
}

public void UnsafeOnCompleted(Action continuation)
{
// If the underlying awaitable implements ICriticalNotifyCompletion, use its UnsafeOnCompleted.
// If not, fall back on using its OnCompleted.
//
// Why this is safe:
// - Implementing ICriticalNotifyCompletion is a way of saying the caller can choose whether it
// needs the execution context to be preserved (which it signals by calling OnCompleted), or
// that it doesn't (which it signals by calling UnsafeOnCompleted). Obviously it's faster *not*
// to preserve and restore the context, so we prefer that where possible.
// - If a caller doesn't need the execution context to be preserved and hence calls UnsafeOnCompleted,
// there's no harm in preserving it anyway - it's just a bit of wasted cost. That's what will happen
// if a caller sees that the proxy implements ICriticalNotifyCompletion but the proxy chooses to
// pass the call on to the underlying awaitable's OnCompleted method.
// If the underlying awaitable implements ICriticalNotifyCompletion, use its UnsafeOnCompleted.
// If not, fall back on using its OnCompleted.
//
// Why this is safe:
// - Implementing ICriticalNotifyCompletion is a way of saying the caller can choose whether it
// needs the execution context to be preserved (which it signals by calling OnCompleted), or
// that it doesn't (which it signals by calling UnsafeOnCompleted). Obviously it's faster *not*
// to preserve and restore the context, so we prefer that where possible.
// - If a caller doesn't need the execution context to be preserved and hence calls UnsafeOnCompleted,
// there's no harm in preserving it anyway - it's just a bit of wasted cost. That's what will happen
// if a caller sees that the proxy implements ICriticalNotifyCompletion but the proxy chooses to
// pass the call on to the underlying awaitable's OnCompleted method.

var underlyingMethodToUse = _unsafeOnCompletedMethod ?? _onCompletedMethod;
underlyingMethodToUse(_customAwaiter, continuation);
}
var underlyingMethodToUse = _unsafeOnCompletedMethod ?? _onCompletedMethod;
underlyingMethodToUse(_customAwaiter, continuation);
}
}
}

+ 119
- 115
src/DotNetCore.CAP/Internal/ObjectMethodExecutor/ObjectMethodExecutorFSharpSupport.cs Zobrazit soubor

@@ -1,5 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

#nullable disable

using System;
using System.Linq;
@@ -8,139 +10,141 @@ using System.Reflection;
using System.Threading;
using System.Threading.Tasks;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.Internal
namespace Microsoft.Extensions.Internal;

/// <summary>
/// Helper for detecting whether a given type is FSharpAsync`1, and if so, supplying
/// an <see cref="Expression"/> for mapping instances of that type to a C# awaitable.
/// </summary>
/// <remarks>
/// The main design goal here is to avoid taking a compile-time dependency on
/// FSharp.Core.dll, because non-F# applications wouldn't use it. So all the references
/// to FSharp types have to be constructed dynamically at runtime.
/// </remarks>
internal static class ObjectMethodExecutorFSharpSupport
{
/// <summary>
/// Helper for detecting whether a given type is FSharpAsync`1, and if so, supplying
/// an <see cref="Expression" /> for mapping instances of that type to a C# awaitable.
/// </summary>
/// <remarks>
/// The main design goal here is to avoid taking a compile-time dependency on
/// FSharp.Core.dll, because non-F# applications wouldn't use it. So all the references
/// to FSharp types have to be constructed dynamically at runtime.
/// </remarks>
internal static class ObjectMethodExecutorFSharpSupport
private static readonly object _fsharpValuesCacheLock = new object();
private static Assembly _fsharpCoreAssembly;
private static MethodInfo _fsharpAsyncStartAsTaskGenericMethod;
private static PropertyInfo _fsharpOptionOfTaskCreationOptionsNoneProperty;
private static PropertyInfo _fsharpOptionOfCancellationTokenNoneProperty;
public static bool TryBuildCoercerFromFSharpAsyncToAwaitable(
Type possibleFSharpAsyncType,
out Expression coerceToAwaitableExpression,
out Type awaitableType)
{
private static readonly object _fsharpValuesCacheLock = new object();
private static Assembly _fsharpCoreAssembly;
private static MethodInfo _fsharpAsyncStartAsTaskGenericMethod;
private static PropertyInfo _fsharpOptionOfTaskCreationOptionsNoneProperty;
private static PropertyInfo _fsharpOptionOfCancellationTokenNoneProperty;

public static bool TryBuildCoercerFromFSharpAsyncToAwaitable(
Type possibleFSharpAsyncType,
out Expression coerceToAwaitableExpression,
out Type awaitableType)
var methodReturnGenericType = possibleFSharpAsyncType.IsGenericType
? possibleFSharpAsyncType.GetGenericTypeDefinition()
: null;

if (!IsFSharpAsyncOpenGenericType(methodReturnGenericType))
{
var methodReturnGenericType = possibleFSharpAsyncType.IsGenericType
? possibleFSharpAsyncType.GetGenericTypeDefinition()
: null;
coerceToAwaitableExpression = null;
awaitableType = null;
return false;
}

if (!IsFSharpAsyncOpenGenericType(methodReturnGenericType))
{
coerceToAwaitableExpression = null;
awaitableType = null;
return false;
}
var awaiterResultType = possibleFSharpAsyncType.GetGenericArguments().Single();
awaitableType = typeof(Task<>).MakeGenericType(awaiterResultType);

// coerceToAwaitableExpression = (object fsharpAsync) =>
// {
// return (object)FSharpAsync.StartAsTask<TResult>(
// (Microsoft.FSharp.Control.FSharpAsync<TResult>)fsharpAsync,
// FSharpOption<TaskCreationOptions>.None,
// FSharpOption<CancellationToken>.None);
// };
var startAsTaskClosedMethod = _fsharpAsyncStartAsTaskGenericMethod
.MakeGenericMethod(awaiterResultType);
var coerceToAwaitableParam = Expression.Parameter(typeof(object));
coerceToAwaitableExpression = Expression.Lambda(
Expression.Convert(
Expression.Call(
startAsTaskClosedMethod,
Expression.Convert(coerceToAwaitableParam, possibleFSharpAsyncType),
Expression.MakeMemberAccess(null, _fsharpOptionOfTaskCreationOptionsNoneProperty),
Expression.MakeMemberAccess(null, _fsharpOptionOfCancellationTokenNoneProperty)),
typeof(object)),
coerceToAwaitableParam);

return true;
}

var awaiterResultType = possibleFSharpAsyncType.GetGenericArguments().Single();
awaitableType = typeof(Task<>).MakeGenericType(awaiterResultType);

// coerceToAwaitableExpression = (object fsharpAsync) =>
// {
// return (object)FSharpAsync.StartAsTask<TResult>(
// (Microsoft.FSharp.Control.FSharpAsync<TResult>)fsharpAsync,
// FSharpOption<TaskCreationOptions>.None,
// FSharpOption<CancellationToken>.None);
// };
var startAsTaskClosedMethod = _fsharpAsyncStartAsTaskGenericMethod
.MakeGenericMethod(awaiterResultType);
var coerceToAwaitableParam = Expression.Parameter(typeof(object));
coerceToAwaitableExpression = Expression.Lambda(
Expression.Convert(
Expression.Call(
startAsTaskClosedMethod,
Expression.Convert(coerceToAwaitableParam, possibleFSharpAsyncType),
Expression.MakeMemberAccess(null, _fsharpOptionOfTaskCreationOptionsNoneProperty),
Expression.MakeMemberAccess(null, _fsharpOptionOfCancellationTokenNoneProperty)),
typeof(object)),
coerceToAwaitableParam);

return true;
private static bool IsFSharpAsyncOpenGenericType(Type possibleFSharpAsyncGenericType)
{
var typeFullName = possibleFSharpAsyncGenericType?.FullName;
if (!string.Equals(typeFullName, "Microsoft.FSharp.Control.FSharpAsync`1", StringComparison.Ordinal))
{
return false;
}

private static bool IsFSharpAsyncOpenGenericType(Type possibleFSharpAsyncGenericType)
lock (_fsharpValuesCacheLock)
{
var typeFullName = possibleFSharpAsyncGenericType?.FullName;
if (!string.Equals(typeFullName, "Microsoft.FSharp.Control.FSharpAsync`1", StringComparison.Ordinal))
if (_fsharpCoreAssembly != null)
{
return false;
// Since we've already found the real FSharpAsync.Core assembly, we just have
// to check that the supplied FSharpAsync`1 type is the one from that assembly.
return possibleFSharpAsyncGenericType.Assembly == _fsharpCoreAssembly;
}

lock (_fsharpValuesCacheLock)
else
{
if (_fsharpCoreAssembly != null)
{
return possibleFSharpAsyncGenericType.Assembly == _fsharpCoreAssembly;
}

// We'll keep trying to find the FSharp types/values each time any type called
// FSharpAsync`1 is supplied.
return TryPopulateFSharpValueCaches(possibleFSharpAsyncGenericType);
}
}
}

private static bool TryPopulateFSharpValueCaches(Type possibleFSharpAsyncGenericType)
{
var assembly = possibleFSharpAsyncGenericType.Assembly;
var fsharpOptionType = assembly.GetType("Microsoft.FSharp.Core.FSharpOption`1");
var fsharpAsyncType = assembly.GetType("Microsoft.FSharp.Control.FSharpAsync");
private static bool TryPopulateFSharpValueCaches(Type possibleFSharpAsyncGenericType)
{
var assembly = possibleFSharpAsyncGenericType.Assembly;
var fsharpOptionType = assembly.GetType("Microsoft.FSharp.Core.FSharpOption`1");
var fsharpAsyncType = assembly.GetType("Microsoft.FSharp.Control.FSharpAsync");

if (fsharpOptionType == null || fsharpAsyncType == null)
{
return false;
}
if (fsharpOptionType == null || fsharpAsyncType == null)
{
return false;
}

// Get a reference to FSharpOption<TaskCreationOptions>.None
var fsharpOptionOfTaskCreationOptionsType = fsharpOptionType
.MakeGenericType(typeof(TaskCreationOptions));
_fsharpOptionOfTaskCreationOptionsNoneProperty = fsharpOptionOfTaskCreationOptionsType
.GetTypeInfo()
.GetRuntimeProperty("None");

// Get a reference to FSharpOption<CancellationToken>.None
var fsharpOptionOfCancellationTokenType = fsharpOptionType
.MakeGenericType(typeof(CancellationToken));
_fsharpOptionOfCancellationTokenNoneProperty = fsharpOptionOfCancellationTokenType
.GetTypeInfo()
.GetRuntimeProperty("None");

// Get a reference to FSharpAsync.StartAsTask<>
var fsharpAsyncMethods = fsharpAsyncType
.GetRuntimeMethods()
.Where(m => m.Name.Equals("StartAsTask", StringComparison.Ordinal));
foreach (var candidateMethodInfo in fsharpAsyncMethods)
// Get a reference to FSharpOption<TaskCreationOptions>.None
var fsharpOptionOfTaskCreationOptionsType = fsharpOptionType
.MakeGenericType(typeof(TaskCreationOptions));
_fsharpOptionOfTaskCreationOptionsNoneProperty = fsharpOptionOfTaskCreationOptionsType
.GetRuntimeProperty("None");

// Get a reference to FSharpOption<CancellationToken>.None
var fsharpOptionOfCancellationTokenType = fsharpOptionType
.MakeGenericType(typeof(CancellationToken));
_fsharpOptionOfCancellationTokenNoneProperty = fsharpOptionOfCancellationTokenType
.GetRuntimeProperty("None");

// Get a reference to FSharpAsync.StartAsTask<>
var fsharpAsyncMethods = fsharpAsyncType
.GetRuntimeMethods()
.Where(m => m.Name.Equals("StartAsTask", StringComparison.Ordinal));
foreach (var candidateMethodInfo in fsharpAsyncMethods)
{
var parameters = candidateMethodInfo.GetParameters();
if (parameters.Length == 3
&& TypesHaveSameIdentity(parameters[0].ParameterType, possibleFSharpAsyncGenericType)
&& parameters[1].ParameterType == fsharpOptionOfTaskCreationOptionsType
&& parameters[2].ParameterType == fsharpOptionOfCancellationTokenType)
{
var parameters = candidateMethodInfo.GetParameters();
if (parameters.Length == 3
&& TypesHaveSameIdentity(parameters[0].ParameterType, possibleFSharpAsyncGenericType)
&& parameters[1].ParameterType == fsharpOptionOfTaskCreationOptionsType
&& parameters[2].ParameterType == fsharpOptionOfCancellationTokenType)
{
// This really does look like the correct method (and hence assembly).
_fsharpAsyncStartAsTaskGenericMethod = candidateMethodInfo;
_fsharpCoreAssembly = assembly;
break;
}
// This really does look like the correct method (and hence assembly).
_fsharpAsyncStartAsTaskGenericMethod = candidateMethodInfo;
_fsharpCoreAssembly = assembly;
break;
}

return _fsharpCoreAssembly != null;
}

private static bool TypesHaveSameIdentity(Type type1, Type type2)
{
return type1.Assembly == type2.Assembly
&& string.Equals(type1.Namespace, type2.Namespace, StringComparison.Ordinal)
&& string.Equals(type1.Name, type2.Name, StringComparison.Ordinal);
}
return _fsharpCoreAssembly != null;
}

private static bool TypesHaveSameIdentity(Type type1, Type type2)
{
return type1.Assembly == type2.Assembly
&& string.Equals(type1.Namespace, type2.Namespace, StringComparison.Ordinal)
&& string.Equals(type1.Name, type2.Name, StringComparison.Ordinal);
}
}

+ 1
- 1
src/DotNetCore.CAP/Internal/PublisherSentFailedException.cs Zobrazit soubor

@@ -11,7 +11,7 @@ namespace DotNetCore.CAP.Internal
{
}

public PublisherSentFailedException(string message, Exception ex) : base(message, ex)
public PublisherSentFailedException(string message, Exception? ex) : base(message, ex)
{
}
}

+ 1
- 1
src/DotNetCore.CAP/Internal/SnowflakeId.cs Zobrazit soubor

@@ -20,7 +20,7 @@ namespace DotNetCore.CAP.Internal
public const int TimestampLeftShift = SequenceBits + WorkerIdBits + DatacenterIdBits;
private const long SequenceMask = -1L ^ (-1L << SequenceBits);

private static SnowflakeId _snowflakeId;
private static SnowflakeId? _snowflakeId;

private readonly object _lock = new object();
private static readonly object SLock = new object();


+ 1
- 1
src/DotNetCore.CAP/Internal/TopicAttribute.cs Zobrazit soubor

@@ -35,6 +35,6 @@ namespace DotNetCore.CAP.Internal
/// kafka --> groups.id
/// rabbit MQ --> queue.name
/// </summary>
public string Group { get; set; }
public string Group { get; set; } = default!;
}
}

+ 2
- 2
src/DotNetCore.CAP/Messages/FailedInfo.cs Zobrazit soubor

@@ -4,10 +4,10 @@ namespace DotNetCore.CAP.Messages
{
public class FailedInfo
{
public IServiceProvider ServiceProvider { get; set; }
public IServiceProvider ServiceProvider { get; set; } = default!;

public MessageType MessageType { get; set; }

public Message Message { get; set; }
public Message Message { get; set; } = default!;
}
}

+ 12
- 13
src/DotNetCore.CAP/Messages/Message.cs Zobrazit soubor

@@ -3,51 +3,50 @@

using System;
using System.Collections.Generic;
using JetBrains.Annotations;

namespace DotNetCore.CAP.Messages
{
public class Message
{
/// <summary>
/// System.Text.Json requires that class explicitly has a parameterless constructor
/// System.Text.Json requires that class explicitly has a parameter less constructor
/// and public properties have a setter.
/// </summary>
public Message() { }
public Message()
{
Headers = new Dictionary<string, string?>();
}

public Message(IDictionary<string, string> headers, [CanBeNull] object value)
public Message(IDictionary<string, string?> headers, object? value)
{
Headers = headers ?? throw new ArgumentNullException(nameof(headers));
Value = value;
}

public IDictionary<string, string> Headers { get; set; }
public IDictionary<string, string?> Headers { get; set; }

[CanBeNull]
public object Value { get; set; }
public object? Value { get; set; }
}

public static class MessageExtensions
{
public static string GetId(this Message message)
{
message.Headers.TryGetValue(Headers.MessageId, out var value);
return value;
return message.Headers[Headers.MessageId]!;
}

public static string GetName(this Message message)
{
message.Headers.TryGetValue(Headers.MessageName, out var value);
return value;
return message.Headers[Headers.MessageName]!;
}

public static string GetCallbackName(this Message message)
public static string? GetCallbackName(this Message message)
{
message.Headers.TryGetValue(Headers.CallbackName, out var value);
return value;
}

public static string GetGroup(this Message message)
public static string? GetGroup(this Message message)
{
message.Headers.TryGetValue(Headers.Group, out var value);
return value;


+ 7
- 9
src/DotNetCore.CAP/Messages/TransportMessage.cs Zobrazit soubor

@@ -3,7 +3,6 @@

using System;
using System.Collections.Generic;
using JetBrains.Annotations;

namespace DotNetCore.CAP.Messages
{
@@ -13,7 +12,7 @@ namespace DotNetCore.CAP.Messages
[Serializable]
public class TransportMessage
{
public TransportMessage(IDictionary<string, string> headers, [CanBeNull] byte[] body)
public TransportMessage(IDictionary<string, string?> headers, byte[]? body)
{
Headers = headers ?? throw new ArgumentNullException(nameof(headers));
Body = body;
@@ -22,30 +21,29 @@ namespace DotNetCore.CAP.Messages
/// <summary>
/// Gets the headers of this message
/// </summary>
public IDictionary<string, string> Headers { get; }
public IDictionary<string, string?> Headers { get; }

/// <summary>
/// Gets the body object of this message
/// </summary>
[CanBeNull]
public byte[] Body { get; }
public byte[]? Body { get; }

public string GetId()
{
return Headers.TryGetValue(Messages.Headers.MessageId, out var value) ? value : null;
return Headers[Messages.Headers.MessageId]!;
}

public string GetName()
{
return Headers.TryGetValue(Messages.Headers.MessageName, out var value) ? value : null;
return Headers[Messages.Headers.MessageName]!;
}

public string GetGroup()
public string? GetGroup()
{
return Headers.TryGetValue(Messages.Headers.Group, out var value) ? value : null;
}
public string GetCorrelationId()
public string? GetCorrelationId()
{
return Headers.TryGetValue(Messages.Headers.CorrelationId, out var value) ? value : null;
}


+ 6
- 6
src/DotNetCore.CAP/Monitoring/MessageDto.cs Zobrazit soubor

@@ -7,15 +7,15 @@ namespace DotNetCore.CAP.Monitoring
{
public class MessageDto
{
public string Id { get; set; }
public string Id { get; set; } = default!;

public string Version { get; set; }
public string Version { get; set; } = default!;

public string Group { get; set; }
public string? Group { get; set; }

public string Name { get; set; }
public string Name { get; set; } = default!;

public string Content { get; set; }
public string? Content { get; set; }

public DateTime Added { get; set; }

@@ -23,6 +23,6 @@ namespace DotNetCore.CAP.Monitoring

public int Retries { get; set; }

public string StatusName { get; set; }
public string StatusName { get; set; } = default!;
}
}

+ 4
- 4
src/DotNetCore.CAP/Monitoring/MessageQueryDto.cs Zobrazit soubor

@@ -9,13 +9,13 @@ namespace DotNetCore.CAP.Monitoring
{
public MessageType MessageType { get; set; }

public string Group { get; set; }
public string? Group { get; set; }

public string Name { get; set; }
public string? Name { get; set; }

public string Content { get; set; }
public string? Content { get; set; }

public string StatusName { get; set; }
public string? StatusName { get; set; }

public int CurrentPage { get; set; }



+ 1
- 1
src/DotNetCore.CAP/Monitoring/PagedQueryResult.cs Zobrazit soubor

@@ -4,7 +4,7 @@ namespace DotNetCore.CAP.Monitoring
{
public class PagedQueryResult<T>
{
public IList<T> Items { get; set; }
public IList<T>? Items { get; set; }

public long Totals { get; set; }



+ 1
- 1
src/DotNetCore.CAP/OperateResult.cs Zobrazit soubor

@@ -22,7 +22,7 @@ namespace DotNetCore.CAP
/// </summary>
public bool Succeeded { get; set; }

public Exception Exception { get; set; }
public Exception? Exception { get; set; }

/// <summary>
/// An <see cref="IEnumerable{T}" /> of <see cref="OperateError" />s containing an errors


+ 1
- 1
src/DotNetCore.CAP/Persistence/IDataStorage.cs Zobrazit soubor

@@ -14,7 +14,7 @@ namespace DotNetCore.CAP.Persistence

Task ChangeReceiveStateAsync(MediumMessage message, StatusName state);

MediumMessage StoreMessage(string name, Message content, object dbTransaction = null);
MediumMessage StoreMessage(string name, Message content, object? dbTransaction = null);

void StoreReceivedExceptionMessage(string name, string group, string content);



+ 3
- 3
src/DotNetCore.CAP/Persistence/MediumMessage.cs Zobrazit soubor

@@ -5,11 +5,11 @@ namespace DotNetCore.CAP.Persistence
{
public class MediumMessage
{
public string DbId { get; set; }
public string DbId { get; set; } = default!;

public Message Origin { get; set; }
public Message Origin { get; set; } = default!;

public string Content { get; set; }
public string Content { get; set; } = default!;

public DateTime Added { get; set; }



+ 4
- 4
src/DotNetCore.CAP/Processor/IDispatcher.Default.cs Zobrazit soubor

@@ -21,10 +21,10 @@ namespace DotNetCore.CAP.Processor
private readonly CapOptions _options;
private readonly ISubscribeDispatcher _executor;
private readonly ILogger<Dispatcher> _logger;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly CancellationTokenSource _cts = new ();

private Channel<MediumMessage> _publishedChannel;
private Channel<(MediumMessage, ConsumerExecutorDescriptor)> _receivedChannel;
private Channel<MediumMessage> _publishedChannel = default!;
private Channel<(MediumMessage, ConsumerExecutorDescriptor)> _receivedChannel = default!;

public Dispatcher(ILogger<Dispatcher> logger,
IMessageSender sender,
@@ -125,7 +125,7 @@ namespace DotNetCore.CAP.Processor
var result = await _sender.SendAsync(message);
if (!result.Succeeded)
{
_logger.MessagePublishException(message.Origin.GetId(), result.ToString(),
_logger.MessagePublishException(message.Origin?.GetId(), result.ToString(),
result.Exception);
}
}


+ 3
- 4
src/DotNetCore.CAP/Processor/IDispatcher.PerGroup.cs Zobrazit soubor

@@ -22,11 +22,10 @@ namespace DotNetCore.CAP.Processor
private readonly CapOptions _options;
private readonly ISubscribeDispatcher _executor;
private readonly ILogger<Dispatcher> _logger;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly CancellationTokenSource _cts = new ();

private Channel<MediumMessage> _publishedChannel;
// private Channel<(MediumMessage, ConsumerExecutorDescriptor)> _receivedChannel;
private ConcurrentDictionary<string, Channel<(MediumMessage, ConsumerExecutorDescriptor)>> _receivedChannels;
private Channel<MediumMessage> _publishedChannel = default!;
private ConcurrentDictionary<string, Channel<(MediumMessage, ConsumerExecutorDescriptor)>> _receivedChannels = default!;
private CancellationToken _stoppingToken;

public DispatcherPerGroup(


+ 2
- 2
src/DotNetCore.CAP/Processor/IProcessingServer.Cap.cs Zobrazit soubor

@@ -19,8 +19,8 @@ namespace DotNetCore.CAP.Processor
private readonly ILoggerFactory _loggerFactory;
private readonly IServiceProvider _provider;

private Task _compositeTask;
private ProcessingContext _context;
private Task? _compositeTask;
private ProcessingContext _context = default!;
private bool _disposed;

public CapProcessingServer(


+ 1
- 1
src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs Zobrazit soubor

@@ -84,7 +84,7 @@ namespace DotNetCore.CAP.Processor
}
catch (Exception ex)
{
_logger.LogWarning(1, ex, "Get messages from storage failed. Retrying...", typeof(T).Name);
_logger.LogWarning(1, ex, "Get messages from storage failed. Retrying...");

return Enumerable.Empty<T>();
}


+ 1
- 5
src/DotNetCore.CAP/Processor/ProcessingContext.cs Zobrazit soubor

@@ -10,11 +10,7 @@ namespace DotNetCore.CAP.Processor
{
public class ProcessingContext : IDisposable
{
private IServiceScope _scope;

public ProcessingContext()
{
}
private IServiceScope? _scope;

private ProcessingContext(ProcessingContext other)
{


+ 3
- 3
src/DotNetCore.CAP/Serialization/ISerializer.JsonUtf8.cs Zobrazit soubor

@@ -34,7 +34,7 @@ namespace DotNetCore.CAP.Serialization
return Task.FromResult(new TransportMessage(message.Headers, jsonBytes));
}

public Task<Message> DeserializeAsync(TransportMessage transportMessage, Type valueType)
public Task<Message> DeserializeAsync(TransportMessage transportMessage, Type? valueType)
{
if (valueType == null || transportMessage.Body == null)
{
@@ -51,12 +51,12 @@ namespace DotNetCore.CAP.Serialization
return JsonSerializer.Serialize(message, _jsonSerializerOptions);
}

public Message Deserialize(string json)
public Message? Deserialize(string json)
{
return JsonSerializer.Deserialize<Message>(json, _jsonSerializerOptions);
}

public object Deserialize(object value, Type valueType)
public object? Deserialize(object value, Type valueType)
{
if (value is JsonElement jsonElement)
{


+ 3
- 4
src/DotNetCore.CAP/Serialization/ISerializer.cs Zobrazit soubor

@@ -4,7 +4,6 @@
using System;
using System.Threading.Tasks;
using DotNetCore.CAP.Messages;
using JetBrains.Annotations;

namespace DotNetCore.CAP.Serialization
{
@@ -23,17 +22,17 @@ namespace DotNetCore.CAP.Serialization
/// <summary>
/// Deserialize the given string into a <see cref="Message"/>
/// </summary>
Message Deserialize(string json);
Message? Deserialize(string json);

/// <summary>
/// Deserialize the given <see cref="TransportMessage"/> back into a <see cref="Message"/>
/// </summary>
Task<Message> DeserializeAsync(TransportMessage transportMessage, [CanBeNull] Type valueType);
Task<Message> DeserializeAsync(TransportMessage transportMessage, Type? valueType);

/// <summary>
/// Deserialize the given object with the given Type into an object
/// </summary>
object Deserialize(object value, Type valueType);
object? Deserialize(object value, Type valueType);

/// <summary>
/// Check if the given object is of Json type, e.g. JToken or JsonElement


+ 3
- 4
src/DotNetCore.CAP/Transport/BrokerAddress.cs Zobrazit soubor

@@ -1,11 +1,10 @@
using System.Linq;
using JetBrains.Annotations;

namespace DotNetCore.CAP.Transport
{
public struct BrokerAddress
{
public BrokerAddress([NotNull]string address)
public BrokerAddress(string address)
{
if (address.Contains("$"))
{
@@ -21,7 +20,7 @@ namespace DotNetCore.CAP.Transport
}
}

public BrokerAddress([NotNull]string name, [CanBeNull]string endpoint)
public BrokerAddress(string name, string? endpoint)
{
Name = name;
Endpoint = endpoint;
@@ -29,7 +28,7 @@ namespace DotNetCore.CAP.Transport

public string Name { get; set; }

public string Endpoint { get; set; }
public string? Endpoint { get; set; }

public override string ToString()
{


+ 1
- 1
src/DotNetCore.CAP/Transport/IConsumerClient.cs Zobrazit soubor

@@ -47,7 +47,7 @@ namespace DotNetCore.CAP.Transport
/// <summary>
/// Reject message and resumption
/// </summary>
void Reject([CanBeNull] object sender);
void Reject(object? sender);

event EventHandler<TransportMessage> OnMessageReceived;



+ 1
- 1
src/DotNetCore.CAP/Transport/MqLogType.cs Zobrazit soubor

@@ -31,7 +31,7 @@ namespace DotNetCore.CAP.Transport

public class LogMessageEventArgs : EventArgs
{
public string Reason { get; set; }
public string? Reason { get; set; }

public MqLogType LogType { get; set; }
}

Načítá se…
Zrušit
Uložit