@@ -40,14 +40,23 @@ def _unpack_resource(self, name):
4040 os .chmod (path_to_unpack , st .st_mode | stat .S_IEXEC )
4141 self .cli_path = path_to_unpack
4242
43- def get_command_prefix (self , subcmds : list [ str ] ) -> list [str ]:
43+ def _get_cli_common_args (self ) -> list [str ]:
4444 return [
4545 self .cli_path ,
4646 '--verbose' ,
4747 '--endpoint' , self .endpoint ,
4848 '--database={}' .format (self .database ),
49+ ]
50+
51+ @property
52+ def workload_topic_name (self ) -> str :
53+ return f'{ self .table_prefix } '
54+
55+ def get_command_prefix (self , subcmds : list [str ]) -> list [str ]:
56+ return [
57+ * self ._get_cli_common_args (),
4958 'workload' , 'topic'
50- ] + subcmds + ['--topic' , f' { self .table_prefix } ' ]
59+ ] + subcmds + ['--topic' , self .workload_topic_name ]
5160
5261 def cmd_run (self , cmd ):
5362 logger .debug (f"Running cmd { cmd } " )
@@ -60,6 +69,20 @@ def __loop(self):
6069 self .cmd_run (
6170 self .get_command_prefix (subcmds = ['init' , '-c' , self .consumers , '-p' , self .producers ])
6271 )
72+ # adjust
73+ self .cmd_run ([
74+ * self ._get_cli_common_args (),
75+ 'topic' , 'alter' ,
76+ '--retention-period=2s' ,
77+ self .workload_topic_name ,
78+ ])
79+ self .cmd_run ([
80+ * self ._get_cli_common_args (),
81+ 'topic' , 'consumer' , 'add' ,
82+ f'--availability-period={ int (self .duration ) * 9 // 10 } s' ,
83+ '--consumer' , 'data_holder' ,
84+ self .workload_topic_name ,
85+ ])
6386 # run
6487 run_cmd_args = ['run' , 'full' , '-s' , self .duration , '--byte-rate' , '100M' , '--use-tx' , '--tx-commit-interval' , '2000' , '-p' , self .producers , '-c' , self .consumers ]
6588 if self .limit_memory_usage :
0 commit comments