import os
import json
import platform
import subprocess
import numpy as np
import nibabel as nb
from lazyfmri import utils
from fmriproc import transform
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
rb = utils.color.BOLD+utils.color.RED
gb = utils.color.BOLD+utils.color.GREEN
end = utils.color.END
opj = os.path.join
[docs]
def define_mr_parameters(pars_file=None, ups=False, is_mp2rage=True):
"""define_mr_parameters
Returns a dictionary of MR acquisition parameters for MP2RAGE or MP2RAGE-ME sequences.
Parameters can either be loaded from a JSON file or returned as a predefined default
(including Universal Pulses presets for MP2RAGE). The function performs basic validation
to ensure required keys are present.
Parameters
----------
pars_file : str or None, optional
Path to a JSON file containing sequence parameters. If None, default values are used.
ups : bool, optional
If True, use Universal Pulses (UPs) parameters when `is_mp2rage` is True and `pars_file` is not provided.
Ignored when `is_mp2rage` is False. Default is False.
is_mp2rage : bool, optional
If True, load or define parameters for an MP2RAGE sequence. If False, parameters for an
MP2RAGE-ME (multi-echo) sequence are used. Default is True.
Returns
-------
dict
Dictionary containing MR parameters, with required fields depending on the sequence type:
If `is_mp2rage` is True:
- "TR": Repetition time of MP2RAGE sequence (in seconds)
- "inv_times": List of inversion times (in seconds)
- "fa": List of flip angles for the two readouts (in degrees)
- "nZ": Number of slices
- "FLASH_tr": List of readout TRs (in seconds)
If `is_mp2rage` is False:
- "echo_times": List of echo times (in seconds)
- "inv_times": List of inversion times (in seconds)
- "TR": Repetition time of the MP2RAGE-ME sequence (in seconds)
- "fa": List of flip angles
- "nZ": Number of slices
- "FLASH_tr": List of readout TRs (in seconds)
Raises
------
ValueError
If a required parameter key is missing from the provided JSON file.
Notes
-----
This function is useful for standardized MP2RAGE or MP2RAGE-ME parameter extraction during
simulation, reconstruction, or post-processing workflows. Universal Pulses (UPs) presets can
be enabled with `ups=True` for modified acquisition protocols.
"""
if is_mp2rage:
mapper = {
"TR": "MPRAGE_tr",
"inv_times": "invtimesAB",
"fa": "flipangleABdegree",
"nZ": "nZslices",
"FLASH_tr": "FLASH_tr"
}
# set default 0.7 MP2RAGE Phillips parameters
if not isinstance(pars_file, str):
pars = {
"TR": 5.5,
"inv_times": [0.8,2.7],
"fa": [6,6],
"nZ": 200,
"FLASH_tr": [0.0062,0.0062]
}
# UPs
if ups:
print("Using parameters for Universal Pulses (UPs)")
pars = {
"TR": 6.778,
"inv_times": [0.67,3.754],
"fa": [4,4],
"nZ": 150,
"FLASH_tr": [0.0062,0.031273]
}
else:
print(f"Reading parameters from '{pars_file}'")
with open(pars_file) as f:
pars = json.load(f)
for i in list(mapper.keys()):
if i not in list(pars.keys()):
raise ValueError(f"Missing key '{i}' in {pars_file}")
pars_dict = {}
for key, val in mapper.items():
pars_dict[val] = pars[key]
return pars_dict
else:
# mp2rageme
mapper = {
"echo_times": "echo_times",
"TR": "MPRAGE_tr",
"inv_times": "invtimesAB",
"fa": "flipangleABdegree",
"nZ": "nZslices",
"FLASH_tr": "FLASH_tr"
}
if not isinstance(pars_file, str):
pars = {
"echo_times": [0.006, 0.0145, 0.023, 0.0315],
"inv_times": [0.67, 3.68],
"mp2rage_tr": 6.723,
"fa": [4,4],
"nZ": 150,
"FLASH_tr": [0.0062,0.31246]
}
else:
print(f"Reading parameters from '{pars_file}'")
with open(pars_file) as f:
pars = json.load(f)
for i in list(mapper.keys()):
if i not in list(pars.keys()):
raise ValueError(f"Missing key '{i}' in {pars_file}")
pars_dict = {}
for key, val in mapper.items():
pars_dict[val] = pars[key]
return pars_dict
[docs]
def get_minmax(file):
"""get_minmax
Compute the minimum and maximum intensity values of a neuroimaging file using FSL's ImageStats.
Parameters
----------
file: str
Path to the neuroimaging file (e.g., NIfTI format).
Returns
----------
list: A list containing two float values:
- The minimum intensity value in the image.
- The maximum intensity value in the image.
Example
----------
>>> minmax = get_minmax("path/to/image.nii.gz")
>>> print(minmax)
[0.0, 255.0]
Notes:
- Requires Nipype and FSL to be installed and configured.
- The function uses the '-R' option in ImageStats, which returns the range of intensity values.
- Ensure the input file is in a format supported by FSL, such as .nii or .nii.gz.
"""
from nipype.interfaces import fsl
stats = fsl.ImageStats(in_file=file, op_string="-R")
res = stats.run()
return res.outputs.out_stat
[docs]
def write_pymp2rage_json(file, params):
"""Write json file for pymp2rage output"""
json_file = file.replace('.nii.gz', '.json')
if not os.path.isfile(json_file):
print(" Writing "+gb+json_file+end)
with open(json_file, "w+") as f:
json.dump(params, f, indent=4)
return json_file
[docs]
def write_pymp2rage_nifti(
file,
descriptor,
obj,
input_files,
is_mp2rageme=False
):
"""write_pymp2rage_nifti
Calculate and write multiparametric maps to an output directory.
Parameters
----------
file: str
Path to the output NIfTI file.
descriptor: str
The type of map to be generated (e.g., "t1map", "r2starmap").
obj: pymp2rage.MP2RAGE or pymp2rage.MEMP2RAGE object
An object containing the computed map attributes.
input_files: list or str
List of input file paths used for computation.
is_mp2rageme: bool, optional
Indicates whether the MEMP2RAGE sequence is used. Defaults to False (MP2RAGE).
Returns
----------
dict: A dictionary containing:
- "nifti" (str): Path to the saved NIfTI file.
- "json" (str): Path to the corresponding JSON metadata file.
Notes
----------
- Uses `pymp2rage` for processing and writing multiparametric maps.
- The `descriptor` determines the algorithm and estimation method.
- Ensures output files are written only if they do not already exist.
- Writes metadata to a JSON file with details on estimation methodology and input data.
Parameters
----------
>>> output = write_pymp2rage_nifti("output.nii.gz", "t1map", obj, input_files)
>>> print(output["nifti"], output["json"])
"""
from pymp2rage import version
if os.path.isfile(file):
print(f" {os.path.basename(file)} already exists")
return
base_path = None
try:
pref = os.environ.get("SUBJECT_PREFIX", "sub-")
except Exception:
pref = "sub-"
sp = input_files[0].split(os.sep)
for i in sp:
if i.startswith(pref) and not i.endswith('.nii.gz'):
base_path = os.path.join(*sp[sp.index(i) + 1: -1])
break
if not is_mp2rageme:
ref = "Marques et al., 2010"
sequence_name = "MP2RAGE"
else:
ref = "Caan et al., 2019"
sequence_name = "MEMP2RAGE"
estimation_algorithms = {
"r2starmap": "Ordinary Least Squares in Log-space",
"t1w_uni": f"{sequence_name} unified T1-weighted image",
"t1map": f"{sequence_name} T1 map",
"t2starw": "Ordinary Least Squares in Log-space",
"t2starmap": f"{sequence_name} unified T1-weighted image",
"s0": "Ordinary Least Squares in Log-space"
}
index_ranges = {
"r2starmap": np.arange(2, 10, 2),
"t1w_uni": range(4),
"t1map": range(4),
"t2starw": np.arange(2, 10, 2),
"t2starmap": range(4),
"s0": np.arange(2, 10, 2)
}
if descriptor not in estimation_algorithms:
print(f" Unknown descriptor: {descriptor}")
return
else:
print(" Writing "+gb+file+end)
getattr(obj, descriptor).to_filename(file)
params = {
"BasedOn": [os.path.join(base_path, os.path.basename(input_files[i])) for i in index_ranges[descriptor]],
"EstimationReference": ref,
"EstimationAlgorithm": estimation_algorithms[descriptor],
"EstimationSoftwareName": "pymp2rage",
"EstimationSoftwareVer": f"{version.__version__}",
"EstimationSoftwareLang": f"python {platform.python_version()}",
"EstimationSoftwareEnv": f"{platform.platform()}"
}
json_file = write_pymp2rage_json(file, params)
return {
"nifti": file,
"json": json_file
}
[docs]
def slice_timings_to_json(
json_file,
**kwargs
):
"""slice_timings_to_json
Compute and update slice timing information in a JSON file or dictionary.
Parameters
----------
json_file: str, dict
Path to the JSON file containing metadata or a dictionary
Returns
----------
dict or None:
- If `json_file` is a dictionary, returns the updated dictionary with slice timings.
- If `json_file` is a file, updates the file in place and returns None.
Raises
----------
ValueError: If the input is neither a JSON file path nor a dictionary.
Notes
----------
- If `tr` is provided but differs from the TR in the JSON file, the function logs a warning
and takes the TR from the JSON file.
- Uses the `slice_timings` function to compute slice timings.
- If `json_file` is a string (file path), the function updates the file in place.
- If `json_file` is a dictionary, it returns the updated dictionary.
Example
----------
>>> updated_data = slice_timings_to_json("metadata.json", nr_slices=32, tr=2.0)
>>> print(updated_data["SliceTiming"])
"""
write_file = True
if isinstance(json_file, str):
with open(json_file) as f:
data = json.load(f)
elif isinstance(json_file, dict):
write_file = False
data = json_file
else:
raise ValueError(f"Input must be a json-file or dictionary, not {type(json_file)}")
# TR from json file is usually more reliable
if "RepetitionTime" in list(data.keys()):
if "tr" in list(kwargs.keys()):
tr = float(kwargs["tr"])
if data["RepetitionTime"] != tr:
utils.verbose(
f" WARNING: specified TR ({tr}) does not match TR in json file ({data['RepetitionTime']}). Taking TR from json file!",
True
)
tr = data["RepetitionTime"]
# update kwargs
kwargs = utils.update_kwargs(
kwargs,
"tr",
tr,
force=True
)
# get the list of slice timings
st = slice_timings(**kwargs)
# add slicetiming key
st_dict = {"SliceTiming": st}
data.update(st_dict)
# write file or return updated dictionary
if write_file:
with open(json_file, 'w') as f:
json.dump(data, f, indent=4)
else:
return data
[docs]
def slice_timings(
nr_slices: int,
tr: float,
mb_factor: int = 1,
order: str = "ascending",
interleaved: bool = False,
file: str = None
) -> list:
"""
Compute slice timing values for an fMRI acquisition sequence.
Parameters
----------
nr_slices: int
Total number of slices in the acquisition.
tr: float
Repetition time (TR) of the sequence in seconds.
mb_factor: int, optional
Multi-band acceleration factor. Defaults to 1.
order: str, optional
Order of slice acquisitions: 'ascending' or 'descending'.
interleaved: bool, optional
If True, use interleaved ordering within each band.
file: str, optional
Use slice timings in a specified txt file. Must match number of slices!
Returns
-------
list
A list of length `nr_slices` giving the acquisition time (in seconds)
for each slice index.
Raises
------
ValueError
If `nr_slices` or `tr` is not provided, or if `order` is invalid.
"""
if file:
if not os.path.isfile(file):
raise FileNotFoundError(f"Input timing file '{file}' not found.")
data = np.loadtxt(file)
return data.tolist()
# Validate inputs
if nr_slices is None or tr is None:
raise ValueError("Both `nr_slices` and `tr` must be provided.")
if order not in ("ascending", "descending"):
raise ValueError("`order` must be 'ascending' or 'descending'.")
if nr_slices % mb_factor != 0:
raise ValueError("`nr_slices` must be divisible by `mb_factor`.")
# Number of acquisitions per band
per_band = nr_slices // mb_factor
# Base acquisition times within one band
base_times = np.linspace(0, tr, per_band, endpoint=False)
# Determine within-band index order
if interleaved:
# Interleave step size = round(sqrt(per_band))
step_size = int(round(np.sqrt(per_band)))
band_order = []
for offset in range(step_size):
band_order.extend(range(offset, per_band, step_size))
else:
band_order = list(range(per_band))
band_order = np.asarray(band_order, dtype=int)
# Reverse if descending
if order == "descending":
band_order = band_order[::-1]
# Full slice indices
full_order = np.concatenate([
band_order + b * per_band
for b in range(mb_factor)
])
tiled_times = np.tile(base_times, mb_factor)
slice_times = np.zeros(nr_slices)
slice_times[full_order] = tiled_times
return slice_times.tolist()
def bin_fov(img, thresh=0,out=None, fsl=False):
"""bin_fov
This function returns a binarized version of the input image. If no output name was specified,
it will return the dataframe in nifti-format.
Parameters
----------
img: str
path to input image
thresh: int
threshold to use (default = 0)
out: str
path to output image (default is None, and will return the data array of the image)
fsl: bool
if you reeeally want a binary image also run fslmaths -bin.. Can only be in combination with an output image and on a linux system with FSL (default is false)
Example
---------
from fmriproc.image import bin_fov
file = bin_fov("/path/to/image.nii.gz")
bin_fov("/path/to/image.nii.gz", thresh=1, out="/path/to/image.nii.gz", fsl=True)
bin_fov("/path/to/image.nii.gz", thres=2)
"""
img_file = nb.load(img) # load file
img_data = img_file.get_fdata() # get data
empty = np.zeros_like(img_data)
empty[img_data <= thresh] = 1
img_bin_img = nb.Nifti1Image(empty, header=img_file.header, affine=img_file.affine)
if out is not None:
img_bin_img.to_filename(out)
# also run fslmaths for proper binarization
if fsl:
cmd_txt = f"fslmaths {out} -bin {out}"
utils.run_shell_wrapper(cmd_txt, verb=True)
else:
return img_bin_img
[docs]
def reorient_img(img, code="RAS", out=None, qform="orig"):
"""reorient_img
Python wrapper for fslswapdim to reorient an input image given an orientation code. Valid options are: RAS, AIL for now. You can also specify "nb" as code, which reorients the input image to nibabel's RAS+ convention. If no output name is given, it will overwrite the input image.
Parameters
----------
img: str
nifti-image to be reoriented
code: str, optional
code for new orientation (default = "RAS")
out: str, optional
string to output nifti image
qform: str,int, optional
set qform code to original (str 'orig' = default) or a specified integer
Returns
----------
str
if `out=="None"`, then `img` will receive the new coordinate code, otherwise `out` is created.
The function only operates, it doesn't actually return something that can be captured in a variable
Examples
----------
reorient_img("input.nii.gz", code="RAS", out="output.nii.gz")
reorient_img("input.nii.gz", code="AIL", qform=1)
reorient_img("input.nii.gz", code="AIL", qform='orig')
"""
if out is not None:
new = out
else:
new = img
if code.upper() != "NB":
try:
os.environ['FSLDIR']
except:
raise OSError(
f'Could not find FSLDIR variable. Are we on a linux system with FSL?')
pairs = {"L": "LR", "R": "RL", "A": "AP",
"P": "PA", "S": "SI", "I": "IS"}
orient = "{} {} {}".format(
pairs[code[0].upper()],
pairs[code[1].upper()],
pairs[code[2].upper()])
cmd_txt = "fslswapdim {} {} {}".format(img, orient, new)
utils.verbose(cmd_txt, highlight=True)
utils.run_shell_wrapper(cmd_txt)
elif code.upper() == "NB":
# use nibabel's RAS
img_nb = nb.load(img)
img_hdr = img_nb.header
if qform == "orig":
qform = img_hdr['qform_code']
ras = nb.as_closest_canonical(img_nb)
if qform != 0:
ras.header['qform_code'] = np.array([qform], dtype=np.int16)
else:
# set to 1 if original qform code = 0
ras.header['qform_code'] = np.array([1], dtype=np.int16)
ras.to_filename(new)
else:
raise ValueError(f"Code '{code}' not yet implemented")
[docs]
def create_line_from_slice(
in_file,
out_file=None,
width=16,
fold="FH",
keep_input=False,
shift=0):
"""create_line_from_slice
This creates a binary image of the outline of the line. The line's dimensions are 16 voxels of 0.25mm x 2.5 mm (slice thickness) and 0.25 mm (frequency encoding direction). We know that the middle of the line is at the center of the slice, so the entire line encompasses 8 voxels up/down from the center. This is represented by the 'width' flag, set to 16 by default
Parameters
----------
in_file: str
path to image that should be used as reference (generally this should be the 1 slice file of the first run or something)
out_file: str, optional
path specifying the output name of the newly created 'line' or 'beam' file
width: int, optional
how many voxels should we use to define the line. Remember that the center of the line is width/2, so if it's set to 16, we take 8 voxels below center and 8 voxels above center.
fold: str, optional
string denoting the type of foldover direction that was used. We can find this in the info-file in the pycortex directory and can either be *FH* (line = `LR`), or *LR* (line = `FH`)
keep_input: boolean, optional
Keep the native input of the input data rather than binarizing the input image
shift: int, optional
Sometimes the slice had to be moved in `foldover` direction to place the saturation slabs in the right position. We need to correct for this. This argument moves the line `shift` millimeter in `fold` direction. For instance, if `fold="FH"` and `shift=2`, the line will be moved up. Use negative values to move the line down.
Returns
----------
nibabel.Nifti1Image
if `out_file==None`, a niimg-object is returned
str
an actual file if the specified output name is created (*NOT RETURNED*)
Examples
----------
img = create_line_from_slice("input.nii.gz")
img
<nibabel.nifti1.Nifti1Image at 0x7f5a1de00df0>
img.to_filename('sub-001_ses-2_task-LR_run-8_bold.nii.gz')
"""
img = False
if isinstance(in_file, str):
img = True
in_img = nb.load(in_file)
in_data = in_img.get_fdata()
vox_size = in_img.header["pixdim"][1]
elif isinstance(in_file, np.ndarray):
in_data = in_file.copy()
vox_size = 0.25
if len(in_data.shape) == 4:
in_data = np.squeeze(in_data, 3)
empty_img = np.zeros_like(in_data)
upper, lower = int((empty_img.shape[0]//2)+(int(width)/2)), int((empty_img.shape[0]//2)-(int(width)/2))
# account for shift (assuming voxel size 0.25mm)
if shift != 0:
shift = int(round(shift/vox_size,0))
upper += shift
lower += shift
# print(fold.lower())
if fold.lower() == "rl" or fold.lower() == "lr":
# add axis if we're dealing with nifti-image, otherwise return 2D-array
beam = np.ones((int(width), empty_img.shape[0]))
if img:
beam = beam[...,np.newaxis]
if keep_input == False:
empty_img[lower:upper] = beam
elif keep_input:
empty_img[lower:upper] = beam*in_data[lower:upper]
elif fold.lower() == "fh" or fold.lower() == "hf":
# add axis if we're dealing with nifti-image, otherwise return 2D-array
beam = np.ones((empty_img.shape[0], int(width)))
if img:
beam = beam[...,np.newaxis]
if keep_input == False:
empty_img[:,lower:upper] = beam
elif keep_input:
empty_img[:,lower:upper] = beam*in_data[:,lower:upper ]
else:
raise NotImplementedError(f"Unknown option {fold}, probably not implemented yet..")
if img:
line = nb.Nifti1Image(empty_img, affine=in_img.affine,header=in_img.header)
if out_file is not None:
line.to_filename(out_file)
else:
return line
else:
return empty_img
[docs]
def create_ribbon_from_beam(
in_file,
ribbon,
out_file=None):
"""create_ribbon_from_beam
This creates a binary image of the outline of the ribbon based on the beam image (see :func:`fmriproc.image.create_line_from_slice`). The line's dimensions are 16 voxels of 0.25mm x 2.5 mm (slice thickness) and 0.25 mm (frequency encoding direction). We know that the middle of the line is at the center of the slice, so the entire line encompasses 8 voxels up/down from the center. We then select only the voxels from `ribbon`.
Parameters
----------
in_file: str
path to image that should be used as reference (generally this should be the 1 slice file of the first run or something)
ribbon: list, tuple
input representing the ribbon, e.g., `(358,363)`
out_file: str, optional
path specifying the output name of the newly created 'line' or 'beam' file
Returns
----------
nibabel.Nifti1Image
if `out_file==None`, a niimg-object is returned
str
an actual file if the specified output name is created (*NOT RETURNED*)
"""
beam_img = nb.load(in_file)
beam_data = beam_img.get_fdata()
# insert ribbon voxels
rib_beam = np.zeros_like(beam_data)
beam_loc = np.where(beam_data>0)
rib_beam[ribbon[0]:ribbon[1],beam_loc[1][0]:beam_loc[1][-1]+1] = 1
# save
rib_img = nb.Nifti1Image(rib_beam, affine=beam_img.affine, header=beam_img.header)
if out_file is not None:
rib_img.to_filename(out_file)
else:
return rib_img
[docs]
def get_max_coordinate(in_img):
"""get_max_coordinate
Fetches the point with the maximum value given an input image_file. Useful if you want to find the voxel with the highest value after warping a binary file with ANTs. Mind you, it outputs the VOXEL value, not the actual index of the array. The VOXEL value = idx_array+1 to account for different indexing in nifti & python.
Parameters
----------
in_img: str,numpy.ndarray,nibabel.Nifti1Image
nibabel.Nifti1Image object, string to nifti image or numpy array
Returns
----------
np.ndarray
if only 1 max value was detected
list
list of np.ndarray containing the voxel coordinates with max value
Examples
----------
.. code-block:: python
get_max_coordinate('sub-001_space-ses1_hemi-L_vert-875.nii.gz')
array([142, 48, 222])
.. code-block:: python
get_max_coordinate('sub-001_space-ses1_hemi-R_vert-6002.nii.gz')
[array([139, 35, 228]), array([139, 36, 228])]
"""
if isinstance(in_img, np.ndarray):
img_data = in_img
elif isinstance(in_img, nb.Nifti1Image):
img_data = in_img.get_fdata()
elif isinstance(in_img, str):
img_data = nb.load(in_img).get_fdata()
else:
raise NotImplementedError(
"Unknown input type; must either be a numpy array, a nibabel Nifti1Image, or a string pointing to a nifti-image")
max_val = img_data.max()
# .reshape(1,3).flatten()
coord = np.array([np.where(img_data == max_val)[i] for i in range(0, 3)])
if coord.shape[-1] > 1:
l = []
for i in np.arange(0, coord.shape[-1]):
l.append(coord[:, i])
else:
l = coord[:, 0]
return l
[docs]
def get_isocenter(img):
"""get_isocenter
This function returns the RAS coordinates of the input image's origin. This resembles the
offset relative to the magnetic isocenter
Parameters
----------
img: str,nibabel.Nifti1Image
input image to extract the isocenter coordinate from
Returns
----------
numpy.ndarray
array containing isocenter coordinate from `img`
Example
----------
.. code-block:: python
img = 'sub-001_space-ses1_hemi-R_vert-6002.nii.gz'
get_isocenter(img)
array([ 0.27998984, 1.49000375, -15.34000604])
"""
# get origin in RAS
if isinstance(img, nb.Nifti1Image):
fn = img
elif isinstance(img, str):
fn = nb.load(img)
else:
raise ValueError("Unknown input format for img")
vox_center = (np.array(fn.shape) - 1) / 2.
origin = transform.native_to_scanner(img, vox_center, addone=False)
return origin
[docs]
def bin_fov(img, thresh=0, out=None, fsl=False):
"""bin_fov
This function returns a binarized version of the input image. If no output name was specified,
it will return the dataframe in nifti-format
Parameters
----------
img: str
path to input image
thresh: int
threshold to use (default = 0)
out: str
path to output image (default is None, and will return the data array of the image)
fsl: bool
if you reeeally want a binary image also run fslmaths -bin.. Can only be in combination with an output image and on a linux system with FSL (default is false)
Returns
----------
nibabel.Nifti1Image
niimg-object with a binarized FOV of input image `img`
Example
----------
.. code-block:: python
file = bin_fov("/path/to/image.nii.gz")
bin_fov("/path/to/image.nii.gz", thresh=1, out="/path/to/image.nii.gz", fsl=True)
bin_fov("/path/to/image.nii.gz", thres=2)
"""
img_file = nb.load(img) # load file
img_data = img_file.get_fdata() # get data
# img_bin = (img_data > thresh).astype(int) # get binary mask where value != 0
# img_bin = ndimage.binary_fill_holes(img_bin).astype(int) # fill any holes
img_data[img_data <= thresh] = 0
img_bin_img = nb.Nifti1Image(
img_data, header=img_file.header, affine=img_file.affine)
if out is not None:
img_bin_img.to_filename(out)
# also run fslmaths for proper binarization
if fsl:
cmd_txt = "fslmaths {in_img} -bin {out_img}".format(
in_img=out, out_img=out)
place = utils.get_base_dir()[1]
if place != "win":
utils.run_shell_wrapper(cmd_txt, verb=True)
else:
return img_bin_img
[docs]
def mgz2nii(input, output=None, return_type='file'):
"""wrapper for call_mriconvert to convert mgz image to nifti"""
if not isinstance(input, str):
raise ValueError("Please use a string-like path to the file")
if not output:
output = input.split('.')[0]+'.nii.gz'
cmd = ('call_mriconvert', input, output)
L = utils.decode(subprocess.check_output(cmd))
if return_type == "file":
return output
elif return_type == "nifti":
return nb.load(output)
else:
raise NotImplementedError(f"Can't deal with return_type {return_type}. Please use 'file' or 'nifti'")
[docs]
def nii2mgz(input, output=None, return_type='file'):
"""wrapper for mri_convert to convert mgz image to nifti"""
import os
import nibabel as nb
if not isinstance(input, str):
raise ValueError("Please use a string-like path to the file")
if not output:
output = input.split('.')[0]+'.mgz'
utils.run_shell_wrapper(f'mri_convert --in_type nii --out_type mgz {input} {output}', verb=True)
if return_type == "file":
return output
elif return_type == "mgz":
return nb.load(output)
else:
raise NotImplementedError(f"Can't deal with return_type {return_type}. Please use 'file' or 'nifti'")
[docs]
def tissue2line(data, line=None):
"""tissue2line
Project tissue probability maps to the line by calculating the probability of each tissue type in each voxel of the 16x720 beam and then average these to get a 1x720 line. Discrete tissues are assigned by means of the highest probability of a particular tissue type.
Parameters
----------
data: list,numpy.ndarray,str
for tissue data: list of three numpy array/nifti images/strings describing the probability of white matter/gray matter and CSF
line: str,nibabel.Nifti1Image,numpy.ndarray
used for the direction of the line and should have the same dimensions as `data`. Generally this is the output from create_line_from_slice
Returns
----------
numpy.ndarray
(1,720) array of your `data` in the line
"""
# load in reference line data
if isinstance(line, str):
ref = nb.load(line).get_fdata()
elif isinstance(line, nb.Nifti1Image):
ref = line.get_fdata()
elif isinstance(line, np.ndarray):
ref = line
else:
raise ValueError("Unknown input type for line; should be a string, nifti-image, or numpy array")
if isinstance(data, list):
# we have receive a list, assuming tissue probability maps.
if len(data) > 3:
raise ValueError(f'Data contains {len(data)} items, this should be three: 1) WM prob, 2) GM prob, 3) CSF prob')
if isinstance(data[0], str):
input = [nb.load(i).get_fdata() for i in data]
elif isinstance(data[0], nb.Nifti1Image):
input = [i.get_fdata() for i in data]
elif isinstance(data[0], np.ndarray):
input = data
# remove existing 4th dimension
input = [np.squeeze(i, axis=3) for i in input if len(i.shape) == 4]
for i in input:
if i.shape != ref.shape:
raise ValueError(f"Dimensions of line [{ref.shape}] do not match dimension of input seg [{i.shape}]")
# put wm/gm/csf in three channels of a numpy array
prob_stack = np.dstack([input[0],input[1],input[2]])
prob_stack_avg = np.average(prob_stack, axis=1)
# normalize averages between 0-1
scaler = MinMaxScaler()
scaler.fit(prob_stack_avg)
avg_norm = scaler.transform(prob_stack_avg)
output = []
lut = {'wm':2,'gm':1,'csf':0}
# avg_norm has 3 columns; 1st = WM, 2nd = GM, 3rd = CSF
for i,r in enumerate(avg_norm):
max_val = np.amax(r)
# check tissue type only if non-zero value. If all probabilities are 0 is should be set to zero regardless
if max_val == 0:
output.append(lut['csf'])
else:
# make list of each row for nicer indexing
idx = list(r).index(max_val)
if idx == 0:
# type = 'wm' = '1' in nighres segmentation
output.append(lut['wm'])
elif idx == 1:
# type = 'gm' = '2' in nighres segmentation
output.append(lut['gm'])
elif idx == 2:
# type = 'csf' = '0' in nighres segmentation
output.append(lut['csf'])
output = np.array(output)[:,np.newaxis]
return output
[docs]
def layers2line(data):
"""layers2line
Project layer delineations to the line by calculating the probability of each layer membership in each voxel of the 16x720 beam and then average these to get a 1x720 line. Discrete layers are assigned by means of the highest probability of a particular layer membership.
Parameters
----------
data: list, numpy.ndarray, str
(720,720) numpy array with the middle 16 rows containing information about the layers
line: str,nibabel.Nifti1Image,numpy.ndarray
used for the direction of the line and should have the same dimensions as `data`. Generally this is the output from create_line_from_slice
Returns
----------
numpy.ndarray
(1,720) array of your `data` in the line
"""
data[data == 0] = np.nan # convert zeros to NaN
avg = np.nanmean(data, axis=1) # average by ignoring zeros
avg = np.nan_to_num(avg) # convert zeros back to NaN otherwise int() will fail
avg_layers = np.array([int(i) for i in avg if np.isnan(i) == False])[:,np.newaxis]
return avg_layers
[docs]
def clip_image(img, thr=None, val=None, return_type="image", out_file=None):
"""clip_image
This function takes an image or numpy array, and takes the <thr>-value as threshold to clip the maximum value of the input. This is especially useful to increase contrast in the masked_mp2rage-files. For some reason, the masking can be detrimental for the contrast resulting in failing segmentations.
Parameters
----------
img: numpy.ndarray, nibabel.Nifti1Image, str
input data; can either be a numpy array, a Nifti-image, or a string pointing to a nifti-file.
thr: float, optional
cut-off to use as threshold of image intensity. This represents 0.5% occuring values, after which a drop of intensities is seen.
val: float, optional
cut-off to use as threshold of image intensity. This represents a hard threshold of literal image values
return_type: str, optional
output type; can either be a numpy array ('arr'), a Nifti-image ('nib'), or a file ('file'). In the case of the latter, an output name should be provided
out_file: str, optional
string to output name if 'return_type="file"' is specified
Returns
----------
nibabel.Nifti1Image
if `return_type=="file"`, a niimg-object is returned
numpy.ndarray
if `return_type=="arr"`, an array with the shape of `img` is returned
str
if `return_type=="file"`, a new file is created and nothing is returned
Example
----------
.. code-block:: python
new_img = clip_image("input.nii.gz", thr=0.005, return_type="nib")
clip_image("input.nii.gz", return_type='file', out_file='output.nii.gz')
"""
if not img:
raise ValueError("Need an input image")
if isinstance(img, np.ndarray):
input = img
aff,hdr = None,None
elif isinstance(img, nb.Nifti1Image):
input = img.get_fdata()
aff,hdr = img.affine, img.header
elif isinstance(img, str):
input = nb.load(img).get_fdata()
aff,hdr = nb.load(img).affine, nb.load(img).header
else:
raise ValueError("Unknown input type; should be either a numpy.ndarray, nibabel.Nifti1Image, or str to nifti-file")
if thr:
ax = plt.hist(input.ravel(), bins=256)
freq, vals = ax[0], ax[1]
thr_freq = np.amax(freq)*thr
idx = utils.find_nearest(freq, thr_freq)[0]
cutoff = vals[idx]
else:
if val:
cutoff = val
else:
raise ValueError("Either specify 'thr' or 'val'")
new_data = np.clip(input,0,cutoff)
if return_type == "arr":
return new_data
elif return_type == "nib":
if not isinstance(aff, np.ndarray) and not isinstance(hdr, nb.Nifti1Header):
raise ValueError("Need an affine-matrix and header for this operation; did you input a Nifti-like object or path to nifti-file?")
return nb.Nifti1Image(new_data, affine=aff, header=hdr)
elif return_type == "file":
if not isinstance(aff, np.ndarray) and not isinstance(hdr, nb.Nifti1Header):
raise ValueError("Need an affine-matrix and header for this operation; did you input a Nifti-like object or path to nifti-file?")
if not out_file:
raise ValueError("Please specify an output name for this operation")
nb.Nifti1Image(new_data, affine=aff, header=hdr).to_filename(out_file)
else:
raise ValueError(f"Unknown option {return_type} specified; please use 'arr', 'nib', or 'file'")
[docs]
def tsnr(img,file_name=None, clip=None):
"""tsnr
This function calculates the temporal SNR of an input image. Can also create 3D tSNR
map, but generally will just output the mean tSNR
Parameters
----------
img: numpy.ndarray, nibabel.Nifti1Image, str
input data; can either be a numpy array, a Nifti-image, or a string pointing to a nifti-file.
file_name: str, optional
path to tSNR-map file. Set to None by default
clip: int, float, optional
lip the values to tSNR. Default is None
Returns
----------
numpy.ndarray
tSNR in array with shape of `img` (minus last dimension if `img.shape`>3)
Example
----------
.. code-block:: python
tsnr = tsnr('path/to/func.nii.gz')
"""
# ignore divide-by-zero error
np.seterr(divide='ignore', invalid='ignore')
if isinstance(img, nb.Nifti1Image):
img = img
elif isinstance(img, str):
img = nb.load(img)
else:
raise ValueError("Unknown input type. Can be string pointing to nifti-file or an nb.Nifti1Image object")
data = img.get_fdata()
affine = img.affine
hdr = img.header
mean_d = np.mean(data,axis=-1)
std_d = np.std(data,axis=-1)
tsnr = mean_d/std_d
tsnr[np.where(np.isinf(tsnr))] = 0
if isinstance(clip, (int,float)):
tsnr[tsnr>clip] = 0
# calculate mean
mean_tsnr = np.nanmean(np.ravel(tsnr))
if isinstance(file_name, str):
nb.Nifti1Image(
tsnr,
affine=affine,
header=hdr).to_filename(file_name)
return mean_tsnr
[docs]
def massp_to_table(label_file, out=None, nr_structures=31, unit="vox"):
"""massp_to_table
This function creates a tabular output from the label image generated by call_nighresmassp (or any other version of MASSP). Just input the 'label' file and hit go. You can either have the dataframe returned or create a file by specifying an output name.
Parameters
----------
label_file: str
path to nifti image (MASSP-output 'label' file)
out: str
path to output name. You can either safe it as a csv, json, txt, or pickle file.
nr_structures: int
set to 31 by default as per the massp script
unit: str
output unit (voxels or mm3). Should be 'vox' for voxel output, or 'mm' for mm^3
Returns
----------
dict
dictionary containing the average volume of each ROI in `label_file` in units of voxel count or mm^3
Example
----------
.. code-block:: python
file = massp_to_table('sub-001_desc-massp_label.nii.gz', out='massp_lut.json')
file
'massp_lut.json'
.. code-block:: python
massp_to_table(
'sub-001_desc-massp_label.nii.gz',
unit="mm"
)
{'Str-l': 10702.2163,
'Str-r': 10816.1125,
'STN-l': 136.6179,
'STN-r': 149.2731,
'SN-l': 540.4317,
'SN-r': 532.9537,
...
}
"""
try:
from nighres.parcellation.massp import labels_17structures
except Exception:
raise ImportError(f"Could not import 'nighres'.. Please install")
img = nb.load(label_file)
img_data = img.get_fdata()
d = {}
for i in np.arange(0,nr_structures):
d[labels_17structures[i]] = np.count_nonzero(img_data == i+1)
# convert to mm3?
if unit == "mm":
# multiply dimensions to get mm^3 per voxel
dims = img.header['pixdim']
mm_dim = dims[1]*dims[2]*dims[3]
# multiply this with all elements in dict
for key in d:
d[key] = d[key]*mm_dim
if out:
# different extensions are possible: csv, txt, json, or pickle
ext =out.split('.')[-1]
if ext == "csv":
import csv
w = csv.writer(open(out, "w"))
for key, val in d.items():
w.writerow([key, val])
elif ext == "txt":
f = open(out,"w")
f.write( str(d) )
f.close()
elif ext == "json":
import json
json = json.dumps(d, indent=4)
f = open(out,"w")
f.write(json)
f.close()
elif ext == "pkl":
import pickle
f = open(out,"wb")
pickle.dump(d,f)
f.close()
else:
print(f"Unknown file extension '{ext}'. Use 'csv', 'txt', 'json', or 'pkl'. Returning dictionary itself")
return d
# raise ValueError(f"Unknown file extension '{ext}'. Use 'csv', 'txt', 'json', or 'pkl'. Returning dictionary itself")
return out
else:
return d
[docs]
def massp_mask_img(label_file, img_to_mask, out=None, nr_structures=31):
"""massp_mask_img
This function extracts the average of each structure in the MASSP-segmentation given an image (e.g.,
T1map)
Parameters
----------
label_file: str
path to nifti image (MASSP-output 'label' file)
img_to_mask: str
path to nifti image for which we want to extract the averages in the MASSP-structures
out: str
path to output name. You can either safe it as a csv, json, txt, or pickle file.
nr_structures: int
set to 31 by default as per the massp script
Returns
----------
dict
dictionary containing the average value of each ROI in the units of `img_to_mask`
Example
----------
.. code-block:: python
file = massp_mask_img(sub-001_desc-massp_label.nii.gz', 'sub-001_T1map.nii.gz', out='massp_t1map.json')
file
'massp_t1map.json'
.. code-block:: python
massp_mask_img('sub-001_desc-massp_label.nii.gz', 'sub-001_T1map.nii.gz', out='massp_t1map.json')
{'Str-l': 1502,234,
'Str-r': 1081.1125,
'STN-l': 1326.6179,
'STN-r': 1492.2731,
'SN-l': 1540.4317,
'SN-r': 1532.9537,
...
}
"""
try:
from nighres.parcellation.massp import labels_17structures
except Exception:
raise ImportError(f"Could not import 'nighres'.. Please install")
label_img = nb.load(label_file)
label_data = label_img.get_fdata()
img_to_mask = nb.load(img_to_mask)
img_to_mask_data = img_to_mask.get_fdata()
d = {}
for i in np.arange(0,nr_structures):
mask = np.zeros_like(label_data)
mask[label_data == i+1] = 1
mask = 1-mask
d[labels_17structures[i]] = np.ma.masked_array(img_to_mask_data,mask=mask).mean()
# save into file
if out:
# different extensions are possible: csv, txt, json, or pickle
ext = out.split('.')[-1]
if ext == "csv":
import csv
w = csv.writer(open(out, "w"))
for key, val in d.items():
w.writerow([key, val])
elif ext == "txt":
f = open(out,"w")
f.write( str(d) )
f.close()
elif ext == "json":
import json
json = json.dumps(d, indent=4)
f = open(out,"w")
f.write(json)
f.close()
elif ext == "pkl":
import pickle
f = open(out,"wb")
pickle.dump(d,f)
f.close()
else:
print(f"Unknown file extension '{ext}'. Use 'csv', 'txt', 'json', or 'pkl'. Returning dictionary itself")
return d
# raise ValueError(f"Unknown file extension '{ext}'. Use 'csv', 'txt', 'json', or 'pkl'. Returning dictionary itself")
return out
else:
return d