瀏覽代碼

Enable #nullable for postgresql storage.

master
Savorboard 3 年之前
父節點
當前提交
bddaf525e9
共有 9 個文件被更改,包括 22 次插入29 次删除
  1. +0
    -1
      src/Directory.Build.props
  2. +2
    -2
      src/DotNetCore.CAP.PostgreSql/CAP.EFOptions.cs
  3. +1
    -1
      src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs
  4. +1
    -6
      src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj
  5. +5
    -5
      src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
  6. +5
    -5
      src/DotNetCore.CAP.PostgreSql/IDbConnection.Extensions.cs
  7. +2
    -2
      src/DotNetCore.CAP.PostgreSql/IDbContextTransaction.CAP.cs
  8. +5
    -5
      src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs
  9. +1
    -2
      src/DotNetCore.CAP/CAP.Builder.cs

+ 0
- 1
src/Directory.Build.props 查看文件

@@ -26,7 +26,6 @@

<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All"/>
<PackageReference Include="JetBrains.Annotations" Version="2021.3.0" PrivateAssets="All" />
</ItemGroup>

</Project>

+ 2
- 2
src/DotNetCore.CAP.PostgreSql/CAP.EFOptions.cs 查看文件

@@ -16,11 +16,11 @@ namespace DotNetCore.CAP
/// </summary>
public string Schema { get; set; } = DefaultSchema;

internal Type DbContextType { get; set; }
internal Type? DbContextType { get; set; }

/// <summary>
/// Data version
/// </summary>
internal string Version { get; set; }
internal string Version { get; set; } = default!;
}
}

+ 1
- 1
src/DotNetCore.CAP.PostgreSql/CAP.PostgreSqlOptions.cs 查看文件

@@ -13,7 +13,7 @@ namespace DotNetCore.CAP
/// <summary>
/// Gets or sets the database's connection string that will be used to store database entities.
/// </summary>
public string ConnectionString { get; set; }
public string ConnectionString { get; set; } = default!;
}

internal class ConfigurePostgreSqlOptions : IConfigureOptions<PostgreSqlOptions>


+ 1
- 6
src/DotNetCore.CAP.PostgreSql/DotNetCore.CAP.PostgreSql.csproj 查看文件

@@ -2,15 +2,10 @@

<PropertyGroup>
<TargetFrameworks>net6.0;netstandard2.1</TargetFrameworks>
<AssemblyName>DotNetCore.CAP.PostgreSql</AssemblyName>
<Nullable>enable</Nullable>
<PackageTags>$(PackageTags);PostgreSQL</PackageTags>
</PropertyGroup>

<PropertyGroup>
<DocumentationFile>bin\$(Configuration)\netstandard2.1\DotNetCore.CAP.PostgreSql.xml</DocumentationFile>
<NoWarn>1701;1702;1705;CS1591</NoWarn>
</PropertyGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'net6.0' ">
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="6.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="6.0.0" />


+ 5
- 5
src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs 查看文件

@@ -46,7 +46,7 @@ namespace DotNetCore.CAP.PostgreSql
public async Task ChangeReceiveStateAsync(MediumMessage message, StatusName state) =>
await ChangeMessageStateAsync(_recName, message, state);

public MediumMessage StoreMessage(string name, Message content, object dbTransaction = null)
public MediumMessage StoreMessage(string name, Message content, object? dbTransaction = null)
{
var sql =
$"INSERT INTO {_pubName} (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
@@ -69,7 +69,7 @@ namespace DotNetCore.CAP.PostgreSql
new NpgsqlParameter("@Content", message.Content),
new NpgsqlParameter("@Retries", message.Retries),
new NpgsqlParameter("@Added", message.Added),
new NpgsqlParameter("@ExpiresAt", message.ExpiresAt.HasValue ? (object)message.ExpiresAt.Value : DBNull.Value),
new NpgsqlParameter("@ExpiresAt", message.ExpiresAt.HasValue ? message.ExpiresAt.Value : DBNull.Value),
new NpgsqlParameter("@StatusName", nameof(StatusName.Scheduled))
};

@@ -84,7 +84,7 @@ namespace DotNetCore.CAP.PostgreSql
if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans)
dbTrans = dbContextTrans.GetDbTransaction();

var conn = dbTrans?.Connection;
var conn = dbTrans?.Connection!;
conn.ExecuteNonQuery(sql, dbTrans, sqlParams);
}

@@ -127,7 +127,7 @@ namespace DotNetCore.CAP.PostgreSql
new NpgsqlParameter("@Content", _serializer.Serialize(mdMessage.Origin)),
new NpgsqlParameter("@Retries", mdMessage.Retries),
new NpgsqlParameter("@Added", mdMessage.Added),
new NpgsqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? (object) mdMessage.ExpiresAt.Value : DBNull.Value),
new NpgsqlParameter("@ExpiresAt", mdMessage.ExpiresAt.HasValue ? mdMessage.ExpiresAt.Value : DBNull.Value),
new NpgsqlParameter("@StatusName", nameof(StatusName.Scheduled))
};

@@ -199,7 +199,7 @@ namespace DotNetCore.CAP.PostgreSql
messages.Add(new MediumMessage
{
DbId = reader.GetInt64(0).ToString(),
Origin = _serializer.Deserialize(reader.GetString(1)),
Origin = _serializer.Deserialize(reader.GetString(1))!,
Retries = reader.GetInt32(2),
Added = reader.GetDateTime(3)
});


+ 5
- 5
src/DotNetCore.CAP.PostgreSql/IDbConnection.Extensions.cs 查看文件

@@ -9,7 +9,7 @@ namespace DotNetCore.CAP.PostgreSql
{
internal static class DbConnectionExtensions
{
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction transaction = null,
public static int ExecuteNonQuery(this IDbConnection connection, string sql, IDbTransaction? transaction = null,
params object[] sqlParams)
{
if (connection.State == ConnectionState.Closed)
@@ -33,7 +33,7 @@ namespace DotNetCore.CAP.PostgreSql
return command.ExecuteNonQuery();
}

public static T ExecuteReader<T>(this IDbConnection connection, string sql, Func<IDataReader, T> readerFunc,
public static T ExecuteReader<T>(this IDbConnection connection, string sql, Func<IDataReader, T>? readerFunc,
params object[] sqlParams)
{
if (connection.State == ConnectionState.Closed)
@@ -51,7 +51,7 @@ namespace DotNetCore.CAP.PostgreSql

var reader = command.ExecuteReader();

T result = default;
T result = default!;
if (readerFunc != null)
{
result = readerFunc(reader);
@@ -77,14 +77,14 @@ namespace DotNetCore.CAP.PostgreSql

var objValue = command.ExecuteScalar();

T result = default;
T result = default!;
if (objValue != null)
{
var returnType = typeof(T);
var converter = TypeDescriptor.GetConverter(returnType);
if (converter.CanConvertFrom(objValue.GetType()))
{
result = (T)converter.ConvertFrom(objValue);
result = (T)converter.ConvertFrom(objValue)!;
}
else
{


+ 2
- 2
src/DotNetCore.CAP.PostgreSql/IDbContextTransaction.CAP.cs 查看文件

@@ -18,7 +18,7 @@ namespace Microsoft.EntityFrameworkCore.Storage
public CapEFDbTransaction(ICapTransaction transaction)
{
_transaction = transaction;
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction;
var dbContextTransaction = (IDbContextTransaction)_transaction.DbTransaction!;
TransactionId = dbContextTransaction.TransactionId;
}

@@ -58,7 +58,7 @@ namespace Microsoft.EntityFrameworkCore.Storage
{
get
{
var dbContextTransaction = (IDbContextTransaction) _transaction.DbTransaction;
var dbContextTransaction = (IDbContextTransaction) _transaction.DbTransaction!;
return dbContextTransaction.GetDbTransaction();
}
}


+ 5
- 5
src/DotNetCore.CAP.PostgreSql/IMonitoringApi.PostgreSql.cs 查看文件

@@ -27,9 +27,9 @@ namespace DotNetCore.CAP.PostgreSql
_recName = initializer.GetReceivedTableName();
}

public async Task<MediumMessage> GetPublishedMessageAsync(long id) => await GetMessageAsync(_pubName, id);
public async Task<MediumMessage?> GetPublishedMessageAsync(long id) => await GetMessageAsync(_pubName, id);

public async Task<MediumMessage> GetReceivedMessageAsync(long id) => await GetMessageAsync(_recName, id);
public async Task<MediumMessage?> GetReceivedMessageAsync(long id) => await GetMessageAsync(_recName, id);

public StatisticsDto GetStatistics()
{
@@ -120,7 +120,7 @@ namespace DotNetCore.CAP.PostgreSql
Content = reader.GetString(index++),
Retries = reader.GetInt32(index++),
Added = reader.GetDateTime(index++),
ExpiresAt = reader.IsDBNull(index++) ? (DateTime?)null : reader.GetDateTime(index - 1),
ExpiresAt = reader.IsDBNull(index++) ? null : reader.GetDateTime(index - 1),
StatusName = reader.GetString(index)
});
}
@@ -242,14 +242,14 @@ select ""Key"",""Count"" from aggr where ""Key"" >= @minKey and ""Key"" <= @maxK
return result;
}

private async Task<MediumMessage> GetMessageAsync(string tableName, long id)
private async Task<MediumMessage?> GetMessageAsync(string tableName, long id)
{
var sql = $@"SELECT ""Id"" AS ""DbId"", ""Content"", ""Added"", ""ExpiresAt"", ""Retries"" FROM {tableName} WHERE ""Id""={id} FOR UPDATE SKIP LOCKED";

await using var connection = new NpgsqlConnection(_options.ConnectionString);
var mediumMessage = connection.ExecuteReader(sql, reader =>
{
MediumMessage message = null;
MediumMessage? message = null;

while (reader.Read())
{


+ 1
- 2
src/DotNetCore.CAP/CAP.Builder.cs 查看文件

@@ -6,7 +6,6 @@ using System.Linq;
using System.Reflection;
using DotNetCore.CAP.Filter;
using DotNetCore.CAP.Internal;
using JetBrains.Annotations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
// ReSharper disable UnusedMember.Global
@@ -78,7 +77,7 @@ namespace DotNetCore.CAP
/// Registers subscribers from the specified types.
/// </summary>
/// <param name="handlerAssemblyMarkerTypes"></param>
public CapBuilder AddSubscriberAssembly([NotNull] params Type[] handlerAssemblyMarkerTypes)
public CapBuilder AddSubscriberAssembly(params Type[] handlerAssemblyMarkerTypes)
{
if (handlerAssemblyMarkerTypes == null) throw new ArgumentNullException(nameof(handlerAssemblyMarkerTypes));



Loading…
取消
儲存