Skip to content

Commit efd37ae

Browse files
authored
Support new secrets in Async Replication (#26530)
1 parent aeb72cf commit efd37ae

File tree

7 files changed

+94
-26
lines changed

7 files changed

+94
-26
lines changed

ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -316,21 +316,12 @@ NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeSecret(
316316
TActorSystem* actorSystem
317317
) {
318318
auto promise = NThreading::NewPromise<TEvDescribeSecretsResponse::TDescription>();
319-
if (actorSystem->AppData<TAppData>()->FeatureFlags.GetEnableSchemaSecrets()) {
320-
bool schemaSecrets = false;
321-
for (const auto& secretName : secretNames) {
322-
if (secretName.StartsWith('/')) {
323-
schemaSecrets = true;
324-
break;
325-
}
326-
}
327-
if (schemaSecrets) {
328-
actorSystem->Send(
329-
MakeKqpDescribeSchemaSecretServiceId(actorSystem->NodeId),
330-
new TDescribeSchemaSecretsService::TEvResolveSecret(userToken, database, secretNames, promise)
331-
);
332-
return promise.GetFuture();
333-
}
319+
if (UseSchemaSecrets(AppData()->FeatureFlags, secretNames)) {
320+
actorSystem->Send(
321+
MakeKqpDescribeSchemaSecretServiceId(actorSystem->NodeId),
322+
new TDescribeSchemaSecretsService::TEvResolveSecret(userToken, database, secretNames, promise)
323+
);
324+
return promise.GetFuture();
334325
}
335326

336327
actorSystem->Register(CreateDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", secretNames, promise));
@@ -397,4 +388,22 @@ IActor* TDescribeSchemaSecretsServiceFactory::CreateService() {
397388
return new TDescribeSchemaSecretsService();
398389
}
399390

391+
bool UseSchemaSecrets(const NKikimr::TFeatureFlags& flags, const TVector<TString>& secretNames) {
392+
if (!flags.GetEnableSchemaSecrets()) {
393+
return false;
394+
}
395+
396+
for (const auto& secretName : secretNames) {
397+
if (!secretName.StartsWith('/')) {
398+
return false;
399+
}
400+
}
401+
402+
return true; // New secrets are enabled and all of them start with '/'
403+
}
404+
405+
bool UseSchemaSecrets(const NKikimr::TFeatureFlags& flags, const TString& secretName) {
406+
return flags.GetEnableSchemaSecrets() && secretName.StartsWith('/');
407+
}
408+
400409
} // namespace NKikimr::NKqp

ydb/core/kqp/federated_query/kqp_federated_query_actors.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,4 +138,14 @@ class TDescribeSchemaSecretsServiceFactory : public IDescribeSchemaSecretsServic
138138
IActor* CreateService() override;
139139
};
140140

141+
NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> DescribeSecret(
142+
const TVector<TString>& secretNames,
143+
const TIntrusiveConstPtr<NACLib::TUserToken> userToken,
144+
const TString& database,
145+
TActorSystem* actorSystem
146+
);
147+
148+
bool UseSchemaSecrets(const NKikimr::TFeatureFlags& flags, const TVector<TString>& secretNames);
149+
bool UseSchemaSecrets(const NKikimr::TFeatureFlags& flags, const TString& secretName);
150+
141151
} // namespace NKikimr::NKqp

ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,19 @@ TStatus ExecuteGeneric(NYdb::NQuery::TQueryClient& queryClient, TSession& sessio
5252
}
5353
}
5454

55+
template<bool UseSchemaSecrets>
56+
void CreateSecret(TString& secretName, const TString& secretValue, TSession& session) {
57+
TString query;
58+
if constexpr (UseSchemaSecrets) {
59+
secretName = "/Root/" + secretName;
60+
query = Sprintf("CREATE SECRET `%s` WITH (value=\"%s\")", secretName.c_str(), secretValue.c_str());
61+
} else {
62+
query = Sprintf("CREATE OBJECT %s (TYPE SECRET) WITH value=\"%s\"", secretName.c_str(), secretValue.c_str());
63+
}
64+
const auto queryResult = session.ExecuteSchemeQuery(query).GetValueSync();
65+
UNIT_ASSERT_EQUAL_C(NYdb::EStatus::SUCCESS, queryResult.GetStatus(), queryResult.GetIssues().ToString());
66+
}
67+
5568
}
5669

5770
Y_UNIT_TEST_SUITE(KqpScheme) {
@@ -9206,10 +9219,13 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
92069219
}
92079220
}
92089221

9209-
Y_UNIT_TEST(CreateAsyncReplicationWithTokenSecret) {
9222+
Y_UNIT_TEST_TWIN(CreateAsyncReplicationWithTokenSecret, UseSchemaSecrets) {
92109223
using namespace NReplication;
92119224

92129225
TKikimrRunner kikimr("root@builtin");
9226+
if (UseSchemaSecrets) {
9227+
kikimr.GetTestServer().GetRuntime()->GetAppData(0).FeatureFlags.SetEnableSchemaSecrets(true);
9228+
}
92139229
auto repl = TReplicationClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root"));
92149230
auto db = kikimr.GetTableClient();
92159231
auto session = db.CreateSession().GetValueSync().GetSession();
@@ -9230,17 +9246,20 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
92309246

92319247
// ok
92329248
{
9249+
TString secretId = "mysecretname";
9250+
const TString secretValue = "root@builtin";
9251+
CreateSecret<UseSchemaSecrets>(secretId, secretValue, session);
9252+
92339253
auto query = Sprintf(R"(
92349254
--!syntax_v1
9235-
CREATE OBJECT mysecret (TYPE SECRET) WITH (value = "root@builtin");
92369255
CREATE ASYNC REPLICATION `/Root/replication` FOR
92379256
`/Root/table` AS `/Root/replica`
92389257
WITH (
92399258
ENDPOINT = "%s",
92409259
DATABASE = "/Root",
9241-
TOKEN_SECRET_NAME = "mysecret"
9260+
TOKEN_SECRET_NAME = "%s"
92429261
);
9243-
)", kikimr.GetEndpoint().c_str());
9262+
)", kikimr.GetEndpoint().c_str(), secretId.c_str());
92449263

92459264
const auto result = session.ExecuteSchemeQuery(query).GetValueSync();
92469265
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

ydb/core/tx/replication/controller/replication.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class TReplication::TImpl: public TLagProvider {
4242
return;
4343
}
4444

45-
SecretResolver = ctx.Register(CreateSecretResolver(ctx.SelfID, ReplicationId, PathId, secretName, ++SecretResolverCookie));
45+
SecretResolver = ctx.Register(CreateSecretResolver(ctx.SelfID, ReplicationId, PathId, secretName, ++SecretResolverCookie, Database));
4646
}
4747

4848
ui64 GetExpectedSecretResolverCookie() const {

ydb/core/tx/replication/controller/secret_resolver.cpp

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
#include "private_events.h"
33
#include "secret_resolver.h"
44

5+
#include <ydb/core/kqp/common/events/script_executions.h>
6+
#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h>
57
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
68
#include <ydb/library/actors/core/actor_bootstrapped.h>
79
#include <ydb/library/actors/core/hfunc.h>
@@ -10,6 +12,8 @@
1012
#include <ydb/services/metadata/secret/snapshot.h>
1113
#include <ydb/services/metadata/service.h>
1214

15+
#include <util/generic/ptr.h>
16+
1317
namespace NKikimr::NReplication::NController {
1418

1519
class TSecretResolver: public TActorBootstrapped<TSecretResolver> {
@@ -40,8 +44,20 @@ class TSecretResolver: public TActorBootstrapped<TSecretResolver> {
4044
}
4145

4246
SecretId = NMetadata::NSecret::TSecretId(entry.SecurityObject->GetOwnerSID(), SecretName);
43-
Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()),
44-
new NMetadata::NProvider::TEvAskSnapshot(SnapshotFetcher()));
47+
if (NKqp::UseSchemaSecrets(AppData()->FeatureFlags, SecretId.GetSecretId())) {
48+
const TVector<TString> secretNames{SecretId.GetSecretId()};
49+
auto userToken = MakeIntrusiveConst<NACLib::TUserToken>(entry.SecurityObject->GetOwnerSID(), TVector<TString>());
50+
const auto actorSystem = ActorContext().ActorSystem();
51+
const auto replyActorId = SelfId();
52+
auto future = NKqp::DescribeSecret(secretNames, userToken, Database, actorSystem);
53+
future.Subscribe([actorSystem, replyActorId](const NThreading::TFuture<NKqp::TEvDescribeSecretsResponse::TDescription>& result) {
54+
actorSystem->Send(replyActorId, new NKqp::TEvDescribeSecretsResponse(result.GetValue()));
55+
});
56+
return;
57+
} else {
58+
Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()),
59+
new NMetadata::NProvider::TEvAskSnapshot(SnapshotFetcher()));
60+
}
4561
}
4662

4763
void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) {
@@ -55,6 +71,15 @@ class TSecretResolver: public TActorBootstrapped<TSecretResolver> {
5571
Reply(secretValue.DetachResult());
5672
}
5773

74+
void Handle(NKqp::TEvDescribeSecretsResponse::TPtr& ev) {
75+
if (ev->Get()->Description.Status != Ydb::StatusIds::SUCCESS) {
76+
return Reply(false, ev->Get()->Description.Issues.ToOneLineString());
77+
}
78+
79+
Y_ENSURE(ev->Get()->Description.SecretValues.size() == 1);
80+
Reply(ev->Get()->Description.SecretValues[0]);
81+
}
82+
5883
template <typename... Args>
5984
void Reply(Args&&... args) {
6085
Send(Parent, new TEvPrivate::TEvResolveSecretResult(ReplicationId, std::forward<Args>(args)...), 0, Cookie);
@@ -66,12 +91,13 @@ class TSecretResolver: public TActorBootstrapped<TSecretResolver> {
6691
return NKikimrServices::TActivity::REPLICATION_CONTROLLER_SECRET_RESOLVER;
6792
}
6893

69-
explicit TSecretResolver(const TActorId& parent, ui64 rid, const TPathId& pathId, const TString& secretName, const ui64 cookie)
94+
explicit TSecretResolver(const TActorId& parent, ui64 rid, const TPathId& pathId, const TString& secretName, const ui64 cookie, const TString& database)
7095
: Parent(parent)
7196
, ReplicationId(rid)
7297
, PathId(pathId)
7398
, SecretName(secretName)
7499
, Cookie(cookie)
100+
, Database(database)
75101
, LogPrefix("SecretResolver", ReplicationId)
76102
{
77103
}
@@ -97,6 +123,7 @@ class TSecretResolver: public TActorBootstrapped<TSecretResolver> {
97123
switch (ev->GetTypeRewrite()) {
98124
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
99125
hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle);
126+
hFunc(NKqp::TEvDescribeSecretsResponse, Handle);
100127
sFunc(TEvents::TEvWakeup, Bootstrap);
101128
sFunc(TEvents::TEvPoison, PassAway);
102129
}
@@ -108,15 +135,16 @@ class TSecretResolver: public TActorBootstrapped<TSecretResolver> {
108135
const TPathId PathId;
109136
const TString SecretName;
110137
const ui64 Cookie;
138+
const TString Database;
111139
const TActorLogPrefix LogPrefix;
112140

113141
static constexpr auto RetryInterval = TDuration::Seconds(1);
114142
NMetadata::NSecret::TSecretId SecretId;
115143

116144
}; // TSecretResolver
117145

118-
IActor* CreateSecretResolver(const TActorId& parent, ui64 rid, const TPathId& pathId, const TString& secretName, const ui64 cookie) {
119-
return new TSecretResolver(parent, rid, pathId, secretName, cookie);
146+
IActor* CreateSecretResolver(const TActorId& parent, ui64 rid, const TPathId& pathId, const TString& secretName, const ui64 cookie, const TString& database) {
147+
return new TSecretResolver(parent, rid, pathId, secretName, cookie, database);
120148
}
121149

122150
}

ydb/core/tx/replication/controller/secret_resolver.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@
44

55
namespace NKikimr::NReplication::NController {
66

7-
IActor* CreateSecretResolver(const TActorId& parent, ui64 rid, const TPathId& pathId, const TString& secretName, const ui64 cookie);
7+
IActor* CreateSecretResolver(const TActorId& parent, ui64 rid, const TPathId& pathId, const TString& secretName, const ui64 cookie, const TString& database);
88

99
}

ydb/core/tx/replication/controller/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ PEERDIR(
44
ydb/core/base
55
ydb/core/discovery
66
ydb/core/engine/minikql
7+
ydb/core/kqp/common/events
8+
ydb/core/kqp/federated_query
79
ydb/core/protos
810
ydb/core/tablet
911
ydb/core/tablet_flat

0 commit comments

Comments
 (0)