Browse Source

add rabbitmq proejct

master
yangxiaodong 7 years ago
parent
commit
0476ed438f
4 changed files with 117 additions and 0 deletions
  1. +16
    -0
      src/Cap.Consistency.RabbitMQ/Cap.Consistency.RabbitMQ.csproj
  2. +72
    -0
      src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClient.cs
  3. +14
    -0
      src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClientFactory.cs
  4. +15
    -0
      src/Cap.Consistency.RabbitMQ/RabbitMQTopicAttribute.cs

+ 16
- 0
src/Cap.Consistency.RabbitMQ/Cap.Consistency.RabbitMQ.csproj View File

@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard1.6</TargetFramework>
<NetStandardImplicitPackageVersion>1.6.1</NetStandardImplicitPackageVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="5.0.1-rc1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Cap.Consistency\Cap.Consistency.csproj" />
</ItemGroup>

</Project>

+ 72
- 0
src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClient.cs View File

@@ -0,0 +1,72 @@
using System;
using System.Text;
using System.Threading.Tasks;
using Cap.Consistency.Consumer;
using Cap.Consistency.Infrastructure;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Cap.Consistency.RabbitMQ
{
public class RabbitMQConsumerClient : IConsumerClient
{
public const string TYPE = "topic";

private readonly string _exchange;
private readonly string _hostName;

private IConnectionFactory _connectionFactory;
private IConnection _connection;
private IModel _channel;

private string _queueName;

public event EventHandler<DeliverMessage> MessageReceieved;

public RabbitMQConsumerClient(string exchange, string hostName) {
_exchange = exchange;
_hostName = hostName;

InitClient();
}

private void InitClient() {
_connectionFactory = new ConnectionFactory { HostName = _hostName };
_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: _exchange, type: TYPE);
_queueName = _channel.QueueDeclare().QueueName;
}

public void Listening(TimeSpan timeout) {

// Task.Delay(timeout).Wait();

var consumer = new EventingBasicConsumer(_channel);
consumer.Received += OnConsumerReceived;
_channel.BasicConsume(_queueName, true, consumer);
}

public void Subscribe(string topic) {
_channel.QueueBind(_queueName, _exchange, topic);
}

public void Subscribe(string topic, int partition) {
_channel.QueueBind(_queueName, _exchange, topic);
}

public void Dispose() {
_channel.Dispose();
_connection.Dispose();
}

private void OnConsumerReceived(object sender, BasicDeliverEventArgs e) {
var message = new DeliverMessage {
MessageKey = e.RoutingKey,
Body = e.Body,
Value = Encoding.UTF8.GetString(e.Body)
};
MessageReceieved?.Invoke(sender, message);
}
}
}

+ 14
- 0
src/Cap.Consistency.RabbitMQ/RabbitMQConsumerClientFactory.cs View File

@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Consumer;

namespace Cap.Consistency.RabbitMQ
{
public class RabbitMQConsumerClientFactory : IConsumerClientFactory
{
public IConsumerClient Create(string groupId, string clientHostAddress) {
return new RabbitMQConsumerClient(groupId, clientHostAddress);
}
}
}

+ 15
- 0
src/Cap.Consistency.RabbitMQ/RabbitMQTopicAttribute.cs View File

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;
using Cap.Consistency.Abstractions;

namespace Cap.Consistency.RabbitMQ
{
public class RabbitMQTopicAttribute : TopicAttribute
{

public RabbitMQTopicAttribute(string routingKey) : base(routingKey) {

}
}
}

Loading…
Cancel
Save