Browse Source

Code cleanup

master
Savorboard 5 years ago
parent
commit
82416af83a
9 changed files with 15 additions and 542 deletions
  1. +2
    -2
      src/DotNetCore.CAP/Diagnostics/CapDiagnosticListenerNames.cs
  2. +0
    -378
      src/DotNetCore.CAP/Internal/CapCache.cs
  3. +3
    -10
      src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs
  4. +0
    -85
      src/DotNetCore.CAP/Internal/HashCodeCombiner.cs
  5. +1
    -1
      src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs
  6. +5
    -4
      src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs
  7. +0
    -21
      src/DotNetCore.CAP/Internal/IMongoTransaction.cs
  8. +4
    -2
      src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs
  9. +0
    -39
      src/DotNetCore.CAP/Internal/WaitHandleEx.cs

+ 2
- 2
src/DotNetCore.CAP/Diagnostics/CapDiagnosticListenerNames.cs View File

@@ -8,10 +8,10 @@ namespace DotNetCore.CAP.Diagnostics
/// </summary>
public static class CapDiagnosticListenerNames
{
public const string DiagnosticListenerName = "CapDiagnosticListener";

private const string CapPrefix = "DotNetCore.CAP.";

public const string DiagnosticListenerName = "CapDiagnosticListener";

public const string BeforePublishMessageStore = CapPrefix + "WritePublishMessageStoreBefore";
public const string AfterPublishMessageStore = CapPrefix + "WritePublishMessageStoreAfter";
public const string ErrorPublishMessageStore = CapPrefix + "WritePublishMessageStoreError";


+ 0
- 378
src/DotNetCore.CAP/Internal/CapCache.cs View File

@@ -1,378 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace DotNetCore.CAP.Internal
{
#region Cache<T> class

/// <summary>
/// This is a generic cache subsystem based on key/value pairs, where key is generic, too. Key must be unique.
/// Every cache entry has its own timeout.
/// Cache is thread safe and will delete expired entries on its own using System.Threading.Timers (which run on
/// <see cref="ThreadPool" /> threads).
/// </summary>
// ReSharper disable once InheritdocConsiderUsage
// ReSharper disable once InconsistentNaming
internal class Cache<K, T> : IDisposable
{
#region Constructor and class members

private readonly Dictionary<K, T> _cache = new Dictionary<K, T>();
private readonly Dictionary<K, Timer> _timers = new Dictionary<K, Timer>();
private readonly ReaderWriterLockSlim _locker = new ReaderWriterLockSlim();

#endregion

#region IDisposable implementation & Clear

private bool disposed;

/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

/// <summary>
/// Releases unmanaged and - optionally - managed resources.
/// </summary>
/// <param name="disposing">
/// <c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.
/// </param>
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
disposed = true;

if (disposing)
{
// Dispose managed resources.
Clear();
_locker.Dispose();
}

// Dispose unmanaged resources
}
}

/// <summary>
/// Clears the entire cache and disposes all active timers.
/// </summary>
public void Clear()
{
_locker.EnterWriteLock();
try
{
try
{
foreach (var t in _timers.Values)
{
t.Dispose();
}
}
catch
{
}

_timers.Clear();
_cache.Clear();
}
finally
{
_locker.ExitWriteLock();
}
}

#endregion

#region CheckTimer

// Checks whether a specific timer already exists and adds a new one, if not
private void CheckTimer(K key, TimeSpan? cacheTimeout, bool restartTimerIfExists)
{
Timer timer;

if (_timers.TryGetValue(key, out timer))
{
if (restartTimerIfExists)
{
timer.Change(
cacheTimeout ?? Timeout.InfiniteTimeSpan,
Timeout.InfiniteTimeSpan);
}
}
else
{
_timers.Add(
key,
new Timer(
RemoveByTimer,
key,
cacheTimeout ?? Timeout.InfiniteTimeSpan,
Timeout.InfiniteTimeSpan));
}
}

private void RemoveByTimer(object state)
{
Remove((K) state);
}

#endregion

#region AddOrUpdate, Get, Remove, Exists, Clear

/// <summary>
/// Adds or updates the specified cache-key with the specified cacheObject and applies a specified timeout (in seconds)
/// to this key.
/// </summary>
/// <param name="key">The cache-key to add or update.</param>
/// <param name="cacheObject">The cache object to store.</param>
/// <param name="cacheTimeout">
/// The cache timeout (lifespan) of this object. Must be 1 or greater.
/// Specify Timeout.Infinite to keep the entry forever.
/// </param>
/// <param name="restartTimerIfExists">
/// (Optional). If set to <c>true</c>, the timer for this cacheObject will be reset if the object already
/// exists in the cache. (Default = false).
/// </param>
public void AddOrUpdate(K key, T cacheObject, TimeSpan? cacheTimeout, bool restartTimerIfExists = false)
{
if (disposed)
{
return;
}

_locker.EnterWriteLock();
try
{
CheckTimer(key, cacheTimeout, restartTimerIfExists);

if (!_cache.ContainsKey(key))
{
_cache.Add(key, cacheObject);
}
else
{
_cache[key] = cacheObject;
}
}
finally
{
_locker.ExitWriteLock();
}
}

/// <summary>
/// Adds or updates the specified cache-key with the specified cacheObject and applies <c>Timeout.Infinite</c> to this
/// key.
/// </summary>
/// <param name="key">The cache-key to add or update.</param>
/// <param name="cacheObject">The cache object to store.</param>
public void AddOrUpdate(K key, T cacheObject)
{
AddOrUpdate(key, cacheObject, Timeout.InfiniteTimeSpan);
}

/// <summary>
/// Gets the cache entry with the specified key or returns <c>default(T)</c> if the key is not found.
/// </summary>
/// <param name="key">The cache-key to retrieve.</param>
/// <returns>The object from the cache or <c>default(T)</c>, if not found.</returns>
public T this[K key] => Get(key);

/// <summary>
/// Gets the cache entry with the specified key or return <c>default(T)</c> if the key is not found.
/// </summary>
/// <param name="key">The cache-key to retrieve.</param>
/// <returns>The object from the cache or <c>default(T)</c>, if not found.</returns>
public T Get(K key)
{
if (disposed)
{
return default(T);
}

_locker.EnterReadLock();
try
{
T rv;
return _cache.TryGetValue(key, out rv) ? rv : default(T);
}
finally
{
_locker.ExitReadLock();
}
}

/// <summary>
/// Tries to gets the cache entry with the specified key.
/// </summary>
/// <param name="key">The key.</param>
/// <param name="value">(out) The value, if found, or <c>default(T)</c>, if not.</param>
/// <returns><c>True</c>, if <c>key</c> exists, otherwise <c>false</c>.</returns>
public bool TryGet(K key, out T value)
{
if (disposed)
{
value = default(T);
return false;
}

_locker.EnterReadLock();
try
{
return _cache.TryGetValue(key, out value);
}
finally
{
_locker.ExitReadLock();
}
}

/// <summary>
/// Removes a series of cache entries in a single call for all key that match the specified key pattern.
/// </summary>
/// <param name="keyPattern">The key pattern to remove. The Predicate has to return true to get key removed.</param>
public void Remove(Predicate<K> keyPattern)
{
if (disposed)
{
return;
}

_locker.EnterWriteLock();
try
{
var removers = (from k in _cache.Keys.Cast<K>()
where keyPattern(k)
select k).ToList();

foreach (var workKey in removers)
{
try
{
_timers[workKey].Dispose();
}
catch
{
}

_timers.Remove(workKey);
_cache.Remove(workKey);
}
}
finally
{
_locker.ExitWriteLock();
}
}

/// <summary>
/// Removes the specified cache entry with the specified key.
/// If the key is not found, no exception is thrown, the statement is just ignored.
/// </summary>
/// <param name="key">The cache-key to remove.</param>
public void Remove(K key)
{
if (disposed)
{
return;
}

_locker.EnterWriteLock();
try
{
if (_cache.ContainsKey(key))
{
try
{
_timers[key].Dispose();
}
catch
{
}

_timers.Remove(key);
_cache.Remove(key);
}
}
finally
{
_locker.ExitWriteLock();
}
}

/// <summary>
/// Checks if a specified key exists in the cache.
/// </summary>
/// <param name="key">The cache-key to check.</param>
/// <returns><c>True</c> if the key exists in the cache, otherwise <c>False</c>.</returns>
public bool Exists(K key)
{
if (disposed)
{
return false;
}

_locker.EnterReadLock();
try
{
return _cache.ContainsKey(key);
}
finally
{
_locker.ExitReadLock();
}
}

#endregion
}

#endregion

#region Other Cache classes (derived)

/// <summary>
/// This is a generic cache subsystem based on key/value pairs, where key is a string.
/// You can add any item to this cache as long as the key is unique, so treat keys as something like namespaces and
/// build them with a
/// specific system/syntax in your application.
/// Every cache entry has its own timeout.
/// Cache is thread safe and will delete expired entries on its own using System.Threading.Timers (which run on
/// <see cref="ThreadPool" /> threads).
/// </summary>
/// <summary>
/// The non-generic Cache class instanciates a Cache{object} that can be used with any type of (mixed) contents.
/// It also publishes a static <c>.Global</c> member, so a cache can be used even without creating a dedicated
/// instance.
/// The <c>.Global</c> member is lazy instanciated.
/// </summary>
internal class CapCache : Cache<string, object>
{
#region Static Global Cache instance

private static readonly Lazy<CapCache> global = new Lazy<CapCache>();

/// <summary>
/// Gets the global shared cache instance valid for the entire process.
/// </summary>
/// <value>
/// The global shared cache instance.
/// </value>
public static CapCache Global => global.Value;

#endregion
}

#endregion
}

+ 3
- 10
src/DotNetCore.CAP/Internal/ConsumerInvokerFactory.cs View File

@@ -8,21 +8,14 @@ namespace DotNetCore.CAP.Internal
{
internal class ConsumerInvokerFactory : IConsumerInvokerFactory
{
private readonly ILoggerFactory _loggerFactory;
//private readonly IMessagePacker _messagePacker;
//
//private readonly IModelBinderFactory _modelBinderFactory;
private readonly ILoggerFactory _loggerFactory;
private readonly IServiceProvider _serviceProvider;

public ConsumerInvokerFactory(
ILoggerFactory loggerFactory,
//IMessagePacker messagePacker,
//IModelBinderFactory modelBinderFactory,
ILoggerFactory loggerFactory,
IServiceProvider serviceProvider)
{
_loggerFactory = loggerFactory;
//_messagePacker = messagePacker;
//_modelBinderFactory = modelBinderFactory;
_loggerFactory = loggerFactory;
_serviceProvider = serviceProvider;
}



+ 0
- 85
src/DotNetCore.CAP/Internal/HashCodeCombiner.cs View File

@@ -1,85 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections;
using System.Collections.Generic;
using System.Runtime.CompilerServices;

namespace DotNetCore.CAP.Internal
{
internal struct HashCodeCombiner
{
private long _combinedHash64;

public int CombinedHash
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
get { return _combinedHash64.GetHashCode(); }
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private HashCodeCombiner(long seed)
{
_combinedHash64 = seed;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Add(IEnumerable e)
{
if (e == null)
{
Add(0);
}
else
{
var count = 0;
foreach (var o in e)
{
Add(o);
count++;
}

Add(count);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static implicit operator int(HashCodeCombiner self)
{
return self.CombinedHash;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Add(int i)
{
_combinedHash64 = ((_combinedHash64 << 5) + _combinedHash64) ^ i;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Add(string s)
{
var hashCode = s != null ? s.GetHashCode() : 0;
Add(hashCode);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Add(object o)
{
var hashCode = o != null ? o.GetHashCode() : 0;
Add(hashCode);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Add<TValue>(TValue value, IEqualityComparer<TValue> comparer)
{
var hashCode = value != null ? comparer.GetHashCode(value) : 0;
Add(hashCode);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static HashCodeCombiner Start()
{
return new HashCodeCombiner(0x1505L);
}
}
}

+ 1
- 1
src/DotNetCore.CAP/Internal/IConsumerInvoker.Default.cs View File

@@ -17,7 +17,7 @@ namespace DotNetCore.CAP.Internal
private readonly ILogger _logger;
private readonly IServiceProvider _serviceProvider;
public DefaultConsumerInvoker(ILoggerFactory loggerFactory,IServiceProvider serviceProvider)
public DefaultConsumerInvoker(ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
_logger = loggerFactory.CreateLogger<DefaultConsumerInvoker>();


+ 5
- 4
src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs View File

@@ -157,9 +157,6 @@ namespace DotNetCore.CAP.Internal
{
tracingTimestamp = TracingBefore(transportMessage, _serverAddress);

var startTime = DateTimeOffset.UtcNow;
var stopwatch = Stopwatch.StartNew();

var name = transportMessage.GetName();
var group = transportMessage.GetGroup();

@@ -171,7 +168,11 @@ namespace DotNetCore.CAP.Internal
if (!canFindSubscriber)
{
var error = $"Message can not be found subscriber. Name:{name}, Group:{group}. {Environment.NewLine} see: https://github.com/dotnetcore/CAP/issues/63";
throw new SubscriberNotFoundException(error);
var ex = new SubscriberNotFoundException(error);

TracingError(tracingTimestamp, transportMessage, client.ServersAddress, ex);

throw ex;
}

var type = executor.Parameters.FirstOrDefault(x => x.IsFromCap == false)?.ParameterType;


+ 0
- 21
src/DotNetCore.CAP/Internal/IMongoTransaction.cs View File

@@ -1,21 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Internal
{
public interface IMongoTransaction : IDisposable
{
/// <summary>
/// If set true, the session.CommitTransaction() will be called automatically.
/// </summary>
/// <value></value>
bool AutoCommit { get; set; }

Task<IMongoTransaction> BeginAsync(bool autoCommit = true);

IMongoTransaction Begin(bool autoCommit = true);
}
}

+ 4
- 2
src/DotNetCore.CAP/Internal/ISubscriberExecutor.Default.cs View File

@@ -196,9 +196,11 @@ namespace DotNetCore.CAP.Internal
}
catch (Exception ex)
{
TracingError(tracingTimestamp, message.Origin, descriptor.MethodInfo, ex);
var e = new SubscriberExecutionFailedException(ex.Message, ex);
TracingError(tracingTimestamp, message.Origin, descriptor.MethodInfo, e);

throw new SubscriberExecutionFailedException(ex.Message, ex);
throw e;
}
}



+ 0
- 39
src/DotNetCore.CAP/Internal/WaitHandleEx.cs View File

@@ -1,39 +0,0 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace DotNetCore.CAP.Internal
{
public static class WaitHandleEx
{
public static Task WaitAnyAsync(WaitHandle handle1, WaitHandle handle2, TimeSpan timeout)
{
var t1 = handle1.WaitOneAsync(timeout);
var t2 = handle2.WaitOneAsync(timeout);
return Task.WhenAny(t1, t2);
}

public static async Task<bool> WaitOneAsync(this WaitHandle handle, TimeSpan timeout)
{
RegisteredWaitHandle registeredHandle = null;
try
{
var tcs = new TaskCompletionSource<bool>();
registeredHandle = ThreadPool.RegisterWaitForSingleObject(
handle,
(state, timedOut) => ((TaskCompletionSource<bool>) state).TrySetResult(!timedOut),
tcs,
timeout,
true);
return await tcs.Task;
}
finally
{
registeredHandle?.Unregister(null);
}
}
}
}

Loading…
Cancel
Save