From 4728134df67b82da3cd808fb0c8792fac0b7e834 Mon Sep 17 00:00:00 2001 From: Connor Barker Date: Fri, 9 May 2025 15:21:10 -0400 Subject: [PATCH 1/2] make created at duplicates more obvious --- .../export/formatters/ExportFormatUtils.java | 1 - .../export/EnrolleeImportServiceTests.java | 11 +++--- pepper-import/scripts/translate.py | 35 +++++++++++++++++++ 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/bio/terra/pearl/core/service/export/formatters/ExportFormatUtils.java b/core/src/main/java/bio/terra/pearl/core/service/export/formatters/ExportFormatUtils.java index 9bbe6305cd..c7de23e6ec 100644 --- a/core/src/main/java/bio/terra/pearl/core/service/export/formatters/ExportFormatUtils.java +++ b/core/src/main/java/bio/terra/pearl/core/service/export/formatters/ExportFormatUtils.java @@ -10,7 +10,6 @@ import java.time.Instant; import java.time.LocalDate; import java.time.ZoneId; -import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.List; diff --git a/core/src/test/java/bio/terra/pearl/core/service/export/EnrolleeImportServiceTests.java b/core/src/test/java/bio/terra/pearl/core/service/export/EnrolleeImportServiceTests.java index e9447b8e32..6dc44f0eef 100644 --- a/core/src/test/java/bio/terra/pearl/core/service/export/EnrolleeImportServiceTests.java +++ b/core/src/test/java/bio/terra/pearl/core/service/export/EnrolleeImportServiceTests.java @@ -864,7 +864,7 @@ public void testImportMultipleSurveyResponses(TestInfo info) { "importTest1.complete", "true", "importTest1.importFirstName", "Jeff", "importTest1.importFavColors", "[\"red\", \"blue\"]", - "importTest1.createdAt", "2023-08-21 05:17AM", + "importTest1.createdAt", "2023-08-21 05:16AM", "importTest1[2].complete", "true", "importTest1[2].importFirstName", "Jeffrey", "importTest1[2].importFavColors", "[\"green\"]", @@ -894,11 +894,9 @@ public void testImportMultipleSurveyResponses(TestInfo info) { assertThat(latestResponse.getSurveyId(), equalTo(survey.getId())); assertThat(latestResponse.isComplete(), equalTo(true)); - assertThat(latestResponse.getLastUpdatedAt(), equalTo(instantFromZone("2023-08-21 05:17AM"))); + assertThat(latestResponse.getLastUpdatedAt(), equalTo(instantFromZone("2023-08-21 05:16AM"))); assertThat(latestResponse.getAnswers().stream().filter(answer -> answer.getQuestionStableId().equals("importFirstName")) .findFirst().get().getStringValue(), equalTo("Jeff")); - - } @Test @@ -915,9 +913,9 @@ public void testImportAndReimportMultipleSurveyResponses(TestInfo info) { surveyFactory.attachToEnv(survey, studyEnvBundle.getStudyEnv().getId(), true); String username = "asdf@asdf.com"; - String oldCompletionDateStr = "2023-08-20 05:17AM"; + String oldCompletionDateStr = "2023-08-24 05:16AM"; String latestCompletionDateStr = "2023-08-24 05:17AM"; - Instant oldCompletionDate = instantFromZone("2023-08-20 05:17AM"); + Instant oldCompletionDate = instantFromZone("2023-08-24 05:16AM"); Instant latestCompletionDate = instantFromZone("2023-08-24 05:17AM"); EnrolleeBundle enrolleeBundle = enrolleeFactory.enroll( @@ -1010,7 +1008,6 @@ public void testImportAndReimportMultipleSurveyResponses(TestInfo info) { assertThat(importedLatestTask.getSurveyResponseId(), equalTo(importedLatestResponse.getId())); assertThat(importedLatestResponse.getAnswers().size(), equalTo(2)); assertThat(importedLatestResponse.getAnswers().stream().map(Answer::valueAsString).collect(Collectors.toSet()), equalTo(Set.of("Alex", "[\"aquamarine\", \"purple\"]"))); - } private void verifyParticipant(ImportItem importItem, UUID studyEnvId, diff --git a/pepper-import/scripts/translate.py b/pepper-import/scripts/translate.py index 47b564dade..5709151d14 100644 --- a/pepper-import/scripts/translate.py +++ b/pepper-import/scripts/translate.py @@ -570,6 +570,41 @@ def apply_repeatable_translation(dsm_data: dict[str, Any], juniper_data: dict[st apply_translation(dsm_data, juniper_data, repeat_translation) module_repeat += 1 + if is_created_at_question(translation.juniper_question_definition): + warn_if_created_at_values_not_unique(juniper_data, dsm_data, translation) + + +def is_created_at_question(juniper_question: DataDefinition) -> bool: + return juniper_question.stable_id.endswith(".createdAt") + +def warn_if_created_at_values_not_unique(juniper_data: dict[str, Any], dsm_data: dict[str,Any], translation: Translation): + + if translation.juniper_question_definition.stable_id.startswith("AT_GROUP_REGISTRATION"): + return # no need to check for duplicates in registration form; latest is always taken there + all_values = [] + repeat = 1 + while True: + stable_id = get_juniper_response_stable_id(translation.juniper_question_definition, repeat) + if stable_id not in juniper_data: + break + + value = juniper_data[stable_id] + all_values.append({ + "stable_id": stable_id, "value": value + }) + repeat += 1 + + + for i in range(len(all_values)): + for j in range(i + 1, len(all_values)): + stable_id1 = all_values[i]["stable_id"] + stable_id2 = all_values[j]["stable_id"] + + val1 = all_values[i]["value"] + val2 = all_values[j]["value"] + if stable_id1 != stable_id2 and val1 == val2 and val1 != '' and val2 != '': + print(f'Warning: duplicate timestamp value {stable_id1} and {stable_id2} with value {val1} for pepper user {dsm_data["PROFILE.HRUID"]}') + def make_repeat_translation(translation: Translation, repeat: int) -> Translation: return Translation( From 022cdc24c96a8c0361e098e1f94d43a6056051bf Mon Sep 17 00:00:00 2001 From: Connor Barker Date: Mon, 12 May 2025 16:41:44 -0400 Subject: [PATCH 2/2] fix lots of issues --- pepper-import/scripts/translate.py | 229 +++++++++++++++++++++++++---- 1 file changed, 197 insertions(+), 32 deletions(-) diff --git a/pepper-import/scripts/translate.py b/pepper-import/scripts/translate.py index 5709151d14..f4ae1d97b6 100644 --- a/pepper-import/scripts/translate.py +++ b/pepper-import/scripts/translate.py @@ -3,11 +3,14 @@ import json import os.path import re +from collections.abc import Callable from copy import copy from datetime import datetime from enum import Enum from typing import Any, Union +from dateutil import parser +from dateutil.relativedelta import relativedelta from openpyxl import load_workbook @@ -57,6 +60,8 @@ def main(): parser.add_argument('-O', '--out-file', required=True) parser.add_argument('-L', '--limit', type=int, default=None) + parser.add_argument('-F', '--filter', help='Comma separated list of pepper short ids: only these participants will be exported') + args = parser.parse_args() ensure_files_exist([args.dsm_data_dict, args.juniper_data_dict, args.in_file, args.translation_override]) @@ -94,7 +99,7 @@ def main(): # 5: translate data # - parse the data & actually do the translation - dsm_data = parse_dsm_data(args.in_file) + dsm_data = parse_dsm_data(args.in_file, args.filter.split(',') if args.filter is not None else None) juniper_data = apply_translations(dsm_data, translations, args.limit) @@ -310,12 +315,16 @@ class TranslationOverride: constant_value = None value_if_present = None + derive : Callable | None = None + def __init__(self, dsm_stable_id: str | None, juniper_stable_id: str | None, constant_value=None, + derive: Callable | None = None, value_if_present=None): self.dsm_stable_id = dsm_stable_id self.juniper_stable_id = juniper_stable_id self.constant_value = constant_value self.value_if_present = value_if_present + self.derive = derive def parse_translation_override(filepath: str) -> list[TranslationOverride]: @@ -348,6 +357,39 @@ def __init__(self, dsm_question_definition: DataDefinition, juniper_question_def self.subquestion_translations = subquestion_translations or [] +def derive_age_at_submission(dsm_data: dict[str, Any], rpt: int) -> int | None: + # get the date of birth from the dsm data + registration_dob = dsm_data.get('REGISTRATION.REGISTRATION_DOB') + assent_date_of_birth = dsm_data.get('ASSENT.ASSENT_DOB') + consent_date_of_birth = dsm_data.get('CONSENT.CONSENT_DOB') + profile_dob = dsm_data.get('DSM.DATEOFBIRTH') + + possible_dobs = [registration_dob, profile_dob,assent_date_of_birth, consent_date_of_birth] + # filter out any None values + possible_dobs = list(filter(lambda dob: dob is not None and dob != '', possible_dobs)) + if len(possible_dobs) == 0: + return None + + date_of_birth = possible_dobs[0] + + # get the date of medical history submission + submission_stable_id = generate_dsm_module_repeat_stable_id("MEDICAL_HISTORY.COMPLETEDAT", rpt) + + # get the date of medical history submission from the juniper data + submission_date = dsm_data.get(submission_stable_id) + if submission_date is None or submission_date == '': + return None + + # convert the date of birth to a datetime object + date_of_birth = parser.parse(date_of_birth) + # convert the submission date to a datetime object + submission_date = parser.parse(submission_date) + + # calculate the age at submission + + difference = relativedelta(submission_date, date_of_birth) + return difference.years + default_translation_overrides = [ TranslationOverride('PROFILE.EMAIL', 'profile.contactEmail'), # if the user doesn't have an email, it usually means their proxy does, so check there @@ -358,6 +400,9 @@ def __init__(self, dsm_question_definition: DataDefinition, juniper_question_def TranslationOverride('PROFILE.FIRSTNAME', 'profile.givenName'), TranslationOverride('PROFILE.LASTNAME', 'profile.familyName'), TranslationOverride(None, 'enrollee.subject', constant_value='true'), + TranslationOverride(None, + 'MEDICAL_HISTORY.ageAtSubmission', + derive=derive_age_at_submission) ] @@ -386,8 +431,7 @@ def create_translations( dsm_question = next((q for q in all_dsm_questions if q.stable_id == override.dsm_stable_id), None) juniper_question = next((q for q in all_juniper_questions if q.stable_id == override.juniper_stable_id), None) - if (override.juniper_stable_id == '' or juniper_question is not None) and ( - override.dsm_stable_id == '' or dsm_question is not None): + if juniper_question is not None and (override.dsm_stable_id is None or dsm_question is not None): translations.append(Translation(dsm_question, juniper_question, override)) # remove from lists; we don't need to match these anymore if dsm_question in dsm_questions: @@ -429,12 +473,23 @@ def create_translations( return unmatched_dsm_questions, juniper_questions, translations +def find_translation_by_juniper_stable_id( + juniper_stable_id: str, + translations: list[Translation] +) -> Translation | None: + for translation in translations: + if translation.juniper_question_definition.stable_id == juniper_stable_id: + return translation + + return None + + def create_workflow_translations( dsm_questions: list[DataDefinition], juniper_questions: list[DataDefinition], translations: list[Translation] ): - known_survey_mappings = { + survey_date_mappings = { 'LASTUPDATEDAT': 'lastUpdatedAt', 'COMPLETEDAT': 'completedAt', 'CREATEDAT': 'createdAt' @@ -451,7 +506,7 @@ def create_workflow_translations( value_if_present='true'))) juniper_questions.remove(completed_juniper_question) - for (known_dsm_question, known_juniper_question) in known_survey_mappings.items(): + for (known_dsm_question, known_juniper_question) in survey_date_mappings.items(): # look for known DSM questions, e.g. survey completion if dsm_question.stable_id.endswith("." + known_dsm_question) and dsm_question.stable_id.count('.') == 1: survey_name = dsm_question.stable_id.split('.')[0] @@ -463,6 +518,29 @@ def create_workflow_translations( if dsm_question in dsm_questions: dsm_questions.remove(dsm_question) + # make sure that completedAt/createdAt are created if possible + # for translation in translations: + # if translation.juniper_question_definition.stable_id.endswith('.completedAt'): + # # find the corresponding createdAt question + # created_at_translation = find_translation_by_juniper_stable_id( + # translation.juniper_question_definition.stable_id.replace('.completedAt', '.createdAt'), + # translations + # ) + # + # juniper_created_at_question = next( + # (q for q in juniper_questions if q.stable_id == translation.juniper_question_definition.stable_id.replace('.completedAt', '.createdAt')), None) + # + # + # if created_at_translation is None and juniper_created_at_question is not None: + # print(f'Warning: createdAt question not found for module {juniper_created_at_question.stable_id.replace(".createdAt", "")}, using value from completedAt') + # created_at_translation = Translation( + # translation.dsm_question_definition, + # juniper_created_at_question, + # ) + # translations.append(created_at_translation) + # juniper_questions.remove(juniper_created_at_question) + # + def is_matched(q1: DataDefinition, q2: DataDefinition) -> bool: @@ -503,7 +581,7 @@ def validate_leftover_questions( exit(1) -def parse_dsm_data(filepath: str) -> list[dict[str, Any]]: +def parse_dsm_data(filepath: str, participant_filter: list[str] | None) -> list[dict[str, Any]]: raw_data = [] with open(filepath, 'r') as f: # pepper exports as tsv @@ -519,12 +597,32 @@ def parse_dsm_data(filepath: str) -> list[dict[str, Any]]: new_row = {} for i, value in enumerate(row): new_row[header[i]] = value + if participant_filter is not None and len(participant_filter) > 0: + # check if the user is in the filter + if new_row['PROFILE.HRUID'] not in participant_filter: + continue data.append(new_row) return data necessary_columns = ['account.username', 'profile.birthDate'] +not_repeatable_modules = [ + "DSM.PARTICIPANTDATA", +] + +def is_not_repeatable(dsm_question: DataDefinition | None): + if dsm_question is None or dsm_question.stable_id is None: + return False + + for module in not_repeatable_modules: + if dsm_question.stable_id.startswith(module): + return True + + return False + + + def apply_translations(data: list[dict[str, Any]], translations: list[Translation], limit: int) -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] @@ -537,12 +635,24 @@ def apply_translations(data: list[dict[str, Any]], translations: list[Translatio new_row = {} for translation in translations: - apply_repeatable_translation(row, new_row, translation) + if (translation.dsm_question_definition is None + and translation.translation_override is not None + and translation.translation_override.constant_value is not None): + # this is a constant value + new_row[translation.juniper_question_definition.stable_id] = translation.translation_override.constant_value + elif is_not_repeatable(translation.dsm_question_definition): + # only get the latest value, don't capture whole history of responses. + # this exists because not all DSM modules export createdAt/completedAt + # which is needed for juniper to capture history :'( + apply_translation(row, new_row, translation, module_repeat=1) + else: + # standard question + apply_repeatable_translation(row, new_row, translation) has_all_needed_columns = True for column in necessary_columns: if column not in new_row or new_row[column] is None or new_row[column] == '' : - print(f'Warning: skipping user with missing {column}') + print(f'Warning: skipping pepper user {row["PROFILE.HRUID"]} (missing {column})') has_all_needed_columns = False if has_all_needed_columns and juniper_enrollee_filter(new_row): @@ -550,6 +660,38 @@ def apply_translations(data: list[dict[str, Any]], translations: list[Translatio return out +def is_question_in_data(question: DataDefinition, data: dict[str, Any]) -> bool: + if question.stable_id in data: + return True + + if question.subquestions is not None: + for subquestion in question.subquestions: + if subquestion.stable_id in data: + return True + + return False + + +# def apply_latest_only(dsm_data: dict[str, Any], juniper_data: dict[str, Any], translation: Translation): +# module_repeat = 1 +# while True: +# # check if the next repeat exists in the dsm data +# repeat_translation = make_repeat_translation(translation, module_repeat+1) +# +# present = is_question_in_data(repeat_translation.dsm_question_definition, dsm_data) +# +# if not present: +# break # last tested index is the last one that exists +# module_repeat += 1 +# +# # now we have the last repeat that exists +# repeat_translation = make_repeat_translation(translation, module_repeat) +# +# # we want DSM_DATA_1000.question -> JUNIPER_DATA.question, not DSM_DATA_1000.question -> JUNIPER_DATA[1000].question +# repeat_translation.juniper_question_definition.stable_id = translation.juniper_question_definition.stable_id +# +# apply_translation(dsm_data, juniper_data, repeat_translation, module_repeat=module_repeat) + def apply_repeatable_translation(dsm_data: dict[str, Any], juniper_data: dict[str, Any], translation: Translation): # for each translation, we need to go through all possible # repeats of the question. e.g., @@ -564,16 +706,19 @@ def apply_repeatable_translation(dsm_data: dict[str, Any], juniper_data: dict[st repeat_translation = make_repeat_translation(translation, module_repeat) - if not is_question_in_data(repeat_translation.dsm_question_definition, dsm_data): + successful = apply_translation(dsm_data, juniper_data, repeat_translation, module_repeat=module_repeat) + if not successful: + # if the translation was not successful, then there's + # no more data to apply break - - apply_translation(dsm_data, juniper_data, repeat_translation) module_repeat += 1 if is_created_at_question(translation.juniper_question_definition): warn_if_created_at_values_not_unique(juniper_data, dsm_data, translation) + + def is_created_at_question(juniper_question: DataDefinition) -> bool: return juniper_question.stable_id.endswith(".createdAt") @@ -608,23 +753,12 @@ def warn_if_created_at_values_not_unique(juniper_data: dict[str, Any], dsm_data: def make_repeat_translation(translation: Translation, repeat: int) -> Translation: return Translation( - make_repeat_question(translation.dsm_question_definition, repeat), + make_repeat_question(translation.dsm_question_definition, repeat) if translation.dsm_question_definition is not None else None, make_repeat_question(translation.juniper_question_definition, repeat), translation.translation_override, list(map(lambda t: make_repeat_translation(t, repeat), translation.subquestion_translations)), ) -def is_question_in_data(question: DataDefinition, data: dict[str, Any]) -> bool: - if question.stable_id in data: - return True - - if question.subquestions is not None: - for subquestion in question.subquestions: - if subquestion.stable_id in data: - return True - - return False - def make_repeat_question(question: DataDefinition, repeat: int) -> DataDefinition: return DataDefinition( question.source, @@ -664,16 +798,31 @@ def generate_juniper_module_repeat_stable_id(stable_id: str, repeat: int) -> str return '.'.join(split) -def apply_translation(dsm_data: dict[str, Any], juniper_data: dict[str, Any], translation: Translation): - if translation.translation_override is not None and translation.translation_override.constant_value is not None: - juniper_data[ - translation.juniper_question_definition.stable_id - ] = translation.translation_override.constant_value - return +# returns whether or not apply was successful; if false, it means that DSM data did not exist +def apply_translation(dsm_data: dict[str, Any], juniper_data: dict[str, Any], translation: Translation, module_repeat: int | None) -> bool: + if translation.translation_override is not None: + if translation.translation_override.constant_value is not None: + juniper_data[ + translation.juniper_question_definition.stable_id + ] = translation.translation_override.constant_value + return True + elif translation.translation_override.derive is not None: + # if we have a derive function, call it + # and set the value in the juniper data + value = translation.translation_override.derive(dsm_data, module_repeat) + if value is not None: + juniper_repeat_stable_id = get_juniper_response_stable_id(translation.juniper_question_definition, module_repeat) + juniper_data[juniper_repeat_stable_id] = value + return True + else: + return False juniper_question = translation.juniper_question_definition dsm_question = translation.dsm_question_definition + if dsm_question is not None and not is_question_in_data(dsm_question, dsm_data): + return False + if juniper_question.question_type == 'paneldynamic': vals = get_dynamic_panel_values(translation, dsm_data) juniper_data[juniper_question.stable_id] = json.dumps(vals) if len(vals) > 0 else '' @@ -685,6 +834,7 @@ def apply_translation(dsm_data: dict[str, Any], juniper_data: dict[str, Any], tr simple_translate( translation, dsm_data, juniper_data ) + return True def simple_translate(translation: Translation, dsm_data: dict[str, Any], @@ -876,15 +1026,18 @@ def get_multi_panel_values(translation: Translation, dsm_data: dict[str, Any]) - def write_data(outfile: str, data: list[dict[str, Any]]): with open(outfile, 'w') as f: - writer = csv.writer(f) - writer.writerow(data[0].keys()) + writer = csv.DictWriter(f, fieldnames=get_headers(data)) + writer.writeheader() for row in data: - writer.writerow(row.values()) + writer.writerow(row) def dsm_enrollee_filter(row): if 'E2E' in row['PROFILE.FIRSTNAME']: print('Skipping E2E participant') return False + if row.get('DSM.PARTICIPANTDATA.REGISTRATION_TYPE') not in ['Self', 'Dependent']: + print('Skipping non-enrollee participant') + return False return True def juniper_enrollee_filter(row): @@ -893,5 +1046,17 @@ def juniper_enrollee_filter(row): return False return True +def get_headers(data: list[dict[str, Any]]) -> list[str]: + if len(data) == 0: + return [] + headers = set() + for row in data: + for key in row.keys(): + headers.add(key) + + headers = list(headers) + headers.sort() + return headers + if __name__ == '__main__': main()