diff --git a/src/Cap.Consistency.RabbitMQ/Cap.Consistency.RabbitMQ.csproj b/src/Cap.Consistency.RabbitMQ/Cap.Consistency.RabbitMQ.csproj
new file mode 100644
index 0000000..b0de771
--- /dev/null
+++ b/src/Cap.Consistency.RabbitMQ/Cap.Consistency.RabbitMQ.csproj
@@ -0,0 +1,16 @@
+
+
+
+ netstandard1.6
+ 1.6.1
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClient.cs b/src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClient.cs
new file mode 100644
index 0000000..a0c2949
--- /dev/null
+++ b/src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClient.cs
@@ -0,0 +1,72 @@
+using System;
+using System.Text;
+using System.Threading.Tasks;
+using Cap.Consistency.Consumer;
+using Cap.Consistency.Infrastructure;
+using RabbitMQ.Client;
+using RabbitMQ.Client.Events;
+
+namespace Cap.Consistency.RabbitMQ
+{
+ public class RabbitMQConsumerClient : IConsumerClient
+ {
+ public const string TYPE = "topic";
+
+ private readonly string _exchange;
+ private readonly string _hostName;
+
+ private IConnectionFactory _connectionFactory;
+ private IConnection _connection;
+ private IModel _channel;
+
+ private string _queueName;
+
+ public event EventHandler MessageReceieved;
+
+ public RabbitMQConsumerClient(string exchange, string hostName) {
+ _exchange = exchange;
+ _hostName = hostName;
+
+ InitClient();
+ }
+
+ private void InitClient() {
+ _connectionFactory = new ConnectionFactory { HostName = _hostName };
+ _connection = _connectionFactory.CreateConnection();
+ _channel = _connection.CreateModel();
+ _channel.ExchangeDeclare(exchange: _exchange, type: TYPE);
+ _queueName = _channel.QueueDeclare().QueueName;
+ }
+
+ public void Listening(TimeSpan timeout) {
+
+ // Task.Delay(timeout).Wait();
+
+ var consumer = new EventingBasicConsumer(_channel);
+ consumer.Received += OnConsumerReceived;
+ _channel.BasicConsume(_queueName, true, consumer);
+ }
+
+ public void Subscribe(string topic) {
+ _channel.QueueBind(_queueName, _exchange, topic);
+ }
+
+ public void Subscribe(string topic, int partition) {
+ _channel.QueueBind(_queueName, _exchange, topic);
+ }
+
+ public void Dispose() {
+ _channel.Dispose();
+ _connection.Dispose();
+ }
+
+ private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) {
+ var message = new DeliverMessage {
+ MessageKey = e.RoutingKey,
+ Body = e.Body,
+ Value = Encoding.UTF8.GetString(e.Body)
+ };
+ MessageReceieved?.Invoke(sender, message);
+ }
+ }
+}
diff --git a/src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClientFactory.cs b/src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClientFactory.cs
new file mode 100644
index 0000000..d66f6a7
--- /dev/null
+++ b/src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClientFactory.cs
@@ -0,0 +1,14 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Cap.Consistency.Consumer;
+
+namespace Cap.Consistency.RabbitMQ
+{
+ public class RabbitMQConsumerClientFactory : IConsumerClientFactory
+ {
+ public IConsumerClient Create(string groupId, string clientHostAddress) {
+ return new RabbitMQConsumerClient(groupId, clientHostAddress);
+ }
+ }
+}
diff --git a/src/Cap.Consistency.RabbitMQ/RabbitMQTopicAttribute.cs b/src/Cap.Consistency.RabbitMQ/RabbitMQTopicAttribute.cs
new file mode 100644
index 0000000..62f5e3b
--- /dev/null
+++ b/src/Cap.Consistency.RabbitMQ/RabbitMQTopicAttribute.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Cap.Consistency.Abstractions;
+
+namespace Cap.Consistency.RabbitMQ
+{
+ public class RabbitMQTopicAttribute : TopicAttribute
+ {
+
+ public RabbitMQTopicAttribute(string routingKey) : base(routingKey) {
+
+ }
+ }
+}