You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

68 lines
2.1 KiB

  1. using System;
  2. using System.Collections.Generic;
  3. using DotNetCore.CAP.Messages;
  4. using Microsoft.Extensions.Logging;
  5. namespace DotNetCore.CAP.Test.FakeInMemoryQueue
  6. {
  7. internal class InMemoryQueue
  8. {
  9. private readonly ILogger<InMemoryQueue> _logger;
  10. private static readonly object Lock = new object();
  11. private readonly Dictionary<string, (Action<TransportMessage>, List<string>)> _groupTopics;
  12. public Dictionary<string, TransportMessage> Messages { get; }
  13. public InMemoryQueue(ILogger<InMemoryQueue> logger)
  14. {
  15. _logger = logger;
  16. _groupTopics = new Dictionary<string, (Action<TransportMessage>, List<string>)>();
  17. Messages = new Dictionary<string, TransportMessage>();
  18. }
  19. public void Subscribe(string groupId, Action<TransportMessage> received, string topic)
  20. {
  21. lock (Lock)
  22. {
  23. if (_groupTopics.ContainsKey(groupId))
  24. {
  25. var topics = _groupTopics[groupId];
  26. if (!topics.Item2.Contains(topic))
  27. {
  28. topics.Item2.Add(topic);
  29. }
  30. }
  31. else
  32. {
  33. _groupTopics.Add(groupId, (received, new List<string> { topic }));
  34. }
  35. }
  36. }
  37. public void ClearSubscriber()
  38. {
  39. _groupTopics.Clear();
  40. }
  41. public void Send(string topic, TransportMessage message)
  42. {
  43. Messages.Add(topic, message);
  44. foreach (var groupTopic in _groupTopics)
  45. {
  46. if (groupTopic.Value.Item2.Contains(topic))
  47. {
  48. try
  49. {
  50. groupTopic.Value.Item1?.Invoke(message);
  51. }
  52. catch (Exception e)
  53. {
  54. _logger.LogError(e, $"Consumption message raises an exception. Group-->{groupTopic.Key} Name-->{topic}");
  55. }
  56. }
  57. }
  58. }
  59. }
  60. }