From 0476ed438f2152c97e320ec338374f1fe2891aff Mon Sep 17 00:00:00 2001 From: yangxiaodong Date: Fri, 26 May 2017 18:33:18 +0800 Subject: [PATCH] add rabbitmq proejct --- .../Cap.Consistency.RabbitMQ.csproj | 16 +++++ .../RabbitMQConsumerClient.cs | 72 +++++++++++++++++++ .../RabbitMQConsumerClientFactory.cs | 14 ++++ .../RabbitMQTopicAttribute.cs | 15 ++++ 4 files changed, 117 insertions(+) create mode 100644 src/Cap.Consistency.RabbitMQ/Cap.Consistency.RabbitMQ.csproj create mode 100644 src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClient.cs create mode 100644 src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClientFactory.cs create mode 100644 src/Cap.Consistency.RabbitMQ/RabbitMQTopicAttribute.cs diff --git a/src/Cap.Consistency.RabbitMQ/Cap.Consistency.RabbitMQ.csproj b/src/Cap.Consistency.RabbitMQ/Cap.Consistency.RabbitMQ.csproj new file mode 100644 index 0000000..b0de771 --- /dev/null +++ b/src/Cap.Consistency.RabbitMQ/Cap.Consistency.RabbitMQ.csproj @@ -0,0 +1,16 @@ + + + + netstandard1.6 + 1.6.1 + + + + + + + + + + + \ No newline at end of file diff --git a/src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClient.cs b/src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClient.cs new file mode 100644 index 0000000..a0c2949 --- /dev/null +++ b/src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClient.cs @@ -0,0 +1,72 @@ +using System; +using System.Text; +using System.Threading.Tasks; +using Cap.Consistency.Consumer; +using Cap.Consistency.Infrastructure; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace Cap.Consistency.RabbitMQ +{ + public class RabbitMQConsumerClient : IConsumerClient + { + public const string TYPE = "topic"; + + private readonly string _exchange; + private readonly string _hostName; + + private IConnectionFactory _connectionFactory; + private IConnection _connection; + private IModel _channel; + + private string _queueName; + + public event EventHandler MessageReceieved; + + public RabbitMQConsumerClient(string exchange, string hostName) { + _exchange = exchange; + _hostName = hostName; + + InitClient(); + } + + private void InitClient() { + _connectionFactory = new ConnectionFactory { HostName = _hostName }; + _connection = _connectionFactory.CreateConnection(); + _channel = _connection.CreateModel(); + _channel.ExchangeDeclare(exchange: _exchange, type: TYPE); + _queueName = _channel.QueueDeclare().QueueName; + } + + public void Listening(TimeSpan timeout) { + + // Task.Delay(timeout).Wait(); + + var consumer = new EventingBasicConsumer(_channel); + consumer.Received += OnConsumerReceived; + _channel.BasicConsume(_queueName, true, consumer); + } + + public void Subscribe(string topic) { + _channel.QueueBind(_queueName, _exchange, topic); + } + + public void Subscribe(string topic, int partition) { + _channel.QueueBind(_queueName, _exchange, topic); + } + + public void Dispose() { + _channel.Dispose(); + _connection.Dispose(); + } + + private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) { + var message = new DeliverMessage { + MessageKey = e.RoutingKey, + Body = e.Body, + Value = Encoding.UTF8.GetString(e.Body) + }; + MessageReceieved?.Invoke(sender, message); + } + } +} diff --git a/src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClientFactory.cs b/src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClientFactory.cs new file mode 100644 index 0000000..d66f6a7 --- /dev/null +++ b/src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClientFactory.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Cap.Consistency.Consumer; + +namespace Cap.Consistency.RabbitMQ +{ + public class RabbitMQConsumerClientFactory : IConsumerClientFactory + { + public IConsumerClient Create(string groupId, string clientHostAddress) { + return new RabbitMQConsumerClient(groupId, clientHostAddress); + } + } +} diff --git a/src/Cap.Consistency.RabbitMQ/RabbitMQTopicAttribute.cs b/src/Cap.Consistency.RabbitMQ/RabbitMQTopicAttribute.cs new file mode 100644 index 0000000..62f5e3b --- /dev/null +++ b/src/Cap.Consistency.RabbitMQ/RabbitMQTopicAttribute.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Cap.Consistency.Abstractions; + +namespace Cap.Consistency.RabbitMQ +{ + public class RabbitMQTopicAttribute : TopicAttribute + { + + public RabbitMQTopicAttribute(string routingKey) : base(routingKey) { + + } + } +}