Browse Source

Refactoring to specification

master
Savorboard 6 years ago
parent
commit
b886cb3e9f
15 changed files with 122 additions and 111 deletions
  1. +1
    -1
      src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs
  2. +1
    -0
      src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs
  3. +7
    -7
      src/DotNetCore.CAP.MongoDB/CapPublisher.cs
  4. +12
    -5
      src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj
  5. +3
    -4
      src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs
  6. +44
    -30
      src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs
  7. +14
    -10
      src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs
  8. +14
    -19
      src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs
  9. +9
    -11
      src/DotNetCore.CAP.MongoDB/MongoDBStorageTransaction.cs
  10. +5
    -11
      src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs
  11. +3
    -3
      test/DotNetCore.CAP.MongoDB.Test/MongoDBMonitoringApiTest.cs
  12. +4
    -4
      test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageConnectionTest.cs
  13. +1
    -1
      test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageTest.cs
  14. +1
    -1
      test/DotNetCore.CAP.MongoDB.Test/MongoDBTest.cs
  15. +3
    -4
      test/DotNetCore.CAP.MongoDB.Test/MongoDBUtilTest.cs

+ 1
- 1
src/DotNetCore.CAP.MongoDB/CAP.MongoDBCapOptionsExtension.cs View File

@@ -7,7 +7,7 @@ namespace DotNetCore.CAP.MongoDB
{
public class MongoDBCapOptionsExtension : ICapOptionsExtension
{
private Action<MongoDBOptions> _configure;
private readonly Action<MongoDBOptions> _configure;

public MongoDBCapOptionsExtension(Action<MongoDBOptions> configure)
{


+ 1
- 0
src/DotNetCore.CAP.MongoDB/CAP.Options.Extensions.cs View File

@@ -2,6 +2,7 @@ using System;
using DotNetCore.CAP;
using DotNetCore.CAP.MongoDB;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class CapOptionsExtensions


+ 7
- 7
src/DotNetCore.CAP.MongoDB/CapPublisher.cs View File

@@ -12,16 +12,18 @@ namespace DotNetCore.CAP.MongoDB
{
public class CapPublisher : CapPublisherBase, ICallbackPublisher
{
private readonly IMongoClient _client;
private readonly MongoDBOptions _options;
private readonly IMongoDatabase _database;
private bool _isInTransaction = true;

public CapPublisher(ILogger<CapPublisherBase> logger, IDispatcher dispatcher,
IMongoClient client, MongoDBOptions options, IServiceProvider provider)
: base(logger, dispatcher)
public CapPublisher(
ILogger<CapPublisherBase> logger,
IDispatcher dispatcher,
IMongoClient client,
MongoDBOptions options,
IServiceProvider provider)
: base(logger, dispatcher)
{
_client = client;
_options = options;
_database = client.GetDatabase(_options.Database);
ServiceProvider = provider;
@@ -122,7 +124,6 @@ namespace DotNetCore.CAP.MongoDB
return message.Id;
}


private async Task PublishWithSessionAsync<T>(string name, T contentObj, IClientSessionHandle session, string callbackName)
{
Guid operationId = default(Guid);
@@ -159,7 +160,6 @@ namespace DotNetCore.CAP.MongoDB
throw;
}
}

private async Task<int> ExecuteAsync(IClientSessionHandle session, CapPublishedMessage message)
{
message.Id = await new MongoDBUtil().GetNextSequenceValueAsync(_database, _options.PublishedCollection, session);


+ 12
- 5
src/DotNetCore.CAP.MongoDB/DotNetCore.CAP.MongoDB.csproj View File

@@ -1,13 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>DotNetCore.CAP.MongoDB</AssemblyName>
<PackageTags>$(PackageTags);MongoDB</PackageTags>
</PropertyGroup>
<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.0\DotNetCore.CAP.MongoDB.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\DotNetCore.CAP\DotNetCore.CAP.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="MongoDB.Bson" Version="2.7.0" />
<PackageReference Include="MongoDB.Driver" Version="2.7.0" />


+ 3
- 4
src/DotNetCore.CAP.MongoDB/MongoDBCollectProcessor.cs View File

@@ -9,16 +9,15 @@ namespace DotNetCore.CAP.MongoDB
{
public class MongoDBCollectProcessor : ICollectProcessor
{
private readonly IMongoClient _client;
private readonly MongoDBOptions _options;
private readonly ILogger _logger;
private readonly IMongoDatabase _database;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);

public MongoDBCollectProcessor(IMongoClient client, MongoDBOptions options,
ILogger<MongoDBCollectProcessor> logger)
public MongoDBCollectProcessor(ILogger<MongoDBCollectProcessor> logger,
MongoDBOptions options,
IMongoClient client)
{
_client = client;
_options = options;
_logger = logger;
_database = client.GetDatabase(_options.Database);


+ 44
- 30
src/DotNetCore.CAP.MongoDB/MongoDBMonitoringApi.cs View File

@@ -12,16 +12,15 @@ namespace DotNetCore.CAP.MongoDB
{
public class MongoDBMonitoringApi : IMonitoringApi
{
private IMongoClient _client;
private MongoDBOptions _options;
private IMongoDatabase _database;
private readonly MongoDBOptions _options;
private readonly IMongoDatabase _database;

public MongoDBMonitoringApi(IMongoClient client, MongoDBOptions options)
{
_client = client ?? throw new ArgumentNullException(nameof(client));
var mongoClient = client ?? throw new ArgumentNullException(nameof(client));
_options = options ?? throw new ArgumentNullException(nameof(options));

_database = _client.GetDatabase(_options.Database);
_database = mongoClient.GetDatabase(_options.Database);
}

public StatisticsDto GetStatistics()
@@ -87,12 +86,12 @@ namespace DotNetCore.CAP.MongoDB
filter = filter & builder.Regex(x => x.Content, ".*" + queryDto.Content + ".*");
}

var result =
collection.Find(filter)
.SortByDescending(x => x.Added)
.Skip(queryDto.PageSize * queryDto.CurrentPage)
.Limit(queryDto.PageSize)
.ToList();
var result = collection
.Find(filter)
.SortByDescending(x => x.Added)
.Skip(queryDto.PageSize * queryDto.CurrentPage)
.Limit(queryDto.PageSize)
.ToList();

return result;
}
@@ -130,30 +129,45 @@ namespace DotNetCore.CAP.MongoDB
var endDate = DateTime.UtcNow;

var groupby = new BsonDocument {
{ "$group", new BsonDocument{
{ "_id", new BsonDocument {
{ "Key", new BsonDocument {
{ "$dateToString", new BsonDocument {
{ "format", "%Y-%m-%d %H:00:00"},
{ "date", "$Added"}
}}
}}
{
"$group", new BsonDocument {
{ "_id", new BsonDocument {
{ "Key", new BsonDocument{
{ "$dateToString", new BsonDocument {
{ "format", "%Y-%m-%d %H:00:00"},
{ "date", "$Added"}
}
}
}
}
}
},
{ "Count", new BsonDocument{{ "$sum", 1}}}
}
}
};

var match = new BsonDocument {
{ "$match", new BsonDocument {
{ "Added", new BsonDocument
{
{ "$gt", endDate.AddHours(-24) }
}
},
{ "StatusName",
new BsonDocument
{
{ "$eq", statusName}
}
}
},
{ "Count", new BsonDocument{
{ "$sum", 1}
}}
}}
}
}
};

var match = new BsonDocument { { "$match", new BsonDocument {
{ "Added", new BsonDocument { { "$gt", endDate.AddHours(-24) } } },
{ "StatusName", new BsonDocument { { "$eq", statusName} }
} } } };
var pipeline = new BsonDocument[] { match, groupby };
var pipeline = new[] { match, groupby };

var collection = _database.GetCollection<BsonDocument>(collectionName);
var result = collection.Aggregate<BsonDocument>(pipeline: pipeline).ToList();
var result = collection.Aggregate<BsonDocument>(pipeline).ToList();

var dic = new Dictionary<DateTime, int>();
for (var i = 0; i < 24; i++)


+ 14
- 10
src/DotNetCore.CAP.MongoDB/MongoDBStorage.cs View File

@@ -16,9 +16,9 @@ namespace DotNetCore.CAP.MongoDB
private readonly ILogger<MongoDBStorage> _logger;

public MongoDBStorage(CapOptions capOptions,
MongoDBOptions options,
IMongoClient client,
ILogger<MongoDBStorage> logger)
MongoDBOptions options,
IMongoClient client,
ILogger<MongoDBStorage> logger)
{
_capOptions = capOptions;
_options = options;
@@ -44,26 +44,30 @@ namespace DotNetCore.CAP.MongoDB
}

var database = _client.GetDatabase(_options.Database);
var names = (await database.ListCollectionNamesAsync())?.ToList();
var names = (await database.ListCollectionNamesAsync(cancellationToken: cancellationToken))?.ToList();

if (!names.Any(n => n == _options.ReceivedCollection))
{
await database.CreateCollectionAsync(_options.ReceivedCollection);
await database.CreateCollectionAsync(_options.ReceivedCollection, cancellationToken: cancellationToken);
}
if (!names.Any(n => n == _options.PublishedCollection))

if (names.All(n => n != _options.PublishedCollection))
{
await database.CreateCollectionAsync(_options.PublishedCollection);
await database.CreateCollectionAsync(_options.PublishedCollection, cancellationToken: cancellationToken);
}
if (!names.Any(n => n == "Counter"))

if (names.All(n => n != "Counter"))
{
await database.CreateCollectionAsync("Counter");
await database.CreateCollectionAsync("Counter", cancellationToken: cancellationToken);
var collection = database.GetCollection<BsonDocument>("Counter");
await collection.InsertManyAsync(new BsonDocument[]
{
new BsonDocument{{"_id", _options.PublishedCollection}, {"sequence_value", 0}},
new BsonDocument{{"_id", _options.ReceivedCollection}, {"sequence_value", 0}}
});
}, cancellationToken: cancellationToken);
}

_logger.LogDebug("Ensuring all create database tables script are applied.");
}
}
}

+ 14
- 19
src/DotNetCore.CAP.MongoDB/MongoDBStorageConnection.cs View File

@@ -10,8 +10,8 @@ namespace DotNetCore.CAP.MongoDB
{
public class MongoDBStorageConnection : IStorageConnection
{
private CapOptions _capOptions;
private MongoDBOptions _options;
private readonly CapOptions _capOptions;
private readonly MongoDBOptions _options;
private readonly IMongoClient _client;
private readonly IMongoDatabase _database;

@@ -58,10 +58,6 @@ namespace DotNetCore.CAP.MongoDB
return new MongoDBStorageTransaction(_client, _options);
}

public void Dispose()
{
}

public async Task<CapPublishedMessage> GetPublishedMessageAsync(int id)
{
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
@@ -72,13 +68,10 @@ namespace DotNetCore.CAP.MongoDB
{
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);
return await
collection.Find(x =>
x.Retries < _capOptions.FailedRetryCount
&& x.Added < fourMinsAgo
&& (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
.Limit(200)
.ToListAsync();
return await collection
.Find(x => x.Retries < _capOptions.FailedRetryCount && x.Added < fourMinsAgo && (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
.Limit(200)
.ToListAsync();
}

public async Task<CapReceivedMessage> GetReceivedMessageAsync(int id)
@@ -92,12 +85,10 @@ namespace DotNetCore.CAP.MongoDB
var fourMinsAgo = DateTime.Now.AddMinutes(-4);
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);

return await
collection.Find(x =>
x.Retries < _capOptions.FailedRetryCount
&& x.Added < fourMinsAgo
&& (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled)
).Limit(200).ToListAsync();
return await collection
.Find(x => x.Retries < _capOptions.FailedRetryCount && x.Added < fourMinsAgo && (x.StatusName == StatusName.Failed || x.StatusName == StatusName.Scheduled))
.Limit(200)
.ToListAsync();
}

public async Task<int> StoreReceivedMessageAsync(CapReceivedMessage message)
@@ -114,5 +105,9 @@ namespace DotNetCore.CAP.MongoDB

return message.Id;
}
public void Dispose()
{

}
}
}

+ 9
- 11
src/DotNetCore.CAP.MongoDB/MongoDBStorageTransaction.cs View File

@@ -7,17 +7,15 @@ namespace DotNetCore.CAP.MongoDB
{
internal class MongoDBStorageTransaction : IStorageTransaction
{
private IMongoClient _client;
private readonly MongoDBOptions _options;
private readonly IMongoDatabase _database;
private readonly IClientSessionHandle _session;

public MongoDBStorageTransaction(IMongoClient client, MongoDBOptions options)
{
_client = client;
_options = options;
_database = client.GetDatabase(options.Database);
_session = _client.StartSession();
_session = client.StartSession();
_session.StartTransaction();
}

@@ -41,10 +39,10 @@ namespace DotNetCore.CAP.MongoDB
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection);

var updateDef = Builders<CapPublishedMessage>.Update
.Set(x => x.Retries, message.Retries)
.Set(x => x.Content, message.Content)
.Set(x => x.ExpiresAt, message.ExpiresAt)
.Set(x => x.StatusName, message.StatusName);
.Set(x => x.Retries, message.Retries)
.Set(x => x.Content, message.Content)
.Set(x => x.ExpiresAt, message.ExpiresAt)
.Set(x => x.StatusName, message.StatusName);

collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef);
}
@@ -59,10 +57,10 @@ namespace DotNetCore.CAP.MongoDB
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection);

var updateDef = Builders<CapReceivedMessage>.Update
.Set(x => x.Retries, message.Retries)
.Set(x => x.Content, message.Content)
.Set(x => x.ExpiresAt, message.ExpiresAt)
.Set(x => x.StatusName, message.StatusName);
.Set(x => x.Retries, message.Retries)
.Set(x => x.Content, message.Content)
.Set(x => x.ExpiresAt, message.ExpiresAt)
.Set(x => x.StatusName, message.StatusName);

collection.FindOneAndUpdate(_session, x => x.Id == message.Id, updateDef);
}


+ 5
- 11
src/DotNetCore.CAP.MongoDB/MongoDBUtil.cs View File

@@ -5,9 +5,9 @@ using MongoDB.Driver;

namespace DotNetCore.CAP.MongoDB
{
public class MongoDBUtil
internal class MongoDBUtil
{
FindOneAndUpdateOptions<BsonDocument> _options = new FindOneAndUpdateOptions<BsonDocument>()
readonly FindOneAndUpdateOptions<BsonDocument> _options = new FindOneAndUpdateOptions<BsonDocument>()
{
ReturnDocument = ReturnDocument.After
};
@@ -43,15 +43,9 @@ namespace DotNetCore.CAP.MongoDB
var filter = new BsonDocument { { "_id", collectionName } };
var updateDef = Builders<BsonDocument>.Update.Inc("sequence_value", 1);

BsonDocument result;
if (session == null)
{
result = collection.FindOneAndUpdate(filter, updateDef, _options);
}
else
{
result = collection.FindOneAndUpdate(session, filter, updateDef, _options);
}
var result = session == null
? collection.FindOneAndUpdate(filter, updateDef, _options)
: collection.FindOneAndUpdate(session, filter, updateDef, _options);

if (result.TryGetValue("sequence_value", out var value))
{


+ 3
- 3
test/DotNetCore.CAP.MongoDB.Test/MongoDBMonitoringApiTest.cs View File

@@ -12,9 +12,9 @@ namespace DotNetCore.CAP.MongoDB.Test
{
public class MongoDBMonitoringApiTest
{
private MongoClient _client;
private MongoDBOptions _options;
private MongoDBMonitoringApi _api;
private readonly MongoClient _client;
private readonly MongoDBOptions _options;
private readonly MongoDBMonitoringApi _api;

public MongoDBMonitoringApiTest()
{


+ 4
- 4
test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageConnectionTest.cs View File

@@ -12,10 +12,10 @@ namespace DotNetCore.CAP.MongoDB.Test
[TestCaseOrderer(PriorityOrderer.Name, PriorityOrderer.Assembly)]
public class MongoDBStorageConnectionTest
{
private MongoClient _client;
private MongoDBOptions _options;
private MongoDBStorage _storage;
private IStorageConnection _connection;
private readonly MongoClient _client;
private readonly MongoDBOptions _options;
private readonly MongoDBStorage _storage;
private readonly IStorageConnection _connection;

public MongoDBStorageConnectionTest()
{


+ 1
- 1
test/DotNetCore.CAP.MongoDB.Test/MongoDBStorageTest.cs View File

@@ -9,7 +9,7 @@ namespace DotNetCore.CAP.MongoDB.Test
{
public class MongoDBStorageTest
{
private MongoClient _client;
private readonly MongoClient _client;

public MongoDBStorageTest()
{


+ 1
- 1
test/DotNetCore.CAP.MongoDB.Test/MongoDBTest.cs View File

@@ -8,7 +8,7 @@ namespace DotNetCore.CAP.MongoDB.Test
{
public class MongoDBTest
{
private MongoClient _client;
private readonly MongoClient _client;

public MongoDBTest()
{


+ 3
- 4
test/DotNetCore.CAP.MongoDB.Test/MongoDBUtilTest.cs View File

@@ -11,17 +11,16 @@ namespace DotNetCore.CAP.MongoDB.Test
{
public class MongoDBUtilTest
{
private readonly MongoClient _client;
private readonly IMongoDatabase _database;
string _recieved = "ReceivedTest";

public MongoDBUtilTest()
{
_client = new MongoClient(ConnectionUtil.ConnectionString);
_database = _client.GetDatabase("CAP_Test");
var client = new MongoClient(ConnectionUtil.ConnectionString);
_database = client.GetDatabase("CAP_Test");

//Initialize MongoDB
if (!_database.ListCollectionNames().ToList().Any(x => x == "Counter"))
if (_database.ListCollectionNames().ToList().All(x => x != "Counter"))
{
var collection = _database.GetCollection<BsonDocument>("Counter");
collection.InsertOne(new BsonDocument { { "_id", _recieved }, { "sequence_value", 0 } });


Loading…
Cancel
Save