Skip to content

Commit efb3c7b

Browse files
author
Pavel Velikhov
committed
Work in progress
1 parent d01db5c commit efb3c7b

File tree

7 files changed

+182
-82
lines changed

7 files changed

+182
-82
lines changed

ydb/core/kqp/opt/rbo/kqp_operator.cpp

Lines changed: 56 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -701,44 +701,47 @@ TConjunctInfo TOpFilter::GetConjunctInfo(TPlanProps& props) const {
701701
lambdaBody = lambdaBody->ChildPtr(0);
702702
}
703703

704+
TVector<TExprNode::TPtr> conjuncts;
704705
if (lambdaBody->IsCallable("And")) {
705706
for (auto conj : lambdaBody->Children()) {
706-
auto conjObj = conj;
707-
bool fromPg = false;
707+
conjuncts.push_back(conj);
708+
}
709+
} else {
710+
conjuncts.push_back(lambdaBody);
711+
}
708712

709-
if (conj->IsCallable("FromPg")) {
710-
conjObj = conj->ChildPtr(0);
711-
fromPg = true;
712-
}
713+
for (auto conj : conjuncts) {
714+
auto conjObj = conj;
715+
bool fromPg = false;
713716

714-
TExprNode::TPtr leftArg;
715-
TExprNode::TPtr rightArg;
716-
if (TestAndExtractEqualityPredicate(conjObj, leftArg, rightArg)) {
717-
TVector<TInfoUnit> conjIUs;
718-
GetAllMembers(conj, conjIUs, props);
719-
720-
if (leftArg->IsCallable("Member") && rightArg->IsCallable("Member") && conjIUs.size() >= 2) {
721-
TVector<TInfoUnit> leftIUs;
722-
TVector<TInfoUnit> rightIUs;
723-
GetAllMembers(leftArg, leftIUs, props);
724-
GetAllMembers(rightArg, rightIUs, props);
725-
res.JoinConditions.push_back(TJoinConditionInfo(conjObj, leftIUs[0], rightIUs[0]));
726-
}
727-
else {
728-
TVector<TInfoUnit> conjIUs;
729-
GetAllMembers(conj, conjIUs);
730-
res.Filters.push_back(TFilterInfo(conj, conjIUs, fromPg));
731-
}
732-
} else {
717+
if (conj->IsCallable("FromPg")) {
718+
conjObj = conj->ChildPtr(0);
719+
fromPg = true;
720+
}
721+
722+
TExprNode::TPtr leftArg;
723+
TExprNode::TPtr rightArg;
724+
if (TestAndExtractEqualityPredicate(conjObj, leftArg, rightArg)) {
725+
TVector<TInfoUnit> conjIUs;
726+
GetAllMembers(conj, conjIUs, props);
727+
728+
if (leftArg->IsCallable("Member") && rightArg->IsCallable("Member") && conjIUs.size() >= 2) {
729+
TVector<TInfoUnit> leftIUs;
730+
TVector<TInfoUnit> rightIUs;
731+
GetAllMembers(leftArg, leftIUs, props);
732+
GetAllMembers(rightArg, rightIUs, props);
733+
res.JoinConditions.push_back(TJoinConditionInfo(conjObj, leftIUs[0], rightIUs[0]));
734+
}
735+
else {
733736
TVector<TInfoUnit> conjIUs;
734-
GetAllMembers(conj, conjIUs, props);
737+
GetAllMembers(conj, conjIUs);
735738
res.Filters.push_back(TFilterInfo(conj, conjIUs, fromPg));
736739
}
740+
} else {
741+
TVector<TInfoUnit> conjIUs;
742+
GetAllMembers(conj, conjIUs, props);
743+
res.Filters.push_back(TFilterInfo(conj, conjIUs, fromPg));
737744
}
738-
} else {
739-
TVector<TInfoUnit> filterIUs;
740-
GetAllMembers(lambdaBody, filterIUs, props);
741-
res.Filters.push_back(TFilterInfo(lambdaBody, filterIUs));
742745
}
743746

744747
return res;
@@ -898,9 +901,24 @@ TString TOpAggregate::ToString(TExprContext& ctx) {
898901
TOpCBOTree::TOpCBOTree(std::shared_ptr<IOperator> treeRoot, TPositionHandle pos) :
899902
IOperator(EOperator::CBOTree, pos),
900903
TreeRoot(treeRoot),
901-
TreeNodes({treeRoot}) {
902-
Children = treeRoot->Children;
904+
TreeNodes({treeRoot})
905+
{
906+
Children = treeRoot->Children;
907+
}
908+
909+
TOpCBOTree::TOpCBOTree(std::shared_ptr<IOperator> treeRoot, TVector<std::shared_ptr<IOperator>> treeNodes, TPositionHandle pos) :
910+
IOperator(EOperator::CBOTree, pos),
911+
TreeRoot(treeRoot),
912+
TreeNodes({treeNodes})
913+
{
914+
for (auto & n : treeNodes) {
915+
for (auto & c : n->Children) {
916+
if (std::find(treeNodes.begin(), treeNodes.end(), c) == treeNodes.end()) {
917+
Children.push_back(c);
918+
}
919+
}
903920
}
921+
}
904922

905923
void TOpCBOTree::RenameIUs(const THashMap<TInfoUnit, TInfoUnit, TInfoUnit::THashFunction> &renameMap, TExprContext &ctx, const THashSet<TInfoUnit, TInfoUnit::THashFunction> &stopList) {
906924
for (auto op : TreeNodes) {
@@ -911,9 +929,13 @@ void TOpCBOTree::RenameIUs(const THashMap<TInfoUnit, TInfoUnit, TInfoUnit::THash
911929
TString TOpCBOTree::ToString(TExprContext& ctx) {
912930
TStringBuilder res;
913931
res << "CBO Tree: [";
914-
for (auto op : TreeNodes) {
915-
res << op->ToString(ctx);
932+
for (size_t i=0; i < TreeNodes.size(); i++) {
933+
res << TreeNodes[i]->ToString(ctx);
934+
if (i != TreeNodes.size()-1) {
935+
res << ", ";
936+
}
916937
}
938+
res << "]";
917939
return res;
918940
}
919941

ydb/core/kqp/opt/rbo/kqp_operator.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ class IUnaryOperator : public IOperator {
363363
IUnaryOperator(EOperator kind, TPositionHandle pos) : IOperator(kind, pos) {}
364364
IUnaryOperator(EOperator kind, TPositionHandle pos, std::shared_ptr<IOperator> input) : IOperator(kind, pos) { Children.push_back(input); }
365365
std::shared_ptr<IOperator> &GetInput() { return Children[0]; }
366+
void SetInput(std::shared_ptr<IOperator> newInput) { Children[0] = newInput; }
366367

367368
virtual void ComputeMetadata(TRBOContext & ctx, TPlanProps & planProps) override;
368369
virtual void ComputeStatistics(TRBOContext & ctx, TPlanProps & planProps) override;
@@ -378,6 +379,9 @@ class IBinaryOperator : public IOperator {
378379

379380
std::shared_ptr<IOperator> &GetLeftInput() { return Children[0]; }
380381
std::shared_ptr<IOperator> &GetRightInput() { return Children[1]; }
382+
383+
void SetLeftInput(std::shared_ptr<IOperator> newInput) { Children[0] = newInput; }
384+
void SetRightInput(std::shared_ptr<IOperator> newInput) { Children[1] = newInput; }
381385
};
382386

383387
class TOpEmptySource : public IOperator {
@@ -533,6 +537,7 @@ class TOpLimit : public IUnaryOperator {
533537
class TOpCBOTree : public IOperator {
534538
public:
535539
TOpCBOTree(std::shared_ptr<IOperator> treeRoot, TPositionHandle pos);
540+
TOpCBOTree(std::shared_ptr<IOperator> treeRoot, TVector<std::shared_ptr<IOperator>> treeNodes, TPositionHandle pos);
536541

537542
virtual TVector<TInfoUnit> GetOutputIUs() override { return TreeRoot->GetOutputIUs(); }
538543
void RenameIUs(const THashMap<TInfoUnit, TInfoUnit, TInfoUnit::THashFunction> &renameMap, TExprContext &ctx, const THashSet<TInfoUnit, TInfoUnit::THashFunction> &stopList = {}) override;
@@ -588,7 +593,7 @@ class TOpRoot : public IUnaryOperator {
588593
for (auto scalarSubplan : Root->PlanProps.ScalarSubplans.Get()) {
589594
BuildDfsList(scalarSubplan, {}, size_t(0), visited);
590595
}
591-
auto child = ptr->Children[0];
596+
auto child = ptr->GetInput();
592597
BuildDfsList(child, {}, size_t(0), visited);
593598
CurrElement = 0;
594599
}

ydb/core/kqp/opt/rbo/kqp_rbo.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,11 @@ void TRuleBasedStage::RunStage(TOpRoot &root, TRBOContext &ctx) {
7272
if (iter.Parent) {
7373
iter.Parent->Children[iter.ChildIndex] = op;
7474
} else {
75-
root.Children[0] = op;
75+
root.SetInput(op);
76+
}
77+
78+
if (rule->LogRule) {
79+
YQL_CLOG(TRACE, CoreDq) << "Plan after applying rule:\n" << root.PlanToString(ctx.ExprCtx);
7680
}
7781

7882
ComputeRequiredProps(root, Props, ctx);

ydb/core/kqp/opt/rbo/kqp_rbo.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,15 @@ enum ERuleProperties: ui32 {
2727
class IRule {
2828
public:
2929
IRule(TString name) : RuleName(name) {}
30-
IRule(TString name, ui32 props) : RuleName(name), Props(props) {}
30+
IRule(TString name, ui32 props, bool logRule = false) : RuleName(name), Props(props), LogRule(logRule) {}
3131

3232
virtual bool TestAndApply(std::shared_ptr<IOperator> &input, TRBOContext &ctx, TPlanProps &props) = 0;
3333

3434
virtual ~IRule() = default;
3535

3636
TString RuleName;
3737
ui32 Props;
38+
bool LogRule = false;
3839
};
3940

4041
/**
@@ -44,7 +45,7 @@ class IRule {
4445
class ISimplifiedRule : public IRule {
4546
public:
4647
ISimplifiedRule(TString name) : IRule(name) {}
47-
ISimplifiedRule(TString name, ui32 props) : IRule(name, props) {}
48+
ISimplifiedRule(TString name, ui32 props, bool logRule = false) : IRule(name, props, logRule) {}
4849

4950
virtual std::shared_ptr<IOperator> SimpleTestAndApply(const std::shared_ptr<IOperator> &input, TRBOContext &ctx, TPlanProps &props) = 0;
5051

ydb/core/kqp/opt/rbo/kqp_rbo_rules.cpp

Lines changed: 67 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,29 @@ bool IsNullRejectingPredicate(const TFilterInfo &filter, TExprContext &ctx) {
148148

149149
std::shared_ptr<TOpCBOTree> JoinCBOTrees(std::shared_ptr<TOpCBOTree> & left, std::shared_ptr<TOpCBOTree> & right, std::shared_ptr<TOpJoin> &join) {
150150
auto newJoin = std::make_shared<TOpJoin>(left->TreeRoot, right->TreeRoot, join->Pos, join->JoinKind, join->JoinKeys);
151-
auto newTree = std::make_shared<TOpCBOTree>(newJoin, newJoin->Pos);
152-
newTree->TreeNodes = left->TreeNodes;
153-
newTree->TreeNodes.insert(newTree->TreeNodes.end(), right->TreeNodes.begin(), right->TreeNodes.end());
154-
newTree->Children = left->Children;
155-
newTree->Children.insert(newTree->Children.end(), right->Children.begin(), right->Children.end());
156-
return newTree;
151+
152+
auto treeNodes = left->TreeNodes;
153+
treeNodes.insert(treeNodes.end(), right->TreeNodes.begin(), right->TreeNodes.end());
154+
treeNodes.push_back(newJoin);
155+
156+
return std::make_shared<TOpCBOTree>(newJoin, treeNodes, newJoin->Pos);
157+
}
158+
159+
std::shared_ptr<TOpCBOTree> AddJoinToCBOTree(std::shared_ptr<TOpCBOTree> & cboTree, std::shared_ptr<TOpJoin> &join) {
160+
TVector<std::shared_ptr<IOperator>> treeNodes;
161+
162+
if (join->GetLeftInput() == cboTree) {
163+
join->SetLeftInput(cboTree->TreeRoot);
164+
treeNodes.insert(treeNodes.end(), cboTree->TreeNodes.begin(), cboTree->TreeNodes.end());
165+
treeNodes.push_back(join);
166+
}
167+
else {
168+
join->SetRightInput(cboTree->TreeRoot);
169+
treeNodes.push_back(join);
170+
treeNodes.insert(treeNodes.end(), cboTree->TreeNodes.begin(), cboTree->TreeNodes.end());
171+
}
172+
173+
return std::make_shared<TOpCBOTree>(join, treeNodes, join->Pos);
157174
}
158175

159176
void ExtractConjuncts(TExprNode::TPtr node, TVector<TExprNode::TPtr> & conjuncts) {
@@ -344,7 +361,7 @@ bool TExtractJoinExpressionsRule::TestAndApply(std::shared_ptr<IOperator> &input
344361
filter->FilterLambda = newFilterLambda;
345362

346363
auto newMap = std::make_shared<TOpMap>(filter->GetInput(), input->Pos, mapElements, false);
347-
filter->Children[0] = newMap;
364+
filter->SetInput(newMap);
348365
return true;
349366
}
350367

@@ -396,7 +413,7 @@ bool TInlineScalarSubplanRule::TestAndApply(std::shared_ptr<IOperator> &input, T
396413

397414
TVector<std::pair<TInfoUnit,TInfoUnit>> joinKeys;
398415
auto cross = std::make_shared<TOpJoin>(child, limit, subplan->Pos, "Cross", joinKeys);
399-
unaryOp->Children[0] = cross;
416+
unaryOp->SetInput(cross);
400417

401418
props.ScalarSubplans.Remove(scalarIU);
402419

@@ -474,12 +491,12 @@ std::shared_ptr<IOperator> TPushMapRule::SimpleTestAndApply(const std::shared_pt
474491

475492
if (leftMapElements.size()) {
476493
auto leftInput = join->GetLeftInput();
477-
join->Children[0] = std::make_shared<TOpMap>(leftInput, input->Pos, leftMapElements, false);
494+
join->SetLeftInput(std::make_shared<TOpMap>(leftInput, input->Pos, leftMapElements, false));
478495
}
479496

480497
if (rightMapElements.size()) {
481498
auto rightInput = join->GetRightInput();
482-
join->Children[1] = std::make_shared<TOpMap>(rightInput, input->Pos, rightMapElements, false);
499+
join->SetRightInput(std::make_shared<TOpMap>(rightInput, input->Pos, rightMapElements, false));
483500
}
484501

485502
// If there was an enforcer on the input map, move it to the output
@@ -601,8 +618,8 @@ std::shared_ptr<IOperator> TPushFilterRule::SimpleTestAndApply(const std::shared
601618
join->JoinKind = "Inner";
602619
}
603620

604-
join->Children[0] = leftInput;
605-
join->Children[1] = rightInput;
621+
join->SetLeftInput(leftInput);
622+
join->SetRightInput(rightInput);
606623

607624
if (topLevelPreds.size()) {
608625
auto topFilterLambda = BuildFilterLambdaFromConjuncts(join->Pos, topLevelPreds, ctx.ExprCtx, props.PgSyntax);
@@ -669,46 +686,54 @@ std::shared_ptr<IOperator> TExpandCBOTreeRule::SimpleTestAndApply(const std::sha
669686

670687
bool leftSideCBOTree = true;
671688

672-
if (leftInput->Kind == EOperator::CBOTree) {
673-
cboTree = CastOperator<TOpCBOTree>(leftInput);
674-
}
675-
else if (leftInput->Kind == EOperator::Filter &&
676-
CastOperator<TOpFilter>(leftInput)->GetInput()->Kind == EOperator::CBOTree &&
677-
join->JoinKind == "Inner") {
678-
maybeFilter = CastOperator<TOpFilter>(leftInput);
679-
cboTree = CastOperator<TOpCBOTree>(maybeFilter->GetInput());
680-
}
681-
else if (rightInput->Kind == EOperator::CBOTree) {
682-
cboTree = CastOperator<TOpCBOTree>(rightInput);
683-
leftSideCBOTree = false;
684-
}
685-
else if (rightInput->Kind == EOperator::Filter &&
686-
CastOperator<TOpFilter>(rightInput)->GetInput()->Kind == EOperator::CBOTree &&
687-
join->JoinKind == "Inner") {
688-
maybeFilter = CastOperator<TOpFilter>(rightInput);
689-
cboTree = CastOperator<TOpCBOTree>(maybeFilter->GetInput());
690-
leftSideCBOTree = false;
691-
}
692-
else {
693-
return input;
689+
auto findCBOTree = [&join](const std::shared_ptr<IOperator>& op,
690+
std::shared_ptr<TOpCBOTree>& cboTree,
691+
std::shared_ptr<TOpFilter>& maybeFilter) {
692+
693+
if (op->Kind == EOperator::CBOTree) {
694+
cboTree = CastOperator<TOpCBOTree>(op);
695+
return true;
696+
}
697+
if (op->Kind == EOperator::Filter &&
698+
CastOperator<TOpFilter>(op)->GetInput()->Kind == EOperator::CBOTree &&
699+
join->JoinKind == "Inner") {
700+
701+
maybeFilter = CastOperator<TOpFilter>(op);
702+
cboTree = CastOperator<TOpCBOTree>(maybeFilter->GetInput());
703+
return true;
704+
}
705+
706+
return false;
707+
};
708+
709+
if (!findCBOTree(leftInput, cboTree, maybeFilter)) {
710+
if (!findCBOTree(rightInput, cboTree, maybeFilter)) {
711+
return input;
712+
} else {
713+
leftSideCBOTree = true;
714+
}
694715
}
695716

696717
std::shared_ptr<TOpFilter> maybeAnotherFilter;
697718
auto otherSide = leftSideCBOTree ? join->GetRightInput() : join->GetLeftInput();
698719
std::shared_ptr<TOpCBOTree> otherSideCBOTree;
699720

700721
if (otherSide->Kind == EOperator::Filter &&
701-
CastOperator<TOpFilter>(otherSide)->GetInput()->Kind == EOperator::CBOTree) {
722+
CastOperator<TOpFilter>(otherSide)->GetInput()->Kind == EOperator::CBOTree &&
723+
join->JoinKind == "Inner") {
724+
702725
maybeAnotherFilter = CastOperator<TOpFilter>(otherSide);
703726
otherSideCBOTree = CastOperator<TOpCBOTree>(maybeAnotherFilter->GetInput());
704-
} else {
705-
otherSideCBOTree = std::make_shared<TOpCBOTree>(otherSide, otherSide->Pos);
706727
}
707728

708-
if (leftSideCBOTree) {
709-
cboTree = JoinCBOTrees(cboTree, otherSideCBOTree, join);
729+
if (otherSideCBOTree) {
730+
if (leftSideCBOTree) {
731+
cboTree = JoinCBOTrees(cboTree, otherSideCBOTree, join);
732+
} else {
733+
cboTree = JoinCBOTrees(otherSideCBOTree, cboTree, join);
734+
}
710735
} else {
711-
cboTree = JoinCBOTrees(otherSideCBOTree, cboTree, join);
736+
cboTree = AddJoinToCBOTree(cboTree, join);
712737
}
713738

714739
if (maybeFilter && maybeAnotherFilter) {
@@ -718,7 +743,7 @@ std::shared_ptr<IOperator> TExpandCBOTreeRule::SimpleTestAndApply(const std::sha
718743
}
719744

720745
if (maybeFilter) {
721-
maybeFilter->Children[0] = cboTree;
746+
maybeFilter->SetInput(cboTree);
722747
return maybeFilter;
723748
} else {
724749
return cboTree;
@@ -810,7 +835,7 @@ bool TAssignStagesRule::TestAndApply(std::shared_ptr<IOperator> &input, TRBOCont
810835
}
811836
YQL_CLOG(TRACE, CoreDq) << "Assign stages join";
812837
} else if (input->Kind == EOperator::Filter || input->Kind == EOperator::Map) {
813-
auto childOp = input->Children[0];
838+
auto childOp = CastOperator<IUnaryOperator>(input)->GetInput();
814839
auto prevStageId = *(childOp->Props.StageId);
815840

816841
// If the child operator is a source, it requires its own stage

ydb/core/kqp/opt/rbo/kqp_rbo_rules.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class TPushFilterRule : public ISimplifiedRule {
7070
*/
7171
class TBuildInitialCBOTreeRule : public ISimplifiedRule {
7272
public:
73-
TBuildInitialCBOTreeRule() : ISimplifiedRule("Building initial CBO tree", ERuleProperties::RequireParents) {}
73+
TBuildInitialCBOTreeRule() : ISimplifiedRule("Building initial CBO tree", ERuleProperties::RequireParents, true) {}
7474

7575
virtual std::shared_ptr<IOperator> SimpleTestAndApply(const std::shared_ptr<IOperator> &input, TRBOContext &ctx, TPlanProps &props) override;
7676
};
@@ -80,7 +80,7 @@ class TBuildInitialCBOTreeRule : public ISimplifiedRule {
8080
*/
8181
class TExpandCBOTreeRule : public ISimplifiedRule {
8282
public:
83-
TExpandCBOTreeRule() : ISimplifiedRule("Expand CBO tree", ERuleProperties::RequireParents) {}
83+
TExpandCBOTreeRule() : ISimplifiedRule("Expand CBO tree", ERuleProperties::RequireParents, true) {}
8484

8585
virtual std::shared_ptr<IOperator> SimpleTestAndApply(const std::shared_ptr<IOperator> &input, TRBOContext &ctx, TPlanProps &props) override;
8686
};

0 commit comments

Comments
 (0)