Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/rbo/kqp_constant_folding_stage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ namespace {
namespace NKikimr {
namespace NKqp {

TConstantFoldingStage::TConstantFoldingStage() {
TConstantFoldingStage::TConstantFoldingStage() : IRBOStage("Constant folding stage") {
Props = ERuleProperties::RequireParents | ERuleProperties::RequireTypes;
}

Expand Down
154 changes: 123 additions & 31 deletions ydb/core/kqp/opt/rbo/kqp_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ using namespace NYql;
using namespace NNodes;

TString PrintRBOExpression(TExprNode::TPtr expr, TExprContext & ctx) {
if (expr->IsLambda()) {
expr = expr->Child(1);
}
try {
TConvertToAstSettings settings;
settings.AllowFreeArgs = true;
Expand Down Expand Up @@ -529,6 +532,8 @@ bool TOpMap::HasRenames() const {
return false;
}

// Returns explicit renames as pairs of <to, from>

TVector<std::pair<TInfoUnit, TInfoUnit>> TOpMap::GetRenames() const {
TVector<std::pair<TInfoUnit, TInfoUnit>> result;
for (auto &[iu, body] : MapElements) {
Expand All @@ -539,6 +544,34 @@ TVector<std::pair<TInfoUnit, TInfoUnit>> TOpMap::GetRenames() const {
return result;
}

// Add simple transformations that preserve both key and column statistics properties
// Currently only pg transformations FromPg and ToPg are supported
// Return pairs of renames <to, from>

TVector<std::pair<TInfoUnit, TInfoUnit>> TOpMap::GetRenamesWithTransforms(TPlanProps& props) const {
auto result = GetRenames();

for (auto &[iu, body] : MapElements) {
if (std::holds_alternative<TExprNode::TPtr>(body)) {
auto lambda = TCoLambda(std::get<TExprNode::TPtr>(body));
auto expr = lambda.Body().Ptr();

if (expr->IsCallable("ToPg") || expr->IsCallable("FromPg")) {
if (expr->ChildPtr(0)->IsCallable("Member")) {
TVector<TInfoUnit> transformIUs;
GetAllMembers(expr, transformIUs, props, true);
if (transformIUs.size()==1) {
result.push_back(std::make_pair(iu, transformIUs[0]));
}
}
}
}
}

return result;
}


void TOpMap::RenameIUs(const THashMap<TInfoUnit, TInfoUnit, TInfoUnit::THashFunction> &renameMap, TExprContext &ctx, const THashSet<TInfoUnit, TInfoUnit::THashFunction> &stopList) {
TVector<std::pair<TInfoUnit, std::variant<TInfoUnit, TExprNode::TPtr>>> newMapElements;

Expand Down Expand Up @@ -708,44 +741,47 @@ TConjunctInfo TOpFilter::GetConjunctInfo(TPlanProps& props) const {
lambdaBody = lambdaBody->ChildPtr(0);
}

TVector<TExprNode::TPtr> conjuncts;
if (lambdaBody->IsCallable("And")) {
for (auto conj : lambdaBody->Children()) {
auto conjObj = conj;
bool fromPg = false;
conjuncts.push_back(conj);
}
} else {
conjuncts.push_back(lambdaBody);
}

if (conj->IsCallable("FromPg")) {
conjObj = conj->ChildPtr(0);
fromPg = true;
}
for (auto conj : conjuncts) {
auto conjObj = conj;
bool fromPg = false;

TExprNode::TPtr leftArg;
TExprNode::TPtr rightArg;
if (TestAndExtractEqualityPredicate(conjObj, leftArg, rightArg)) {
TVector<TInfoUnit> conjIUs;
GetAllMembers(conj, conjIUs, props);

if (leftArg->IsCallable("Member") && rightArg->IsCallable("Member") && conjIUs.size() >= 2) {
TVector<TInfoUnit> leftIUs;
TVector<TInfoUnit> rightIUs;
GetAllMembers(leftArg, leftIUs, props);
GetAllMembers(rightArg, rightIUs, props);
res.JoinConditions.push_back(TJoinConditionInfo(conjObj, leftIUs[0], rightIUs[0]));
}
else {
TVector<TInfoUnit> conjIUs;
GetAllMembers(conj, conjIUs);
res.Filters.push_back(TFilterInfo(conj, conjIUs, fromPg));
}
} else {
if (conj->IsCallable("FromPg")) {
conjObj = conj->ChildPtr(0);
fromPg = true;
}

TExprNode::TPtr leftArg;
TExprNode::TPtr rightArg;
if (TestAndExtractEqualityPredicate(conjObj, leftArg, rightArg)) {
TVector<TInfoUnit> conjIUs;
GetAllMembers(conj, conjIUs, props);

if (leftArg->IsCallable("Member") && rightArg->IsCallable("Member") && conjIUs.size() >= 2) {
TVector<TInfoUnit> leftIUs;
TVector<TInfoUnit> rightIUs;
GetAllMembers(leftArg, leftIUs, props);
GetAllMembers(rightArg, rightIUs, props);
res.JoinConditions.push_back(TJoinConditionInfo(conjObj, leftIUs[0], rightIUs[0]));
}
else {
TVector<TInfoUnit> conjIUs;
GetAllMembers(conj, conjIUs, props);
GetAllMembers(conj, conjIUs);
res.Filters.push_back(TFilterInfo(conj, conjIUs, fromPg));
}
} else {
TVector<TInfoUnit> conjIUs;
GetAllMembers(conj, conjIUs, props);
res.Filters.push_back(TFilterInfo(conj, conjIUs, fromPg));
}
} else {
TVector<TInfoUnit> filterIUs;
GetAllMembers(lambdaBody, filterIUs, props);
res.Filters.push_back(TFilterInfo(lambdaBody, filterIUs));
}

return res;
Expand Down Expand Up @@ -785,10 +821,20 @@ void TOpJoin::RenameIUs(const THashMap<TInfoUnit, TInfoUnit, TInfoUnit::THashFun
}
}

const THashMap<EJoinAlgoType,TString> AlgoNames = {
{EJoinAlgoType::LookupJoin, "Lookup"},
{EJoinAlgoType::LookupJoinReverse, "ReverseLookup"},
{EJoinAlgoType::MapJoin, "Map"},
{EJoinAlgoType::GraceJoin, "Shuffle"}};

TString TOpJoin::ToString(TExprContext& ctx) {
Y_UNUSED(ctx);
TStringBuilder res;
res << "Join [";
res << "Join, Kind: " << JoinKind;
if (Props.JoinAlgo.has_value()) {
res << ", Algo: " << AlgoNames.at(*Props.JoinAlgo);
}
res << " [";
for (size_t i = 0; i < JoinKeys.size(); i++) {
auto [x,y] = JoinKeys[i];
res << x.GetFullName() + "=" + y.GetFullName();
Expand Down Expand Up @@ -899,6 +945,52 @@ TString TOpAggregate::ToString(TExprContext& ctx) {
return strBuilder;
}

/***
* OpCBOTree operator methods
*/
TOpCBOTree::TOpCBOTree(std::shared_ptr<IOperator> treeRoot, TPositionHandle pos) :
IOperator(EOperator::CBOTree, pos),
TreeRoot(treeRoot),
TreeNodes({treeRoot})
{
Children = treeRoot->Children;
}

TOpCBOTree::TOpCBOTree(std::shared_ptr<IOperator> treeRoot, TVector<std::shared_ptr<IOperator>> treeNodes, TPositionHandle pos) :
IOperator(EOperator::CBOTree, pos),
TreeRoot(treeRoot),
TreeNodes({treeNodes})
{
for (auto & n : treeNodes) {
for (auto & c : n->Children) {
if (std::find(treeNodes.begin(), treeNodes.end(), c) == treeNodes.end()) {
Children.push_back(c);
}
}
}
}

void TOpCBOTree::RenameIUs(const THashMap<TInfoUnit, TInfoUnit, TInfoUnit::THashFunction> &renameMap, TExprContext &ctx, const THashSet<TInfoUnit, TInfoUnit::THashFunction> &stopList) {
for (auto op : TreeNodes) {
op->RenameIUs(renameMap, ctx, stopList);
}
}

TString TOpCBOTree::ToString(TExprContext& ctx) {
TStringBuilder res;
res << "CBO Tree: [";
for (size_t i=0; i < TreeNodes.size(); i++) {
res << TreeNodes[i]->ToString(ctx);
if (i != TreeNodes.size()-1) {
res << ", ";
}
}
res << "]";
return res;
}



/**
* OpRoot operator methods
*/
Expand Down
30 changes: 28 additions & 2 deletions ydb/core/kqp/opt/rbo/kqp_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace NKqp {

using namespace NYql;

enum EOperator : ui32 { EmptySource, Source, Map, Project, Filter, Join, Aggregate, Limit, UnionAll, Root };
enum EOperator : ui32 { EmptySource, Source, Map, Project, Filter, Join, Aggregate, Limit, UnionAll, CBOTree, Root };

/* Represents aggregation phases. */
enum EAggregationPhase : ui32 {Intermediate, Final};
Expand Down Expand Up @@ -387,6 +387,7 @@ class IUnaryOperator : public IOperator {
IUnaryOperator(EOperator kind, TPositionHandle pos) : IOperator(kind, pos) {}
IUnaryOperator(EOperator kind, TPositionHandle pos, std::shared_ptr<IOperator> input) : IOperator(kind, pos) { Children.push_back(input); }
std::shared_ptr<IOperator> &GetInput() { return Children[0]; }
void SetInput(std::shared_ptr<IOperator> newInput) { Children[0] = newInput; }

virtual void ComputeMetadata(TRBOContext & ctx, TPlanProps & planProps) override;
virtual void ComputeStatistics(TRBOContext & ctx, TPlanProps & planProps) override;
Expand All @@ -402,6 +403,9 @@ class IBinaryOperator : public IOperator {

std::shared_ptr<IOperator> &GetLeftInput() { return Children[0]; }
std::shared_ptr<IOperator> &GetRightInput() { return Children[1]; }

void SetLeftInput(std::shared_ptr<IOperator> newInput) { Children[0] = newInput; }
void SetRightInput(std::shared_ptr<IOperator> newInput) { Children[1] = newInput; }
};

class TOpEmptySource : public IOperator {
Expand Down Expand Up @@ -446,6 +450,7 @@ class TOpMap : public IUnaryOperator {
bool HasRenames() const;
bool HasLambdas() const;
TVector<std::pair<TInfoUnit, TInfoUnit>> GetRenames() const;
TVector<std::pair<TInfoUnit, TInfoUnit>> GetRenamesWithTransforms(TPlanProps& props) const;
TVector<std::pair<TInfoUnit, TExprNode::TPtr>> GetLambdas() const;
void RenameIUs(const THashMap<TInfoUnit, TInfoUnit, TInfoUnit::THashFunction> &renameMap, TExprContext &ctx, const THashSet<TInfoUnit, TInfoUnit::THashFunction> &stopList = {}) override;

Expand Down Expand Up @@ -553,6 +558,27 @@ class TOpLimit : public IUnaryOperator {
TExprNode::TPtr LimitCond;
};

/***
* This operator packages a subtree of operators in order to pass them to dynamic programming optimizer
* Currently it requires that the list of operators TreeNodes is in a post-order traversal of the tree
* No validation is currently used
*/
class TOpCBOTree : public IOperator {
public:
TOpCBOTree(std::shared_ptr<IOperator> treeRoot, TPositionHandle pos);
TOpCBOTree(std::shared_ptr<IOperator> treeRoot, TVector<std::shared_ptr<IOperator>> treeNodes, TPositionHandle pos);

virtual TVector<TInfoUnit> GetOutputIUs() override { return TreeRoot->GetOutputIUs(); }
void RenameIUs(const THashMap<TInfoUnit, TInfoUnit, TInfoUnit::THashFunction> &renameMap, TExprContext &ctx, const THashSet<TInfoUnit, TInfoUnit::THashFunction> &stopList = {}) override;
virtual TString ToString(TExprContext& ctx) override;

virtual void ComputeMetadata(TRBOContext & ctx, TPlanProps & planProps) override;
virtual void ComputeStatistics(TRBOContext & ctx, TPlanProps & planProps) override;

std::shared_ptr<IOperator> TreeRoot;
TVector<std::shared_ptr<IOperator>> TreeNodes;
};

class TOpRoot : public IUnaryOperator {
public:
TOpRoot(std::shared_ptr<IOperator> input, TPositionHandle pos, TVector<TString> columnOrder);
Expand Down Expand Up @@ -596,7 +622,7 @@ class TOpRoot : public IUnaryOperator {
for (auto scalarSubplan : Root->PlanProps.ScalarSubplans.Get()) {
BuildDfsList(scalarSubplan, {}, size_t(0), visited);
}
auto child = ptr->Children[0];
auto child = ptr->GetInput();
BuildDfsList(child, {}, size_t(0), visited);
CurrElement = 0;
}
Expand Down
16 changes: 11 additions & 5 deletions ydb/core/kqp/opt/rbo/kqp_rbo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ bool ISimplifiedRule::TestAndApply(std::shared_ptr<IOperator> &input, TRBOContex
}
}

TRuleBasedStage::TRuleBasedStage(TVector<std::shared_ptr<IRule>> rules) : Rules(rules) {
TRuleBasedStage::TRuleBasedStage(TString stageName, TVector<std::shared_ptr<IRule>> rules) :
IRBOStage(stageName),
Rules(rules) {
for (auto & r : Rules) {
Props |= r->Props;
}
Expand All @@ -27,7 +29,7 @@ void ComputeRequiredProps(TOpRoot &root, ui32 props, TRBOContext &ctx) {
if (props & ERuleProperties::RequireParents) {
root.ComputeParents();
}
if (props & ERuleProperties::RequireTypes) {
if (props & (ERuleProperties::RequireTypes | ERuleProperties::RequireStatistics)) {
if (root.ComputeTypes(ctx) != IGraphTransformer::TStatus::Ok) {
Y_ENSURE(false, "RBO type annotation failed");
}
Expand Down Expand Up @@ -70,7 +72,11 @@ void TRuleBasedStage::RunStage(TOpRoot &root, TRBOContext &ctx) {
if (iter.Parent) {
iter.Parent->Children[iter.ChildIndex] = op;
} else {
root.Children[0] = op;
root.SetInput(op);
}

if (rule->LogRule) {
YQL_CLOG(TRACE, CoreDq) << "Plan after applying rule:\n" << root.PlanToString(ctx.ExprCtx);
}

ComputeRequiredProps(root, Props, ctx);
Expand All @@ -95,8 +101,8 @@ TExprNode::TPtr TRuleBasedOptimizer::Optimize(TOpRoot &root, TExprContext &ctx)
auto context = TRBOContext(KqpCtx, ctx, TypeCtx, RBOTypeAnnTransformer, FuncRegistry);

for (size_t idx = 0; idx < Stages.size(); idx++) {
YQL_CLOG(TRACE, CoreDq) << "Running stage: " << idx;
auto stage = Stages[idx];
YQL_CLOG(TRACE, CoreDq) << "Running stage: " << stage->StageName;
ComputeRequiredProps(root, stage->Props, context);
stage->RunStage(root, context);
YQL_CLOG(TRACE, CoreDq) << "After stage:\n" << root.PlanToString(ctx);
Expand All @@ -106,7 +112,7 @@ TExprNode::TPtr TRuleBasedOptimizer::Optimize(TOpRoot &root, TExprContext &ctx)

auto convertProps = ERuleProperties::RequireParents | ERuleProperties::RequireTypes | ERuleProperties::RequireStatistics;
ComputeRequiredProps(root, convertProps, context);
YQL_CLOG(TRACE, CoreDq) << "Final plan before generation:\n" << root.PlanToString(ctx, EPrintPlanOptions::PrintBasicMetadata | EPrintPlanOptions::PrintBasicStatistics);
YQL_CLOG(TRACE, CoreDq) << "Final plan before generation:\n" << root.PlanToString(ctx, EPrintPlanOptions::PrintFullMetadata | EPrintPlanOptions::PrintBasicStatistics);

return ConvertToPhysical(root, context, TypeAnnTransformer, PeepholeTransformer);
}
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/kqp/opt/rbo/kqp_rbo.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ enum ERuleProperties: ui32 {
class IRule {
public:
IRule(TString name) : RuleName(name) {}
IRule(TString name, ui32 props) : RuleName(name), Props(props) {}
IRule(TString name, ui32 props, bool logRule = false) : RuleName(name), Props(props), LogRule(logRule) {}

virtual bool TestAndApply(std::shared_ptr<IOperator> &input, TRBOContext &ctx, TPlanProps &props) = 0;

virtual ~IRule() = default;

TString RuleName;
ui32 Props{0x00};
bool LogRule = false;
};

/**
Expand All @@ -43,7 +44,7 @@ class IRule {
class ISimplifiedRule : public IRule {
public:
ISimplifiedRule(TString name) : IRule(name) {}
ISimplifiedRule(TString name, ui32 props) : IRule(name, props) {}
ISimplifiedRule(TString name, ui32 props, bool logRule = false) : IRule(name, props, logRule) {}

virtual std::shared_ptr<IOperator> SimpleTestAndApply(const std::shared_ptr<IOperator> &input, TRBOContext &ctx, TPlanProps &props) = 0;

Expand All @@ -58,17 +59,21 @@ class ISimplifiedRule : public IRule {
*/
class IRBOStage {
public:
IRBOStage(TString stageName) : StageName(stageName) {}

virtual void RunStage(TOpRoot &root, TRBOContext &ctx) = 0;
virtual ~IRBOStage() = default;
ui32 Props = 0x00;

TString StageName;
};

/**
* Rule based stage is just a collection of rules
*/
class TRuleBasedStage : public IRBOStage {
public:
TRuleBasedStage(TVector<std::shared_ptr<IRule>> rules);
TRuleBasedStage(TString stageName, TVector<std::shared_ptr<IRule>> rules);
virtual void RunStage(TOpRoot &root, TRBOContext &ctx) override;

TVector<std::shared_ptr<IRule>> Rules;
Expand Down
Loading
Loading