-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathintegrated_pipeline.py
More file actions
1416 lines (1193 loc) · 61.9 KB
/
integrated_pipeline.py
File metadata and controls
1416 lines (1193 loc) · 61.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import json
import sys
import argparse
import requests
from typing import Dict, List, Optional
from pathlib import Path
from tqdm import tqdm
# Add necessary paths
root_path = os.path.dirname(os.path.abspath(__file__))
print(root_path)
sys.path.append(root_path)
sys.path.append(os.path.join(root_path, "Models/ProTrek"))
# Import required modules
from tools.interproscan import InterproScan
from Bio.Blast.Applications import NcbiblastpCommandline
from utils.utils import extract_interproscan_metrics, get_seqnid, extract_blast_metrics, rename_interproscan_keys
from tools.go_integration_pipeline import GOIntegrationPipeline
from utils.generate_protein_prompt import generate_prompt, get_interpro_manager, get_lmdb_connection
from utils.openai_access import call_chatgpt
# Add MPR import
from utils.mpr import MultipleProcessRunnerSimplifier
class IntegratedProteinPipeline:
def __init__(self,
blast_database: str = "uniprot_swissprot",
expect_value: float = 0.01,
blast_num_threads: int = 256, # Number of BLAST threads
interproscan_path: str = "interproscan/interproscan-5.75-106.0/interproscan.sh",
interproscan_libraries: List[str] = None,
go_topk: int = 1,
selected_info_types: List[str] = None,
pfam_descriptions_path: str = None,
go_info_path: str = None,
interpro_data_path: str = None,
lmdb_path: str = None,
n_process_prompt: int = 256, # Number of parallel processes for prompt generation
n_process_llm: int = 64, # Number of parallel processes for LLM answer generation
is_enzyme: bool = False, # Whether the protein is an enzyme
skip_protrek_check: bool = False, # Whether to skip the ProTrek check
use_foldseek: bool = True, # Whether to use Foldseek
foldseek_database: str = "foldseek_db/sp", # Foldseek database path
foldseek_num_threads: int = 64, # Number of Foldseek threads
pdb_dir: str = None, # Directory containing PDB files for foldseek
auto_download_pdb: bool = True, # Whether to automatically download PDB files
pdb_download_timeout: int = 30, # Timeout for PDB download in seconds
pdb_download_processes: int = 64, # Number of processes for PDB download
failed_pdb_ids_path: str = None, # Path to save failed PDB download IDs
args: argparse.Namespace = None):
"""
Integrated protein analysis pipeline.
Args:
blast_database: BLAST database name.
expect_value: BLAST E-value threshold.
blast_num_threads: Number of threads for BLAST.
interproscan_path: Path to the InterProScan executable.
interproscan_libraries: List of InterProScan libraries to use.
go_topk: Top-k parameter for GO integration.
selected_info_types: Information types to include in the prompt generation.
pfam_descriptions_path: Path to the Pfam descriptions file.
go_info_path: Path to the GO information file.
interpro_data_path: Path to the InterPro data file.
lmdb_path: Path to the LMDB database.
n_process_prompt: Number of parallel processes for prompt generation.
n_process_llm: Number of parallel processes for LLM answer generation.
is_enzyme: Whether the protein is an enzyme.
skip_protrek_check: Whether to skip checking for ProTrek results.
use_foldseek: Whether to use Foldseek for remote homology search.
foldseek_database: Path to the Foldseek database.
foldseek_num_threads: Number of threads for Foldseek.
"""
self.blast_database = blast_database
self.expect_value = expect_value
self.blast_num_threads = blast_num_threads
self.interproscan_path = interproscan_path
self.interproscan_libraries = interproscan_libraries or [
"PFAM", "PIRSR", "PROSITE_PROFILES", "SUPERFAMILY", "PRINTS",
"PANTHER", "CDD", "GENE3D", "NCBIFAM", "SFLM", "MOBIDB_LITE",
"COILS", "PROSITE_PATTERNS", "FUNFAM", "SMART"
]
self.go_topk = go_topk
self.selected_info_types = selected_info_types or ['motif', 'go', 'protrek'] # Add protrek
self.n_process_prompt = n_process_prompt
self.n_process_llm = n_process_llm
self.is_enzyme = is_enzyme
self.skip_protrek_check = skip_protrek_check
self.use_foldseek = use_foldseek
self.foldseek_database = foldseek_database
self.foldseek_num_threads = foldseek_num_threads
self.pdb_dir = pdb_dir
self.auto_download_pdb = auto_download_pdb
self.pdb_download_timeout = pdb_download_timeout
self.pdb_download_processes = pdb_download_processes
self.failed_pdb_ids_path = failed_pdb_ids_path
# File path configuration
self.pfam_descriptions_path = pfam_descriptions_path
self.go_info_path = go_info_path
self.interpro_data_path = interpro_data_path
self.lmdb_path = lmdb_path
self.interproscan_info_path = args.interproscan_info_path if args else None
self.blast_info_path = args.blast_info_path if args else None
self.foldseek_info_path = args.foldseek_info_path if args else None
self.protrek_dir = args.protrek_dir if args else None
# Initialize the GO integration pipeline
self.go_pipeline = GOIntegrationPipeline(
topk=self.go_topk,
use_foldseek=self.use_foldseek,
foldseek_database=self.foldseek_database
)
# Initialize the InterPro manager (if needed)
self.interpro_manager = None
other_types = [t for t in self.selected_info_types if t not in ['motif', 'go', 'protrek']]
if other_types and self.interpro_data_path:
try:
from utils.generate_protein_prompt import get_interpro_manager
self.interpro_manager = get_interpro_manager(self.interpro_data_path, None)
except Exception as e:
print(f"Failed to initialize InterPro manager: {str(e)}")
def get_prompt_template(self, selected_info_types=None):
"""
Gets the prompt template, supporting optional information types and enzyme designation.
"""
if selected_info_types is None:
selected_info_types = ['motif', 'go']
# Select a different base prompt based on the is_enzyme parameter
if self.is_enzyme:
from utils.prompts import ENZYME_PROMPT
base_prompt = ENZYME_PROMPT
else:
from utils.prompts import FUNCTION_PROMPT
base_prompt = FUNCTION_PROMPT
PROMPT_TEMPLATE = base_prompt + '\n' + """
input information:
{%- if 'motif' in selected_info_types and motif_pfam %}
motif:{% for motif_id, motif_info in motif_pfam.items() %}
{{motif_id}}: {{motif_info}}
{% endfor %}
{%- endif %}
{%- if 'go' in selected_info_types and go_data.status == 'success' %}
GO:{% for go_entry in go_data.go_annotations %}
▢ GO term{{loop.index}}: {{go_entry.go_id}} (source: {{go_entry.source}}, E-value: {{go_entry.evalue}})
• definition: {{ go_data.all_related_definitions.get(go_entry.go_id, 'not found definition') }}
{% endfor %}
{%- endif %}
{%- for info_type in selected_info_types %}
{%- if info_type == 'protrek' and interpro_descriptions.get('protrek') and not motif_pfam and go_data.status == 'error' %}
protrek is a tool that can be used to predict the function of a protein.
To be specific, ProTrek is a tri-modal protein language model that jointly models protein sequence, structure and function (SSF).
It employs contrastive learning with three core alignment strategies:
(1) using structure as the supervision signal for AA sequences and vice versa,
(2) mutual supervision between sequences and functions, and
(3) mutual supervision between structures and functions.
This tri-modal alignment training enables ProTrek to tightly associate SSF by bringing genuine sample pairs (sequence-structure, sequence-function, and structure-function) closer together while pushing negative samples farther apart in the latent space.
These are the protrek output descriptions of the protein, the matching score is the score of the description in the protrek output.
The higher the matching score, the more relevant the description is to the protein. 15 represents that the description is quite relevant to the protein.
protrek:{% for description,score in interpro_descriptions.protrek %}
▢ Related description {{loop.index}}: {{description}} (matching score: {{score}})
{% endfor %}
{%- elif info_type not in ['motif', 'go', 'protrek'] and interpro_descriptions.get(info_type) %}
{{info_type}}:{% for ipr_id, ipr_info in interpro_descriptions[info_type].items() %}
▢ {{ipr_id}}: {{ipr_info.name}}
• description: {{ipr_info.abstract}}
{% endfor %}
{%- endif %}
{%- endfor %}
{%- if question %}
question: \n {{question}}
{%- else %}
question: \n what is the function of the protein?
{%- endif %}
"""
return PROMPT_TEMPLATE
def _pdb_to_fasta(self, pdb_dir: str, output_fasta: str) -> str:
"""
Convert PDB files to FASTA format for BLAST and InterProScan analysis.
Args:
pdb_dir: Directory containing PDB files.
output_fasta: Path to output FASTA file.
Returns:
Path to the generated FASTA file.
"""
# Extract sequences from PDB files directly
seq_dict = self._extract_sequences_from_pdb(pdb_dir)
# Write FASTA file
with open(output_fasta, 'w') as f:
for pdb_id, sequence in seq_dict.items():
f.write(f">{pdb_id}\n{sequence}\n")
print(f"Converted {len(seq_dict)} PDB files to FASTA format: {output_fasta}")
return output_fasta
def _extract_sequences_from_pdb(self, pdb_dir: str) -> Dict[str, str]:
"""
Extract sequences from PDB file(s).
Args:
pdb_dir: Path to PDB file or directory containing PDB files.
Returns:
Dictionary mapping PDB IDs to sequences.
"""
seq_dict = {}
if os.path.isfile(pdb_dir):
# Single PDB file
pdb_files = [pdb_dir]
elif os.path.isdir(pdb_dir):
# Directory containing PDB files
pdb_files = [os.path.join(pdb_dir, f) for f in os.listdir(pdb_dir)
if f.endswith('.pdb')]
else:
raise ValueError(f"PDB file or directory not found: {pdb_dir}")
for pdb_path in pdb_files:
try:
# Extract PDB ID from filename
pdb_id = os.path.basename(pdb_path).replace('.pdb', '')
# Read PDB file and extract sequence
sequence = self._read_pdb_sequence(pdb_path)
if sequence:
seq_dict[pdb_id] = sequence
except Exception as e:
print(f"Warning: Could not extract sequence from {pdb_path}: {str(e)}")
continue
return seq_dict
def _read_pdb_sequence(self, pdb_path: str) -> str:
"""
Read amino acid sequence from PDB file.
Args:
pdb_path: Path to PDB file.
Returns:
Amino acid sequence string.
"""
sequence = ""
atom_lines = []
with open(pdb_path, 'r') as f:
for line in f:
if line.startswith('ATOM') and line[12:16].strip() == 'CA':
# Extract residue information
res_name = line[17:20].strip()
res_num = int(line[22:26].strip())
atom_lines.append((res_num, res_name))
# Sort by residue number and extract sequence
atom_lines.sort(key=lambda x: x[0])
# Map 3-letter codes to 1-letter codes
aa_map = {
'ALA': 'A', 'ARG': 'R', 'ASN': 'N', 'ASP': 'D', 'CYS': 'C',
'GLN': 'Q', 'GLU': 'E', 'GLY': 'G', 'HIS': 'H', 'ILE': 'I',
'LEU': 'L', 'LYS': 'K', 'MET': 'M', 'PHE': 'F', 'PRO': 'P',
'SER': 'S', 'THR': 'T', 'TRP': 'W', 'TYR': 'Y', 'VAL': 'V'
}
for res_num, res_name in atom_lines:
if res_name in aa_map:
sequence += aa_map[res_name]
return sequence
def _check_and_run_protrek(self, input_fasta: str, protrek_dir: str):
"""
Checks if protrek_dir contains results for all required sequences; if not, runs the ProTrek script.
Args:
input_fasta: Path to the input FASTA file.
protrek_dir: Directory for ProTrek results.
"""
import subprocess
from Bio import SeqIO
# Read all protein IDs from the FASTA file
protein_ids = []
with open(input_fasta, 'r') as f:
for record in SeqIO.parse(f, 'fasta'):
protein_ids.append(record.id)
# Check which protein IDs are missing ProTrek results
missing_ids = []
os.makedirs(protrek_dir, exist_ok=True)
for pid in protein_ids:
protrek_file = os.path.join(protrek_dir, f"{pid}.json")
if not os.path.exists(protrek_file):
missing_ids.append(pid)
if not missing_ids:
print(f"✓ ProTrek results already exist for all proteins.")
return
print(f"⚠ Found {len(missing_ids)} proteins missing ProTrek results. Starting ProTrek script...")
# Run the run_protrek_text.sh script
script_path = os.path.join(root_path, "scripts", "run_protrek_text.sh")
# Call the script, passing necessary parameters
# Do not use capture_output, so output is displayed directly in the terminal
try:
print(f"Running command: bash {script_path} {input_fasta} {protrek_dir}")
result = subprocess.run(
["bash", script_path, input_fasta, protrek_dir],
check=True
)
print("✓ ProTrek script ran successfully.")
except subprocess.CalledProcessError as e:
print(f"✗ ProTrek script failed with return code: {e.returncode}")
raise
except KeyboardInterrupt:
print("\n⚠ User interrupted the ProTrek script.")
print("Hint: You can run the ProTrek script manually later, or it will be automatically detected on the next pipeline run.")
raise
def _check_and_download_pdb_files(self, input_fasta: str, pdb_dir: str) -> str:
"""
Check and download PDB files (if necessary), supporting a retry mechanism.
Args:
input_fasta: Path to the input FASTA file
pdb_dir: PDB file directory
Returns:
Updated pdb_dir path
"""
if not self.auto_download_pdb:
print("Automatic PDB file download is disabled")
return pdb_dir
# Extract protein IDs from the FASTA file
protein_ids = []
with open(input_fasta, 'r') as f:
for line in f:
if line.startswith('>'):
protein_id = line.strip()[1:].split()[0] # Take the part before the first space as the ID
protein_ids.append(protein_id)
print(f"Extracted {len(protein_ids)} protein IDs from the FASTA file")
# Check which PDB files are missing
missing_ids = []
os.makedirs(pdb_dir, exist_ok=True)
for protein_id in protein_ids:
pdb_file = os.path.join(pdb_dir, f"AF-{protein_id}-F1-model_v6.pdb")
if not os.path.exists(pdb_file):
missing_ids.append(protein_id)
if not missing_ids:
print(f"✓ All PDB files already exist")
return pdb_dir
print(f"⚠ Found {len(missing_ids)} proteins missing PDB files, starting download...")
# Use multiple processes to download PDB files, supporting retries
self._download_pdb_files_with_retry(missing_ids, pdb_dir)
return pdb_dir
def _download_pdb_files_with_retry(self, protein_ids: List[str], pdb_dir: str, max_retries: int = 10) -> None:
"""
PDB file download with retry mechanism.
Args:
protein_ids: List of protein IDs to download
pdb_dir: PDB file save directory
max_retries: Maximum number of retries
"""
failed_ids = protein_ids.copy()
retry_count = 0
while failed_ids and retry_count < max_retries:
retry_count += 1
print(f"\nAttempting to download PDB files for the {retry_count} time...")
print(f"Remaining number of proteins to download: {len(failed_ids)}")
# Download the currently failed IDs
self._download_pdb_files_parallel(failed_ids, pdb_dir)
# Check which files are still missing
still_missing = []
for protein_id in failed_ids:
pdb_file = os.path.join(pdb_dir, f"AF-{protein_id}-F1-model_v6.pdb")
if not os.path.exists(pdb_file):
still_missing.append(protein_id)
failed_ids = still_missing
if failed_ids:
print(f"After the {retry_count} attempt, {len(failed_ids)} proteins still failed to download")
if retry_count < max_retries:
print(f"Will retry downloading these proteins in the {retry_count + 1} attempt...")
else:
print(f"✓ All PDB files downloaded successfully!")
break
if failed_ids:
print(f"\n⚠ After {max_retries} attempts, {len(failed_ids)} proteins could not be downloaded")
print("These proteins will not have corresponding PDB structures, but sequence information processing will continue")
else:
print(f"\n✓ All PDB files finished downloading!")
def _download_pdb_files_parallel(self, protein_ids: List[str], pdb_dir: str) -> None:
"""
Download PDB files in parallel.
Args:
protein_ids: List of protein IDs to download
pdb_dir: PDB file save directory
"""
print(f"Starting parallel download of {len(protein_ids)} PDB files...")
def download_single_pdb(process_id, idx, protein_id, writer):
"""Download a single PDB file"""
url = f"https://alphafold.ebi.ac.uk/files/AF-{protein_id}-F1-model_v6.pdb"
output_path = os.path.join(pdb_dir, f"AF-{protein_id}-F1-model_v6.pdb")
try:
response = requests.get(url, timeout=self.pdb_download_timeout)
if response.status_code == 200:
with open(output_path, 'wb') as f:
f.write(response.content)
print(f"✓ Successfully downloaded {protein_id}")
return None
else:
print(f"✗ Download of {protein_id} failed, status code: {response.status_code}")
if writer:
writer.write(f"{protein_id}\n")
return protein_id
except Exception as e:
print(f"✗ Error during download of {protein_id}: {str(e)}")
if writer:
writer.write(f"{protein_id}\n")
return protein_id
# Set failed ID save path
if self.failed_pdb_ids_path is None:
failed_ids_path = os.path.join(pdb_dir, "failed_downloads.txt")
else:
failed_ids_path = self.failed_pdb_ids_path
# Use MPR for parallel download
mprs = MultipleProcessRunnerSimplifier(
data=protein_ids,
do=download_single_pdb,
n_process=self.pdb_download_processes,
split_strategy="queue",
save_path=failed_ids_path,
return_results=True
)
results = mprs.run()
# Count download results
failed_ids = [result for result in results if result is not None]
success_count = len(protein_ids) - len(failed_ids)
print(f"PDB download complete: Success {success_count}/{len(protein_ids)}")
if failed_ids:
print(f"Failed protein IDs saved to: {failed_ids_path}")
def _is_uniprot_id(self, protein_id: str) -> bool:
"""
Check if the protein ID is in UniProt ID format.
Args:
protein_id: Protein ID
Returns:
Whether it is a UniProt ID
"""
# UniProt IDs are usually 6 characters long, containing letters and numbers
# Example: P12345, Q9Y6K9, A0A0A0A0A0
if len(protein_id) == 6 and protein_id.isalnum():
return True
# Longer UniProt IDs are also supported
if len(protein_id) >= 6 and protein_id.isalnum():
return True
return False
def step1_run_tools(self, input_fasta: str, temp_dir: str = "temp") -> tuple:
"""
Step 1: Run BLAST, InterProScan, and optionally Foldseek analysis.
Args:
input_fasta: Path to the input FASTA file (or PDB directory if using PDB input).
temp_dir: Directory for temporary files.
Returns:
A tuple: (interproscan_info, blast_info, foldseek_info).
"""
print("Step 1: Running BLAST, InterProScan, and Foldseek analysis...")
# Create temporary directory
os.makedirs(temp_dir, exist_ok=True)
# If automatic PDB download is enabled and pdb_dir exists, check and download missing PDB files
if self.auto_download_pdb and self.pdb_dir:
print("Checking and downloading missing PDB files...")
self.pdb_dir = self._check_and_download_pdb_files(input_fasta, self.pdb_dir)
# Get sequence dictionary
seq_dict = get_seqnid(input_fasta)
print(f"Read {len(seq_dict)} sequences.")
# Run BLAST
print("Running BLAST analysis...")
if self.blast_info_path is None:
blast_xml = os.path.join(temp_dir, "blast_results.xml")
blast_cmd = NcbiblastpCommandline(
query=input_fasta,
db=self.blast_database,
out=blast_xml,
outfmt=5, # XML format
evalue=self.expect_value,
num_threads=self.blast_num_threads
)
blast_cmd()
# Extract BLAST results
blast_results = extract_blast_metrics(blast_xml)
blast_info = {}
for uid, info in blast_results.items():
blast_info[uid] = {"sequence": seq_dict[uid], "blast_results": info}
else:
blast_info = json.load(open(self.blast_info_path))
# Run Foldseek if enabled
foldseek_info = {}
if self.use_foldseek:
print("Running Foldseek analysis...")
if self.foldseek_info_path is None:
try:
from tools.foldseek import run_foldseek_analysis
foldseek_output = os.path.join(temp_dir, "foldseek_results.m8")
# Use PDB directory for foldseek analysis
if self.pdb_dir and os.path.isdir(self.pdb_dir):
pdb_dir = self.pdb_dir
else:
# Try to find PDB files in the same directory as input_fasta
input_dir = os.path.dirname(input_fasta)
pdb_dir = input_dir
print(f"Warning: No PDB directory specified, using input directory: {pdb_dir}")
foldseek_info = run_foldseek_analysis(
pdb_file=pdb_dir,
database=self.foldseek_database,
evalue=self.expect_value,
num_threads=self.foldseek_num_threads,
output_file=foldseek_output,
temp_dir=os.path.join(temp_dir, "foldseek_tmp")
)
print(f"Foldseek analysis complete: Found results for {len(foldseek_info)} proteins.")
except Exception as e:
print(f"Warning: Foldseek analysis failed: {str(e)}")
print("Continuing without Foldseek results...")
foldseek_info = {}
else:
foldseek_info = json.load(open(self.foldseek_info_path))
# Ensure foldseek_info keys match FASTA sequence IDs
# If the keys in the foldseek results are in PDB filename format, they need to be converted to original protein IDs
if foldseek_info:
corrected_foldseek_info = {}
for protein_id in seq_dict.keys():
# Try different key formats to match foldseek results
possible_keys = [
protein_id, # Original ID
f"AF-{protein_id}-F1-model_v6", # PDB filename format
f"AF-{protein_id}-F1-model_v6.pdb" # Full filename
]
matched_key = None
for key in possible_keys:
if key in foldseek_info:
matched_key = key
break
if matched_key:
corrected_foldseek_info[protein_id] = foldseek_info[matched_key]
else:
# If no corresponding foldseek results are found, create empty results
corrected_foldseek_info[protein_id] = {
"sequence": seq_dict[protein_id],
"foldseek_results": []
}
print(f"Warning: No Foldseek results found for protein {protein_id}")
foldseek_info = corrected_foldseek_info
# Run InterProScan
print("Running InterProScan analysis...")
if self.interproscan_info_path is None:
interproscan_json = os.path.join(temp_dir, "interproscan_results.json")
interproscan = InterproScan(self.interproscan_path)
input_args = {
"fasta_file": input_fasta,
"goterms": True,
"pathways": True,
"save_dir": interproscan_json
}
interproscan.run(**input_args)
# Extract InterProScan results
interproscan_results = extract_interproscan_metrics(
interproscan_json,
librarys=self.interproscan_libraries
)
interproscan_info = {}
for id, seq in seq_dict.items():
info = interproscan_results[seq]
info = rename_interproscan_keys(info)
interproscan_info[id] = {"sequence": seq, "interproscan_results": info}
else:
interproscan_info = json.load(open(self.interproscan_info_path))
# Note: Do not clean up temp files here, as they might be saved to the tool_results directory.
# Cleanup is handled in the finally block of the run() method.
print(f"Step 1 complete: Processed {len(interproscan_info)} proteins.")
return interproscan_info, blast_info, foldseek_info
def step2_integrate_go_information(self, interproscan_info: Dict, blast_info: Dict,
foldseek_info: Dict = None) -> Dict:
"""
Step 2: Integrate GO information.
Args:
interproscan_info: InterProScan results.
blast_info: BLAST results.
foldseek_info: Foldseek results (optional).
Returns:
A dictionary of integrated GO information.
"""
print("Step 2: Integrating GO information...")
# Use the GO integration pipeline for first-level filtering
protein_go_dict = self.go_pipeline.first_level_filtering(interproscan_info, blast_info, foldseek_info)
print(f"Step 2 complete: Integrated GO information for {len(protein_go_dict)} proteins.")
return protein_go_dict
def step3_generate_prompts(self, interproscan_info: Dict, blast_info: Dict,
protein_go_dict: Dict) -> Dict:
"""
Step 3: Generate protein prompts.
Args:
interproscan_info: InterProScan results.
blast_info: BLAST results.
protein_go_dict: Integrated GO information.
Returns:
A dictionary mapping protein IDs to prompts (or QA pairs if lmdb is used).
"""
print("Step 3: Generating protein prompts...")
# Create a temporary GO integration file format (for the generate_prompt function)
temp_go_data = {}
for protein_id, go_ids in protein_go_dict.items():
temp_go_data[protein_id] = go_ids
prompts_data = {}
if self.lmdb_path:
# If an lmdb path is provided, process QA data
from utils.generate_protein_prompt import get_qa_data
global_index = 0
for protein_id in tqdm(interproscan_info.keys(), desc="Generating prompts"):
# Get QA pairs
qa_pairs = get_qa_data(protein_id, self.lmdb_path)
for qa_pair in qa_pairs:
question = qa_pair['question']
ground_truth = qa_pair['ground_truth']
question_type = qa_pair['question_type']
# Generate prompt (requires modifying generate_prompt to support in-memory data)
prompt = self._generate_prompt_from_memory(
protein_id, interproscan_info, temp_go_data, question
)
if prompt:
prompts_data[global_index] = {
"index": global_index,
"protein_id": protein_id,
"prompt": prompt,
"question": question,
"ground_truth": ground_truth,
"question_type": question_type
}
global_index += 1
else:
# If no lmdb path, process as before
for protein_id in tqdm(interproscan_info.keys(), desc="Generating prompts"):
prompt = self._generate_prompt_from_memory(
protein_id, interproscan_info, temp_go_data
)
if prompt:
prompts_data[protein_id] = prompt
print(f"Step 3 complete: Generated {len(prompts_data)} prompts.")
return prompts_data
def step3_generate_prompts_parallel(self, interproscan_info: Dict, blast_info: Dict,
protein_go_dict: Dict) -> Dict:
"""
Step 3: Generate protein prompts in parallel.
Args:
interproscan_info: InterProScan results.
blast_info: BLAST results.
protein_go_dict: Integrated GO information.
Returns:
A dictionary mapping protein IDs to prompts (or QA pairs if lmdb is used).
"""
print("Step 3: Generating protein prompts in parallel...")
# Create a temporary GO integration file format (for the generate_prompt function)
temp_go_data = {}
for protein_id, go_ids in protein_go_dict.items():
temp_go_data[protein_id] = go_ids
# Prepare shared data
self._shared_interproscan_info = interproscan_info
self._shared_temp_go_data = temp_go_data
if self.lmdb_path:
# If an lmdb path is provided, process QA data
from utils.generate_protein_prompt import get_qa_data
# Prepare all QA tasks to be processed
qa_tasks = []
global_index = 0
for protein_id in interproscan_info.keys():
qa_pairs = get_qa_data(protein_id, self.lmdb_path)
for qa_pair in qa_pairs:
qa_tasks.append({
'global_index': global_index,
'protein_id': protein_id,
'question': qa_pair['question'],
'ground_truth': qa_pair['ground_truth'],
'question_type': qa_pair['question_type']
})
global_index += 1
print(f"Preparing to process {len(qa_tasks)} QA tasks in parallel...")
# Use MPR for parallel processing
prompts_data = {}
def process_qa_task(process_id, idx, qa_task, writer):
try:
# Initialize lmdb connection for each process
if self.lmdb_path:
get_lmdb_connection(self.lmdb_path)
protein_id = qa_task['protein_id']
question = qa_task['question']
ground_truth = qa_task['ground_truth']
global_index = qa_task['global_index']
question_type = qa_task['question_type']
# Generate prompt
prompt = self._generate_prompt_from_memory(
protein_id, self._shared_interproscan_info,
self._shared_temp_go_data, question
)
if prompt:
result = {
"index": global_index,
"protein_id": protein_id,
"prompt": prompt,
"question": question,
"ground_truth": ground_truth,
"question_type": question_type
}
if writer:
writer.write(json.dumps(result) + '\n')
except Exception as e:
print(f"Error processing QA task (protein_id: {qa_task['protein_id']}): {str(e)}")
# Run parallel processing
mprs = MultipleProcessRunnerSimplifier(
data=qa_tasks,
do=process_qa_task,
n_process=self.n_process_prompt,
split_strategy="static",
return_results=True
)
results = mprs.run()
# Parse results
for result_line in results:
try:
result = json.loads(result_line)
prompts_data[result['index']] = result
except Exception as e:
print(f"Error parsing result: {str(e)}")
else:
# If no lmdb path, process by protein ID
protein_ids = list(interproscan_info.keys())
print(f"Preparing to process {len(protein_ids)} proteins in parallel...")
def process_protein(process_id, idx, protein_id, writer):
try:
prompt = self._generate_prompt_from_memory(
protein_id, self._shared_interproscan_info,
self._shared_temp_go_data
)
if prompt and writer:
result = {
"protein_id": protein_id,
"prompt": prompt
}
writer.write(json.dumps(result) + '\n')
except Exception as e:
print(f"Error processing protein (protein_id: {protein_id}): {str(e)}")
# Run parallel processing
mprs = MultipleProcessRunnerSimplifier(
data=protein_ids,
do=process_protein,
n_process=self.n_process_prompt,
split_strategy="static",
return_results=True
)
results = mprs.run()
# Parse results
prompts_data = {}
for result_line in results:
try:
result = json.loads(result_line)
prompts_data[result['protein_id']] = result['prompt']
except Exception as e:
print(f"Error parsing result: {str(e)}")
print(f"Step 3 complete: Generated {len(prompts_data)} prompts in parallel.")
return prompts_data
def _generate_prompt_from_memory(self, protein_id: str, interproscan_info: Dict,
protein_go_dict: Dict, question: str = None) -> str:
"""
Generates a prompt from in-memory data, including complete motif and GO definitions.
"""
try:
from utils.protein_go_analysis import get_go_definition
from jinja2 import Template
# Get GO analysis results (new format with sources)
go_data_raw = protein_go_dict.get(protein_id, {})
go_ids = go_data_raw.get("go_ids", []) if isinstance(go_data_raw, dict) else go_data_raw
go_sources = go_data_raw.get("go_sources", {}) if isinstance(go_data_raw, dict) else {}
go_annotations = []
all_related_definitions = {}
if go_ids:
for go_id in go_ids:
# Ensure correct GO ID format
clean_go_id = go_id.split(":")[-1] if ":" in go_id else go_id
# Get source and e-value information
source_info = go_sources.get(clean_go_id, {})
source = source_info.get("source", "Unknown")
evalue = source_info.get("evalue", None)
go_annotations.append({
"go_id": clean_go_id,
"source": source,
"evalue": evalue if evalue else "N/A"
})
# Get GO definition
if self.go_info_path and os.path.exists(self.go_info_path):
definition = get_go_definition(clean_go_id, self.go_info_path)
if definition:
all_related_definitions[clean_go_id] = definition
# Get motif information
motif_pfam = {}
if self.pfam_descriptions_path and os.path.exists(self.pfam_descriptions_path):
try:
# Extract pfam info from interproscan results
interproscan_results = interproscan_info[protein_id].get('interproscan_results', {})
pfam_entries = interproscan_results.get('pfam_id', [])
# Load pfam descriptions
with open(self.pfam_descriptions_path, 'r') as f:
pfam_descriptions = json.load(f)
# Build motif_pfam dictionary
for entry in pfam_entries:
for pfam_id, ipr_id in entry.items():
if pfam_id and pfam_id in pfam_descriptions:
motif_pfam[pfam_id] = pfam_descriptions[pfam_id]['description']
except Exception as e:
print(f"Error getting motif information: {str(e)}")
# Get InterPro description information
interpro_descriptions = {}
other_types = [t for t in self.selected_info_types if t not in ['motif', 'go', 'protrek']]
if other_types and self.interpro_manager:
try:
interpro_descriptions = self.interpro_manager.get_description(protein_id, other_types)
except Exception as e:
print(f"Error getting InterPro description information: {str(e)}")
# Get ProTrek description information (only if motif and GO are absent)
if 'protrek' in self.selected_info_types and not motif_pfam and not go_annotations:
if self.protrek_dir is not None:
protrek_path = os.path.join(self.protrek_dir, f"{protein_id}.json")
with open(protrek_path, 'r') as f:
protrek_descriptions = json.load(f)
if protrek_descriptions:
interpro_descriptions['protrek'] = protrek_descriptions
else:
try:
from utils.get_protrek_text import get_protrek_text
sequence = interproscan_info[protein_id]['sequence']
protrek_descriptions = get_protrek_text(sequence, topk=3)
if protrek_descriptions:
interpro_descriptions['protrek'] = protrek_descriptions
except Exception as e:
print(f"Error getting ProTrek information: {str(e)}")
# Prepare template data
template_data = {
"protein_id": protein_id,
"selected_info_types": self.selected_info_types,
"go_data": {
"status": "success" if go_annotations else "error",
"go_annotations": go_annotations,
"all_related_definitions": all_related_definitions
},
"motif_pfam": motif_pfam,
"interpro_descriptions": interpro_descriptions,
"question": question
}
# Generate prompt using the template
PROMPT_TEMPLATE = self.get_prompt_template(self.selected_info_types)
template = Template(PROMPT_TEMPLATE)
return template.render(**template_data)
except Exception as e:
print(f"Error generating prompt (protein_id: {protein_id}): {str(e)}")
# If an error occurs, return a simplified version of the prompt
return self._generate_simple_prompt(protein_id, interproscan_info, protein_go_dict, question)
def _generate_simple_prompt(self, protein_id: str, interproscan_info: Dict,
protein_go_dict: Dict, question: str = None) -> str:
"""
Generates a simplified version of the prompt (as a fallback).
"""
# Get protein sequence
sequence = interproscan_info[protein_id].get('sequence', '')