Skip to content
14 changes: 8 additions & 6 deletions ydb/core/formats/arrow/program/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ class TFetchingCalculationPolicy: public IMemoryCalculationPolicy {
return EStage::Fetching;
}
virtual ui64 GetReserveMemorySize(
const ui64 blobsSize, const ui64 rawSize, const std::optional<ui32> limit, const ui32 recordsCount) const override {
if (limit) {
return std::max<ui64>(blobsSize, rawSize * (1.0 * *limit) / recordsCount);
} else {
return std::max<ui64>(blobsSize, rawSize);
}
const ui64 blobsSize, const ui64 rawSize, const std::optional<ui32> /*limit*/, const ui32 /*recordsCount*/) const override {
return std::max<ui64>(blobsSize, rawSize);
// FIXME after futher memory usage investagiation
// if (limit) {
// return std::max<ui64>(blobsSize, rawSize * (1.0 * *limit) / recordsCount);
// } else {
// return std::max<ui64>(blobsSize, rawSize);
// }
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,7 @@ RewriteInputForConstraint(const TExprBase& inputRows, const THashSet<TStringBuf>
hasUniqIndex |= (indexDesc->Type == TIndexDescription::EType::GlobalSyncUnique);
for (const auto& indexKeyCol : indexDesc->KeyColumns) {
if (inputColumns.contains(indexKeyCol)) {
if (!usedIndexes.contains(indexDesc->Name) &&
std::find(mainPk.begin(), mainPk.end(), indexKeyCol) == mainPk.end())
{
usedIndexes.insert(indexDesc->Name);
}
usedIndexes.insert(indexDesc->Name);
} else {
// input always contains key columns
YQL_ENSURE(std::find(mainPk.begin(), mainPk.end(), indexKeyCol) == mainPk.end());
Expand All @@ -352,6 +348,8 @@ RewriteInputForConstraint(const TExprBase& inputRows, const THashSet<TStringBuf>
}
}

AFL_ENSURE(!hasUniqIndex || !usedIndexes.empty());

if (!hasUniqIndex) {
missedKeyInput.clear();
}
Expand Down
114 changes: 114 additions & 0 deletions ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5972,6 +5972,120 @@ R"([[#;#;["Primary1"];[41u]];[["Secondary2"];[2u];["Primary2"];[42u]];[["Seconda
);
}
}

Y_UNIT_TEST_TWIN(IndexUpsert, Uniq) {
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings().SetKqpSettings({setting});
TKikimrRunner kikimr(serverSettings);

auto client = kikimr.GetQueryClient();

{
const TString query(Q_(std::format(R"(
CREATE TABLE `/Root/TestTable` (
a Int32,
b Int32,
PRIMARY KEY(a,b),
INDEX ix_b GLOBAL {} SYNC ON (b)
);
)", Uniq ? "UNIQUE" : "")));

auto result = client.ExecuteQuery(
query,
NQuery::TTxControl::NoTx())
.ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
}

{
const TString query(Q_(R"(
$v=[<|a:10,b:20|>,<|a:30,b:20|>];
UPSERT INTO `/Root/TestTable` SELECT * FROM AS_TABLE($v);
)"));

auto result = client.ExecuteQuery(
query,
NQuery::TTxControl::NoTx())
.ExtractValueSync();
if (Uniq) {
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS_C(
result.GetIssues().ToString(),
"Duplicated keys found.",
result.GetIssues().ToString());
} else {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
}

{
const TString query(Q_(R"(
$v=[<|a:10,b:20|>,<|a:10,b:20|>];
UPSERT INTO `/Root/TestTable` SELECT * FROM AS_TABLE($v);
)"));

auto result = client.ExecuteQuery(
query,
NQuery::TTxControl::NoTx())
.ExtractValueSync();
if (Uniq) {
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS_C(
result.GetIssues().ToString(),
"Duplicated keys found.",
result.GetIssues().ToString());
} else {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
}

{
const TString query(Q_(R"(
$v=[<|a:10,b:10|>,<|a:30,b:30|>];
UPSERT INTO `/Root/TestTable` SELECT * FROM AS_TABLE($v);
)"));

auto result = client.ExecuteQuery(
query,
NQuery::TTxControl::NoTx())
.ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

{
const TString query(Q_(R"(
$v=[<|a:20,b:10|>,<|a:20,b:30|>];
UPSERT INTO `/Root/TestTable` SELECT * FROM AS_TABLE($v);
)"));

auto result = client.ExecuteQuery(
query,
NQuery::TTxControl::NoTx())
.ExtractValueSync();
if (Uniq) {
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToString());
UNIT_ASSERT_STRING_CONTAINS_C(
result.GetIssues().ToString(),
"Conflict with existing key.",
result.GetIssues().ToString());
} else {
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
}

{
const TString query(Q_(R"(
$v=[<|a:20,b:40|>,<|a:20,b:50|>];
UPSERT INTO `/Root/TestTable` SELECT * FROM AS_TABLE($v);
)"));

auto result = client.ExecuteQuery(
query,
NQuery::TTxControl::NoTx())
.ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
}
}

}
Expand Down
21 changes: 4 additions & 17 deletions ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,41 +178,28 @@ void TPartition::TryRunCompaction()
const ui64 blobsKeyCountLimit = GetBodyKeysCountLimit();
const ui64 compactedBlobSizeLowerBound = GetCompactedBlobSizeLowerBound();

if (BlobEncoder.DataKeysBody.size() >= blobsKeyCountLimit) {
CompactionInProgress = true;
Send(SelfId(), new TEvPQ::TEvRunCompaction(BlobEncoder.DataKeysBody.size()));
if ((BlobEncoder.DataKeysBody.size() < blobsKeyCountLimit) && (BlobEncoder.GetSize() < GetCumulativeSizeLimit())) {
LOG_D("No data for blobs compaction");
return;
}

size_t blobsCount = 0, blobsSize = 0, totalSize = 0;
size_t blobsCount = 0, blobsSize = 0;
for (; blobsCount < BlobEncoder.DataKeysBody.size(); ++blobsCount) {
const auto& k = BlobEncoder.DataKeysBody[blobsCount];
if (k.Size < compactedBlobSizeLowerBound) {
// неполный блоб. можно дописать
blobsSize += k.Size;
totalSize += k.Size;
if (blobsSize > 2 * MaxBlobSize) {
// KV не может отдать много
blobsSize -= k.Size;
totalSize -= k.Size;
break;
}
LOG_D("Blob key for append " << k.Key.ToString());
} else {
totalSize += k.Size;
LOG_D("Blob key for rename " << k.Key.ToString());
}
}
LOG_D(blobsCount << " keys were taken away. Let's read " << blobsSize << " bytes (" << totalSize << ")");

if (totalSize < GetCumulativeSizeLimit()) {
LOG_D("Need more data for compaction. " <<
"Blobs " << BlobEncoder.DataKeysBody.size() <<
", size " << totalSize << " (" << GetCumulativeSizeLimit() << ")");
return;
}

LOG_D("Run compaction for " << blobsCount << " blobs");
LOG_D(blobsCount << " keys were taken away. Let's read " << blobsSize << " bytes");

CompactionInProgress = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class TReadMetadataBase {

private:
YDB_ACCESSOR_DEF(TString, ScanIdentifier);
YDB_ACCESSOR_DEF(bool, FakeSort);
std::optional<ui64> FilteredCountLimit;
std::optional<ui64> RequestedLimit;
const ESorting Sorting = ESorting::ASC; // Sorting inside returned batches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ TConclusionStatus TScanHead::Start() {

TScanHead::TScanHead(std::unique_ptr<NCommon::ISourcesConstructor>&& sourcesConstructor, const std::shared_ptr<TSpecialReadContext>& context)
: Context(context) {
auto readMetadataContext = context->GetReadMetadata();
if (auto script = Context->GetSourcesAggregationScript()) {
SourcesCollection =
std::make_shared<TNotSortedCollection>(Context, std::move(sourcesConstructor), Context->GetReadMetadata()->GetLimitRobustOptional());
std::make_shared<TNotSortedCollection>(Context, std::move(sourcesConstructor), readMetadataContext->GetLimitRobustOptional());
SyncPoints.emplace_back(std::make_shared<TSyncPointResult>(SyncPoints.size(), context, SourcesCollection));
SyncPoints.emplace_back(std::make_shared<TSyncPointResultsAggregationControl>(
SourcesCollection, Context->GetSourcesAggregationScript(), Context->GetRestoreResultScript(), SyncPoints.size(), context));
} else if (Context->GetReadMetadata()->IsSorted()) {
if (Context->GetReadMetadata()->HasLimit()) {
} else if (readMetadataContext->IsSorted()) {
if (readMetadataContext->HasLimit() && !readMetadataContext->GetFakeSort()) {
auto collection = std::make_shared<TScanWithLimitCollection>(Context, std::move(sourcesConstructor));
SourcesCollection = collection;
SyncPoints.emplace_back(std::make_shared<TSyncPointLimitControl>(
Expand All @@ -39,7 +40,7 @@ TScanHead::TScanHead(std::unique_ptr<NCommon::ISourcesConstructor>&& sourcesCons
SyncPoints.emplace_back(std::make_shared<TSyncPointResult>(SyncPoints.size(), context, SourcesCollection));
} else {
SourcesCollection =
std::make_shared<TNotSortedCollection>(Context, std::move(sourcesConstructor), Context->GetReadMetadata()->GetLimitRobustOptional());
std::make_shared<TNotSortedCollection>(Context, std::move(sourcesConstructor), readMetadataContext->GetLimitRobustOptional());
SyncPoints.emplace_back(std::make_shared<TSyncPointResult>(SyncPoints.size(), context, SourcesCollection));
}
for (ui32 i = 0; i + 1 < SyncPoints.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ void TTxScan::Complete(const TActorContext& ctx) {
}
auto newRange = scannerConstructor->BuildReadMetadata(Self, read);
if (newRange.IsSuccess()) {
if (!request.HasReverse() && deduplicationEnabled) {
(*newRange)->SetFakeSort(true);
}
readMetadataRange = TValidator::CheckNotNull(newRange.DetachResult());
} else {
return SendError("cannot build metadata", newRange.GetErrorMessage(), ctx);
Expand Down
13 changes: 10 additions & 3 deletions ydb/core/tx/datashard/type_serialization.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ TString DecimalToString(const std::pair<ui64, i64>& loHi, const NScheme::TTypeIn
using namespace NYql::NDecimal;

TInt128 val = FromHalfs(loHi.first, loHi.second);
return ToString(val, typeInfo.GetDecimalType().GetPrecision(), typeInfo.GetDecimalType().GetScale());
const char* result = ToString(val, MaxPrecision /*typeInfo.GetDecimalType().GetPrecision()*/, typeInfo.GetDecimalType().GetScale());
Y_ENSURE(result);

return result;
}

TString DyNumberToString(TStringBuf data) {
Expand All @@ -36,11 +39,15 @@ TString PgToString(TStringBuf data, const NScheme::TTypeInfo& typeInfo) {
}

bool DecimalToStream(const std::pair<ui64, i64>& loHi, IOutputStream& out, TString& err, const NScheme::TTypeInfo& typeInfo) {
Y_UNUSED(err);
using namespace NYql::NDecimal;

TInt128 val = FromHalfs(loHi.first, loHi.second);
out << ToString(val, typeInfo.GetDecimalType().GetPrecision(), typeInfo.GetDecimalType().GetScale());
const char* result = ToString(val, MaxPrecision /*typeInfo.GetDecimalType().GetPrecision()*/, typeInfo.GetDecimalType().GetScale());
if (!result) [[unlikely]] {
err = "Invalid Decimal binary representation";
return false;
}
out << result;
return true;
}

Expand Down
Loading
Loading