@@ -50,14 +50,10 @@ use hyperactor_telemetry::log_file_path;
5050use notify:: Watcher ;
5151use serde:: Deserialize ;
5252use serde:: Serialize ;
53- use tokio:: fs;
5453use tokio:: io;
5554use tokio:: io:: AsyncRead ;
5655use tokio:: io:: AsyncReadExt ;
57- use tokio:: io:: AsyncSeek ;
58- use tokio:: io:: AsyncSeekExt ;
5956use tokio:: io:: AsyncWriteExt ;
60- use tokio:: io:: SeekFrom ;
6157use tokio:: sync:: Mutex ;
6258use tokio:: sync:: Notify ;
6359use tokio:: sync:: RwLock ;
@@ -947,113 +943,6 @@ impl StreamFwder {
947943 }
948944}
949945
950- /// Result of processing file content
951- #[ derive( Debug ) ]
952- struct FileProcessingResult {
953- /// Complete lines found during processing
954- lines : Vec < Vec < u8 > > ,
955- /// Updated position in the file after processing
956- new_position : u64 ,
957- /// Any remaining incomplete line data, buffered for subsequent reads
958- incomplete_line_buffer : Vec < u8 > ,
959- }
960-
961- /// Process new file content from a given position, extracting complete lines
962- /// This function is extracted to enable easier unit testing without file system dependencies
963- async fn process_file_content < R : AsyncRead + AsyncSeek + Unpin > (
964- reader : & mut R ,
965- current_position : u64 ,
966- file_size : u64 ,
967- existing_line_buffer : Vec < u8 > ,
968- max_buffer_size : usize ,
969- ) -> Result < FileProcessingResult > {
970- // If position equals file size, we're at the end
971- if current_position == file_size {
972- return Ok ( FileProcessingResult {
973- lines : Vec :: new ( ) ,
974- new_position : current_position,
975- incomplete_line_buffer : existing_line_buffer,
976- } ) ;
977- }
978-
979- // Handle potential file truncation/rotation
980- let actual_position = if current_position > file_size {
981- tracing:: warn!(
982- "File appears to have been truncated (position {} > file size {}), resetting to start" ,
983- current_position,
984- file_size
985- ) ;
986- reader. seek ( SeekFrom :: Start ( 0 ) ) . await ?;
987- 0
988- } else {
989- // current_position < file_size
990- reader. seek ( SeekFrom :: Start ( current_position) ) . await ?;
991- current_position
992- } ;
993-
994- let mut buf = vec ! [ 0u8 ; 128 * 1024 ] ;
995- let mut line_buffer = existing_line_buffer;
996- let mut lines = Vec :: with_capacity ( max_buffer_size) ;
997- let mut processed_bytes = 0u64 ;
998-
999- loop {
1000- let bytes_read = reader. read ( & mut buf) . await ?;
1001- if bytes_read == 0 {
1002- break ;
1003- }
1004-
1005- let chunk = & buf[ ..bytes_read] ;
1006-
1007- let mut start = 0 ;
1008- while let Some ( newline_pos) = chunk[ start..] . iter ( ) . position ( |& b| b == b'\n' ) {
1009- let absolute_pos = start + newline_pos;
1010-
1011- line_buffer. extend_from_slice ( & chunk[ start..absolute_pos] ) ;
1012-
1013- if !line_buffer. is_empty ( ) {
1014- if line_buffer. len ( ) > MAX_LINE_SIZE {
1015- line_buffer. truncate ( MAX_LINE_SIZE ) ;
1016- line_buffer. extend_from_slice ( b"... [TRUNCATED]" ) ;
1017- }
1018-
1019- let line_data = std:: mem:: replace ( & mut line_buffer, Vec :: with_capacity ( 2048 ) ) ;
1020- lines. push ( line_data) ;
1021- }
1022-
1023- start = absolute_pos + 1 ;
1024-
1025- // Check if we've reached the max buffer size after adding each line
1026- if lines. len ( ) >= max_buffer_size {
1027- // We've processed up to and including the current newline
1028- // The new position is where we should start reading next time
1029- let new_position = actual_position + processed_bytes + start as u64 ;
1030-
1031- // Don't save remaining data - we'll re-read it from the new position
1032- return Ok ( FileProcessingResult {
1033- lines,
1034- new_position,
1035- incomplete_line_buffer : Vec :: new ( ) ,
1036- } ) ;
1037- }
1038- }
1039-
1040- // Only add bytes to processed_bytes if we've fully processed this chunk
1041- processed_bytes += bytes_read as u64 ;
1042-
1043- if start < chunk. len ( ) {
1044- line_buffer. extend_from_slice ( & chunk[ start..] ) ;
1045- }
1046- }
1047-
1048- let new_position = actual_position + processed_bytes;
1049-
1050- Ok ( FileProcessingResult {
1051- lines,
1052- new_position,
1053- incomplete_line_buffer : line_buffer,
1054- } )
1055- }
1056-
1057946/// Messages that can be sent to the LogForwarder
1058947#[ derive(
1059948 Debug ,
@@ -1554,6 +1443,7 @@ pub mod test_tap {
15541443
15551444#[ cfg( test) ]
15561445mod tests {
1446+
15571447 use std:: sync:: Arc ;
15581448 use std:: sync:: Mutex ;
15591449
@@ -1566,11 +1456,121 @@ mod tests {
15661456 use hyperactor:: mailbox:: DialMailboxRouter ;
15671457 use hyperactor:: mailbox:: MailboxServer ;
15681458 use hyperactor:: proc:: Proc ;
1459+ use tokio:: io:: AsyncSeek ;
1460+ use tokio:: io:: AsyncSeekExt ;
15691461 use tokio:: io:: AsyncWriteExt ;
1462+ use tokio:: io:: SeekFrom ;
15701463 use tokio:: sync:: mpsc;
15711464
15721465 use super :: * ;
15731466
1467+ /// Result of processing file content
1468+ #[ derive( Debug ) ]
1469+ struct FileProcessingResult {
1470+ /// Complete lines found during processing
1471+ lines : Vec < Vec < u8 > > ,
1472+ /// Updated position in the file after processing
1473+ new_position : u64 ,
1474+ /// Any remaining incomplete line data, buffered for subsequent reads
1475+ incomplete_line_buffer : Vec < u8 > ,
1476+ }
1477+
1478+ /// Process new file content from a given position, extracting complete lines
1479+ /// This function is extracted to enable easier unit testing without file system dependencies
1480+ async fn process_file_content < R : AsyncRead + AsyncSeek + Unpin > (
1481+ reader : & mut R ,
1482+ current_position : u64 ,
1483+ file_size : u64 ,
1484+ existing_line_buffer : Vec < u8 > ,
1485+ max_buffer_size : usize ,
1486+ ) -> Result < FileProcessingResult > {
1487+ // If position equals file size, we're at the end
1488+ if current_position == file_size {
1489+ return Ok ( FileProcessingResult {
1490+ lines : Vec :: new ( ) ,
1491+ new_position : current_position,
1492+ incomplete_line_buffer : existing_line_buffer,
1493+ } ) ;
1494+ }
1495+
1496+ // Handle potential file truncation/rotation
1497+ let actual_position = if current_position > file_size {
1498+ tracing:: warn!(
1499+ "File appears to have been truncated (position {} > file size {}), resetting to start" ,
1500+ current_position,
1501+ file_size
1502+ ) ;
1503+ reader. seek ( SeekFrom :: Start ( 0 ) ) . await ?;
1504+ 0
1505+ } else {
1506+ // current_position < file_size
1507+ reader. seek ( SeekFrom :: Start ( current_position) ) . await ?;
1508+ current_position
1509+ } ;
1510+
1511+ let mut buf = vec ! [ 0u8 ; 128 * 1024 ] ;
1512+ let mut line_buffer = existing_line_buffer;
1513+ let mut lines = Vec :: with_capacity ( max_buffer_size) ;
1514+ let mut processed_bytes = 0u64 ;
1515+
1516+ loop {
1517+ let bytes_read = reader. read ( & mut buf) . await ?;
1518+ if bytes_read == 0 {
1519+ break ;
1520+ }
1521+
1522+ let chunk = & buf[ ..bytes_read] ;
1523+
1524+ let mut start = 0 ;
1525+ while let Some ( newline_pos) = chunk[ start..] . iter ( ) . position ( |& b| b == b'\n' ) {
1526+ let absolute_pos = start + newline_pos;
1527+
1528+ line_buffer. extend_from_slice ( & chunk[ start..absolute_pos] ) ;
1529+
1530+ if !line_buffer. is_empty ( ) {
1531+ if line_buffer. len ( ) > MAX_LINE_SIZE {
1532+ line_buffer. truncate ( MAX_LINE_SIZE ) ;
1533+ line_buffer. extend_from_slice ( b"... [TRUNCATED]" ) ;
1534+ }
1535+
1536+ let line_data = std:: mem:: replace ( & mut line_buffer, Vec :: with_capacity ( 2048 ) ) ;
1537+ lines. push ( line_data) ;
1538+ }
1539+
1540+ start = absolute_pos + 1 ;
1541+
1542+ // Check if we've reached the max buffer size after adding each line
1543+ if lines. len ( ) >= max_buffer_size {
1544+ // We've processed up to and including the current newline
1545+ // The new position is where we should start reading next time
1546+ let new_position = actual_position + processed_bytes + start as u64 ;
1547+
1548+ // Don't save remaining data - we'll re-read it from the new position
1549+ return Ok ( FileProcessingResult {
1550+ lines,
1551+ new_position,
1552+ incomplete_line_buffer : Vec :: new ( ) ,
1553+ } ) ;
1554+ }
1555+ }
1556+
1557+ // Only add bytes to processed_bytes if we've fully processed this chunk
1558+ processed_bytes += bytes_read as u64 ;
1559+
1560+ if start < chunk. len ( ) {
1561+ line_buffer. extend_from_slice ( & chunk[ start..] ) ;
1562+ }
1563+ }
1564+
1565+ let new_position = actual_position + processed_bytes;
1566+
1567+ Ok ( FileProcessingResult {
1568+ lines,
1569+ new_position,
1570+ incomplete_line_buffer : line_buffer,
1571+ } )
1572+ }
1573+
15741574 #[ tokio:: test]
15751575 async fn test_forwarding_log_to_client ( ) {
15761576 // Setup the basics
0 commit comments