[med-svn] [Git][med-team/heudiconv][upstream] New upstream version 1.3.2
Yaroslav Halchenko (@yoh)
gitlab at salsa.debian.org
Wed Nov 13 21:06:25 GMT 2024
Yaroslav Halchenko pushed to branch upstream at Debian Med / heudiconv
Commits:
8a063dd1 by Yaroslav Halchenko at 2024-11-13T13:24:59-05:00
New upstream version 1.3.2
- - - - -
17 changed files:
- PKG-INFO
- README.rst
- heudiconv.egg-info/PKG-INFO
- heudiconv/_version.py
- heudiconv/bids.py
- heudiconv/convert.py
- heudiconv/dicoms.py
- heudiconv/external/dlad.py
- heudiconv/heuristics/reproin.py
- heudiconv/info.py
- heudiconv/parser.py
- heudiconv/tests/test_bids.py
- heudiconv/tests/test_dicoms.py
- heudiconv/tests/test_regression.py
- heudiconv/tests/test_utils.py
- heudiconv/tests/utils.py
- heudiconv/utils.py
Changes:
=====================================
PKG-INFO
=====================================
@@ -1,19 +1,19 @@
Metadata-Version: 2.1
Name: heudiconv
-Version: 1.1.6
+Version: 1.3.2
Summary: Heuristic DICOM Converter
Author: HeuDiConv team and contributors
License: Apache 2.0
Classifier: Environment :: Console
Classifier: Intended Audience :: Science/Research
Classifier: License :: OSI Approved :: Apache Software License
-Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
+Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Scientific/Engineering
Classifier: Typing :: Typed
-Requires-Python: >=3.8
+Requires-Python: >=3.9
License-File: LICENSE
Requires-Dist: dcmstack>=0.8
Requires-Dist: etelemetry
=====================================
README.rst
=====================================
@@ -4,6 +4,10 @@
`a heuristic-centric DICOM converter`
+.. image:: https://joss.theoj.org/papers/10.21105/joss.05839/status.svg
+ :target: https://doi.org/10.21105/joss.05839
+ :alt: JOSS Paper
+
.. image:: https://img.shields.io/badge/docker-nipy/heudiconv:latest-brightgreen.svg?logo=docker&style=flat
:target: https://hub.docker.com/r/nipy/heudiconv/tags/
:alt: Our Docker image
@@ -36,6 +40,10 @@
:target: https://repology.org/project/python:heudiconv/versions
:alt: PyPI
+.. image:: https://img.shields.io/badge/RRID-SCR__017427-blue
+ :target: https://identifiers.org/RRID:SCR_017427
+ :alt: RRID
+
About
-----
=====================================
heudiconv.egg-info/PKG-INFO
=====================================
@@ -1,19 +1,19 @@
Metadata-Version: 2.1
Name: heudiconv
-Version: 1.1.6
+Version: 1.3.2
Summary: Heuristic DICOM Converter
Author: HeuDiConv team and contributors
License: Apache 2.0
Classifier: Environment :: Console
Classifier: Intended Audience :: Science/Research
Classifier: License :: OSI Approved :: Apache Software License
-Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
+Classifier: Programming Language :: Python :: 3.12
Classifier: Topic :: Scientific/Engineering
Classifier: Typing :: Typed
-Requires-Python: >=3.8
+Requires-Python: >=3.9
License-File: LICENSE
Requires-Dist: dcmstack>=0.8
Requires-Dist: etelemetry
=====================================
heudiconv/_version.py
=====================================
@@ -1 +1 @@
-__version__ = "1.1.6"
+__version__ = "1.3.2"
=====================================
heudiconv/bids.py
=====================================
@@ -31,7 +31,7 @@ from .utils import (
remove_suffix,
save_json,
set_readonly,
- strptime_micr,
+ strptime_bids,
update_json,
)
@@ -77,6 +77,7 @@ AllowedFmapParameterMatching = [
"ImagingVolume",
"ModalityAcquisitionLabel",
"CustomAcquisitionLabel",
+ "PlainAcquisitionLabel",
"Force",
]
# Key info returned by get_key_info_for_fmap_assignment when
@@ -755,6 +756,10 @@ def get_key_info_for_fmap_assignment(
custom_label = BIDSFile.parse(op.basename(json_file))["acq"]
# Get the custom acquisition label, acq_label is None if no custom field found
key_info = [custom_label]
+ elif matching_parameter == "PlainAcquisitionLabel":
+ # always base the decision on <acq> label
+ plain_label = BIDSFile.parse(op.basename(json_file))["acq"]
+ key_info = [plain_label]
elif matching_parameter == "Force":
# We want to force the matching, so just return some string
# regardless of the image
@@ -947,18 +952,16 @@ def select_fmap_from_compatible_groups(
k for k, v in acq_times_fmaps.items() if v == first_acq_time
][0]
elif criterion == "Closest":
- json_acq_time = strptime_micr(
+ json_acq_time = strptime_bids(
acq_times[
# remove session folder and '.json', add '.nii.gz':
remove_suffix(remove_prefix(json_file, sess_folder + op.sep), ".json")
+ ".nii.gz"
- ],
- "%Y-%m-%dT%H:%M:%S[.%f]",
+ ]
)
# differences in acquisition time (abs value):
diff_fmaps_acq_times = {
- k: abs(strptime_micr(v, "%Y-%m-%dT%H:%M:%S[.%f]") - json_acq_time)
- for k, v in acq_times_fmaps.items()
+ k: abs(strptime_bids(v) - json_acq_time) for k, v in acq_times_fmaps.items()
}
min_diff_acq_times = sorted(diff_fmaps_acq_times.values())[0]
selected_fmap_key = [
=====================================
heudiconv/convert.py
=====================================
@@ -562,6 +562,15 @@ def convert(
for item in items:
prefix, outtypes, item_dicoms = item
+ if isinstance(outtypes, str): # type: ignore[unreachable]
+ lgr.warning( # type: ignore[unreachable]
+ "Provided output types %r of type 'str' instead "
+ "of a tuple for prefix %r. Likely need to fix-up your heuristic. "
+ "Meanwhile we are 'manually' converting to 'tuple'",
+ outtypes,
+ prefix,
+ )
+ outtypes = (outtypes,)
prefix_dirname = op.dirname(prefix)
outname_bids = prefix + ".json"
bids_outfiles = []
=====================================
heudiconv/dicoms.py
=====================================
@@ -32,7 +32,8 @@ from .utils import (
get_typed_attr,
load_json,
set_readonly,
- strptime_micr,
+ strptime_dcm_da_tm,
+ strptime_dcm_dt,
)
if TYPE_CHECKING:
@@ -46,6 +47,11 @@ with warnings.catch_warnings():
# suppress warning
import nibabel.nicom.dicomwrappers as dw
+# TODO: remove the kludge whenever
+# https://github.com/moloney/dcmstack/pull/90 is merged and released
+if not hasattr(dcm, "read_file"):
+ dcm.read_file = dcm.dcmread
+
lgr = logging.getLogger(__name__)
total_files = 0
# Might be monkey patched by user heuristic to tune desired compression level.
@@ -87,13 +93,21 @@ def create_seqinfo(
image_type = get_typed_attr(dcminfo, "ImageType", tuple, ())
is_moco = "MOCO" in image_type
series_desc = get_typed_attr(dcminfo, "SeriesDescription", str, "")
+ protocol_name = get_typed_attr(dcminfo, "ProtocolName", str, "")
- if dcminfo.get([0x18, 0x24]):
- # GE and Philips
- sequence_name = dcminfo[0x18, 0x24].value
- elif dcminfo.get([0x19, 0x109C]):
- # Siemens
- sequence_name = dcminfo[0x19, 0x109C].value
+ for k, m in (
+ ([0x18, 0x24], "GE and Philips"),
+ ([0x19, 0x109C], "Siemens"),
+ ([0x18, 0x9005], "Siemens XA"),
+ ):
+ if v := dcminfo.get(k):
+ sequence_name = v.value
+ lgr.debug(
+ "Identified sequence name as %s coming from the %r family of MR scanners",
+ sequence_name,
+ m,
+ )
+ break
else:
sequence_name = ""
@@ -128,7 +142,7 @@ def create_seqinfo(
dim4=size[3],
TR=TR,
TE=TE,
- protocol_name=dcminfo.ProtocolName,
+ protocol_name=protocol_name,
is_motion_corrected=is_moco,
is_derived="derived" in [x.lower() for x in image_type],
patient_id=dcminfo.get("PatientID"),
@@ -526,19 +540,16 @@ def get_datetime_from_dcm(dcm_data: dcm.FileDataset) -> Optional[datetime.dateti
3. SeriesDate & SeriesTime (0008,0021); (0008,0031)
"""
- acq_date = dcm_data.get("AcquisitionDate", "").strip()
- acq_time = dcm_data.get("AcquisitionTime", "").strip()
- if acq_date and acq_time:
- return strptime_micr(acq_date + acq_time, "%Y%m%d%H%M%S[.%f]")
-
- acq_dt = dcm_data.get("AcquisitionDateTime", "").strip()
- if acq_dt:
- return strptime_micr(acq_dt, "%Y%m%d%H%M%S[.%f]")
-
- series_date = dcm_data.get("SeriesDate", "").strip()
- series_time = dcm_data.get("SeriesTime", "").strip()
- if series_date and series_time:
- return strptime_micr(series_date + series_time, "%Y%m%d%H%M%S[.%f]")
+
+ def check_tag(x: str) -> bool:
+ return x in dcm_data and dcm_data[x].value.strip()
+
+ if check_tag("AcquisitionDate") and check_tag("AcquisitionTime"):
+ return strptime_dcm_da_tm(dcm_data, "AcquisitionDate", "AcquisitionTime")
+ if check_tag("AcquisitionDateTime"):
+ return strptime_dcm_dt(dcm_data, "AcquisitionDateTime")
+ if check_tag("SeriesDate") and check_tag("SeriesTime"):
+ return strptime_dcm_da_tm(dcm_data, "SeriesDate", "SeriesTime")
return None
=====================================
heudiconv/external/dlad.py
=====================================
@@ -156,12 +156,12 @@ def add_to_datalad(
# Provide metadata for sensitive information
sensitive_patterns = [
- "sourcedata",
+ "sourcedata/**",
"*_scans.tsv", # top level
"*/*_scans.tsv", # within subj
"*/*/*_scans.tsv", # within sess/subj
- "*/anat", # within subj
- "*/*/anat", # within ses/subj
+ "*/anat/*", # within subj
+ "*/*/anat/*", # within ses/subj
]
for sp in sensitive_patterns:
mark_sensitive(ds, sp, annexed_files)
=====================================
heudiconv/heuristics/reproin.py
=====================================
@@ -280,8 +280,8 @@ def fix_canceled_runs(seqinfo: list[SeqInfo]) -> list[SeqInfo]:
"""Function that adds cancelme_ to known bad runs which were forgotten"""
if not fix_accession2run:
return seqinfo # nothing to do
- for i, s in enumerate(seqinfo):
- accession_number = s.accession_number
+ for i, curr_seqinfo in enumerate(seqinfo):
+ accession_number = curr_seqinfo.accession_number
if accession_number and accession_number in fix_accession2run:
lgr.info(
"Considering some runs possibly marked to be "
@@ -292,12 +292,12 @@ def fix_canceled_runs(seqinfo: list[SeqInfo]) -> list[SeqInfo]:
# a single accession, but left as is for now
badruns = fix_accession2run[accession_number]
badruns_pattern = "|".join(badruns)
- if re.match(badruns_pattern, s.series_id):
- lgr.info("Fixing bad run {0}".format(s.series_id))
+ if re.match(badruns_pattern, curr_seqinfo.series_id):
+ lgr.info("Fixing bad run {0}".format(curr_seqinfo.series_id))
fixedkwargs = dict()
for key in series_spec_fields:
- fixedkwargs[key] = "cancelme_" + getattr(s, key)
- seqinfo[i] = s._replace(**fixedkwargs)
+ fixedkwargs[key] = "cancelme_" + getattr(curr_seqinfo, key)
+ seqinfo[i] = curr_seqinfo._replace(**fixedkwargs)
return seqinfo
@@ -341,11 +341,11 @@ def _apply_substitutions(
seqinfo: list[SeqInfo], substitutions: list[tuple[str, str]], subs_scope: str
) -> None:
lgr.info("Considering %s substitutions", subs_scope)
- for i, s in enumerate(seqinfo):
+ for i, curr_seqinfo in enumerate(seqinfo):
fixed_kwargs = dict()
# need to replace both protocol_name series_description
for key in series_spec_fields:
- oldvalue = value = getattr(s, key)
+ oldvalue = value = getattr(curr_seqinfo, key)
# replace all I need to replace
for substring, replacement in substitutions:
value = re.sub(substring, replacement, value)
@@ -353,7 +353,7 @@ def _apply_substitutions(
lgr.info(" %s: %r -> %r", key, oldvalue, value)
fixed_kwargs[key] = value
# namedtuples are immutable
- seqinfo[i] = s._replace(**fixed_kwargs)
+ seqinfo[i] = curr_seqinfo._replace(**fixed_kwargs)
def fix_seqinfo(seqinfo: list[SeqInfo]) -> list[SeqInfo]:
@@ -402,32 +402,34 @@ def infotodict(
run_label: Optional[str] = None # run-
dcm_image_iod_spec: Optional[str] = None
skip_derived = False
- for s in seqinfo:
+ for curr_seqinfo in seqinfo:
# XXX: skip derived sequences, we don't store them to avoid polluting
# the directory, unless it is the motion corrected ones
# (will get _rec-moco suffix)
- if skip_derived and s.is_derived and not s.is_motion_corrected:
- skipped.append(s.series_id)
- lgr.debug("Ignoring derived data %s", s.series_id)
+ if skip_derived and curr_seqinfo.is_derived and not curr_seqinfo.is_motion_corrected:
+ skipped.append(curr_seqinfo.series_id)
+ lgr.debug("Ignoring derived data %s", curr_seqinfo.series_id)
continue
# possibly apply present formatting in the series_description or protocol name
for f in "series_description", "protocol_name":
- s = s._replace(**{f: getattr(s, f).format(**s._asdict())})
+ curr_seqinfo = curr_seqinfo._replace(
+ **{f: getattr(curr_seqinfo, f).format(**curr_seqinfo._asdict())}
+ )
template = None
suffix = ""
# seq = []
- # figure out type of image from s.image_info -- just for checking ATM
+ # figure out type of image from curr_seqinfo.image_info -- just for checking ATM
# since we primarily rely on encoded in the protocol name information
prev_dcm_image_iod_spec = dcm_image_iod_spec
- if len(s.image_type) > 2:
+ if len(curr_seqinfo.image_type) > 2:
# https://dicom.innolitics.com/ciods/cr-image/general-image/00080008
# 0 - ORIGINAL/DERIVED
# 1 - PRIMARY/SECONDARY
# 3 - Image IOD specific specialization (optional)
- dcm_image_iod_spec = s.image_type[2]
+ dcm_image_iod_spec = curr_seqinfo.image_type[2]
image_type_datatype = {
# Note: P and M are too generic to make a decision here, could be
# for different datatypes (bold, fmap, etc)
@@ -443,7 +445,7 @@ def infotodict(
series_info = {} # For please lintian and its friends
for sfield in series_spec_fields:
- svalue = getattr(s, sfield)
+ svalue = getattr(curr_seqinfo, sfield)
series_info = parse_series_spec(svalue)
if series_info: # looks like a valid spec - we are done
series_spec = svalue
@@ -454,10 +456,10 @@ def infotodict(
if not series_info:
series_spec = None # we cannot know better
lgr.warning(
- "Could not determine the series name by looking at " "%s fields",
+ "Could not determine the series name by looking at %s fields",
", ".join(series_spec_fields),
)
- skipped_unknown.append(s.series_id)
+ skipped_unknown.append(curr_seqinfo.series_id)
continue
if dcm_image_iod_spec and dcm_image_iod_spec.startswith("MIP"):
@@ -476,14 +478,14 @@ def infotodict(
series_spec,
)
- # if s.is_derived:
+ # if curr_seqinfo.is_derived:
# # Let's for now stash those close to original images
# # TODO: we might want a separate tree for all of this!?
# # so more of a parameter to the create_key
# #datatype += '/derivative'
# # just keep it lower case and without special characters
# # XXXX what for???
- # #seq.append(s.series_description.lower())
+ # #seq.append(curr_seqinfo.series_description.lower())
# prefix = os.path.join('derivatives', 'scanner')
# else:
# prefix = ''
@@ -493,14 +495,14 @@ def infotodict(
# Figure out the datatype_suffix (BIDS _suffix)
#
# If none was provided -- let's deduce it from the information we find:
- # analyze s.protocol_name (series_id is based on it) for full name mapping etc
+ # analyze curr_seqinfo.protocol_name (series_id is based on it) for full name mapping etc
if not datatype_suffix:
if datatype == "func":
if "_pace_" in series_spec:
datatype_suffix = "pace" # or should it be part of seq-
- elif "P" in s.image_type:
+ elif "P" in curr_seqinfo.image_type:
datatype_suffix = "phase"
- elif "M" in s.image_type:
+ elif "M" in curr_seqinfo.image_type:
datatype_suffix = "bold"
else:
# assume bold by default
@@ -526,7 +528,7 @@ def infotodict(
# since they are complementary files produced along-side with original
# ones.
#
- if s.series_description.endswith("_SBRef"):
+ if curr_seqinfo.series_description.endswith("_SBRef"):
datatype_suffix = "sbref"
if not datatype_suffix:
@@ -550,7 +552,7 @@ def infotodict(
# XXX if we have a known earlier study, we need to always
# increase the run counter for phasediff because magnitudes
# were not acquired
- if get_study_hash([s]) == "9d148e2a05f782273f6343507733309d":
+ if get_study_hash([curr_seqinfo]) == "9d148e2a05f782273f6343507733309d":
current_run += 1
else:
raise RuntimeError(
@@ -583,10 +585,10 @@ def infotodict(
run_label = None
# yoh: had a wrong assumption
- # if s.is_motion_corrected:
- # assert s.is_derived, "Motion corrected images must be 'derived'"
+ # if curr_seqinfo.is_motion_corrected:
+ # assert curr_seqinfo.is_derived, "Motion corrected images must be 'derived'"
- if s.is_motion_corrected and "rec-" in series_info.get("bids", ""):
+ if curr_seqinfo.is_motion_corrected and "rec-" in series_info.get("bids", ""):
raise NotImplementedError(
"want to add _rec-moco but there is _rec- already"
)
@@ -611,7 +613,7 @@ def infotodict(
from_series_info("acq"),
# But we want to add an indicator in case it was motion corrected
# in the magnet. ref sample /2017/01/03/qa
- None if not s.is_motion_corrected else "rec-moco",
+ None if not curr_seqinfo.is_motion_corrected else "rec-moco",
from_series_info("dir"),
series_info.get("bids"),
run_label,
@@ -621,7 +623,7 @@ def infotodict(
suffix = "_".join(filter(bool, filename_suffix_parts)) # type: ignore[arg-type]
# # .series_description in case of
- # sdesc = s.study_description
+ # sdesc = curr_seqinfo.study_description
# # temporary aliases for those phantoms which we already collected
# # so we rename them into this
# #MAPPING
@@ -638,13 +640,16 @@ def infotodict(
# https://github.com/nipy/heudiconv/issues/145
outtype: tuple[str, ...]
if (
- "_Scout" in s.series_description
+ "_Scout" in curr_seqinfo.series_description
or (
datatype == "anat"
and datatype_suffix
and datatype_suffix.startswith("scout")
)
- or (s.series_description.lower() == s.protocol_name.lower() + "_setter")
+ or (
+ curr_seqinfo.series_description.lower()
+ == curr_seqinfo.protocol_name.lower() + "_setter"
+ )
):
outtype = ("dicom",)
else:
@@ -654,7 +659,7 @@ def infotodict(
# we wanted ordered dict for consistent demarcation of dups
if template not in info:
info[template] = []
- info[template].append(s.series_id)
+ info[template].append(curr_seqinfo.series_id)
if skipped:
lgr.info("Skipped %d sequences: %s" % (len(skipped), skipped))
@@ -762,7 +767,7 @@ def infotoids(seqinfos: Iterable[SeqInfo], outdir: str) -> dict[str, Optional[st
# So -- use `outdir` and locator etc to see if for a given locator/subject
# and possible ses+ in the sequence names, so we would provide a sequence
- # So might need to go through parse_series_spec(s.protocol_name)
+ # So might need to go through parse_series_spec(curr_seqinfo.protocol_name)
# to figure out presence of sessions.
ses_markers: list[str] = []
=====================================
heudiconv/info.py
=====================================
@@ -11,15 +11,15 @@ CLASSIFIERS = [
"Environment :: Console",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: Apache Software License",
- "Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
"Topic :: Scientific/Engineering",
"Typing :: Typed",
]
-PYTHON_REQUIRES = ">=3.8"
+PYTHON_REQUIRES = ">=3.9"
REQUIRES = [
# not usable in some use cases since might be just a downloader, not binary
=====================================
heudiconv/parser.py
=====================================
@@ -9,6 +9,7 @@ import os
import os.path as op
import re
import shutil
+import sys
from types import ModuleType
from typing import Optional
@@ -22,7 +23,18 @@ atexit.register(tempdirs.cleanup)
_VCS_REGEX = r"%s\.(?:git|gitattributes|svn|bzr|hg)(?:%s|$)" % (op.sep, op.sep)
-_UNPACK_FORMATS = tuple(sum((x[1] for x in shutil.get_unpack_formats()), []))
+
+def _get_unpack_formats() -> dict[str, bool]:
+ """For each extension return if it is a tar"""
+ out = {}
+ for _, exts, d in shutil.get_unpack_formats():
+ for e in exts:
+ out[e] = bool(re.search(r"\btar\b", d.lower()))
+ return out
+
+
+_UNPACK_FORMATS = _get_unpack_formats()
+_TAR_UNPACK_FORMATS = tuple(k for k, is_tar in _UNPACK_FORMATS.items() if is_tar)
@docstring_parameter(_VCS_REGEX)
@@ -114,7 +126,7 @@ def get_extracted_dicoms(fl: Iterable[str]) -> ItemsView[Optional[str], list[str
# needs sorting to keep the generated "session" label deterministic
for _, t in enumerate(sorted(fl)):
- if not t.endswith(_UNPACK_FORMATS):
+ if not t.endswith(tuple(_UNPACK_FORMATS)):
sessions[None].append(t)
continue
@@ -127,7 +139,14 @@ def get_extracted_dicoms(fl: Iterable[str]) -> ItemsView[Optional[str], list[str
# check content and sanitize permission bits before extraction
os.chmod(tmpdir, mode=0o700)
- shutil.unpack_archive(t, extract_dir=tmpdir)
+ # For tar (only!) starting with 3.12 we should provide filter
+ # (enforced in 3.14) on how to filter/safe-guard filenames.
+ kws: dict[str, str] = {}
+ if sys.version_info >= (3, 12) and t.endswith(_TAR_UNPACK_FORMATS):
+ # Allow for a user-workaround if would be desired
+ # see e.g. https://docs.python.org/3.12/library/tarfile.html#extraction-filters
+ kws["filter"] = os.environ.get("HEUDICONV_TAR_FILTER", "tar")
+ shutil.unpack_archive(t, extract_dir=tmpdir, **kws) # type: ignore[arg-type]
archive_content = list(find_files(regex=".*", topdir=tmpdir))
@@ -247,7 +266,7 @@ def get_study_sessions(
"`infotoids` to heuristic file or "
"provide `--subjects` option"
)
- lgr.warning(
+ lgr.info(
"Heuristic is missing an `infotoids` method, assigning "
"empty method and using provided subject id %s. "
"Provide `session` and `locator` fields for best results.",
=====================================
heudiconv/tests/test_bids.py
=====================================
@@ -179,6 +179,24 @@ def test_get_key_info_for_fmap_assignment(
json_name, matching_parameter="CustomAcquisitionLabel"
)
+ # 7) matching_parameter = 'PlainAcquisitionLabel'
+ A_LABEL = gen_rand_label(label_size, label_seed)
+ for d in ["fmap", "func", "dwi", "anat"]:
+ (tmp_path / d).mkdir(parents=True, exist_ok=True)
+
+ for dirname, fname, expected_key_info in [
+ ("fmap", f"sub-foo_acq-{A_LABEL}_epi.json", A_LABEL),
+ ("func", f"sub-foo_task-foo_acq-{A_LABEL}_bold.json", A_LABEL),
+ ("func", f"sub-foo_task-bar_acq-{A_LABEL}_bold.json", A_LABEL),
+ ("dwi", f"sub-foo_acq-{A_LABEL}_dwi.json", A_LABEL),
+ ("anat", f"sub-foo_acq-{A_LABEL}_T1w.json", A_LABEL),
+ ]:
+ json_name = op.join(tmp_path, dirname, fname)
+ save_json(json_name, {SHIM_KEY: A_SHIM})
+ assert [expected_key_info] == get_key_info_for_fmap_assignment(
+ json_name, matching_parameter="PlainAcquisitionLabel"
+ )
+
# Finally: invalid matching_parameters:
assert (
get_key_info_for_fmap_assignment(json_name, matching_parameter="Invalid") == []
=====================================
heudiconv/tests/test_dicoms.py
=====================================
@@ -12,6 +12,8 @@ import pytest
from heudiconv.cli.run import main as runner
from heudiconv.convert import nipype_convert
from heudiconv.dicoms import (
+ create_seqinfo,
+ dw,
embed_dicom_and_nifti_metadata,
get_datetime_from_dcm,
get_reproducible_int,
@@ -19,7 +21,7 @@ from heudiconv.dicoms import (
parse_private_csa_header,
)
-from .utils import TESTS_DATA_PATH
+from .utils import TEST_DICOM_PATHS, TESTS_DATA_PATH
# Public: Private DICOM tags
DICOM_FIELDS_TO_TEST = {"ProtocolName": "tProtocolName"}
@@ -178,9 +180,17 @@ def test_get_datetime_from_dcm_wo_dt() -> None:
assert get_datetime_from_dcm(XA30_enhanced_dcm) is None
-def test_get_reproducible_int() -> None:
- dcmfile = op.join(TESTS_DATA_PATH, "phantom.dcm")
+ at pytest.mark.parametrize("dcmfile", TEST_DICOM_PATHS)
+def test_create_seqinfo(
+ dcmfile: str,
+) -> None:
+ mw = dw.wrapper_from_file(dcmfile)
+ seqinfo = create_seqinfo(mw, [dcmfile], op.basename(dcmfile))
+ assert seqinfo.sequence_name
+
+ at pytest.mark.parametrize("dcmfile", TEST_DICOM_PATHS)
+def test_get_reproducible_int(dcmfile: str) -> None:
assert type(get_reproducible_int([dcmfile])) is int
=====================================
heudiconv/tests/test_regression.py
=====================================
@@ -55,6 +55,7 @@ def test_conversion(
heuristic,
anon_cmd,
template="sourcedata/sub-{subject}/*/*/*.tgz",
+ xargs=["--datalad"],
)
runner(args) # run conversion
@@ -96,6 +97,18 @@ def test_conversion(
for key in keys:
assert orig[key] == conv[key]
+ # validate sensitive marking
+ from datalad.api import Dataset
+
+ ds = Dataset(outdir)
+ all_meta = dict(ds.repo.get_metadata("."))
+ target_rec = {"distribution-restrictions": ["sensitive"]}
+ for pth, meta in all_meta.items():
+ if "anat" in pth or "scans.tsv" in pth:
+ assert meta == target_rec
+ else:
+ assert meta == {}
+
@pytest.mark.skipif(not have_datalad, reason="no datalad")
def test_multiecho(
=====================================
heudiconv/tests/test_utils.py
=====================================
@@ -9,6 +9,7 @@ from pathlib import Path
from typing import IO, Any
from unittest.mock import patch
+import pydicom as dcm
import pytest
from heudiconv.utils import (
@@ -22,6 +23,9 @@ from heudiconv.utils import (
remove_prefix,
remove_suffix,
save_json,
+ strptime_bids,
+ strptime_dcm_da_tm,
+ strptime_dcm_dt,
strptime_micr,
update_json,
)
@@ -178,14 +182,98 @@ def test_get_datetime() -> None:
],
)
def test_strptime_micr(dt: str, fmt: str) -> None:
+ with pytest.warns(DeprecationWarning):
+ target = datetime.strptime(dt, fmt)
+ assert strptime_micr(dt, fmt) == target
+ assert strptime_micr(dt, fmt + "[.%f]") == target
+ assert strptime_micr(dt + ".0", fmt + "[.%f]") == target
+ assert strptime_micr(dt + ".000000", fmt + "[.%f]") == target
+ assert strptime_micr(dt + ".1", fmt + "[.%f]") == datetime.strptime(
+ dt + ".1", fmt + ".%f"
+ )
+
+
+ at pytest.mark.parametrize(
+ "dt, fmt",
+ [
+ ("2023-04-02T11:47:09", "%Y-%m-%dT%H:%M:%S"),
+ ("2023-04-02T11:47:09.0", "%Y-%m-%dT%H:%M:%S.%f"),
+ ("2023-04-02T11:47:09.000000", "%Y-%m-%dT%H:%M:%S.%f"),
+ ("2023-04-02T11:47:09.1", "%Y-%m-%dT%H:%M:%S.%f"),
+ ("2023-04-02T11:47:09-0900", "%Y-%m-%dT%H:%M:%S%z"),
+ ("2023-04-02T11:47:09.1-0900", "%Y-%m-%dT%H:%M:%S.%f%z"),
+ ],
+)
+def test_strptime_bids(dt: str, fmt: str) -> None:
target = datetime.strptime(dt, fmt)
- assert strptime_micr(dt, fmt) == target
- assert strptime_micr(dt, fmt + "[.%f]") == target
- assert strptime_micr(dt + ".0", fmt + "[.%f]") == target
- assert strptime_micr(dt + ".000000", fmt + "[.%f]") == target
- assert strptime_micr(dt + ".1", fmt + "[.%f]") == datetime.strptime(
- dt + ".1", fmt + ".%f"
- )
+ assert strptime_bids(dt) == target
+
+
+ at pytest.mark.parametrize(
+ "tm, tm_fmt",
+ [
+ ("114709.1", "%H%M%S.%f"),
+ ("114709", "%H%M%S"),
+ ("1147", "%H%M"),
+ ("11", "%H"),
+ ],
+)
+ at pytest.mark.parametrize(
+ "offset, offset_fmt",
+ [
+ ("-0900", "%z"),
+ ("", ""),
+ ],
+)
+def test_strptime_dcm_da_tm(tm: str, tm_fmt: str, offset: str, offset_fmt: str) -> None:
+ da = "20230402"
+ da_fmt = "%Y%m%d"
+ target = datetime.strptime(da + tm + offset, da_fmt + tm_fmt + offset_fmt)
+ ds = dcm.dataset.Dataset()
+ ds["AcquisitionDate"] = dcm.DataElement("AcquisitionDate", "DA", da)
+ ds["AcquisitionTime"] = dcm.DataElement("AcquisitionTime", "TM", tm)
+ if offset:
+ ds[(0x0008, 0x0201)] = dcm.DataElement((0x0008, 0x0201), "SH", offset)
+ assert strptime_dcm_da_tm(ds, "AcquisitionDate", "AcquisitionTime") == target
+
+
+ at pytest.mark.parametrize(
+ "dt, dt_fmt",
+ [
+ ("20230402114709.1-0400", "%Y%m%d%H%M%S.%f%z"),
+ ("20230402114709-0400", "%Y%m%d%H%M%S%z"),
+ ("202304021147-0400", "%Y%m%d%H%M%z"),
+ ("2023040211-0400", "%Y%m%d%H%z"),
+ ("20230402-0400", "%Y%m%d%z"),
+ ("202304-0400", "%Y%m%z"),
+ ("2023-0400", "%Y%z"),
+ ("20230402114709.1", "%Y%m%d%H%M%S.%f"),
+ ("20230402114709", "%Y%m%d%H%M%S"),
+ ("202304021147", "%Y%m%d%H%M"),
+ ("2023040211", "%Y%m%d%H"),
+ ("20230402", "%Y%m%d"),
+ ("202304", "%Y%m"),
+ ("2023", "%Y"),
+ ],
+)
+ at pytest.mark.parametrize(
+ "offset, offset_fmt",
+ [
+ ("-0900", "%z"),
+ ("", ""),
+ ],
+)
+def test_strptime_dcm_dt(dt: str, dt_fmt: str, offset: str, offset_fmt: str) -> None:
+ target = None
+ if dt_fmt[-2:] == "%z" and offset:
+ target = datetime.strptime(dt, dt_fmt)
+ else:
+ target = datetime.strptime(dt + offset, dt_fmt + offset_fmt)
+ ds = dcm.dataset.Dataset()
+ ds["AcquisitionDateTime"] = dcm.DataElement("AcquisitionDateTime", "DT", dt)
+ if offset:
+ ds[(0x0008, 0x0201)] = dcm.DataElement((0x0008, 0x0201), "SH", offset)
+ assert strptime_dcm_dt(ds, "AcquisitionDateTime") == target
def test_remove_suffix() -> None:
=====================================
heudiconv/tests/utils.py
=====================================
@@ -1,5 +1,6 @@
from __future__ import annotations
+from glob import glob
import logging
import os.path as op
from pathlib import Path
@@ -9,6 +10,14 @@ import heudiconv.heuristics
HEURISTICS_PATH = op.join(heudiconv.heuristics.__path__[0])
TESTS_DATA_PATH = op.join(op.dirname(__file__), "data")
+# Do relative to curdir to shorten in a typical application,
+# and side-effect test that tests do not change curdir.
+TEST_DICOM_PATHS = [
+ op.relpath(x)
+ for x in glob(op.join(TESTS_DATA_PATH, "**/*.dcm"), recursive=True)
+ # exclude PhoenixDocuments
+ if "PhoenixDocument" not in x
+]
lgr = logging.getLogger(__name__)
=====================================
heudiconv/utils.py
=====================================
@@ -4,7 +4,7 @@ from __future__ import annotations
from collections.abc import Callable
from collections.abc import Mapping as MappingABC
import copy
-from datetime import datetime
+import datetime
from glob import glob
import hashlib
import json
@@ -35,6 +35,10 @@ from typing import (
cast,
overload,
)
+import warnings
+
+import pydicom as dcm
+from pydicom.tag import TagType
lgr = logging.getLogger(__name__)
@@ -662,13 +666,13 @@ def get_datetime(date: str, time: str, *, microseconds: bool = True) -> str:
# add dummy microseconds if not available for strptime to parse
time += ".000000"
td = time + ":" + date
- datetime_str = datetime.strptime(td, "%H%M%S.%f:%Y%m%d").isoformat()
+ datetime_str = datetime.datetime.strptime(td, "%H%M%S.%f:%Y%m%d").isoformat()
if not microseconds:
datetime_str = datetime_str.split(".", 1)[0]
return datetime_str
-def strptime_micr(date_string: str, fmt: str) -> datetime:
+def strptime_micr(date_string: str, fmt: str) -> datetime.datetime:
r"""
Decorate strptime while supporting optional [.%f] in the format at the end
@@ -681,12 +685,156 @@ def strptime_micr(date_string: str, fmt: str) -> datetime:
'.\d+' regex and not if it does not.
"""
+ warnings.warn(
+ "strptime_micr() is deprecated, please use strptime() instead.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
optional_micr = "[.%f]"
if fmt.endswith(optional_micr):
fmt = fmt[: -len(optional_micr)]
if re.search(r"\.\d+$", date_string):
fmt += ".%f"
- return datetime.strptime(date_string, fmt)
+ return datetime.datetime.strptime(date_string, fmt)
+
+
+def datetime_utc_offset(
+ datetime_obj: datetime.datetime, utc_offset: str
+) -> datetime.datetime:
+ """set the datetime's tzinfo by parsing an utc offset string"""
+ # https://dicom.innolitics.com/ciods/electromyogram/sop-common/00080201
+ extract_offset = re.match(r"([+\-]?)(\d{2})(\d{2})", utc_offset)
+ if extract_offset is None:
+ raise ValueError(f"utc offset {utc_offset} is not valid")
+ sign, hours, minutes = extract_offset.groups()
+ sign = -1 if sign == "-" else 1
+ hours, minutes = int(hours), int(minutes)
+ tzinfo = datetime.timezone(sign * datetime.timedelta(hours=hours, minutes=minutes))
+ return datetime_obj.replace(tzinfo=tzinfo)
+
+
+def strptime(datetime_string: str, fmts: list[str]) -> datetime.datetime:
+ """
+ Try datetime.strptime on a list of formats returning the first successful attempt.
+
+ Parameters
+ ----------
+ datetime_string: str
+ Datetime string to parse
+ fmts: list[str]
+ List of format strings
+ """
+ datetime_str = datetime_string
+ for fmt in fmts:
+ try:
+ return datetime.datetime.strptime(datetime_str, fmt)
+ except ValueError:
+ pass
+ raise ValueError(f"Unable to parse datetime string: {datetime_str}")
+
+
+def strptime_bids(datetime_string: str) -> datetime.datetime:
+ """
+ Create a datetime object from a bids datetime string.
+
+ Parameters
+ ----------
+ date_string: str
+ Datetime string to parse
+ """
+ # https://bids-specification.readthedocs.io/en/stable/common-principles.html#units
+ fmts = [
+ "%Y-%m-%dT%H:%M:%S.%f%z",
+ "%Y-%m-%dT%H:%M:%S%z",
+ "%Y-%m-%dT%H:%M:%S.%f",
+ "%Y-%m-%dT%H:%M:%S",
+ ]
+ datetime_obj = strptime(datetime_string, fmts)
+ return datetime_obj
+
+
+def strptime_dcm_da_tm(
+ dcm_data: dcm.Dataset, da_tag: TagType, tm_tag: TagType
+) -> datetime.datetime:
+ """
+ Create a datetime object from a dicom DA tag and TM tag.
+
+ Parameters
+ ----------
+ dcm_data : dcm.Dataset
+ DICOM with header, e.g., as read by pydicom.dcmread.
+ da_tag: str
+ Dicom tag with DA value representation
+ tm_tag: str
+ Dicom tag with TM value representation
+ """
+ # https://dicom.nema.org/medical/dicom/current/output/chtml/part05/sect_6.2.html
+ date_str = dcm_data[da_tag].value
+ fmts = [
+ "%Y%m%d",
+ ]
+ date = strptime(date_str, fmts)
+
+ time_str = dcm_data[tm_tag].value
+ fmts = ["%H", "%H%M", "%H%M%S", "%H%M%S.%f"]
+ time = strptime(time_str, fmts)
+
+ datetime_obj = datetime.datetime.combine(date.date(), time.time())
+
+ if utc_offset_dcm := dcm_data.get((0x0008, 0x0201)):
+ utc_offset = utc_offset_dcm.value
+ datetime_obj = (
+ datetime_utc_offset(datetime_obj, utc_offset)
+ if utc_offset
+ else datetime_obj
+ )
+ return datetime_obj
+
+
+def strptime_dcm_dt(dcm_data: dcm.Dataset, dt_tag: TagType) -> datetime.datetime:
+ """
+ Create a datetime object from a dicom DT tag.
+
+ Parameters
+ ----------
+ dcm_data : dcm.FileDataset
+ DICOM with header, e.g., as read by pydicom.dcmread.
+ Objects with __getitem__ and have those keys with values properly formatted may also work
+ da_tag: str
+ Dicom tag with DT value representation
+ """
+ # https://dicom.nema.org/medical/dicom/current/output/chtml/part05/sect_6.2.html
+ datetime_str = dcm_data[dt_tag].value
+ fmts = [
+ "%Y%z",
+ "%Y%m%z",
+ "%Y%m%d%z",
+ "%Y%m%d%H%z",
+ "%Y%m%d%H%M%z",
+ "%Y%m%d%H%M%S%z",
+ "%Y%m%d%H%M%S.%f%z",
+ "%Y",
+ "%Y%m",
+ "%Y%m%d",
+ "%Y%m%d%H",
+ "%Y%m%d%H%M",
+ "%Y%m%d%H%M%S",
+ "%Y%m%d%H%M%S.%f",
+ ]
+ datetime_obj = strptime(datetime_str, fmts)
+
+ if utc_offset_dcm := dcm_data.get((0x0008, 0x0201)):
+ if utc_offset := utc_offset_dcm.value:
+ datetime_obj2 = datetime_utc_offset(datetime_obj, utc_offset)
+ if datetime_obj.tzinfo and datetime_obj2 != datetime_obj:
+ lgr.warning(
+ "Unexpectedly previously parsed datetime %s contains zoneinfo which is different from the one obtained from DICOMs UTFOffset field: %s",
+ datetime_obj,
+ datetime_obj2,
+ )
+ else:
+ datetime_obj = datetime_obj2
+ return datetime_obj
def remove_suffix(s: str, suf: str) -> str:
View it on GitLab: https://salsa.debian.org/med-team/heudiconv/-/commit/8a063dd1a55467da732c9cbdadfbc892239d5ea7
--
View it on GitLab: https://salsa.debian.org/med-team/heudiconv/-/commit/8a063dd1a55467da732c9cbdadfbc892239d5ea7
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20241113/445fb618/attachment-0001.htm>
More information about the debian-med-commit
mailing list