@@ -249,16 +249,16 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture {
249249 }
250250 }
251251
252- void ReadTopicMessage (const TString& topicName, const TString& expectedMessage, TDuration disposition = TDuration::Seconds(100 )) {
253- ReadTopicMessages (topicName, {expectedMessage}, disposition);
252+ void ReadTopicMessage (const TString& topicName, const TString& expectedMessage, TInstant disposition = TInstant::Now() - TDuration::Seconds(100 ), bool sort = false ) {
253+ ReadTopicMessages (topicName, {expectedMessage}, disposition, sort );
254254 }
255255
256- void ReadTopicMessages (const TString& topicName, const TVector<TString>& expectedMessages, TDuration disposition = TDuration::Seconds(100 )) {
256+ void ReadTopicMessages (const TString& topicName, const TVector<TString>& expectedMessages, TInstant disposition = TInstant::Now() - TDuration::Seconds(100 ), bool sort = false ) {
257257 NTopic::TReadSessionSettings readSettings;
258258 readSettings
259259 .WithoutConsumer ()
260260 .AppendTopics (
261- NTopic::TTopicReadSettings (topicName).ReadFromTimestamp (TInstant::Now () - disposition)
261+ NTopic::TTopicReadSettings (topicName).ReadFromTimestamp (disposition)
262262 .AppendPartitionIds (0 )
263263 );
264264
@@ -287,12 +287,19 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture {
287287 }
288288 }
289289
290- UNIT_ASSERT_GE (expectedMessages.size (), received.size ());
290+ UNIT_ASSERT_C (expectedMessages.size () >= received.size (), TStringBuilder ()
291+ << " expected #" << expectedMessages.size () << " messages ("
292+ << JoinSeq (" , " , expectedMessages) << " ), got #" << received.size () << " messages ("
293+ << JoinSeq (" , " , received) << " )" );
291294
292295 error = TStringBuilder () << " got new event, received #" << received.size () << " / " << expectedMessages.size () << " messages" ;
293296 return false ;
294297 });
295298
299+ if (sort) {
300+ std::sort (received.begin (), received.end ());
301+ }
302+
296303 UNIT_ASSERT_VALUES_EQUAL (received.size (), expectedMessages.size ());
297304 for (size_t i = 0 ; i < received.size (); ++i) {
298305 UNIT_ASSERT_VALUES_EQUAL (received[i], expectedMessages[i]);
@@ -2142,6 +2149,134 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
21422149 UNIT_ASSERT_STRING_CONTAINS (*ast, " DqCnStreamLookup" );
21432150 });
21442151 }
2152+
2153+ Y_UNIT_TEST_F (OffsetsAndStateRecoveryOnInternalRetry, TStreamingTestFixture) {
2154+ // Join with S3 used for introducing temporary failure and force retry on specific key
2155+
2156+ constexpr char sourceBucket[] = " test_streaming_query_recovery_on_internal_retry" ;
2157+ constexpr char objectContent[] = R"(
2158+ {"fqdn": "host1.example.com", "payload": "P1"}
2159+ {"fqdn": "host2.example.com" })" ;
2160+ constexpr char objectPath[] = " path/test_object.json" ;
2161+ CreateBucketWithObject (sourceBucket, objectPath, objectContent);
2162+
2163+ constexpr char inputTopicName[] = " internalRetryInputTopicName" ;
2164+ constexpr char outputTopicName[] = " internalRetryOutputTopicName" ;
2165+ CreateTopic (inputTopicName);
2166+ CreateTopic (outputTopicName);
2167+
2168+ constexpr char pqSourceName[] = " pqSourceName" ;
2169+ constexpr char s3SourceName[] = " s3Source" ;
2170+ CreatePqSource (pqSourceName);
2171+ CreateS3Source (sourceBucket, s3SourceName);
2172+
2173+ constexpr char queryName[] = " streamingQuery" ;
2174+ ExecQuery (fmt::format (R"(
2175+ CREATE STREAMING QUERY `{query_name}` AS
2176+ DO BEGIN
2177+ PRAGMA ydb.HashJoinMode = "map";
2178+ $s3_lookup = SELECT * FROM `{s3_source}`.`path/` WITH (
2179+ FORMAT = "json_each_row",
2180+ SCHEMA (
2181+ fqdn String NOT NULL,
2182+ payload String
2183+ )
2184+ );
2185+
2186+ -- Test that offsets are recovered
2187+ $pq_source = SELECT * FROM `{pq_source}`.`{input_topic}` WITH (
2188+ FORMAT = "json_each_row",
2189+ SCHEMA (
2190+ time String NOT NULL,
2191+ event String,
2192+ host String
2193+ )
2194+ );
2195+
2196+ $joined = SELECT
2197+ Unwrap(l.payload) AS payload, -- Test failure here
2198+ p.*
2199+ FROM $pq_source AS p
2200+ LEFT JOIN $s3_lookup AS l
2201+ ON (l.fqdn = p.host);
2202+
2203+ -- Test that state also recovered
2204+ $grouped = SELECT
2205+ event,
2206+ CAST(SOME(time) AS String) AS time,
2207+ SOME(payload) AS payload,
2208+ CAST(COUNT(*) AS String) AS count
2209+ FROM $joined
2210+ GROUP BY
2211+ HOP (CAST(time AS Timestamp), "PT1H", "PT1H", "PT0H"),
2212+ event;
2213+
2214+ INSERT INTO `{pq_source}`.`{output_topic}`
2215+ SELECT Unwrap(event || "-" || time || "-" || payload || "-" || count) FROM $grouped
2216+ END DO;)" ,
2217+ " query_name" _a = queryName,
2218+ " pq_source" _a = pqSourceName,
2219+ " s3_source" _a = s3SourceName,
2220+ " input_topic" _a = inputTopicName,
2221+ " output_topic" _a = outputTopicName
2222+ ));
2223+
2224+ CheckScriptExecutionsCount (1 , 1 );
2225+ Sleep (TDuration::Seconds (1 ));
2226+
2227+ // Fill HOP state for key A
2228+ WriteTopicMessages (inputTopicName, {
2229+ R"( {"time": "2025-08-24T00:00:00.000000Z", "event": "A", "host": "host1.example.com"})" ,
2230+ R"( {"time": "2025-08-25T00:00:00.000000Z", "event": "A", "host": "host1.example.com"})" ,
2231+ });
2232+ ReadTopicMessage (outputTopicName, " A-2025-08-24T00:00:00.000000Z-P1-1" );
2233+
2234+ Sleep (TDuration::Seconds (2 ));
2235+ auto readDisposition = TInstant::Now ();
2236+
2237+ // Write failure message for key B
2238+ WriteTopicMessage (inputTopicName, R"( {"time": "2025-08-24T00:00:00.000000Z", "event": "B", "host": "host2.example.com"})" );
2239+
2240+ // Wait script execution retry
2241+ WaitFor (TDuration::Seconds (10 ), " wait retry" , [&](TString& error) {
2242+ const auto & results = ExecQuery (R"(
2243+ SELECT MAX(lease_generation) AS generation FROM `.metadata/script_executions`;
2244+ )" );
2245+ UNIT_ASSERT_VALUES_EQUAL (results.size (), 1 );
2246+
2247+ std::optional<i64 > generation;
2248+ CheckScriptResult (results[0 ], 1 , 1 , [&](TResultSetParser& result) {
2249+ generation = result.ColumnParser (0 ).GetOptionalInt64 ();
2250+ });
2251+
2252+ if (!generation || *generation < 2 ) {
2253+ error = TStringBuilder () << " generation is: " << (generation ? ToString (*generation) : " null" );
2254+ return false ;
2255+ }
2256+
2257+ return true ;
2258+ });
2259+
2260+ // Resolve query failure
2261+ UploadObject (sourceBucket, objectPath, R"(
2262+ {"fqdn": "host1.example.com", "payload": "P1"}
2263+ {"fqdn": "host2.example.com", "payload": "P2" })" );
2264+ Sleep (TDuration::Seconds (2 ));
2265+
2266+ // Check that offset is restored
2267+ WriteTopicMessage (inputTopicName, R"( {"time": "2025-08-25T00:00:00.000000Z", "event": "B", "host": "host2.example.com"})" );
2268+ ReadTopicMessage (outputTopicName, " B-2025-08-24T00:00:00.000000Z-P2-1" , readDisposition);
2269+
2270+ Sleep (TDuration::Seconds (1 ));
2271+ readDisposition = TInstant::Now ();
2272+
2273+ // Check that HOP state is restored
2274+ WriteTopicMessage (inputTopicName, R"( {"time": "2025-08-26T00:00:00.000000Z", "event": "A", "host": "host1.example.com"})" );
2275+ ReadTopicMessages (outputTopicName, {
2276+ " A-2025-08-25T00:00:00.000000Z-P1-1" ,
2277+ " B-2025-08-25T00:00:00.000000Z-P2-1"
2278+ }, readDisposition, /* sort */ true );
2279+ }
21452280}
21462281
21472282} // namespace NKikimr::NKqp
0 commit comments