@@ -30,8 +30,6 @@ TString TKafkaProduceActor::LogPrefix() {
3030 sb << " Init " ;
3131 } else if (stateFunc == &TKafkaProduceActor::StateWork) {
3232 sb << " Work " ;
33- } else if (stateFunc == &TKafkaProduceActor::StateAccepting) {
34- sb << " Accepting " ;
3533 } else {
3634 sb << " Unknown " ;
3735 }
@@ -229,22 +227,22 @@ void TKafkaProduceActor::Handle(TEvKafka::TEvProduceRequest::TPtr request, const
229227
230228void TKafkaProduceActor::ProcessRequests (const TActorContext& ctx) {
231229 if (&TKafkaProduceActor::StateWork != CurrentStateFunc ()) {
232- KAFKA_LOG_ERROR (" Produce actor: Unexpected state" );
233230 return ;
234231 }
235232
236233 if (Requests.empty ()) {
237234 return ;
238235 }
239236
240- if (EnqueueInitialization ()) {
237+ auto canProcess = EnqueueInitialization ();
238+ while (canProcess--) {
241239 PendingRequests.push_back (std::make_shared<TPendingRequest>(Requests.front ()));
242240 Requests.pop_front ();
243241
244242 ProcessRequest (PendingRequests.back (), ctx);
245- } else {
246- ProcessInitializationRequests (ctx);
247243 }
244+
245+ ProcessInitializationRequests (ctx);
248246}
249247
250248size_t TKafkaProduceActor::EnqueueInitialization () {
@@ -399,12 +397,10 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
399397 if (pendingRequest->WaitResultCookies .empty ()) {
400398 // All request for unknown topic or empty request
401399 SendResults (ctx);
402- } else {
403- Become (&TKafkaProduceActor::StateAccepting);
404400 }
405401}
406402
407- void TKafkaProduceActor::HandleAccepting (TEvPartitionWriter::TEvWriteAccepted::TPtr request, const TActorContext& ctx) {
403+ void TKafkaProduceActor::Handle (TEvPartitionWriter::TEvWriteAccepted::TPtr request, const TActorContext& ctx) {
408404 auto r = request->Get ();
409405 auto cookie = r->Cookie ;
410406
@@ -422,12 +418,87 @@ void TKafkaProduceActor::HandleAccepting(TEvPartitionWriter::TEvWriteAccepted::T
422418 Become (&TKafkaProduceActor::StateWork);
423419 ProcessRequests (ctx);
424420 } else {
425- KAFKA_LOG_W (" Still in Accepting state after TEvPartitionWriter::TEvWriteAccepted cause cookies are expected: " << JoinSeq (" , " , expectedCookies));
421+ KAFKA_LOG_W (" Still in accepting after receive TEvPartitionWriter::TEvWriteAccepted cause cookies are expected: " << JoinSeq (" , " , expectedCookies));
426422 }
427423}
428424
429425void TKafkaProduceActor::Handle (TEvPartitionWriter::TEvInitResult::TPtr request, const TActorContext& /* ctx*/ ) {
430426 KAFKA_LOG_D (" Produce actor: Init " << request->Get ()->ToString ());
427+
428+ if (!request->Get ()->IsSuccess ()) {
429+ auto sender = request->Sender ;
430+
431+ if (WriterDied (sender, EKafkaErrors::UNKNOWN_SERVER_ERROR, request->Get ()->GetError ().Reason )) {
432+ KAFKA_LOG_D (" Produce actor: Received TEvPartitionWriter::TEvInitResult for " << sender << " with error: " << request->Get ()->GetError ().Reason );
433+ return ;
434+ }
435+
436+ KAFKA_LOG_D (" Produce actor: Received TEvPartitionWriter::TEvInitResult with unexpected writer " << sender);
437+ }
438+ }
439+
440+ void TKafkaProduceActor::Handle (TEvPartitionWriter::TEvDisconnected::TPtr request, const TActorContext& /* ctx*/ ) {
441+ auto sender = request->Sender ;
442+
443+ if (WriterDied (sender, EKafkaErrors::NOT_LEADER_OR_FOLLOWER, TStringBuilder () << " Partition writer " << sender << " disconnected" )) {
444+ KAFKA_LOG_D (" Produce actor: Received TEvPartitionWriter::TEvDisconnected for " << sender);
445+ return ;
446+ }
447+
448+ KAFKA_LOG_D (" Produce actor: Received TEvPartitionWriter::TEvDisconnected with unexpected writer " << sender);
449+ }
450+
451+ bool TKafkaProduceActor::WriterDied (const TActorId& writerId, EKafkaErrors errorCode, TStringBuf errorMessage) {
452+ auto findAndCleanWriter = [&]() -> std::pair<TString, ui32> {
453+ for (auto it = TransactionalWriters.begin (); it != TransactionalWriters.end (); ++it) {
454+ if (it->second .ActorId == writerId) {
455+ auto id = it->first ;
456+ CleanWriter (id, writerId);
457+ TransactionalWriters.erase (it);
458+ return {id.TopicPath , id.PartitionId };
459+ }
460+ }
461+
462+ for (auto & [topicPath, partitionWriters] : NonTransactionalWriters) {
463+ for (auto it = partitionWriters.begin (); it != partitionWriters.end (); ++it) {
464+ if (it->second .ActorId == writerId) {
465+ auto id = it->first ;
466+ CleanWriter ({topicPath, static_cast <ui32>(id)}, writerId);
467+ partitionWriters.erase (it);
468+ return {topicPath, static_cast <ui32>(id)};
469+ }
470+ }
471+ }
472+
473+ return {" " , 0 };
474+ };
475+
476+ auto [topicPath, partitionId] = findAndCleanWriter ();
477+ if (topicPath.empty ()) {
478+ return false ;
479+ }
480+
481+ for (auto it = Cookies.begin (); it != Cookies.end ();) {
482+ auto cookie = it->first ;
483+ auto & info = it->second ;
484+
485+ if (info.TopicPath == topicPath && info.PartitionId == partitionId) {
486+ info.Request ->Results [info.Position ].ErrorCode = errorCode;
487+ info.Request ->Results [info.Position ].ErrorMessage = errorMessage;
488+ info.Request ->WaitAcceptingCookies .erase (cookie);
489+ info.Request ->WaitResultCookies .erase (cookie);
490+
491+ if (info.Request ->WaitAcceptingCookies .empty () && info.Request ->WaitResultCookies .empty ()) {
492+ SendResults (ActorContext ());
493+ }
494+
495+ it = Cookies.erase (it);
496+ } else {
497+ ++it;
498+ }
499+ }
500+
501+ return true ;
431502}
432503
433504void TKafkaProduceActor::Handle (TEvPartitionWriter::TEvWriteResponse::TPtr request, const TActorContext& ctx) {
@@ -532,7 +603,8 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
532603 size_t recordsCount = partitionData.Records .has_value () ? partitionData.Records ->Records .size () : 0 ;
533604 partitionResponse.Index = partitionData.Index ;
534605 if (EKafkaErrors::NONE_ERROR != result.ErrorCode ) {
535- KAFKA_LOG_ERROR (" Produce actor: Partition result with error: ErrorCode=" << static_cast <int >(result.ErrorCode ) << " , ErrorMessage=" << result.ErrorMessage << " , #01" );
606+ KAFKA_LOG_ERROR (" Produce actor: Partition result with error: ErrorCode=" << static_cast <int >(result.ErrorCode )
607+ << " , ErrorMessage=" << result.ErrorMessage << " , #01" );
536608 partitionResponse.ErrorCode = result.ErrorCode ;
537609 metricsErrorCode = result.ErrorCode ;
538610 partitionResponse.ErrorMessage = result.ErrorMessage ;
@@ -584,20 +656,6 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
584656
585657 Send (Context->ConnectionId , new TEvKafka::TEvResponse (correlationId, response, metricsErrorCode));
586658
587- if (!pendingRequest->WaitAcceptingCookies .empty ()) {
588- if (!expired) {
589- TStringBuilder sb;
590- sb << " Produce actor: All TEvWriteResponse were received, but not all TEvWriteAccepted. Unreceived cookies:" ;
591- for (auto cookie : pendingRequest->WaitAcceptingCookies ) {
592- sb << " " << cookie;
593- }
594- KAFKA_LOG_W (sb);
595- }
596- if (&TKafkaProduceActor::StateAccepting == CurrentStateFunc ()) {
597- Become (&TKafkaProduceActor::StateWork);
598- }
599- }
600-
601659 for (auto cookie : pendingRequest->WaitAcceptingCookies ) {
602660 Cookies.erase (cookie);
603661 }
@@ -607,6 +665,8 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
607665
608666 PendingRequests.pop_front ();
609667 }
668+
669+ ProcessRequests (ctx);
610670}
611671
612672void TKafkaProduceActor::ProcessInitializationRequests (const TActorContext& ctx) {
@@ -681,18 +741,19 @@ void TKafkaProduceActor::SendWriteRequest(const TProduceRequestData::TTopicProdu
681741 auto & result = pendingRequest->Results [position];
682742 if (OK == writer.first ) {
683743 auto ownCookie = ++Cookie;
684- auto & cookieInfo = Cookies[ownCookie];
685- cookieInfo.TopicPath = topicPath;
686- cookieInfo.PartitionId = partitionId;
687- cookieInfo.Position = position;
688- cookieInfo.RuPerRequest = ruPerRequest;
689- cookieInfo.Request = pendingRequest;
690-
691- pendingRequest->WaitAcceptingCookies .insert (ownCookie);
692- pendingRequest->WaitResultCookies .insert (ownCookie);
693744
694745 auto [error, ev] = Convert (transactionalId.GetOrElse (" " ), partitionData, topicPath, ownCookie, ClientDC, ruPerRequest);
695746 if (error == EKafkaErrors::NONE_ERROR) {
747+ auto & cookieInfo = Cookies[ownCookie];
748+ cookieInfo.TopicPath = topicPath;
749+ cookieInfo.PartitionId = partitionId;
750+ cookieInfo.Position = position;
751+ cookieInfo.RuPerRequest = ruPerRequest;
752+ cookieInfo.Request = pendingRequest;
753+
754+ pendingRequest->WaitAcceptingCookies .insert (ownCookie);
755+ pendingRequest->WaitResultCookies .insert (ownCookie);
756+
696757 ruPerRequest = false ;
697758 KAFKA_LOG_T (" Sending TEvPartitionWriter::TEvWriteRequest to " << writer.second << " with cookie " << ownCookie);
698759 Send (writer.second , std::move (ev));
0 commit comments