Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions bio2zarr/plink.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ def iter_alleles_and_genotypes(self, start, stop, shape, num_alleles):
ref_iter = self.bim.allele_2.values[start:stop]
gt_iter = self.bed_reader.iter_decode(start, stop)
for alt, ref, gt in zip(alt_iter, ref_iter, gt_iter):
alleles = np.full(num_alleles, constants.STR_FILL, dtype="O")
alleles = np.full(num_alleles, constants.STR_FILL, dtype="T")
alleles[0] = ref
alleles[1 : 1 + len(alt)] = alt
phased = np.zeros(gt.shape[0], dtype=bool)
Expand Down Expand Up @@ -246,13 +246,13 @@ def generate_schema(
),
vcz.ZarrArraySpec(
name="variant_allele",
dtype="O",
dtype="T",
dimensions=["variants", "alleles"],
description=None,
),
vcz.ZarrArraySpec(
name="variant_id",
dtype="O",
dtype="T",
dimensions=["variants"],
description=None,
),
Expand Down
4 changes: 2 additions & 2 deletions bio2zarr/tskit.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def iter_alleles_and_genotypes(self, start, stop, shape, num_alleles):
copy=False,
):
gt = np.full(shape, constants.INT_FILL, dtype=np.int8)
alleles = np.full(num_alleles, constants.STR_FILL, dtype="O")
alleles = np.full(num_alleles, constants.STR_FILL, dtype="T")
# length is the length of the REF allele unless other fields
# are included.
variant_length = len(variant.alleles[0])
Expand Down Expand Up @@ -200,7 +200,7 @@ def generate_schema(
vcz.ZarrArraySpec(
source=None,
name="variant_allele",
dtype="O",
dtype="T",
dimensions=["variants", "alleles"],
description="Alleles for each variant",
),
Expand Down
19 changes: 12 additions & 7 deletions bio2zarr/vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def smallest_dtype(self):
ret = "U1"
else:
assert self.vcf_type == "String"
ret = "O"
ret = "T"
return ret


Expand Down Expand Up @@ -582,7 +582,7 @@ def transform(self, vcf_value):
if vcf_value is None:
return self.missing_value # NEEDS TEST
assert self.dimension == 1
return np.array(vcf_value, ndmin=1, dtype="str")
return np.array(vcf_value, ndmin=1, dtype="T")


def get_vcf_field_path(base_path, vcf_field):
Expand Down Expand Up @@ -1141,13 +1141,15 @@ def fixed_field_spec(name, dtype, source=None, dimensions=("variants",)):
if dtype == "bool"
else None
)
codecs = vcz.default_zarr_codecs(dtype)
return vcz.ZarrArraySpec(
source=source,
name=name,
dtype=dtype,
description="",
dimensions=dimensions,
compressor=compressor,
codecs=codecs,
)

name_map = {field.full_name: field for field in self.metadata.fields}
Expand All @@ -1163,7 +1165,7 @@ def fixed_field_spec(name, dtype, source=None, dimensions=("variants",)):
),
fixed_field_spec(
name="variant_allele",
dtype="O",
dtype="T",
dimensions=["variants", "alleles"],
),
fixed_field_spec(
Expand All @@ -1173,7 +1175,7 @@ def fixed_field_spec(name, dtype, source=None, dimensions=("variants",)):
),
fixed_field_spec(
name="variant_id",
dtype="O",
dtype="T",
),
fixed_field_spec(
name="variant_id_mask",
Expand Down Expand Up @@ -1205,15 +1207,18 @@ def fixed_field_spec(name, dtype, source=None, dimensions=("variants",)):
dimensions=["variants", "samples"],
description="",
compressor=vcz.DEFAULT_ZARR_COMPRESSOR_BOOL.get_config(),
codecs=vcz.default_zarr_codecs("bool"),
)
)
gt_dtype = self.gt_field.smallest_dtype()
array_specs.append(
vcz.ZarrArraySpec(
name="call_genotype",
dtype=self.gt_field.smallest_dtype(),
dtype=gt_dtype,
dimensions=["variants", "samples", "ploidy"],
description="",
compressor=vcz.DEFAULT_ZARR_COMPRESSOR_GENOTYPES.get_config(),
codecs=vcz.default_zarr_codecs(gt_dtype),
)
)
array_specs.append(
Expand All @@ -1223,6 +1228,7 @@ def fixed_field_spec(name, dtype, source=None, dimensions=("variants",)):
dimensions=["variants", "samples", "ploidy"],
description="",
compressor=vcz.DEFAULT_ZARR_COMPRESSOR_BOOL.get_config(),
codecs=vcz.default_zarr_codecs("bool"),
)
)

Expand Down Expand Up @@ -1581,8 +1587,7 @@ def inspect(path):
raise ValueError(f"Path not found: {path}")
if (path / "metadata.json").exists():
obj = IntermediateColumnarFormat(path)
# NOTE: this is too strict, we should support more general Zarrs, see #276
elif (path / ".zmetadata").exists():
elif (path / ".zattrs").exists() or (path / "zarr.json").exists():
obj = vcz.VcfZarr(path)
else:
raise ValueError(f"{path} not in ICF or VCF Zarr format")
Expand Down
119 changes: 90 additions & 29 deletions bio2zarr/vcz.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,29 @@
cname="zstd", clevel=7, shuffle=numcodecs.Blosc.BITSHUFFLE
)


def default_zarr_compressors(dtype):
return zarr.codecs.BloscCodec(
cname="zstd",
clevel=7,
shuffle=zarr.codecs.BloscShuffle.bitshuffle,
typesize=np.dtype(dtype).itemsize,
).to_dict()


def default_zarr_codecs(dtype):
if dtype == "T":
return [
zarr.codecs.VLenUTF8Codec().to_dict(),
default_zarr_compressors(dtype),
]
else:
return [
zarr.codecs.BytesCodec().to_dict(),
default_zarr_compressors(dtype),
]


_fixed_field_descriptions = {
"variant_contig": "An identifier from the reference genome or an angle-bracketed ID"
" string pointing to a contig in the assembly file",
Expand Down Expand Up @@ -166,6 +189,7 @@ class ZarrArraySpec:
description: str
compressor: dict = None
filters: list = None
codecs: list = None
source: str = None

def __post_init__(self):
Expand Down Expand Up @@ -206,6 +230,7 @@ def from_field(
array_name=None,
compressor=None,
filters=None,
codecs=None,
):
prefix = "variant_"
dimensions = ["variants"]
Expand Down Expand Up @@ -258,6 +283,7 @@ def from_field(
description=vcf_field.description,
compressor=compressor,
filters=filters,
codecs=codecs,
)

def chunk_nbytes(self, schema):
Expand Down Expand Up @@ -643,55 +669,73 @@ def init(

def encode_samples(self, root):
samples = self.source.samples
dimension_names = ["samples"]
array = root.array(
"sample_id",
data=[sample.id for sample in samples],
shape=len(samples),
dtype="str",
compressor=DEFAULT_ZARR_COMPRESSOR,
dtype="T",
# compressor=DEFAULT_ZARR_COMPRESSOR,
compressor="auto", # TODO: shouldn't need to set this
compressors=default_zarr_compressors("T"),
chunks=(self.schema.get_chunks(["samples"])[0],),
dimension_names=dimension_names,
)
array.attrs["_ARRAY_DIMENSIONS"] = ["samples"]
# array.attrs["_ARRAY_DIMENSIONS"] = dimension_names
logger.debug("Samples done")

def encode_contigs(self, root):
contigs = self.source.contigs
dimension_names = ["contigs"]
array = root.array(
"contig_id",
data=[contig.id for contig in contigs],
shape=len(contigs),
dtype="str",
compressor=DEFAULT_ZARR_COMPRESSOR,
dtype="T",
# compressor=DEFAULT_ZARR_COMPRESSOR,
compressor="auto", # TODO: shouldn't need to set this
compressors=default_zarr_compressors("T"),
dimension_names=dimension_names,
)
array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]
# array.attrs["_ARRAY_DIMENSIONS"] = dimension_names
if all(contig.length is not None for contig in contigs):
array = root.array(
"contig_length",
data=[contig.length for contig in contigs],
shape=len(contigs),
dtype=np.int64,
compressor=DEFAULT_ZARR_COMPRESSOR,
# compressor=DEFAULT_ZARR_COMPRESSOR,
compressor="auto", # TODO: shouldn't need to set this
compressors=default_zarr_compressors(np.int64),
dimension_names=dimension_names,
)
array.attrs["_ARRAY_DIMENSIONS"] = ["contigs"]
# array.attrs["_ARRAY_DIMENSIONS"] = dimension_names

def encode_filters(self, root):
filters = self.source.filters
dimension_names = ["filters"]
array = root.array(
"filter_id",
data=[filt.id for filt in filters],
shape=len(filters),
dtype="str",
compressor=DEFAULT_ZARR_COMPRESSOR,
dtype="T",
# compressor=DEFAULT_ZARR_COMPRESSOR,
compressor="auto", # TODO: shouldn't need to set this
compressors=default_zarr_compressors("T"),
dimension_names=dimension_names,
)
array.attrs["_ARRAY_DIMENSIONS"] = ["filters"]
# array.attrs["_ARRAY_DIMENSIONS"] = dimension_names
array = root.array(
"filter_description",
data=[filt.description for filt in filters],
shape=len(filters),
dtype="str",
compressor=DEFAULT_ZARR_COMPRESSOR,
dtype="T",
# compressor=DEFAULT_ZARR_COMPRESSOR,
compressor="auto", # TODO: shouldn't need to set this
compressors=default_zarr_compressors("T"),
dimension_names=dimension_names,
)
array.attrs["_ARRAY_DIMENSIONS"] = ["filters"]
# array.attrs["_ARRAY_DIMENSIONS"] = dimension_names

def init_array(self, root, schema, array_spec, variants_dim_size):
kwargs = dict(zarr_utils.ZARR_FORMAT_KWARGS)
Expand All @@ -713,6 +757,12 @@ def init_array(self, root, schema, array_spec, variants_dim_size):
else:
kwargs["object_codec"] = numcodecs.VLenUTF8()

codecs = (
array_spec.codecs
if array_spec.codecs is not None
else default_zarr_codecs(array_spec.dtype)
)

if not zarr_utils.zarr_v3():
kwargs["dimension_separator"] = self.metadata.dimension_separator

Expand All @@ -724,15 +774,17 @@ def init_array(self, root, schema, array_spec, variants_dim_size):
shape=shape,
chunks=schema.get_chunks(array_spec.dimensions),
dtype=array_spec.dtype,
compressor=compressor,
filters=filters,
# compressor=compressor,
# filters=filters,
codecs=codecs,
dimension_names=array_spec.dimensions,
**kwargs,
)
a.attrs.update(
{
"description": array_spec.description,
# Dimension names are part of the spec in Zarr v3
"_ARRAY_DIMENSIONS": array_spec.dimensions,
# "_ARRAY_DIMENSIONS": array_spec.dimensions,
}
)
logger.debug(f"Initialised {a}")
Expand Down Expand Up @@ -977,11 +1029,18 @@ def finalise_array(self, name):
if not src.exists():
# Needs test
raise ValueError(f"Partition {partition} of {name} does not exist")
dest = self.arrays_path / name
# This is Zarr v2 specific. Chunks in v3 with start with "c" prefix.
chunk_files = [
path for path in src.iterdir() if not path.name.startswith(".")
]
dest = self.arrays_path / name / "c"
dest.mkdir(exist_ok=True)
src_chunks = src / "c"
if not src_chunks.exists():
chunk_files = []
else:
chunk_files = [
path
for path in src_chunks.iterdir()
if not path.name.startswith(".")
]
# TODO check for a count of then number of files. If we require a
# dimension_separator of "/" then we could make stronger assertions
# here, as we'd always have num_variant_chunks
Expand Down Expand Up @@ -1108,7 +1167,7 @@ def encode_all_partitions(

class VcfZarr:
def __init__(self, path):
if not (path / ".zmetadata").exists():
if not (path / ".zattrs").exists() and not (path / "zarr.json").exists():
raise ValueError("Not in VcfZarr format") # NEEDS TEST
self.path = path
self.root = zarr.open(path, mode="r")
Expand All @@ -1129,8 +1188,8 @@ def summary_table(self):
"avg_chunk_stored": core.display_size(int(stored / array.nchunks)),
"shape": str(array.shape),
"chunk_shape": str(array.chunks),
"compressor": str(array.compressor),
"filters": str(array.filters),
# "compressor": str(array.compressor),
# "filters": str(array.filters),
}
data.append(d)
return data
Expand Down Expand Up @@ -1192,20 +1251,22 @@ def create_index(self):
kwargs = {}
if not zarr_utils.zarr_v3():
kwargs["dimension_separator"] = "/"
dimension_names = [
"region_index_values",
"region_index_fields",
]
array = root.array(
"region_index",
data=index,
shape=index.shape,
chunks=index.shape,
dtype=index.dtype,
compressor=numcodecs.Blosc("zstd", clevel=9, shuffle=0),
# compressor=numcodecs.Blosc("zstd", clevel=9, shuffle=0),
fill_value=None,
dimension_names=dimension_names,
**kwargs,
)
array.attrs["_ARRAY_DIMENSIONS"] = [
"region_index_values",
"region_index_fields",
]
# array.attrs["_ARRAY_DIMENSIONS"] = dimension_names

logger.info("Consolidating Zarr metadata")
zarr.consolidate_metadata(self.path)
2 changes: 1 addition & 1 deletion bio2zarr/zarr_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def zarr_v3() -> bool:

if zarr_v3():
# Use zarr format v2 even when running with zarr-python v3
ZARR_FORMAT_KWARGS = dict(zarr_format=2)
ZARR_FORMAT_KWARGS = dict(zarr_format=3)
else:
ZARR_FORMAT_KWARGS = dict()

Expand Down
Loading
Loading