@@ -31,18 +31,18 @@ namespace DotNetCore.CAP.MongoDB | |||||
_logger.LogDebug( | _logger.LogDebug( | ||||
$"Collecting expired data from collection [{_options.PublishedCollection}]."); | $"Collecting expired data from collection [{_options.PublishedCollection}]."); | ||||
var publishedCollection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||||
var receivedCollection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); | |||||
var publishedCollection = _database.GetCollection<PublishedMessage>(_options.PublishedCollection); | |||||
var receivedCollection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection); | |||||
await publishedCollection.BulkWriteAsync(new[] | await publishedCollection.BulkWriteAsync(new[] | ||||
{ | { | ||||
new DeleteManyModel<CapPublishedMessage>( | |||||
Builders<CapPublishedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now)) | |||||
new DeleteManyModel<PublishedMessage>( | |||||
Builders<PublishedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now)) | |||||
}); | }); | ||||
await receivedCollection.BulkWriteAsync(new[] | await receivedCollection.BulkWriteAsync(new[] | ||||
{ | { | ||||
new DeleteManyModel<CapReceivedMessage>( | |||||
Builders<CapReceivedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now)) | |||||
new DeleteManyModel<ReceivedMessage>( | |||||
Builders<ReceivedMessage>.Filter.Lt(x => x.ExpiresAt, DateTime.Now)) | |||||
}); | }); | ||||
await context.WaitAsync(_waitingInterval); | await context.WaitAsync(_waitingInterval); | ||||
@@ -39,9 +39,9 @@ namespace DotNetCore.CAP.MongoDB | |||||
throw new ArgumentNullException(nameof(message)); | throw new ArgumentNullException(nameof(message)); | ||||
} | } | ||||
var collection = _database.GetCollection<CapPublishedMessage>(_options.PublishedCollection); | |||||
var collection = _database.GetCollection<PublishedMessage>(_options.PublishedCollection); | |||||
var updateDef = Builders<CapPublishedMessage>.Update | |||||
var updateDef = Builders<PublishedMessage>.Update | |||||
.Set(x => x.Retries, message.Retries) | .Set(x => x.Retries, message.Retries) | ||||
.Set(x => x.Content, message.Content) | .Set(x => x.Content, message.Content) | ||||
.Set(x => x.ExpiresAt, message.ExpiresAt) | .Set(x => x.ExpiresAt, message.ExpiresAt) | ||||
@@ -57,9 +57,9 @@ namespace DotNetCore.CAP.MongoDB | |||||
throw new ArgumentNullException(nameof(message)); | throw new ArgumentNullException(nameof(message)); | ||||
} | } | ||||
var collection = _database.GetCollection<CapReceivedMessage>(_options.ReceivedCollection); | |||||
var collection = _database.GetCollection<ReceivedMessage>(_options.ReceivedCollection); | |||||
var updateDef = Builders<CapReceivedMessage>.Update | |||||
var updateDef = Builders<ReceivedMessage>.Update | |||||
.Set(x => x.Retries, message.Retries) | .Set(x => x.Retries, message.Retries) | ||||
.Set(x => x.Content, message.Content) | .Set(x => x.Content, message.Content) | ||||
.Set(x => x.ExpiresAt, message.ExpiresAt) | .Set(x => x.ExpiresAt, message.ExpiresAt) | ||||