Skip to content

Commit 9158ec8

Browse files
authored
Merge pull request #287 from broadinstitute/jlc_ingest_h5ad_metadata
Extract metadata from h5ad and delocalize ingest-intermediate files to study bucket (SCP-4823)
2 parents f9c1ae2 + 93bea28 commit 9158ec8

File tree

5 files changed

+111
-59
lines changed

5 files changed

+111
-59
lines changed

ingest/anndata_.py

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,6 @@ def __init__(self, file_path, study_file_id, study_id, **kwargs):
1616
IngestFiles.__init__(
1717
self, file_path, allowed_file_types=self.ALLOWED_FILE_TYPES
1818
)
19-
# If performing cluster extraction, set obsm_keys
20-
extract_cluster = kwargs.get("extract_cluster")
21-
if extract_cluster == True:
22-
self.obsm_keys = kwargs["obsm_keys"]
23-
else:
24-
pass
2519

2620
def obtain_adata(self):
2721
try:
@@ -61,7 +55,8 @@ def generate_cluster_header(adata, clustering_name):
6155
else:
6256
msg = f"Too few dimensions for visualization in obsm \"{clustering_name}\", found {clustering_dimension}, expected 2 or 3."
6357
raise ValueError(msg)
64-
with open(f"{clustering_name}.cluster.anndata_segment.tsv", "w") as f:
58+
filename = AnnDataIngestor.set_clustering_filename(clustering_name)
59+
with open(filename, "w") as f:
6560
f.write('\t'.join(headers) + '\n')
6661

6762
@staticmethod
@@ -71,7 +66,8 @@ def generate_cluster_type_declaration(adata, clustering_name):
7166
"""
7267
clustering_dimension = adata.obsm[clustering_name].shape[1]
7368
types = ["TYPE", *["numeric"] * clustering_dimension]
74-
with open(f"{clustering_name}.cluster.anndata_segment.tsv", "a") as f:
69+
filename = AnnDataIngestor.set_clustering_filename(clustering_name)
70+
with open(filename, "a") as f:
7571
f.write('\t'.join(types) + '\n')
7672

7773
@staticmethod
@@ -83,27 +79,45 @@ def generate_cluster_body(adata, clustering_name):
8379
cluster_body = pd.concat(
8480
[cluster_cells, pd.DataFrame(adata.obsm[clustering_name])], axis=1
8581
)
82+
filename = AnnDataIngestor.set_clustering_filename(clustering_name)
8683
pd.DataFrame(cluster_body).to_csv(
87-
AnnDataIngestor.set_output_filename(clustering_name),
88-
sep="\t",
89-
mode="a",
90-
header=None,
91-
index=False,
84+
filename, sep="\t", mode="a", header=None, index=False
9285
)
9386

9487
@staticmethod
95-
def files_to_delocalize(arguments):
96-
# ToDo - check if names using obsm_keys need sanitization
97-
cluster_file_names = [AnnDataIngestor.set_output_filename(name) for name in arguments["obsm_keys"]]
98-
return cluster_file_names
88+
def set_clustering_filename(name):
89+
return f"h5ad_frag.cluster.{name}.tsv"
9990

10091
@staticmethod
101-
def set_output_filename(name):
102-
return f"{name}.cluster.anndata_segment.tsv"
92+
def generate_metadata_file(adata, output_name):
93+
"""
94+
Generate metadata NAME and TYPE lines
95+
"""
96+
headers = adata.obs.columns.tolist()
97+
types = []
98+
for header in headers:
99+
if pd.api.types.is_number(adata.obs[header]):
100+
types.append("NUMERIC")
101+
else:
102+
types.append("GROUP")
103+
headers.insert(0, "NAME")
104+
types.insert(0, "TYPE")
105+
with open(output_name, "w") as f:
106+
f.write('\t'.join(headers) + '\n')
107+
f.write('\t'.join(types) + '\n')
108+
adata.obs.to_csv(output_name, sep="\t", mode="a", header=None, index=True)
109+
110+
@staticmethod
111+
def clusterings_to_delocalize(arguments):
112+
# ToDo - check if names using obsm_keys need sanitization
113+
cluster_file_names = []
114+
for name in arguments["obsm_keys"]:
115+
cluster_file_names.append(AnnDataIngestor.set_clustering_filename(name))
116+
return cluster_file_names
103117

104118
@staticmethod
105-
def delocalize_cluster_files(file_path, study_file_id, files_to_delocalize):
106-
""" Copy cluster files to study bucket
119+
def delocalize_extracted_files(file_path, study_file_id, files_to_delocalize):
120+
""" Copy extracted files to study bucket
107121
"""
108122

109123
for file in files_to_delocalize:
@@ -112,5 +126,5 @@ def delocalize_cluster_files(file_path, study_file_id, files_to_delocalize):
112126
None,
113127
file_path,
114128
file,
115-
f"_scp_internal/anndata_ingest/{file}",
129+
f"_scp_internal/anndata_ingest/{study_file_id}/{file}",
116130
)

ingest/cli_parser.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -354,9 +354,9 @@ def create_parser():
354354
)
355355

356356
parser_anndata.add_argument(
357-
"--extract-cluster",
358-
action="store_true",
359-
help="Indicates clustering data should be extracted",
357+
"--extract",
358+
type=ast.literal_eval,
359+
help="Array of file types to extract, options include ['cluster', 'metadata']",
360360
)
361361

362362
parser_expression_writer = subparsers.add_parser(

ingest/ingest_pipeline.py

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
python ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 ingest_anndata --ingest-anndata --anndata-file ../tests/data/anndata/test.h5ad
3131
3232
# Ingest AnnData file - cluster extraction
33-
python ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 ingest_anndata --extract-cluster --ingest-anndata --anndata-file ../tests/data/anndata/test.h5ad --obsm-keys "['X_tsne']"
33+
python ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 ingest_anndata --extract "['cluster']" --ingest-anndata --anndata-file ../tests/data/anndata/test.h5ad --obsm-keys "['X_tsne']"
3434
3535
# Subsample cluster and metadata file
3636
python ingest_pipeline.py --study-id 5d276a50421aa9117c982845 --study-file-id 5dd5ae25421aa910a723a337 ingest_subsample --cluster-file ../tests/data/test_1k_cluster_data.csv --name cluster1 --cell-metadata-file ../tests/data/test_1k_metadata_Data.csv --subsample
@@ -483,14 +483,18 @@ def subsample(self):
483483
return 0
484484

485485
@custom_metric(config.get_metric_properties)
486-
def ingest_anndata(self):
487-
"""Ingests anndata files."""
486+
def extract_from_anndata(self):
487+
"""Extract data subsets from anndata file per SCP filetypes."""
488488
self.anndata = AnnDataIngestor(
489489
self.anndata_file, self.study_id, self.study_file_id, **self.kwargs
490490
)
491491
if self.anndata.validate():
492492
self.report_validation("success")
493-
if self.kwargs["extract_cluster"] == True:
493+
study_info = config.get_metric_properties()
494+
accession = study_info.get_properties()['studyAccession']
495+
file_id = study_info.get_properties()['fileName']
496+
outfile_prefix = f"{file_id}.{accession}"
497+
if self.kwargs.get("extract") and "cluster" in self.kwargs.get("extract"):
494498
if not self.kwargs["obsm_keys"]:
495499
self.kwargs["obsm_keys"] = ['X_tsne']
496500
for key in self.kwargs["obsm_keys"]:
@@ -499,6 +503,11 @@ def ingest_anndata(self):
499503
self.anndata.adata, key
500504
)
501505
AnnDataIngestor.generate_cluster_body(self.anndata.adata, key)
506+
if self.kwargs.get("extract") and "metadata" in self.kwargs.get("extract"):
507+
metadata_filename = f"h5ad_frag.metadata.tsv"
508+
AnnDataIngestor.generate_metadata_file(
509+
self.anndata.adata, metadata_filename
510+
)
502511
return 0
503512
# scanpy unable to open AnnData file
504513
else:
@@ -571,7 +580,7 @@ def run_ingest(ingest, arguments, parsed_args):
571580
elif "ingest_anndata" in arguments:
572581
if arguments["ingest_anndata"]:
573582
config.set_parent_event_name("ingest-pipeline:anndata:ingest")
574-
status_anndata = ingest.ingest_anndata()
583+
status_anndata = ingest.extract_from_anndata()
575584
status.append(status_anndata)
576585
elif "differential_expression" in arguments:
577586
config.set_parent_event_name("ingest-pipeline:differential-expression")
@@ -618,16 +627,24 @@ def exit_pipeline(ingest, status, status_cell_metadata, arguments):
618627
file_path, study_file_id, files_to_match
619628
)
620629
# for successful anndata jobs, need to delocalize intermediate ingest files
621-
elif (
622-
"extract_cluster" in arguments
623-
and arguments.get("extract_cluster") == True
624-
and all(i < 1 for i in status)
625-
):
630+
elif arguments.get("extract") and all(i < 1 for i in status):
626631
file_path, study_file_id = get_delocalization_info(arguments)
627632
# append status?
633+
files_to_delocalize = []
628634
if IngestFiles.is_remote_file(file_path):
629-
files_to_delocalize = AnnDataIngestor.files_to_delocalize(arguments)
630-
AnnDataIngestor.delocalize_cluster_files(
635+
# the next 3 lines are copied from 493-495, suggestions
636+
# welcomed for how to DRY this up
637+
study_info = config.get_metric_properties()
638+
accession = study_info.get_properties()['studyAccession']
639+
file_id = study_info.get_properties()['fileName']
640+
if "cluster" in arguments.get("extract"):
641+
files_to_delocalize.extend(
642+
AnnDataIngestor.clusterings_to_delocalize(arguments)
643+
)
644+
if "metadata" in arguments.get("extract"):
645+
metadata_filename = f"h5ad_frag.metadata.tsv"
646+
files_to_delocalize.append(metadata_filename)
647+
AnnDataIngestor.delocalize_extracted_files(
631648
file_path, study_file_id, files_to_delocalize
632649
)
633650
# all non-DE, non-anndata ingest jobs can exit on success

tests/test_anndata.py

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
from anndata_ import AnnDataIngestor
1212
from ingest_files import IngestFiles
1313

14-
class TestAnnDataIngestor(unittest.TestCase):
1514

15+
class TestAnnDataIngestor(unittest.TestCase):
1616
@staticmethod
1717
def setup_class(self):
1818
filepath_valid = "../tests/data/anndata/trimmed_compliant_pbmc3K.h5ad"
@@ -24,14 +24,16 @@ def setup_class(self):
2424
self.cluster_name = 'X_tsne'
2525
self.valid_kwargs = {'obsm_keys': [self.cluster_name]}
2626
self.anndata_ingest = AnnDataIngestor(*self.valid_args, **self.valid_kwargs)
27-
self.output_filename = f"{self.cluster_name}.cluster.anndata_segment.tsv"
27+
self.output_filename = f"h5ad_frag.cluster.{self.cluster_name}.tsv"
2828

2929
def teardown_method(self, _):
3030
if os.path.isfile(self.output_filename):
3131
os.remove(self.output_filename)
3232

3333
def test_minimal_valid_anndata(self):
34-
self.assertTrue(self.anndata_ingest.validate(), "expect known good file to open with scanpy")
34+
self.assertTrue(
35+
self.anndata_ingest.validate(), "expect known good file to open with scanpy"
36+
)
3537

3638
def test_truncated_anndata(self):
3739
truncated_input = AnnDataIngestor(*self.invalid_args)
@@ -47,9 +49,7 @@ def test_truncated_anndata(self):
4749

4850
def test_input_bad_suffix(self):
4951
bad_input = AnnDataIngestor(
50-
"../tests/data/anndata/bad.foo",
51-
self.study_id,
52-
self.study_file_id,
52+
"../tests/data/anndata/bad.foo", self.study_id, self.study_file_id
5353
)
5454
# passing obtain_data function to assertRaises using lambda
5555
# otherwise bad_input.obtain_data() is evaluated and triggers
@@ -64,40 +64,60 @@ def test_input_bad_suffix(self):
6464
def test_set_output_filename(self):
6565
cluster_name = "X_Umap"
6666
self.assertEqual(
67-
AnnDataIngestor.set_output_filename(cluster_name),
68-
"X_Umap.cluster.anndata_segment.tsv"
67+
AnnDataIngestor.set_clustering_filename(cluster_name),
68+
"h5ad_frag.cluster.X_Umap.tsv",
6969
)
7070

7171
def test_generate_cluster_header(self):
72-
self.anndata_ingest.generate_cluster_header(self.anndata_ingest.obtain_adata(), self.cluster_name)
72+
self.anndata_ingest.generate_cluster_header(
73+
self.anndata_ingest.obtain_adata(), self.cluster_name
74+
)
7375
with open(self.output_filename) as header_file:
7476
header = header_file.readline().split("\t")
75-
self.assertEqual(['NAME', 'X', "Y\n"], header, "did not find expected headers")
77+
self.assertEqual(
78+
['NAME', 'X', "Y\n"], header, "did not find expected headers"
79+
)
7680

7781
def test_generate_cluster_type_declaration(self):
78-
self.anndata_ingest.generate_cluster_type_declaration(self.anndata_ingest.obtain_adata(), self.cluster_name)
82+
self.anndata_ingest.generate_cluster_type_declaration(
83+
self.anndata_ingest.obtain_adata(), self.cluster_name
84+
)
7985
with open(self.output_filename) as header_file:
8086
header = header_file.readline().split("\t")
81-
self.assertEqual(['TYPE', 'numeric', "numeric\n"], header, "did not find expected headers")
87+
self.assertEqual(
88+
['TYPE', 'numeric', "numeric\n"],
89+
header,
90+
"did not find expected headers",
91+
)
8292

8393
def test_generate_cluster_body(self):
84-
self.anndata_ingest.generate_cluster_body(self.anndata_ingest.obtain_adata(), self.cluster_name)
94+
self.anndata_ingest.generate_cluster_body(
95+
self.anndata_ingest.obtain_adata(), self.cluster_name
96+
)
8597
with open(self.output_filename) as cluster_body:
8698
line = cluster_body.readline().split("\t")
8799
expected_line = ['AAACATACAACCAC-1', '16.009954', "-21.073845\n"]
88-
self.assertEqual(expected_line, line, 'did not get expected coordinates from cluster body')
100+
self.assertEqual(
101+
expected_line,
102+
line,
103+
'did not get expected coordinates from cluster body',
104+
)
89105

90106
def test_get_files_to_delocalize(self):
91-
files = AnnDataIngestor.files_to_delocalize(self.valid_kwargs)
107+
files = AnnDataIngestor.clusterings_to_delocalize(self.valid_kwargs)
92108
expected_files = [self.output_filename]
93109
self.assertEqual(expected_files, files)
94110

95111
def test_delocalize_files(self):
96112
# just create header, no reason to run full extract
97-
self.anndata_ingest.generate_cluster_header(self.anndata_ingest.obtain_adata(), self.cluster_name)
113+
self.anndata_ingest.generate_cluster_header(
114+
self.anndata_ingest.obtain_adata(), self.cluster_name
115+
)
98116
with patch('ingest_files.IngestFiles.delocalize_file'):
99117
AnnDataIngestor.delocalize_file(
100-
"gs://fake_bucket", self.study_id, AnnDataIngestor.files_to_delocalize(self.valid_kwargs)
118+
"gs://fake_bucket",
119+
self.study_id,
120+
AnnDataIngestor.clusterings_to_delocalize(self.valid_kwargs),
101121
)
102122
self.assertEqual(
103123
IngestFiles.delocalize_file.call_count,

tests/test_ingest.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,11 @@
4646
validate_arguments,
4747
IngestPipeline,
4848
exit_pipeline,
49-
run_ingest
49+
run_ingest,
5050
)
5151
from expression_files.expression_files import GeneExpression
5252

53+
5354
def mock_load(self, *args, **kwargs):
5455
"""Enables overwriting normal function with this placeholder.
5556
Returning the arguments enables tests to verify that the code invokes
@@ -682,17 +683,17 @@ def test_extract_cluster_file_from_anndata(self):
682683
"5dd5ae25421aa910a723a337",
683684
"ingest_anndata",
684685
"--ingest-anndata",
685-
"--extract-cluster",
686+
"--extract",
687+
"['cluster']",
686688
"--anndata-file",
687689
"../tests/data/anndata/trimmed_compliant_pbmc3K.h5ad",
688690
"--obsm-keys",
689-
"['X_tsne']"
690-
691+
"['X_tsne']",
691692
]
692693
ingest, arguments, status, status_cell_metadata = self.execute_ingest(args)
693694
self.assertEqual(len(status), 1)
694695
self.assertEqual(status[0], 0)
695-
filename = 'X_tsne.cluster.anndata_segment.tsv'
696+
filename = 'h5ad_frag.cluster.X_tsne.tsv'
696697
self.assertTrue(os.path.isfile(filename))
697698

698699

0 commit comments

Comments
 (0)