From c0f9e882cb745e35680e3e0f893d6a6cd82997da Mon Sep 17 00:00:00 2001 From: Margherita Molaro <48129834+marghe-molaro@users.noreply.github.com> Date: Mon, 1 Sep 2025 10:56:35 +0100 Subject: [PATCH 1/3] Remove reference to squeeze factors in test_rescaling_capabilities_based_on_squeeze_factors and when checking if mode 1 appointments can go ahead --- src/tlo/methods/healthsystem.py | 104 ++++++++++++++++++++------------ tests/test_healthsystem.py | 52 ++++++++-------- 2 files changed, 91 insertions(+), 65 deletions(-) diff --git a/src/tlo/methods/healthsystem.py b/src/tlo/methods/healthsystem.py index 0c81fe4026..a0461829fb 100644 --- a/src/tlo/methods/healthsystem.py +++ b/src/tlo/methods/healthsystem.py @@ -1606,6 +1606,66 @@ def get_appt_footprint_as_time_request(self, facility_info: FacilityInfo, appt_f return appt_footprint_times + def get_total_minutes_of_this_officer_in_this_district(self, current_capabilities, _officer): + """Returns the minutes of current capabilities for the officer identified (this officer type in this + facility_id).""" + return current_capabilities.get(_officer) + + def get_total_minutes_of_this_officer_in_all_district(self, current_capabilities, _officer): + """Returns the minutes of current capabilities for the officer identified in all districts (this officer + type in this all facilities of the same level in all districts).""" + + def split_officer_compound_string(cs) -> Tuple[int, str]: + """Returns (facility_id, officer_type) for the officer identified in the string of the form: + 'FacilityID_{facility_id}_Officer_{officer_type}'.""" + _, _facility_id, _, _officer_type = cs.split('_', 3) # (NB. Some 'officer_type' include "_") + return int(_facility_id), _officer_type + + def _match(_this_officer, facility_ids: List[int], officer_type: str): + """Returns True if the officer identified is of the identified officer_type and is in one of the + facility_ids.""" + this_facility_id, this_officer_type = split_officer_compound_string(_this_officer) + return (this_officer_type == officer_type) and (this_facility_id in facility_ids) + + facility_id, officer_type = split_officer_compound_string(_officer) + facility_level = self._facility_by_facility_id[int(facility_id)].level + facilities_of_same_level_in_all_district = [ + _fac.id for _fac in self._facilities_for_each_district[facility_level].values() + ] + + officers_in_the_same_level_in_all_districts = [ + _officer for _officer in current_capabilities.keys() if + _match(_officer, facility_ids=facilities_of_same_level_in_all_district, officer_type=officer_type) + ] + + return sum(current_capabilities.get(_o) for _o in officers_in_the_same_level_in_all_districts) + + + def check_if_all_required_officers_have_nonzero_capabilities(self, expected_time_requests)-> bool: + """Check if all officers required by the appt footprint are available to perform the HSI""" + + ok_to_run = True + + for officer in expected_time_requests.keys(): + if self.compute_squeeze_factor_to_district_level: + availability = self.get_total_minutes_of_this_officer_in_this_district(self.capabilities_today, officer) + else: + availability = self.get_total_minutes_of_this_officer_in_all_district(self.capabilities_today, officer) + + # If officer does not exist in the relevant facility, log warning and proceed as if availability = 0 + if availability is None: + logger.warning( + key="message", + data=(f"Requested officer {officer} is not contemplated by health system. ") + ) + availability = 0.0 + + if availability == 0.0: + ok_to_run = False + + return ok_to_run + + def get_squeeze_factors(self, footprints_per_event, total_footprint, current_capabilities, compute_squeeze_factor_to_district_level: bool ): @@ -1636,48 +1696,16 @@ def get_squeeze_factors(self, footprints_per_event, total_footprint, current_cap (position in array matches that in the all_call_today list). """ - def get_total_minutes_of_this_officer_in_this_district(_officer): - """Returns the minutes of current capabilities for the officer identified (this officer type in this - facility_id).""" - return current_capabilities.get(_officer) - - def get_total_minutes_of_this_officer_in_all_district(_officer): - """Returns the minutes of current capabilities for the officer identified in all districts (this officer - type in this all facilities of the same level in all districts).""" - - def split_officer_compound_string(cs) -> Tuple[int, str]: - """Returns (facility_id, officer_type) for the officer identified in the string of the form: - 'FacilityID_{facility_id}_Officer_{officer_type}'.""" - _, _facility_id, _, _officer_type = cs.split('_', 3) # (NB. Some 'officer_type' include "_") - return int(_facility_id), _officer_type - - def _match(_this_officer, facility_ids: List[int], officer_type: str): - """Returns True if the officer identified is of the identified officer_type and is in one of the - facility_ids.""" - this_facility_id, this_officer_type = split_officer_compound_string(_this_officer) - return (this_officer_type == officer_type) and (this_facility_id in facility_ids) - - facility_id, officer_type = split_officer_compound_string(_officer) - facility_level = self._facility_by_facility_id[int(facility_id)].level - facilities_of_same_level_in_all_district = [ - _fac.id for _fac in self._facilities_for_each_district[facility_level].values() - ] - officers_in_the_same_level_in_all_districts = [ - _officer for _officer in current_capabilities.keys() if - _match(_officer, facility_ids=facilities_of_same_level_in_all_district, officer_type=officer_type) - ] - - return sum(current_capabilities.get(_o) for _o in officers_in_the_same_level_in_all_districts) # 1) Compute the load factors for each officer type at each facility that is # called-upon in this list of HSIs load_factor = {} for officer, call in total_footprint.items(): if compute_squeeze_factor_to_district_level: - availability = get_total_minutes_of_this_officer_in_this_district(officer) + availability = self.get_total_minutes_of_this_officer_in_this_district(current_capabilities, officer) else: - availability = get_total_minutes_of_this_officer_in_all_district(officer) + availability = self.get_total_minutes_of_this_officer_in_all_district(current_capabilities, officer) # If officer does not exist in the relevant facility, log warning and proceed as if availability = 0 if availability is None: @@ -2066,11 +2094,13 @@ def run_individual_level_events_in_mode_0_or_1(self, _appt_footprint_before_running = event.EXPECTED_APPT_FOOTPRINT # Mode 0: All HSI Event run, with no squeeze - # Mode 1: All HSI Events run with squeeze provided latter is not inf + # Mode 1: All HSI Events run provided all required officers have non-zero capabilities ok_to_run = True - if self.mode_appt_constraints == 1 and squeeze_factor == float('inf'): - ok_to_run = False + if self.mode_appt_constraints == 1: + if event.expected_time_requests: + ok_to_run = self.check_if_all_required_officers_have_nonzero_capabilities(event.expected_time_requests) + if ok_to_run: diff --git a/tests/test_healthsystem.py b/tests/test_healthsystem.py index 62c6970196..41598bfd4f 100644 --- a/tests/test_healthsystem.py +++ b/tests/test_healthsystem.py @@ -391,7 +391,7 @@ def test_run_in_mode_1_with_capacity(tmpdir, seed): @pytest.mark.slow -def test_rescaling_capabilities_based_on_squeeze_factors(tmpdir, seed): +def test_rescaling_capabilities_based_on_load_factors(tmpdir, seed): # Capabilities should increase when a HealthSystem that has low capabilities changes mode with # the option `scale_to_effective_capabilities` set to `True`. @@ -404,6 +404,7 @@ def test_rescaling_capabilities_based_on_squeeze_factors(tmpdir, seed): "directory": tmpdir, "custom_levels": { "tlo.methods.healthsystem": logging.DEBUG, + "tlo.methods.healthsystem.summary": logging.INFO } }, resourcefilepath=resourcefilepath ) @@ -438,37 +439,32 @@ def test_rescaling_capabilities_based_on_squeeze_factors(tmpdir, seed): hs_params['scale_to_effective_capabilities'] = True # Run the simulation - sim.make_initial_population(n=popsize) + sim.make_initial_population(n=1000) sim.simulate(end_date=end_date) check_dtypes(sim) # read the results - output = parse_log_file(sim.log_filepath, level=logging.DEBUG) - - # Do the checks - assert len(output['tlo.methods.healthsystem']['HSI_Event']) > 0 - hsi_events = output['tlo.methods.healthsystem']['HSI_Event'] - hsi_events['date'] = pd.to_datetime(hsi_events['date']).dt.year - - # Check that all squeeze factors were high in 2010, but not all were high in 2011 - # thanks to rescaling of capabilities - assert ( - hsi_events.loc[ - (hsi_events['Person_ID'] >= 0) & - (hsi_events['Number_By_Appt_Type_Code'] != {}) & - (hsi_events['date'] == 2010), - 'Squeeze_Factor' - ] >= 100.0 - ).all() # All the events that had a non-blank footprint experienced high squeezing. - assert not ( - hsi_events.loc[ - (hsi_events['Person_ID'] >= 0) & - (hsi_events['Number_By_Appt_Type_Code'] != {}) & - (hsi_events['date'] == 2011), - 'Squeeze_Factor' - ] >= 100.0 - ).all() # All the events that had a non-blank footprint experienced high squeezing. - + output = parse_log_file(sim.log_filepath, level=logging.INFO) + pd.set_option('display.max_columns', None) + capacity_by_officer_and_level = output['tlo.methods.healthsystem.summary']['Capacity_By_OfficerType_And_FacilityLevel'] + + # Filter rows for the two years + row_2010 = capacity_by_officer_and_level.loc[capacity_by_officer_and_level["date"] == "2010-12-31"].squeeze() + row_2011 = capacity_by_officer_and_level.loc[capacity_by_officer_and_level["date"] == "2011-12-31"].squeeze() + + # Dictionary to store results + results = {} + + for col in capacity_by_officer_and_level.columns: + if col == "date": + continue # skip the date column + if not (capacity_by_officer_and_level[col] == 0).any(): # check column is not all zeros + ratio = row_2010[col] / row_2011[col] + results[col] = ratio > 100 # Check that load has significantly reduced in second year, thanks to the significant rescaling of capabilities. (There is some degeneracy here, in that load could also be reduced due to declining demand. However it is extremely unlikely that demand for care would have dropped by a factor of 100 in second year, hence this is a fair test). + + assert all(results.values()) + + @pytest.mark.slow def test_run_in_mode_1_with_almost_no_capacity(tmpdir, seed): From 6bc44015b8ffc0dbb789b2c8b8b3b35a78b7fbd3 Mon Sep 17 00:00:00 2001 From: Margherita Molaro <48129834+marghe-molaro@users.noreply.github.com> Date: Mon, 1 Sep 2025 11:13:39 +0100 Subject: [PATCH 2/3] Remove outdated test test_run_in_mode_1_with_almost_no_capacity --- tests/test_healthsystem.py | 64 -------------------------------------- 1 file changed, 64 deletions(-) diff --git a/tests/test_healthsystem.py b/tests/test_healthsystem.py index 41598bfd4f..72aa6c210d 100644 --- a/tests/test_healthsystem.py +++ b/tests/test_healthsystem.py @@ -464,70 +464,6 @@ def test_rescaling_capabilities_based_on_load_factors(tmpdir, seed): assert all(results.values()) - - -@pytest.mark.slow -def test_run_in_mode_1_with_almost_no_capacity(tmpdir, seed): - # Events should run but (for those with non-blank footprints) with high squeeze factors - # (Mode 1 -> elastic constraints) - - # Establish the simulation object - sim = Simulation( - start_date=start_date, - seed=seed, - log_config={ - "filename": "log", - "directory": tmpdir, - "custom_levels": { - "tlo.methods.healthsystem": logging.DEBUG, - } - }, resourcefilepath=resourcefilepath - ) - - # Define the service availability - service_availability = ['*'] - - # Register the core modules - sim.register(demography.Demography(), - simplified_births.SimplifiedBirths(), - enhanced_lifestyle.Lifestyle(), - healthsystem.HealthSystem(service_availability=service_availability, - capabilities_coefficient=0.0000001, # This will mean that capabilities are - # very close to 0 everywhere. - # (If the value was 0, then it would - # be interpreted as the officers NEVER - # being available at a facility, - # which would mean the HSIs should not - # run (as opposed to running with - # a very high squeeze factor)). - mode_appt_constraints=1), - symptommanager.SymptomManager(), - healthseekingbehaviour.HealthSeekingBehaviour(), - mockitis.Mockitis(), - chronicsyndrome.ChronicSyndrome() - ) - - # Run the simulation - sim.make_initial_population(n=popsize) - sim.simulate(end_date=end_date) - check_dtypes(sim) - - # read the results - output = parse_log_file(sim.log_filepath, level=logging.DEBUG) - - # Do the checks - assert len(output['tlo.methods.healthsystem']['HSI_Event']) > 0 - hsi_events = output['tlo.methods.healthsystem']['HSI_Event'] - # assert hsi_events['did_run'].all() - assert ( - hsi_events.loc[(hsi_events['Person_ID'] >= 0) & (hsi_events['Number_By_Appt_Type_Code'] != {}), - 'Squeeze_Factor'] >= 100.0 - ).all() # All the events that had a non-blank footprint experienced high squeezing. - assert (hsi_events.loc[hsi_events['Person_ID'] < 0, 'Squeeze_Factor'] == 0.0).all() - - # Check that some Mockitis cures occurred (though health system) - assert any(sim.population.props['mi_status'] == 'P') - @pytest.mark.slow def test_run_in_mode_2_with_capacity(tmpdir, seed): From 1606ff85030781b11a19339dc3b5df8708d5af38 Mon Sep 17 00:00:00 2001 From: Margherita Molaro <48129834+marghe-molaro@users.noreply.github.com> Date: Wed, 3 Sep 2025 10:47:27 +0100 Subject: [PATCH 3/3] Style fixes --- src/tlo/methods/healthsystem.py | 11 ++++++----- tests/test_healthsystem.py | 13 ++++++++++--- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/tlo/methods/healthsystem.py b/src/tlo/methods/healthsystem.py index a0461829fb..11b6e0a1fe 100644 --- a/src/tlo/methods/healthsystem.py +++ b/src/tlo/methods/healthsystem.py @@ -1640,10 +1640,10 @@ def _match(_this_officer, facility_ids: List[int], officer_type: str): return sum(current_capabilities.get(_o) for _o in officers_in_the_same_level_in_all_districts) - + def check_if_all_required_officers_have_nonzero_capabilities(self, expected_time_requests)-> bool: """Check if all officers required by the appt footprint are available to perform the HSI""" - + ok_to_run = True for officer in expected_time_requests.keys(): @@ -1651,7 +1651,7 @@ def check_if_all_required_officers_have_nonzero_capabilities(self, expected_time availability = self.get_total_minutes_of_this_officer_in_this_district(self.capabilities_today, officer) else: availability = self.get_total_minutes_of_this_officer_in_all_district(self.capabilities_today, officer) - + # If officer does not exist in the relevant facility, log warning and proceed as if availability = 0 if availability is None: logger.warning( @@ -1662,7 +1662,7 @@ def check_if_all_required_officers_have_nonzero_capabilities(self, expected_time if availability == 0.0: ok_to_run = False - + return ok_to_run @@ -2099,7 +2099,8 @@ def run_individual_level_events_in_mode_0_or_1(self, if self.mode_appt_constraints == 1: if event.expected_time_requests: - ok_to_run = self.check_if_all_required_officers_have_nonzero_capabilities(event.expected_time_requests) + ok_to_run = self.check_if_all_required_officers_have_nonzero_capabilities( + event.expected_time_requests) if ok_to_run: diff --git a/tests/test_healthsystem.py b/tests/test_healthsystem.py index 72aa6c210d..8e2fb907f7 100644 --- a/tests/test_healthsystem.py +++ b/tests/test_healthsystem.py @@ -446,8 +446,9 @@ def test_rescaling_capabilities_based_on_load_factors(tmpdir, seed): # read the results output = parse_log_file(sim.log_filepath, level=logging.INFO) pd.set_option('display.max_columns', None) - capacity_by_officer_and_level = output['tlo.methods.healthsystem.summary']['Capacity_By_OfficerType_And_FacilityLevel'] - + summary = output['tlo.methods.healthsystem.summary'] + capacity_by_officer_and_level = summary['Capacity_By_OfficerType_And_FacilityLevel'] + # Filter rows for the two years row_2010 = capacity_by_officer_and_level.loc[capacity_by_officer_and_level["date"] == "2010-12-31"].squeeze() row_2011 = capacity_by_officer_and_level.loc[capacity_by_officer_and_level["date"] == "2011-12-31"].squeeze() @@ -455,12 +456,18 @@ def test_rescaling_capabilities_based_on_load_factors(tmpdir, seed): # Dictionary to store results results = {} + # Check that load has significantly reduced in second year, thanks to the significant + # rescaling of capabilities. + # (There is some degeneracy here, in that load could also be reduced due to declining demand. + # However it is extremely unlikely that demand for care would have dropped by a factor of 100 + # in second year, hence this is a fair test). for col in capacity_by_officer_and_level.columns: if col == "date": continue # skip the date column if not (capacity_by_officer_and_level[col] == 0).any(): # check column is not all zeros ratio = row_2010[col] / row_2011[col] - results[col] = ratio > 100 # Check that load has significantly reduced in second year, thanks to the significant rescaling of capabilities. (There is some degeneracy here, in that load could also be reduced due to declining demand. However it is extremely unlikely that demand for care would have dropped by a factor of 100 in second year, hence this is a fair test). + + results[col] = ratio > 100 assert all(results.values())