Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 95 additions & 34 deletions ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ TString TKafkaProduceActor::LogPrefix() {
sb << "Init ";
} else if (stateFunc == &TKafkaProduceActor::StateWork) {
sb << "Work ";
} else if (stateFunc == &TKafkaProduceActor::StateAccepting) {
sb << "Accepting ";
} else {
sb << "Unknown ";
}
Expand Down Expand Up @@ -229,22 +227,22 @@ void TKafkaProduceActor::Handle(TEvKafka::TEvProduceRequest::TPtr request, const

void TKafkaProduceActor::ProcessRequests(const TActorContext& ctx) {
if (&TKafkaProduceActor::StateWork != CurrentStateFunc()) {
KAFKA_LOG_ERROR("Produce actor: Unexpected state");
return;
}

if (Requests.empty()) {
return;
}

if (EnqueueInitialization()) {
auto canProcess = EnqueueInitialization();
while (canProcess--) {
PendingRequests.push_back(std::make_shared<TPendingRequest>(Requests.front()));
Requests.pop_front();

ProcessRequest(PendingRequests.back(), ctx);
} else {
ProcessInitializationRequests(ctx);
}

ProcessInitializationRequests(ctx);
}

size_t TKafkaProduceActor::EnqueueInitialization() {
Expand Down Expand Up @@ -399,12 +397,10 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
if (pendingRequest->WaitResultCookies.empty()) {
// All request for unknown topic or empty request
SendResults(ctx);
} else {
Become(&TKafkaProduceActor::StateAccepting);
}
}

void TKafkaProduceActor::HandleAccepting(TEvPartitionWriter::TEvWriteAccepted::TPtr request, const TActorContext& ctx) {
void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteAccepted::TPtr request, const TActorContext& ctx) {
auto r = request->Get();
auto cookie = r->Cookie;

Expand All @@ -422,12 +418,87 @@ void TKafkaProduceActor::HandleAccepting(TEvPartitionWriter::TEvWriteAccepted::T
Become(&TKafkaProduceActor::StateWork);
ProcessRequests(ctx);
} else {
KAFKA_LOG_W("Still in Accepting state after TEvPartitionWriter::TEvWriteAccepted cause cookies are expected: " << JoinSeq(", ", expectedCookies));
KAFKA_LOG_W("Still in accepting after receive TEvPartitionWriter::TEvWriteAccepted cause cookies are expected: " << JoinSeq(", ", expectedCookies));
}
}

void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvInitResult::TPtr request, const TActorContext& /*ctx*/) {
KAFKA_LOG_D("Produce actor: Init " << request->Get()->ToString());

if (!request->Get()->IsSuccess()) {
auto sender = request->Sender;

if (WriterDied(sender, EKafkaErrors::UNKNOWN_SERVER_ERROR, request->Get()->GetError().Reason)) {
KAFKA_LOG_D("Produce actor: Received TEvPartitionWriter::TEvInitResult for " << sender << " with error: " << request->Get()->GetError().Reason);
return;
}

KAFKA_LOG_D("Produce actor: Received TEvPartitionWriter::TEvInitResult with unexpected writer " << sender);
}
}

void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvDisconnected::TPtr request, const TActorContext& /*ctx*/) {
auto sender = request->Sender;

if (WriterDied(sender, EKafkaErrors::NOT_LEADER_OR_FOLLOWER, TStringBuilder() << "Partition writer " << sender << " disconnected")) {
KAFKA_LOG_D("Produce actor: Received TEvPartitionWriter::TEvDisconnected for " << sender);
return;
}

KAFKA_LOG_D("Produce actor: Received TEvPartitionWriter::TEvDisconnected with unexpected writer " << sender);
}

bool TKafkaProduceActor::WriterDied(const TActorId& writerId, EKafkaErrors errorCode, TStringBuf errorMessage) {
auto findAndCleanWriter = [&]() -> std::pair<TString, ui32> {
for (auto it = TransactionalWriters.begin(); it != TransactionalWriters.end(); ++it) {
if (it->second.ActorId == writerId) {
auto id = it->first;
CleanWriter(id, writerId);
TransactionalWriters.erase(it);
return {id.TopicPath, id.PartitionId};
}
}

for (auto& [topicPath, partitionWriters] : NonTransactionalWriters) {
for (auto it = partitionWriters.begin(); it != partitionWriters.end(); ++it) {
if (it->second.ActorId == writerId) {
auto id = it->first;
CleanWriter({topicPath, static_cast<ui32>(id)}, writerId);
partitionWriters.erase(it);
return {topicPath, static_cast<ui32>(id)};
}
}
}

return {"", 0};
};

auto [topicPath, partitionId] = findAndCleanWriter();
if (topicPath.empty()) {
return false;
}

for (auto it = Cookies.begin(); it != Cookies.end();) {
auto cookie = it->first;
auto& info = it->second;

if (info.TopicPath == topicPath && info.PartitionId == partitionId) {
info.Request->Results[info.Position].ErrorCode = errorCode;
info.Request->Results[info.Position].ErrorMessage = errorMessage;
info.Request->WaitAcceptingCookies.erase(cookie);
info.Request->WaitResultCookies.erase(cookie);

if (info.Request->WaitAcceptingCookies.empty() && info.Request->WaitResultCookies.empty()) {
SendResults(ActorContext());
}

it = Cookies.erase(it);
} else {
++it;
}
}

return true;
}

void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr request, const TActorContext& ctx) {
Expand Down Expand Up @@ -532,7 +603,8 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
size_t recordsCount = partitionData.Records.has_value() ? partitionData.Records->Records.size() : 0;
partitionResponse.Index = partitionData.Index;
if (EKafkaErrors::NONE_ERROR != result.ErrorCode) {
KAFKA_LOG_ERROR("Produce actor: Partition result with error: ErrorCode=" << static_cast<int>(result.ErrorCode) << ", ErrorMessage=" << result.ErrorMessage << ", #01");
KAFKA_LOG_ERROR("Produce actor: Partition result with error: ErrorCode=" << static_cast<int>(result.ErrorCode)
<< ", ErrorMessage=" << result.ErrorMessage << ", #01");
partitionResponse.ErrorCode = result.ErrorCode;
metricsErrorCode = result.ErrorCode;
partitionResponse.ErrorMessage = result.ErrorMessage;
Expand Down Expand Up @@ -584,20 +656,6 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {

Send(Context->ConnectionId, new TEvKafka::TEvResponse(correlationId, response, metricsErrorCode));

if (!pendingRequest->WaitAcceptingCookies.empty()) {
if (!expired) {
TStringBuilder sb;
sb << "Produce actor: All TEvWriteResponse were received, but not all TEvWriteAccepted. Unreceived cookies:";
for(auto cookie : pendingRequest->WaitAcceptingCookies) {
sb << " " << cookie;
}
KAFKA_LOG_W(sb);
}
if (&TKafkaProduceActor::StateAccepting == CurrentStateFunc()) {
Become(&TKafkaProduceActor::StateWork);
}
}

for(auto cookie : pendingRequest->WaitAcceptingCookies) {
Cookies.erase(cookie);
}
Expand All @@ -607,6 +665,8 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {

PendingRequests.pop_front();
}

ProcessRequests(ctx);
}

void TKafkaProduceActor::ProcessInitializationRequests(const TActorContext& ctx) {
Expand Down Expand Up @@ -681,18 +741,19 @@ void TKafkaProduceActor::SendWriteRequest(const TProduceRequestData::TTopicProdu
auto& result = pendingRequest->Results[position];
if (OK == writer.first) {
auto ownCookie = ++Cookie;
auto& cookieInfo = Cookies[ownCookie];
cookieInfo.TopicPath = topicPath;
cookieInfo.PartitionId = partitionId;
cookieInfo.Position = position;
cookieInfo.RuPerRequest = ruPerRequest;
cookieInfo.Request = pendingRequest;

pendingRequest->WaitAcceptingCookies.insert(ownCookie);
pendingRequest->WaitResultCookies.insert(ownCookie);

auto [error, ev] = Convert(transactionalId.GetOrElse(""), partitionData, topicPath, ownCookie, ClientDC, ruPerRequest);
if (error == EKafkaErrors::NONE_ERROR) {
auto& cookieInfo = Cookies[ownCookie];
cookieInfo.TopicPath = topicPath;
cookieInfo.PartitionId = partitionId;
cookieInfo.Position = position;
cookieInfo.RuPerRequest = ruPerRequest;
cookieInfo.Request = pendingRequest;

pendingRequest->WaitAcceptingCookies.insert(ownCookie);
pendingRequest->WaitResultCookies.insert(ownCookie);

ruPerRequest = false;
KAFKA_LOG_T("Sending TEvPartitionWriter::TEvWriteRequest to " << writer.second << " with cookie " << ownCookie);
Send(writer.second, std::move(ev));
Expand Down
36 changes: 11 additions & 25 deletions ydb/core/kafka_proxy/actors/kafka_produce_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ using namespace NKikimrClient;
// Requests are processed in parallel, but it is guaranteed that the recording order will be preserved.
// The order of responses to requests is also guaranteed.
//
// When the request begins to be processed, the actor enters the Accepting state. In this state, responses
// are expected from all TPartitionWriters confirming acceptance of the request (TEvWriteAccepted). After that,
// the actor switches back to the Work state. This guarantees the order of writing to each partition.
//
class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor> {
struct TPendingRequest;

Expand All @@ -52,9 +48,14 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor>

// Handlers for many StateFunc
void Handle(TEvKafka::TEvWakeup::TPtr request, const TActorContext& ctx);

void Handle(TEvPartitionWriter::TEvWriteAccepted::TPtr request, const TActorContext& ctx);
void Handle(TEvPartitionWriter::TEvWriteResponse::TPtr request, const TActorContext& ctx);
void Handle(TEvPartitionWriter::TEvInitResult::TPtr request, const TActorContext& ctx);
void Handle(TEvPartitionWriter::TEvDisconnected::TPtr request, const TActorContext& ctx);

void EnqueueRequest(TEvKafka::TEvProduceRequest::TPtr request, const TActorContext& ctx);

void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx);

Expand All @@ -67,8 +68,11 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor>
HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleInit);

HFunc(TEvKafka::TEvProduceRequest, EnqueueRequest);

HFunc(TEvPartitionWriter::TEvInitResult, Handle);
HFunc(TEvPartitionWriter::TEvWriteAccepted, Handle);
HFunc(TEvPartitionWriter::TEvWriteResponse, Handle);
HFunc(TEvPartitionWriter::TEvDisconnected, Handle);

HFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle);
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
Expand All @@ -85,29 +89,11 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor>
LogEvent(*ev.Get());
switch (ev->GetTypeRewrite()) {
HFunc(TEvKafka::TEvProduceRequest, Handle);
HFunc(TEvPartitionWriter::TEvInitResult, Handle);
HFunc(TEvPartitionWriter::TEvWriteResponse, Handle);

HFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle);
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);

HFunc(TEvKafka::TEvWakeup, Handle);
sFunc(TEvents::TEvPoison, PassAway);
}
}

// StateAccepting - enqueue ProduceRequest parts to PartitionWriters
// This guarantees the order of responses according order of request
void HandleAccepting(TEvPartitionWriter::TEvWriteAccepted::TPtr request, const TActorContext& ctx);

STATEFN(StateAccepting) {
LogEvent(*ev.Get());
switch (ev->GetTypeRewrite()) {
HFunc(TEvPartitionWriter::TEvWriteAccepted, HandleAccepting);

HFunc(TEvKafka::TEvProduceRequest, EnqueueRequest);
HFunc(TEvPartitionWriter::TEvInitResult, Handle);
HFunc(TEvPartitionWriter::TEvWriteAccepted, Handle);
HFunc(TEvPartitionWriter::TEvWriteResponse, Handle);
HFunc(TEvPartitionWriter::TEvDisconnected, Handle);

HFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle);
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
Expand All @@ -117,7 +103,6 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor>
}
}


// Logic
void ProcessRequests(const TActorContext& ctx);
void ProcessRequest(std::shared_ptr<TPendingRequest> pendingRequest, const TActorContext& ctx);
Expand All @@ -129,6 +114,7 @@ class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor>
void CleanTopics(const TActorContext& ctx);
void CleanWriters(const TActorContext& ctx);
std::pair<ETopicStatus, TActorId> PartitionWriter(const TTopicPartition& topicPartition, const TProducerInstanceId& producerInstanceId, const TMaybe<TString>& transactionalId, const TActorContext& ctx);
bool WriterDied(const TActorId& writerId, EKafkaErrors errorCode, TStringBuf errorMessage);

TString LogPrefix();
void LogEvent(IEventHandle& ev);
Expand Down
31 changes: 31 additions & 0 deletions ydb/core/kafka_proxy/ut/ut_produce_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,5 +301,36 @@ namespace {
UNIT_ASSERT(response != nullptr);
UNIT_ASSERT_VALUES_EQUAL(response->ErrorCode, NKafka::EKafkaErrors::REQUEST_TIMED_OUT);
}

Y_UNIT_TEST(OnProduce_andPipeDisconnected) {
i64 producerId = 1;
i32 producerEpoch = 2;

int writeRequestsCounter = 0;
int poisonPillCounter = 0;

auto observer = [&](TAutoPtr<IEventHandle>& input) {
if (input->CastAsLocal<TEvPartitionWriter::TEvWriteRequest>()) {
if (writeRequestsCounter++ == 0) {
auto r = std::make_unique<TEvPartitionWriter::TEvDisconnected>(TEvPartitionWriter::TEvWriteResponse::EErrorCode::InternalError);
Ctx->Runtime->Send(new IEventHandle(input->Sender, input->Recipient, r.release()));
return TTestActorRuntimeBase::EEventAction::DROP;
}
} else if (input->CastAsLocal<TEvents::TEvPoison>()) {
poisonPillCounter++;
}

return TTestActorRuntimeBase::EEventAction::PROCESS;
};

Ctx->Runtime->SetObserverFunc(observer);

SendProduce({}, producerId, producerEpoch);

auto response = Ctx->Runtime->GrabEdgeEvent<NKafka::TEvKafka::TEvResponse>();
UNIT_ASSERT(response);
UNIT_ASSERT_VALUES_EQUAL(response->ErrorCode, NKafka::EKafkaErrors::NOT_LEADER_OR_FOLLOWER);
UNIT_ASSERT_VALUES_EQUAL(std::dynamic_pointer_cast<NKafka::TProduceResponseData>(response->Response)->Responses[0].PartitionResponses[0].ErrorCode, NKafka::EKafkaErrors::NOT_LEADER_OR_FOLLOWER);
}
}
} // anonymous namespace
Loading