Browse Source

refactor

master
Savorboard 6 years ago
parent
commit
362b6f5219
1 changed files with 63 additions and 194 deletions
  1. +63
    -194
      src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs

+ 63
- 194
src/DotNetCore.CAP/Abstractions/CapPublisherBase.cs View File

@@ -2,262 +2,131 @@
// Licensed under the MIT License. See License.txt in the project root for license information. // Licensed under the MIT License. See License.txt in the project root for license information.


using System; using System;
using System.Data;
using System.Diagnostics; using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using DotNetCore.CAP.Diagnostics; using DotNetCore.CAP.Diagnostics;
using DotNetCore.CAP.Infrastructure; using DotNetCore.CAP.Infrastructure;
using DotNetCore.CAP.Models; using DotNetCore.CAP.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;


namespace DotNetCore.CAP.Abstractions namespace DotNetCore.CAP.Abstractions
{ {
public abstract class CapPublisherBase : ICapPublisher, IDisposable
public abstract class CapPublisherBase : ICapPublisher
{ {
private readonly IDispatcher _dispatcher;
protected readonly ILogger _logger;
private readonly CapTransactionBase _capTransaction;
private readonly IMessagePacker _msgPacker;
private readonly IContentSerializer _serializer;

protected bool NotUseTransaction;


// diagnostics listener
// ReSharper disable once InconsistentNaming // ReSharper disable once InconsistentNaming
protected static readonly DiagnosticListener s_diagnosticListener = protected static readonly DiagnosticListener s_diagnosticListener =
new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName); new DiagnosticListener(CapDiagnosticListenerExtensions.DiagnosticListenerName);


protected CapPublisherBase(ILogger<CapPublisherBase> logger, IDispatcher dispatcher)
{
_logger = logger;
_dispatcher = dispatcher;
}

protected IDbConnection DbConnection { get; set; }
protected IDbTransaction DbTransaction { get; set; }
protected bool IsCapOpenedTrans { get; set; }
protected bool IsCapOpenedConn { get; set; }
protected bool IsUsingEF { get; set; }
protected IServiceProvider ServiceProvider { get; set; }

public void Publish<T>(string name, T contentObj, string callbackName = null)
protected CapPublisherBase(IServiceProvider service)
{ {
CheckIsUsingEF(name);
PrepareConnectionForEF();

PublishWithTrans(name, contentObj, callbackName);
}

public Task PublishAsync<T>(string name, T contentObj, string callbackName = null)
{
CheckIsUsingEF(name);
PrepareConnectionForEF();

return PublishWithTransAsync(name, contentObj, callbackName);
}

public void Publish<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null)
{
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbTransaction);

PublishWithTrans(name, contentObj, callbackName);
ServiceProvider = service;
_capTransaction = service.GetRequiredService<CapTransactionBase>();
_msgPacker = service.GetRequiredService<IMessagePacker>();
_serializer = service.GetRequiredService<IContentSerializer>();
} }


public Task PublishAsync<T>(string name, T contentObj, IDbTransaction dbTransaction, string callbackName = null)
{
CheckIsAdoNet(name);
PrepareConnectionForAdo(dbTransaction);

return PublishWithTransAsync(name, contentObj, callbackName);
}

public virtual void PublishWithMongo<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{
throw new NotImplementedException("Work for MongoDB only.");
}

public virtual Task PublishWithMongoAsync<T>(string name, T contentObj, IMongoTransaction mongoTransaction = null, string callbackName = null)
{
throw new NotImplementedException("Work for MongoDB only.");
}

protected void Enqueue(CapPublishedMessage message)
{
_dispatcher.EnqueueToPublish(message);
}

protected abstract void PrepareConnectionForEF();

protected abstract int Execute(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message);
protected IServiceProvider ServiceProvider { get; }


protected abstract Task<int> ExecuteAsync(IDbConnection dbConnection, IDbTransaction dbTransaction,
CapPublishedMessage message);
public ICapTransaction CapTransaction => _capTransaction;


protected virtual string Serialize<T>(T obj, string callbackName = null)
public void Publish<T>(string name, T contentObj, string callbackName = null)
{ {
var packer = (IMessagePacker)ServiceProvider.GetService(typeof(IMessagePacker));
string content;
if (obj != null)
{
if (Helper.IsComplexType(obj.GetType()))
{
var serializer = (IContentSerializer)ServiceProvider.GetService(typeof(IContentSerializer));
content = serializer.Serialize(obj);
}
else
{
content = obj.ToString();
}
}
else
{
content = string.Empty;
}

var message = new CapMessageDto(content)
var message = new CapPublishedMessage
{ {
CallbackName = callbackName
Name = name,
Content = Serialize(contentObj, callbackName),
StatusName = StatusName.Scheduled
}; };
return packer.Pack(message);
}

#region private methods


private void PrepareConnectionForAdo(IDbTransaction dbTransaction)
{
DbTransaction = dbTransaction ?? throw new ArgumentNullException(nameof(dbTransaction));
DbConnection = DbTransaction.Connection;
if (DbConnection.State != ConnectionState.Open)
{
IsCapOpenedConn = true;
DbConnection.Open();
}
PublishAsyncInternal(message).GetAwaiter().GetResult();
} }


private void CheckIsUsingEF(string name)
public async Task PublishAsync<T>(string name, T contentObj, string callbackName = null,
CancellationToken cancellationToken = default(CancellationToken))
{ {
if (name == null)
var message = new CapPublishedMessage
{ {
throw new ArgumentNullException(nameof(name));
}
Name = name,
Content = Serialize(contentObj, callbackName),
StatusName = StatusName.Scheduled
};


if (!IsUsingEF)
{
throw new InvalidOperationException(
"If you are using the EntityFramework, you need to configure the DbContextType first." +
" otherwise you need to use overloaded method with IDbTransaction.");
}
await PublishAsyncInternal(message);
} }


private void CheckIsAdoNet(string name)
protected async Task PublishAsyncInternal(CapPublishedMessage message)
{ {
if (name == null)
{
throw new ArgumentNullException(nameof(name));
}

if (IsUsingEF)
if (CapTransaction.DbTransaction == null)
{ {
throw new InvalidOperationException(
"If you are using the EntityFramework, you do not need to use this overloaded.");
NotUseTransaction = true;
CapTransaction.DbTransaction = GetDbTransaction();
} }
}


private async Task PublishWithTransAsync<T>(string name, T contentObj, string callbackName = null)
{
Guid operationId = default(Guid); Guid operationId = default(Guid);
var content = Serialize(contentObj, callbackName);

var message = new CapPublishedMessage
{
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};


try try
{ {
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message); operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);


var id = await ExecuteAsync(DbConnection, DbTransaction, message);

ClosedCap();
message.Id = await ExecuteAsync(message, CapTransaction);


if (id > 0)
if (message.Id > 0)
{ {
_logger.LogInformation($"message [{message}] has been persisted in the database.");
_capTransaction.AddToSent(message);
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message); s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
}


message.Id = id;
Enqueue(message);
if (NotUseTransaction || CapTransaction.AutoCommit)
{
_capTransaction.Commit();
} }
} }
catch (Exception e) catch (Exception e)
{ {
_logger.LogError(e, "An exception was occurred when publish message async. exception message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e); s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw; throw;
} }
}

private void PublishWithTrans<T>(string name, T contentObj, string callbackName = null)
{
Guid operationId = default(Guid);

var content = Serialize(contentObj, callbackName);

var message = new CapPublishedMessage
finally
{ {
Name = name,
Content = content,
StatusName = StatusName.Scheduled
};

try
{
Console.WriteLine("================22222222222222=====================");
operationId = s_diagnosticListener.WritePublishMessageStoreBefore(message);

var id = Execute(DbConnection, DbTransaction, message);
Console.WriteLine("================777777777777777777777=====================");
ClosedCap();

if (id > 0)
if (NotUseTransaction || CapTransaction.AutoCommit)
{ {
_logger.LogInformation($"message [{message}] has been persisted in the database.");
s_diagnosticListener.WritePublishMessageStoreAfter(operationId, message);
message.Id = id;
Enqueue(message);
_capTransaction.Dispose();
} }
} }
catch (Exception e)
{
_logger.LogError(e, "An exception was occurred when publish message. message:" + name);
s_diagnosticListener.WritePublishMessageStoreError(operationId, message, e);
Console.WriteLine(e);
throw;
}
} }


private void ClosedCap()
protected abstract object GetDbTransaction();

protected abstract Task<int> ExecuteAsync(CapPublishedMessage message,
ICapTransaction transaction,
CancellationToken cancel = default(CancellationToken));

protected virtual string Serialize<T>(T obj, string callbackName = null)
{ {
if (IsCapOpenedTrans)
string content;
if (obj != null)
{ {
DbTransaction.Commit();
DbTransaction.Dispose();
content = Helper.IsComplexType(obj.GetType())
? _serializer.Serialize(obj)
: obj.ToString();
} }

if (IsCapOpenedConn)
else
{ {
DbConnection.Dispose();
content = string.Empty;
} }
var message = new CapMessageDto(content)
{
CallbackName = callbackName
};
return _msgPacker.Pack(message);
} }

public void Dispose()
{
DbTransaction?.Dispose();
DbConnection?.Dispose();
}

#endregion private methods
} }
} }

Loading…
Cancel
Save