@@ -70,13 +70,15 @@ namespace DotNetCore.CAP.NATS | |||||
private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e) | private void Subscription_MessageHandler(object sender, MsgHandlerEventArgs e) | ||||
{ | { | ||||
var headers = new Dictionary<string, string>(); | var headers = new Dictionary<string, string>(); | ||||
foreach (string h in e.Message.Header.Keys) | foreach (string h in e.Message.Header.Keys) | ||||
{ | { | ||||
headers.Add(h, e.Message.Header[h]); | headers.Add(h, e.Message.Header[h]); | ||||
} | } | ||||
var tranmessage = new TransportMessage(headers, e.Message.Data); | |||||
tranmessage.Headers.Add(Headers.Group, _groupId); | |||||
OnMessageReceived?.Invoke(e.Message.Reply, tranmessage); | |||||
headers.Add(Headers.Group, _groupId); | |||||
OnMessageReceived?.Invoke(e.Message.Reply, new TransportMessage(headers, e.Message.Data)); | |||||
} | } | ||||
public void Commit(object sender) | public void Commit(object sender) | ||||
@@ -66,7 +66,6 @@ namespace DotNetCore.CAP.Internal | |||||
public static bool IsInnerIP(string ipAddress) | public static bool IsInnerIP(string ipAddress) | ||||
{ | { | ||||
bool isInnerIp; | |||||
var ipNum = GetIpNum(ipAddress); | var ipNum = GetIpNum(ipAddress); | ||||
//Private IP: | //Private IP: | ||||
@@ -80,9 +79,9 @@ namespace DotNetCore.CAP.Internal | |||||
var bEnd = GetIpNum("172.31.255.255"); | var bEnd = GetIpNum("172.31.255.255"); | ||||
var cBegin = GetIpNum("192.168.0.0"); | var cBegin = GetIpNum("192.168.0.0"); | ||||
var cEnd = GetIpNum("192.168.255.255"); | var cEnd = GetIpNum("192.168.255.255"); | ||||
isInnerIp = IsInner(ipNum, aBegin, aEnd) || IsInner(ipNum, bBegin, bEnd) || IsInner(ipNum, cBegin, cEnd); | |||||
return isInnerIp; | |||||
return IsInner(ipNum, aBegin, aEnd) || IsInner(ipNum, bBegin, bEnd) || IsInner(ipNum, cBegin, cEnd); | |||||
} | } | ||||
private static long GetIpNum(string ipAddress) | private static long GetIpNum(string ipAddress) | ||||
{ | { | ||||
var ip = ipAddress.Split('.'); | var ip = ipAddress.Split('.'); | ||||
@@ -30,7 +30,7 @@ namespace DotNetCore.CAP.Internal | |||||
ServiceProvider = service; | ServiceProvider = service; | ||||
_dispatcher = service.GetRequiredService<IDispatcher>(); | _dispatcher = service.GetRequiredService<IDispatcher>(); | ||||
_storage = service.GetRequiredService<IDataStorage>(); | _storage = service.GetRequiredService<IDataStorage>(); | ||||
_capOptions = service.GetService<IOptions<CapOptions>>().Value; | |||||
_capOptions = service.GetRequiredService<IOptions<CapOptions>>().Value; | |||||
Transaction = new AsyncLocal<ICapTransaction>(); | Transaction = new AsyncLocal<ICapTransaction>(); | ||||
} | } | ||||
@@ -27,7 +27,7 @@ namespace DotNetCore.CAP.Processor | |||||
_serviceProvider = serviceProvider; | _serviceProvider = serviceProvider; | ||||
_waitingInterval = TimeSpan.FromSeconds(options.Value.CollectorCleaningInterval); | _waitingInterval = TimeSpan.FromSeconds(options.Value.CollectorCleaningInterval); | ||||
var initializer = _serviceProvider.GetService<IStorageInitializer>(); | |||||
var initializer = _serviceProvider.GetRequiredService<IStorageInitializer>(); | |||||
_tableNames = new[] { initializer.GetPublishedTableName(), initializer.GetReceivedTableName() }; | _tableNames = new[] { initializer.GetPublishedTableName(), initializer.GetReceivedTableName() }; | ||||
} | } | ||||
@@ -42,7 +42,7 @@ namespace DotNetCore.CAP.Processor | |||||
var time = DateTime.Now; | var time = DateTime.Now; | ||||
do | do | ||||
{ | { | ||||
deletedCount = await _serviceProvider.GetService<IDataStorage>() | |||||
deletedCount = await _serviceProvider.GetRequiredService<IDataStorage>() | |||||
.DeleteExpiresAsync(table, time, ItemBatch, context.CancellationToken); | .DeleteExpiresAsync(table, time, ItemBatch, context.CancellationToken); | ||||
if (deletedCount != 0) | if (deletedCount != 0) | ||||