@@ -0,0 +1,56 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Reflection; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Internal; | |||
using DotNetCore.CAP.Messages; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Xunit; | |||
namespace DotNetCore.CAP.Test | |||
{ | |||
public class ConsumerInvokerTest | |||
{ | |||
private readonly IServiceProvider _serviceProvider; | |||
public ConsumerInvokerTest() | |||
{ | |||
var serviceCollection = new ServiceCollection(); | |||
serviceCollection.AddLogging(); | |||
serviceCollection.AddSingleton<IConsumerInvoker, DefaultConsumerInvoker>(); | |||
serviceCollection.AddTransient<FakeSubscriber>(); | |||
_serviceProvider = serviceCollection.BuildServiceProvider(); | |||
} | |||
private IConsumerInvoker ConsumerInvoker => _serviceProvider.GetService<IConsumerInvoker>(); | |||
[Fact] | |||
public async Task InvokeTest() | |||
{ | |||
var descriptor = new ConsumerExecutorDescriptor() | |||
{ | |||
Attribute = new CandidatesTopic("fake.output.integer"), | |||
ServiceTypeInfo = typeof(FakeSubscriber).GetTypeInfo(), | |||
ImplTypeInfo = typeof(FakeSubscriber).GetTypeInfo(), | |||
MethodInfo = typeof(FakeSubscriber).GetMethod(nameof(FakeSubscriber.OutputIntegerSub)), | |||
Parameters = new List<ParameterDescriptor>() | |||
}; | |||
var header = new Dictionary<string, string>(); | |||
var message = new Message(header, null); | |||
var context = new ConsumerContext(descriptor, message); | |||
var ret = await ConsumerInvoker.InvokeAsync(context); | |||
Assert.Equal(int.MaxValue, ret.Result); | |||
} | |||
} | |||
public class FakeSubscriber : ICapSubscribe | |||
{ | |||
[CapSubscribe("fake.output.integer")] | |||
public int OutputIntegerSub() | |||
{ | |||
return int.MaxValue; | |||
} | |||
} | |||
} |
@@ -6,20 +6,20 @@ | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.0.1" /> | |||
<PackageReference Include="System.Data.Common" Version="4.3.0" /> | |||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.0.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Logging" Version="3.0.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Options" Version="3.0.0" /> | |||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.4.0" /> | |||
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1"> | |||
<PrivateAssets>all</PrivateAssets> | |||
<IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets> | |||
</PackageReference> | |||
<PackageReference Include="xunit" Version="2.4.1" /> | |||
<PackageReference Include="Microsoft.AspNetCore.Http" Version="2.2.0" /> | |||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" /> | |||
<PackageReference Include="Moq" Version="4.10.1" /> | |||
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\..\src\DotNetCore.CAP.InMemoryStorage\DotNetCore.CAP.InMemoryStorage.csproj" /> | |||
<ProjectReference Include="..\..\src\DotNetCore.CAP\DotNetCore.CAP.csproj" /> | |||
</ItemGroup> | |||
@@ -0,0 +1,18 @@ | |||
using DotNetCore.CAP.Transport; | |||
using Microsoft.Extensions.DependencyInjection; | |||
namespace DotNetCore.CAP.Test.FakeInMemoryQueue | |||
{ | |||
internal sealed class FakeQueueOptionsExtension : ICapOptionsExtension | |||
{ | |||
public void AddServices(IServiceCollection services) | |||
{ | |||
services.AddSingleton<CapMessageQueueMakerService>(); | |||
services.AddSingleton<InMemoryQueue>(); | |||
services.AddSingleton<IConsumerClientFactory, InMemoryConsumerClientFactory>(); | |||
services.AddSingleton<ITransport, FakeInMemoryQueueTransport>(); | |||
} | |||
} | |||
} |
@@ -0,0 +1,11 @@ | |||
namespace DotNetCore.CAP.Test.FakeInMemoryQueue | |||
{ | |||
public static class CapOptionsExtensions | |||
{ | |||
public static CapOptions UseFakeTransport(this CapOptions options) | |||
{ | |||
options.RegisterExtension(new FakeQueueOptionsExtension()); | |||
return options; | |||
} | |||
} | |||
} |
@@ -0,0 +1,41 @@ | |||
using System; | |||
using System.Threading.Tasks; | |||
using DotNetCore.CAP.Internal; | |||
using DotNetCore.CAP.Messages; | |||
using DotNetCore.CAP.Transport; | |||
using Microsoft.Extensions.Logging; | |||
namespace DotNetCore.CAP.Test.FakeInMemoryQueue | |||
{ | |||
internal class FakeInMemoryQueueTransport : ITransport | |||
{ | |||
private readonly InMemoryQueue _queue; | |||
private readonly ILogger _logger; | |||
public FakeInMemoryQueueTransport(InMemoryQueue queue, ILogger<FakeInMemoryQueueTransport> logger) | |||
{ | |||
_queue = queue; | |||
_logger = logger; | |||
} | |||
public string Address { get; } = string.Empty; | |||
public Task<OperateResult> SendAsync(TransportMessage message) | |||
{ | |||
try | |||
{ | |||
_queue.Send(message.GetName(), message); | |||
_logger.LogDebug($"Event message [{message.GetName()}] has been published."); | |||
return Task.FromResult(OperateResult.Success); | |||
} | |||
catch (Exception ex) | |||
{ | |||
var wrapperEx = new PublisherSentFailedException(ex.Message, ex); | |||
return Task.FromResult(OperateResult.Failed(wrapperEx)); | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,75 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Threading; | |||
using DotNetCore.CAP.Messages; | |||
using DotNetCore.CAP.Transport; | |||
using Microsoft.Extensions.Logging; | |||
namespace DotNetCore.CAP.Test.FakeInMemoryQueue | |||
{ | |||
internal sealed class InMemoryConsumerClient : IConsumerClient | |||
{ | |||
private readonly ILogger _logger; | |||
private readonly InMemoryQueue _queue; | |||
private readonly string _subscriptionName; | |||
public InMemoryConsumerClient( | |||
ILogger logger, | |||
InMemoryQueue queue, | |||
string subscriptionName) | |||
{ | |||
_logger = logger; | |||
_queue = queue; | |||
_subscriptionName = subscriptionName; | |||
} | |||
public event EventHandler<TransportMessage> OnMessageReceived; | |||
public event EventHandler<LogMessageEventArgs> OnLog; | |||
public string ServersAddress => string.Empty; | |||
public void Subscribe(IEnumerable<string> topics) | |||
{ | |||
if (topics == null) throw new ArgumentNullException(nameof(topics)); | |||
foreach (var topic in topics) | |||
{ | |||
_queue.Subscribe(_subscriptionName, OnConsumerReceived, topic); | |||
_logger.LogInformation($"InMemory message queue initialize the topic: {topic}"); | |||
} | |||
} | |||
public void Listening(TimeSpan timeout, CancellationToken cancellationToken) | |||
{ | |||
while (!cancellationToken.IsCancellationRequested) | |||
{ | |||
cancellationToken.WaitHandle.WaitOne(timeout); | |||
} | |||
} | |||
public void Commit() | |||
{ | |||
// ignore | |||
} | |||
public void Reject() | |||
{ | |||
// ignore | |||
} | |||
public void Dispose() | |||
{ | |||
_queue.ClearSubscriber(); | |||
} | |||
#region private methods | |||
private void OnConsumerReceived(TransportMessage e) | |||
{ | |||
OnMessageReceived?.Invoke(null, e); | |||
} | |||
#endregion private methods | |||
} | |||
} |
@@ -0,0 +1,23 @@ | |||
using DotNetCore.CAP.Transport; | |||
using Microsoft.Extensions.Logging; | |||
namespace DotNetCore.CAP.Test.FakeInMemoryQueue | |||
{ | |||
internal sealed class InMemoryConsumerClientFactory : IConsumerClientFactory | |||
{ | |||
private readonly ILoggerFactory _loggerFactory; | |||
private readonly InMemoryQueue _queue; | |||
public InMemoryConsumerClientFactory(ILoggerFactory loggerFactory, InMemoryQueue queue) | |||
{ | |||
_loggerFactory = loggerFactory; | |||
_queue = queue; | |||
} | |||
public IConsumerClient Create(string groupId) | |||
{ | |||
var logger = _loggerFactory.CreateLogger(typeof(InMemoryConsumerClient)); | |||
return new InMemoryConsumerClient(logger, _queue, groupId); | |||
} | |||
} | |||
} |
@@ -0,0 +1,67 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using DotNetCore.CAP.Messages; | |||
using Microsoft.Extensions.Logging; | |||
namespace DotNetCore.CAP.Test.FakeInMemoryQueue | |||
{ | |||
internal class InMemoryQueue | |||
{ | |||
private readonly ILogger<InMemoryQueue> _logger; | |||
private static readonly object Lock = new object(); | |||
private readonly Dictionary<string, (Action<TransportMessage>, List<string>)> _groupTopics; | |||
public Dictionary<string, TransportMessage> Messages { get; } | |||
public InMemoryQueue(ILogger<InMemoryQueue> logger) | |||
{ | |||
_logger = logger; | |||
_groupTopics = new Dictionary<string, (Action<TransportMessage>, List<string>)>(); | |||
Messages = new Dictionary<string, TransportMessage>(); | |||
} | |||
public void Subscribe(string groupId, Action<TransportMessage> received, string topic) | |||
{ | |||
lock (Lock) | |||
{ | |||
if (_groupTopics.ContainsKey(groupId)) | |||
{ | |||
var topics = _groupTopics[groupId]; | |||
if (!topics.Item2.Contains(topic)) | |||
{ | |||
topics.Item2.Add(topic); | |||
} | |||
} | |||
else | |||
{ | |||
_groupTopics.Add(groupId, (received, new List<string> { topic })); | |||
} | |||
} | |||
} | |||
public void ClearSubscriber() | |||
{ | |||
_groupTopics.Clear(); | |||
} | |||
public void Send(string topic, TransportMessage message) | |||
{ | |||
Messages.Add(topic, message); | |||
foreach (var groupTopic in _groupTopics) | |||
{ | |||
if (groupTopic.Value.Item2.Contains(topic)) | |||
{ | |||
try | |||
{ | |||
groupTopic.Value.Item1?.Invoke(message); | |||
} | |||
catch (Exception e) | |||
{ | |||
_logger.LogError(e, $"Consumption message raises an exception. Group-->{groupTopic.Key} Name-->{topic}"); | |||
} | |||
} | |||
} | |||
} | |||
} | |||
} |
@@ -72,7 +72,7 @@ namespace DotNetCore.CAP.Test | |||
} | |||
} | |||
class HomeController | |||
public class HomeController | |||
{ | |||
} | |||
@@ -1,26 +0,0 @@ | |||
using System.Linq; | |||
using Xunit; | |||
namespace DotNetCore.CAP.Test | |||
{ | |||
public class OperateResultTest | |||
{ | |||
[Fact] | |||
public void VerifyDefaultConstructor() | |||
{ | |||
var result = new OperateResult(); | |||
Assert.False(result.Succeeded); | |||
Assert.Empty(result.Errors); | |||
} | |||
[Fact] | |||
public void NullFaildUsesEmptyErrors() | |||
{ | |||
var result = OperateResult.Failed(); | |||
Assert.False(result.Succeeded); | |||
Assert.Empty(result.Errors); | |||
} | |||
} | |||
} |