Source code for fmriproc.roi
from lazyfmri import (
utils,
fitting,
dataset,
plotting,
)
import os
import json
import datetime
import numpy as np
import pandas as pd
import nibabel as nb
import matplotlib.pyplot as plt
from joblib import (
Parallel,
delayed,
)
opj = os.path.join
# subclass JSONEncoder
[docs]
class DateTimeEncoder(json.JSONEncoder):
#Override the default method
[docs]
def default(self, obj):
if isinstance(obj, (datetime.date, datetime.datetime)):
return obj.isoformat()
[docs]
class ExtractFromROIs():
"""ExtractFromROIs
Extract timecourses given a list of ROIs from functional files. Assumes the ROI and functional files are in the same space,
e.g., both in native functional space (in case of single-subject processing), or all in MNI-space (in which case multi-
subject processing is permitted). If the ROIs represent float data (e.g., zstat/tstat), it will extract the 15 highest
voxels within that ROI. If ROI is binary, all voxels will be used. See also :func:`fmriproc.roi.ExtractFromROIs.extract_data`.
Parameters
----------
func: str, nibabel.Nifti1Image, nibabel.GiftiImage, np.ndarray
Input functional data, either a string representing a path, a nibabel.Nifti1Image-object, a nibabel.GiftiImage-object,
or a numpy array
rois: list, str, optional
List of ROIs, either strings representing a path, nibabel.Nifti1Image-objects, or a numpy arrays or a directory con-
taining a bunch of ROIs. Use the "filters="-flag to further specify the input.
filters: str, list, optional
additional filters to search for ROI files. For instance, you have specified a directory with "--rois /path/to/ROIs",
but that directory contains a bunch of ROIs that you're not necessarily interested in. If you're only interested in
the files starting with "conv-\*", you can specify that here.
roi_names, list, str, optional
If empty, we will try to derive the ROI name from the filename (if they are strings).
E.g., from "bin.thr.ACC_L.nii.gz", it would extract the last elements after ".", so "ACC_L".
If the inputs are not strings, we will default to "roi_{}", so it's highly encouraged to use the *roi_names*-input
for non-string functional files.
ses: int, optional
Specify session ID; will be read from filename if possible using :func:`lazyfmri.utils.split_bids_components`
task: str, optional
Specify task ID; will be read from filename if possible using :func:`lazyfmri.utils.split_bids_components`
TR: int, optional
Repetition time to use for the dataframe formation, by default 2
subject: int, optional
Subject ID to use for the dataframe formation, by default 1
verbose: bool, optional
Make some noise, by default False
**kwargs: dict
Same as :func:`fmriproc.roi.ExtractFromROIs.extract_data`
Example
----------
.. code-block:: python
from fmriproc import roi
obj = roi.ExtractFromROIs(
"/path/to/fmriprep/sub-01/ses-1/func/sub-01_ses-1_task-task1_space-MNI152NLin6Asym_bold.nii.gz",
rois=[
"/path/to/rois/bin.thr.space-MNI152NLin6Asym.ACC_L.nii.gz,
"/path/to/rois/bin.thr.space-MNI152NLin6Asym.ACC_R.nii.gz
"/path/to/rois/bin.thr.space-MNI152NLin6Asym.PCC_L.nii.gz
"/path/to/rois/bin.thr.space-MNI152NLin6Asym.PCC_R.nii.gz
],
TR=1.5,
subject="01"
)
"""
def __init__(
self,
func,
rois=None,
filters=None,
roi_names=None,
ses=None,
task=None,
TR=2,
subject=1,
verbose=False,
**kwargs
):
self.func = func
self.rois = rois
self.filters = filters
self.TR = TR
self.subject = subject
self.verbose = verbose
self.roi_names = roi_names
self.ses = ses
self.task = task
if isinstance(self.func, (str,nb.Nifti1Image,np.ndarray,nb.GiftiImage)):
self.func = [self.func]
if isinstance(self.filters, str):
self.filters = [self.filters]
if isinstance(self.roi_names, str):
self.roi_names = [self.roi_names]
# decide input of ROIs
self.set_rois_input()
# check if roi_names == rois
if isinstance(self.roi_names, list):
if len(self.roi_names) != len(self.roi_list):
raise ValueError(f"Number of specified ROI names ({len(self.roi_names)}) does not match number of supplied ROI-files ({len(self.roi_list)})")
# run extractor
self.extracted_data = self.main_extractor(**kwargs)
[docs]
def set_rois_input(self):
if isinstance(self.rois, (nb.GiftiImage, nb.Nifti1Image, np.ndarray)):
self.roi_list = [self.rois]
elif isinstance(self.rois, str):
# input is directory
if os.path.isdir(self.rois):
self.roi_list = utils.FindFiles(
self.rois,
extension="nii.gz",
exclude="._"
).files
if isinstance(self.filters, list):
self.roi_list = utils.get_file_from_substring(
self.filters,
self.rois
)
elif os.path.isfile(self.rois):
# input is a file
self.roi_list = [self.rois]
else:
# input is invalid
raise TypeError(f"rois must be a directory or filepath, not {type(self.rois)}")
elif isinstance(self.rois, list):
self.roi_list = self.rois
else:
raise ValueError(f"rois must be a directory, filepath, or list of files, not {type(self.rois)}")
[docs]
def main_extractor(
self,
sep="_",
**kwargs
):
"""Loop through functional files to extract data using :func:`fmriproc.roi.ExtractFromROIs.extract_data`"""
df = []
# loop through functionals
for ix,func in enumerate(self.func):
# try to read subject/run info from file
subject = self.subject
self.run = ix+1
# load data
if isinstance(func, str):
utils.verbose(f"Func #{ix+1}: {func}", self.verbose)
else:
utils.verbose(f"Func #{ix+1}: input={type(func)}", self.verbose)
# load functional file
data = self.load_file(func)
T = data.shape[-1]
V = np.prod(data.shape[:-1]) # total # of voxels
flat_func = data.reshape(V, T) # shape: (V, T)
# pre-allocate storage
n_rois = len(self.roi_list)
all_data = np.empty((T, n_rois), dtype=float)
colnames = [None] * n_rois
# --- fill the array ---
for ix, roi in enumerate(self.roi_list):
# 1) name
colnames[ix] = self.extract_run_identifier(roi, ix, sep=sep)
# 2) load & extract
roi_dat = self.load_file(roi)
flat_roi = roi_dat.ravel()
extr_dat = self.extract_data(
flat_func,
flat_roi,
**kwargs
)
# 3) stick into the big array
all_data[:, ix] = extr_dat.squeeze()
# --- build the DataFrame just once ---
roi_df = pd.DataFrame(all_data, columns=colnames)
# --- prepare all of the columns you want to assign ---
assign_kwargs = {
"subject": subject,
"run": self.run,
"t": np.arange(T, dtype=float) * self.TR
}
# add ses and task only if they exist and are not None
for k in ("ses", "task"):
val = getattr(self, k, None)
if val is not None:
assign_kwargs[k] = val
# --- one-shot assign of everything ---
roi_df = roi_df.assign(**assign_kwargs)
df.append(roi_df)
df = pd.concat(df)
utils.verbose(f"Done", self.verbose)
return df
[docs]
def extract_run_identifier(
self,
roi,
ix,
sep="_",
):
roi_name = None
if isinstance(self.roi_names, list):
roi_name = self.roi_names[ix]
# extract ROI name from file
if roi_name is None:
if isinstance(roi, str):
utils.verbose(f" Dealing with roi: {roi}", self.verbose)
roi_name = os.path.basename(roi)
if roi_name.endswith("gz"):
roi_name = sep.join(roi_name.split(".")[:-2])
elif roi_name.endswith("nii"):
roi_name = sep.join(roi_name.split(".")[:-1])
else:
raise NotImplementedError()
try:
bids_comps = utils.split_bids_components(roi)
for key in ["ses", "task", "run"]:
if key in list(bids_comps.keys()):
setattr(self, key, bids_comps[key])
except Exception:
pass
else:
roi_name = f"roi_{ix+1}"
return roi_name
[docs]
@staticmethod
def extract_data(
func,
roi,
nr=15,
bsl=15,
highest=True,
psc=True,
zscore=False
):
"""extract_data
Extractor function depending on ROI input. If the ROI represents a zstat/tstat file (or file with float) values,
we will select by default the 15 highest values within the roi. To select the 15 lowest, set *highest=False*.
To change the number of voxels to extract, use the *nr*-input.
Parameters
----------
func: np.ndarray
2D timeseries representing the functional data (voxels, time)
roi: np.ndarray
int/float np.ndarray representing the ROI data (voxels,)
nr: int, optional
number of voxels to extract from data in case the input is float, by default 15.
if the input is binary, we will take the entire ROI.
bsl: int, optional
amount of time to use to calculate baseline for percent signal change conversion, by default 15s.
highest: bool, optional
If True, we select *nr* HIGHEST voxels from roi. If False, we select the *nr* LOWEST. By default True
psc: bool, optional
Normalize to percent signal change. Default is True.
zscore: bool, optional
Normalize using z-scoring. Default = False
Returns
----------
np.ndarray
extracted data (either normalized or not)
"""
# build mask
if 0 <= roi.max() <= 1:
# binary‐mask case
mask = roi > 0
else:
# pick top-n or bottom-n without full sort
if highest:
idx = np.argpartition(-roi, nr - 1)[:nr]
else:
idx = np.argpartition(roi, nr - 1)[:nr]
mask = np.zeros_like(roi, dtype=bool)
mask[idx] = True
# 3) extract & average timecourses
tc = func[mask].mean(axis=0) # shape (T,)
if zscore:
psc = False
if psc:
psc = utils.percent_change(
tc,
0,
baseline=bsl
)
return psc
else:
if zscore:
sd = tc.std()
m_ = tc.mean()
z = (tc-m_)/sd
return z
else:
return tc
[docs]
@staticmethod
def load_file(file):
"""Load file based on whether it is a string, numpy array, or nibabel.Nifti1Image object"""
if isinstance(file, str):
if file.endswith(".nii.gz"):
data = nb.load(file).get_fdata()
elif file.endswith(".gii"):
data = dataset.ParseGiftiFile(file).data
else:
raise NotImplementedError(f"{file} is not supported. Must be .nii.gz, or .gii")
elif isinstance(file, np.ndarray):
data = file.copy()
elif isinstance(file, nb.Nifti1Image):
data = file.get_fdata()
elif isinstance(file, nb.GiftiImage):
data = np.vstack([arr.data for arr in file.f_gif.darrays])
else:
raise TypeError(f"Input {file} is of type {type(file)}. Must be a string pointing to an existing file path, an numpy array, nb.Nifti1Image-object, or nb.GiftiImage-object")
return data
[docs]
class ExtractSubjects(ExtractFromROIs):
"""ExtractSubjects
Loop through all subjects in a FEAT-directory to extract timecourses from a set of ROIs using
:class:`fmriproc.roi.ExtractFromROIs()`.
Parameters
----------
ft_dir: str
path to 1st level FEAT, fMRIprep, or Pybest directory. For FEAT, We're explicitly searching for the
"filtered_func_data" files; for fMRIprep, we will look for "\*desc-preproc_bold.nii.gz" using the
native functional files as default. Use e.g., `space=MNI152NLin6Asym` to select FSL's MNI files.
For Pybest, we will either look in the "unzscored"-folder if you want surface-based files (e.g.,
'fsnative' or 'fsaverage') for "\*desc-denoised_bold.npy". For volumetric files, we will look in
the "masked" folder for "\*desc-denoised_bold.nii.gz". For both fMRIprep and Pybest, you can select
specific tasks with *task=["task1", "task3"]*. Otherwise all files will be considered. This can be
particularly problematic for surface-based files from Pybest, as there's extra files being outputted
by Pybest that make selection using these criteria difficult. For instance, if you have multiple runs,
it saves out an average without "run"-identifier. However, it outputs the same file if you only have
a single runs..
func: str, list, optional
Reuse existing csv-file (<output_dir>/<base_name>/<base_name>_desc-tcs.csv)
excl_subjs: list, optional
Exclude particular subjects present in the *ft_dir*, by default None
incl_subjs: str, optional
Include particular subjects present in the *ft_dir*, by default None
n_jobs: int, optional
Number of jobs to use for the extraction, by default set to the number of subjects present
verbose: bool, optional
Make some noise, by default False
in_file: str, optional
Directly specify a 4D file to extract timecourses from, rather than looking in
predefined folders such as fMRIPrep, FEAT, or Pybest..
**kwargs: dict
Same as :func:`fmriproc.roi.ExtractFromROIs.extract_data`
"""
def __init__(
self,
ft_dir=None,
in_file=None,
func=None,
space=None,
task=None,
excl_subjs=[],
incl_subjs=None,
n_jobs=None,
verbose=False,
**kwargs
):
self.in_file = in_file
self.ft_dir = ft_dir
self.excl_subjs = excl_subjs
self.incl_subjs = incl_subjs
self.n_jobs = n_jobs
self.verbose = verbose
self.func = func
self.space = space
self.task = task
if not isinstance(self.func, str):
# check for incl/excl subjects
self.set_subjects()
# set number of jobs
if not isinstance(self.n_jobs, int):
self.n_jobs = len(self.subjects)
# search files
self.search_for_files()
# extract
self.df_func = self.extract_subjects(**kwargs)
else:
self.read_tcs_file()
[docs]
def search_for_files(self):
# check if input if fmriprep
if not isinstance(self.in_file, str):
utils.verbose(f"Looking for files in '{self.ft_dir}'", self.verbose)
if os.path.basename(self.ft_dir) == "fmriprep":
self.all_files = self.find_fmriprep_files(self.ft_dir, space=self.space, task=self.task)
elif os.path.basename(self.ft_dir) == "pybest":
self.all_files = self.find_pybest_files(self.ft_dir, space=self.space, task=self.task)
else:
self.all_files = self.find_feat_files(self.ft_dir)
if isinstance(self.all_files, str):
self.all_files = [self.all_files]
else:
if not os.path.exists(self.in_file):
raise FileNotFoundError(f"Could not find specified file: {self.in_file}")
utils.verbose(f"Using file: {self.in_file}", self.verbose)
self.all_files = [self.in_file]
utils.verbose(f"Found {len(self.all_files)} file(s)", self.verbose)
[docs]
def read_tcs_file(self):
utils.verbose(f"Reading file: {self.func}", self.verbose)
self.df_func = pd.read_csv(self.func, dtype={"subject": str}) # bit dodgy..
self.df_func.set_index(["subject","run","t"], inplace=True)
# read subject IDs from dataframe
self.subj = utils.get_unique_ids(self.df_func, id="subject")
self.subjects = [f"sub-{i}" for i in self.subj]
[docs]
def set_subjects(self):
if not isinstance(self.incl_subjs, (str,list)):
self.subjects = utils.get_file_from_substring(
["sub-"],
os.listdir(self.ft_dir)
)
if isinstance(self.subjects, str):
self.subjects = [self.subjects]
self.subjects = sorted([i for i in self.subjects if i not in self.excl_subjs])
else:
if isinstance(self.incl_subjs, str):
self.incl_subjs = [self.incl_subjs]
self.subjects = [i for i in self.incl_subjs if i not in self.excl_subjs]
utils.verbose(f"Including following subjects: {self.subjects}", self.verbose)
[docs]
@staticmethod
def find_feat_files(input_dir):
# assuming input is FEAT
ft_files = utils.FindFiles(input_dir, extension="filtered_func_data.nii.gz", exclude="._").files
if not isinstance(ft_files, (str, list)):
raise ValueError(f"Could not find files with 'filtered_func_data.nii.gz' in '{input_dir}'")
return ft_files
[docs]
@staticmethod
def find_fmriprep_files(input_dir, space="func", task=None):
all_files = utils.FindFiles(input_dir, extension="desc-preproc_bold.nii.gz", exclude="._").files
if isinstance(space, str):
if space == "func":
fprep_files = utils.get_file_from_substring(
["desc-preproc_bold.nii.gz"],
all_files,
exclude="space-"
)
else:
fprep_files = utils.get_file_from_substring(
[f"space-{space}"],
all_files
)
if not isinstance(fprep_files, (str, list)):
raise ValueError(f"Could not find files with 'desc-preproc_bold.nii.gz' in '{input_dir}'")
else:
raise ValueError(f"space- tag required for fmriprep input, otherwise I don't know which files to take..")
if isinstance(task, (str)):
task = [task]
if isinstance(task, list):
task_files = []
for t in task:
tmp = utils.get_file_from_substring(
[f"task-{t}"],
fprep_files
)
task_files += tmp
return task_files
else:
return fprep_files
[docs]
@staticmethod
def find_pybest_files(input_dir, space=None, task=None):
if isinstance(space, str):
if space in ["fsaverage", "fsnative"]:
ext = "desc-denoised_bold.npy"
pyb_files = utils.FindFiles(
input_dir,
extension=ext,
exclude="._"
).files
pyb_files = utils.get_file_from_substring(
["unzscored"],
pyb_files
)
else:
ext = "desc-pybest_bold.nii.gz"
all_files = utils.FindFiles(
input_dir,
extension=ext,
exclude="._"
).files
if space == "func":
pyb_files = utils.get_file_from_substring(
["masked"],
all_files,
exclude="space-"
)
else:
pyb_files = utils.get_file_from_substring(
["masked", f"space-{space}"],
all_files
)
if not isinstance(pyb_files, (str, list)):
raise ValueError(f"Could not find files with '{ext}' in '{input_dir}'")
if isinstance(task, (str)):
task = [task]
if isinstance(task, list):
task_files = []
for t in task:
tmp = utils.get_file_from_substring(
[f"task-{t}"],
pyb_files
)
task_files += tmp
return task_files
else:
return pyb_files
else:
raise ValueError(f"space- tag required for pybest input, otherwise I don't know which files to take..")
[docs]
def extract_subjects(
self,
**kwargs
):
"""Wrapper around :func:`fmriproc.roi.ExtractSubjects.extract_single_subject` to loop through subjects"""
output = Parallel(n_jobs=self.n_jobs, verbose=True)(
delayed(self.extract_single_subject)(
utils.get_file_from_substring([sub], self.all_files),
subject=sub.split("-")[-1],
verbose=self.verbose,
**kwargs
)
for sub in self.subjects
)
# concat
df = pd.concat(output)
# set indices
idx_list = ["subject", "run", "t"]
for k in ["task", "ses"]:
if k in list(df.columns):
idx_list.insert(1, k)
df.set_index(idx_list, inplace=True)
return df
[docs]
def extract_single_subject(
self,
files,
**kwargs):
"""Wrapper around :class:`fmriproc.roi.ExtractFromROIs` to extract data from a single subject"""
super().__init__(
files,
**kwargs
)
return self.extracted_data.copy()
[docs]
def format_onsets(
subject_list,
project_dir
):
onset = []
for ix,subject in enumerate(subject_list):
subj = subject.split("-")[-1]
onset_dir = opj(project_dir, subject, "ses-1")
if not os.path.exists(onset_dir):
raise FileNotFoundError(f"Directory '{onset_dir}' does not exist; is 'proj_dir' set correctly?")
tsv_files = utils.FindFiles(
onset_dir,
extension="tsv"
).files
if len(tsv_files) == 0:
raise ValueError(f"Could not find tsv-files in '{onset_dir}'")
# sort onset time
tmp_onset = []
for tsv in tsv_files:
# read runID from bids components
comps = utils.split_bids_components(tsv)
run = comps["run"]
onset_df = pd.read_csv(tsv, sep='\t')
selected_columns = onset_df[["onset", "trial_type"]]
onset_df = selected_columns.copy()
onset_df['subject'], onset_df['run'] = subj, int(run)
onset_df = onset_df.rename(columns={"trial_type": "event_type"})
onset_df['onset'] = onset_df['onset'].astype(float)
onset_df['event_type'] = onset_df['event_type'].astype(str)
tmp_onset.append(onset_df)
subj_onset = pd.concat(tmp_onset)
onset.append(subj_onset)
return pd.concat(onset).set_index(['subject', 'run', 'event_type'])
[docs]
class FullExtractionPipeline(ExtractSubjects):
"""FullExtractionPipeline
Given a list of ROIs, extract the timecourses from all subjects' preprocessed data in a level1 feat folder. If these ROIs
are binary, the timecourses will be averaged across this entire area. If the ROIs are values (e.g., z-stat values), the n
(default = 15) highest (default) /lowest (--lowest) values will be selected and averaged. These timecourses are entered in
a deconvolution model using the canonical HRF and its derivatives as basis sets (default). From this model, parameters are
extracted from the individual basis sets as well as the full HRF. Results are plotted and stored in a figure where the
rows denote EVs (stimulus conditions. There's a couple of default settings that you can generally leave alone, but there's
flags to changed them nonetheless.
Parameters
----------
proj_dir: str, optional
Root of the project folder, defaults to os.environ.get("DIR_DATA_HOME"). Mainly necessary for fetching onset files.
order: list, optional
If performing deconvolution, a plot can be generated. This flag sets the order (rows). By default, we'll take the order
in the onset dataframe (unsorted).
peak: int, optional
Deconvolution can sometimes cause exotic HRF shapes. This flag specifically tells the algorithm to select the first
peak while extracting parameters.
dec_kws: dict, optional
Extra parameters for the deconvolution, by default:
.. code-block:: python
dec_defaults = {
"interval": [-2,25],
"basis_sets": "canonical_hrf_with_time_derivative_dispersion",
"TR": 2
}
The format should be as follows (different parameters comma-separated, and parameter-value pair separated by '='):
"--dec <parameter1>=<value1>,<parameter2>=<value2>,<parameterX>=<valueX>"
In case you want to change the interval over which the profiles are deconvolved, use "interval=t1>t2" (separated by '>').
plot_kws: dict, optional
Set parameters for plotting. The idea is similar to the "--dec" flag. The defaults are set to:
.. code-block:: python
plot_defaults = {
"line_width": 3, # thickness of the lines of profiles
"y_dec": 2, # number of decimals on y-axes (ensures correct spacing)
"add_hline": 0, # horizontal line at 0
"sns_offset": 4, # distance between y-axis and first bar in bar plot
"error": "se", # error type (standard error of mean) for bar plot
"cmap": "Set1" # colormap Set1
}
output_dir: str, optional
path to output directory (default = $DIR_DATA_DERIV/roi_extract)
output_base: str, optional
basename for output. Will also be used to create a subfolder in <output_dir>. "_desc-<description>" will
be appended automatically. Defaults to "roi_extract" (very insightful, I know..)
verbose: bool, optional
print progress to the terminal
pos_neg: bool, optional
Switch to different plotting function, by default False.
do_fit: bool, optional
If False, we'll only extract the data and will not perform deconvolution, by default True
"""
def __init__(
self,
proj_dir=None,
order=None,
peak=1,
dec_kws={},
plot_kws={},
output_dir=None,
output_base="roi_extract",
verbose=False,
pos_neg=False,
do_fit=True,
excl_evs=[],
**kwargs
):
self.proj_dir = proj_dir
self.dec_kws = dec_kws
self.plot_kws = plot_kws
self.peak = peak
self.order = order
self.output_dir = output_dir
self.output_base = output_base
self.verbose = verbose
self.pos_neg = pos_neg
self.do_fit = do_fit
self.excl_evs = excl_evs
self.roi_list = None
if "rois" in list(kwargs.keys()):
self.roi_list = kwargs["rois"]
if not isinstance(self.proj_dir, str):
self.proj_dir = os.environ.get("DIR_DATA_HOME")
if not isinstance(self.output_dir, str):
self.output_dir = opj(
self.proj_dir,
"derivatives",
"roi_extract",
self.output_base
)
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir, exist_ok=True)
# initiate ExtractSubjects class
super().__init__(
verbose=self.verbose,
**kwargs
)
# check whether to do fit
if self.do_fit:
# parse onsets in a dataframe
self.df_onsets = format_onsets(
self.subjects,
self.proj_dir
)
# check order
if not isinstance(self.order, list):
self.order = utils.get_unique_ids(
self.df_onsets,
id="event_type",
sort=False
)
# filter evs
if len(self.excl_evs)>0:
self.order = [i for i in self.order if i not in self.excl_evs]
# derive TR from functional dataframe
self.t = utils.get_unique_ids(self.df_func, id="t")
self.TR = np.diff(self.t)[0]
# define default deconvolution parameters
dec_defaults = {
"interval": [-2,25],
"basis_sets": "canonical_hrf_with_time_derivative",
"verbose": self.verbose,
"TR": self.TR
}
for key,val in dec_defaults.items():
self.dec_kws = utils.update_kwargs(
self.dec_kws,
key,
val
)
## fit
self.run_fitter(**self.dec_kws)
# plot
if not self.pos_neg:
self.generate_plot(**self.plot_kws)
else:
self.generate_plot2(**self.plot_kws)
## write output files
self.write_csv_files()
self.write_settings()
## done
utils.verbose("Done", self.verbose)
[docs]
def run_fitter(
self,
**kwargs
):
self.fitter = fitting.NideconvFitter(
self.df_func,
self.df_onsets,
**kwargs
)
# get deconvolved timecourses
self.fitter.timecourses_condition()
# get subject-specific parameters
self.fitter.parameters_for_tc_subjects(col_name="roi")
# get condition-specific parameters
self.fitter.parameters_for_tc_condition(col_name="roi")
[docs]
def sort_rois(
self,
ddict,
base_color="#ffffff",
fc=1
):
# enforce int
fc = int(fc)
# get columns
cols = list(self.fitter.tc_condition.columns)
# cols 2 list
cdict = {}
color_list = []
roi_list = []
idx_list = []
for key,val in ddict.items():
cdict[key] = []
for c in cols:
if key in c:
cdict[key].append(c)
roi_list += cdict[key]
# create colormaps
color_list += utils.make_between_cm(
base_color,
val,
N=len(cdict[key])+fc,
as_list=True
)[fc:]
for r in roi_list:
idx_list.append(cols.index(r))
return idx_list,roi_list,color_list
[docs]
def sort_dataframe(
self,
df,
idx=None,
cols=None
):
# sort columns or rows
if isinstance(cols, (list,str)):
return df[cols]
else:
subjs = utils.get_unique_ids(df, id="subject")
new = []
for subj in subjs:
subj_df = utils.select_from_df(df, expression=f"subject = {subj}")
evs = utils.get_unique_ids(subj_df, id="event_type")
for ev in evs:
ev_df = utils.select_from_df(subj_df, expression=f"event_type = {ev}")
if "run" in list(subj_df.columns):
run_ids = utils.get_unique_ids(subj_df, id="run")
for run in run_ids:
run_df = utils.select_from_df(ev_df, expression=f"run = {run}")
tmp = run_df.reset_index()
new_df = tmp.iloc[:,idx]
new_df.set_index(list(subj_df.index.names), inplace=True)
new.append(new_df)
else:
tmp = ev_df.reset_index()
new_df = tmp.iloc[idx,:]
new_df.set_index(list(subj_df.index.names), inplace=True)
new.append(new_df)
return pd.concat(new)
[docs]
def generate_plot(
self,
include_pars=[
"magnitude",
"time_to_peak",
"1st_deriv_time_to_peak",
"2nd_deriv_time_to_peak",
"rise_slope",
"positive_area"
],
time_col="time",
write_files=True,
cdict={},
subjects=False,
fc=1,
**kwargs
):
y_size = 4
nrows = 1
if isinstance(include_pars, (str,list)):
if isinstance(include_pars, str):
include_pars = [include_pars]
nrows += len(include_pars)
y_size = 4*(1+len(include_pars))
fig = plt.figure(
figsize=(len(self.order)*4,y_size),
constrained_layout=True,
)
utils.verbose(f"Generating plot..", self.verbose)
sfs = fig.subfigures(nrows=nrows)
axs = {}
for f_ix in range(nrows):
if nrows<2:
sf = sfs
else:
sf = sfs[f_ix]
axs[f"ax{f_ix}"] = sf.subplots(ncols=len(self.order), sharey=True)
for e, ev in enumerate(self.order):
utils.verbose(f" ev-{e} | {ev}", self.verbose)
expr = f"event_type = {ev}"
ev_regr = utils.select_from_df(self.fitter.tc_condition, expression=expr)
if subjects:
use_df = self.fitter.avg_pars_subjects
else:
use_df = self.fitter.pars_condition
ev_pars = utils.select_from_df(use_df, expression=expr)
ax = axs["ax0"][e]
# default cmap
if len(cdict)<1:
if not "color" in list(kwargs.keys()):
kwargs = utils.update_kwargs(
kwargs,
"cmap",
"Set1"
)
else:
kwargs = utils.update_kwargs(
kwargs,
"palette",
kwargs["color"]
)
else:
# get information on how to sort dataframe
t = self.sort_rois(cdict, fc=fc)
# sort rows of parameter dataframe
ev_pars = self.sort_dataframe(
ev_pars,
idx=t[0]
)
# sort columns or profile dataframe
ev_regr = self.sort_dataframe(
ev_regr,
cols=t[1]
)
# add colors to kwargs
kwargs = utils.update_kwargs(
kwargs,
"color",
t[2]
)
# set plotting defaults
plot_defaults = {
"line_width": 1,
"y_dec": 2,
"add_hline": 0,
"error": "se",
"font_size": 24,
"add_points": False,
"points_color": "k"
}
plot_defaults["title_size"] = plot_defaults["font_size"]*1.1
for key,val in plot_defaults.items():
kwargs = utils.update_kwargs(
kwargs,
key,
val
)
bar_lbl = None
y_lbl = None
if e == 0:
y_lbl = "magnitude (%)"
bs = plotting.LazyLine(
list(ev_regr.T.values),
xx=utils.get_unique_ids(ev_regr, id=time_col),
axs=ax,
title = {
"title": ev.replace("_", " "),
"fontweight": "bold"
},
y_label=y_lbl,
**kwargs
)
if isinstance(include_pars, list):
for p_ix,par in enumerate(include_pars):
if e == 0:
unit = "s"
par_lbl = par
if par == "magnitude":
unit = "%"
elif par in ["positive_area","undershoot"]:
unit = "au"
elif "1st_deriv" in par:
unit = "%/s"
par_lbl = "rate of change"
elif "2nd_deriv" in par:
unit = "%/s$^2$"
par_lbl = "acceleration"
bar_lbl = f"{par_lbl.replace('_', ' ')} ({unit})"
b_ax = axs[f"ax{p_ix+1}"][e]
allowed_inputs = [
'magnitude',
'fwhm',
'time_to_peak',
'half_rise_time',
'half_max',
'rise_slope',
'rise_slope_t',
'positive_area',
'undershoot',
'1st_deriv_magnitude',
'1st_deriv_time_to_peak',
'2nd_deriv_magnitude',
'2nd_deriv_time_to_peak',
]
if par not in allowed_inputs:
raise ValueError(f"Specified parameter '{par}' cannot be used. Must be one of {allowed_inputs}")
# copy kwargs, but slight reduce fontsize
bar_kws = {}
for key,val in kwargs.items():
bar_kws[key] = val
for key,val in zip(["font_size","label_size","sns_offset"],[bs.font_size,bs.label_size,4]):
bar_kws = utils.update_kwargs(
bar_kws,
key,
val,
force=True
)
n_rois = utils.get_unique_ids(ev_pars, id="roi", sort=False)
_ = plotting.LazyBar(
ev_pars,
x="roi",
hue="roi",
y=par,
axs=b_ax,
y_label=bar_lbl,
**bar_kws
)
else:
n_rois = list(ev_regr.columns)
if nrows>1:
prof_sf = sfs[0]
else:
prof_sf = sfs
prof_sf.supxlabel("time (s)", fontsize=bs.title_size)
if nrows>1:
sfs[-1].supxlabel("ROIs", fontsize=bs.title_size)
fig.legend(
labels=n_rois,
bbox_to_anchor=(1.01,1),
loc="upper left",
frameon=False,
fontsize=bs.label_size
)
# save figure
if write_files:
fname = opj(self.output_dir, f"{self.output_base}_desc-plot")
utils.verbose(f"Writing plot files: {fname}", self.verbose)
for ext in ["pdf","png","svg"]:
fig.savefig(
f"{fname}.{ext}",
bbox_inches="tight",
facecolor="white",
dpi=300
)
[docs]
def generate_plot2(
self,
use_colors=["#D97F75","#9ABDD4"],
time_col="time",
write_files=True,
cdict={},
subjects=False,
fc=1,
**kwargs
):
y_size = 4
nrows = 1
if isinstance(include_pars, (str,list)):
if isinstance(include_pars, str):
include_pars = [include_pars]
nrows += len(include_pars)
y_size = 4*(1+len(include_pars))
fig = plt.figure(
figsize=(len(self.order)*4,y_size),
constrained_layout=True,
)
utils.verbose(f"Generating plot..", self.verbose)
sfs = fig.subfigures(nrows=nrows)
axs = {}
for f_ix in range(nrows):
if nrows<2:
sf = sfs
else:
sf = sfs[f_ix]
axs[f"ax{f_ix}"] = sf.subplots(ncols=4, sharey=True)
for e,ev in enumerate(self.order):
utils.verbose(f" ev-{e} | {ev}", self.verbose)
expr = f"event_type = {ev}"
ev_regr = utils.select_from_df(self.fitter.tc_condition, expression=expr)
if subjects:
subjs_df = utils.select_from_df(self.fitter.tc_subjects, expression=expr)
subjs_df = ev_regr.groupby(["subject","event_type","t"]).mean()
# set plotting defaults
ax = axs["ax0"][e]
plot_defaults = {
"line_width": 3,
"y_dec": 2,
"add_hline": 0,
"error": "se",
"font_size": 24
}
plot_defaults["title_size"] = plot_defaults["font_size"]*1.1
for key,val in plot_defaults.items():
kwargs = utils.update_kwargs(
kwargs,
key,
val
)
y_lbl = None
if e == 0:
y_lbl = "magnitude (%)"
bs = plotting.LazyPlot(
list(ev_regr.T.values),
xx=utils.get_unique_ids(ev_regr, id=time_col),
axs=ax,
title = {
"title": ev.replace("_", " "),
"fontweight": "bold"
},
y_label=y_lbl,
**kwargs
)
if nrows>1:
prof_sf = sfs[0]
else:
prof_sf = sfs
prof_sf.supxlabel("time (s)", fontsize=bs.title_size)
# save figure
if write_files:
fname = opj(self.output_dir, f"{self.output_base}_desc-plot")
utils.verbose(f"Writing plot files: {fname}", self.verbose)
for ext in ["pdf","png","svg"]:
fig.savefig(
f"{fname}.{ext}",
bbox_inches="tight",
facecolor="white",
dpi=300
)
[docs]
def write_csv_files(self):
utils.verbose(f"Writing csv-files to {self.output_dir}..", self.verbose)
if self.do_fit:
out_dict = {
"hrfs": getattr(self.fitter, "tc_subjects"),
"pars_run": getattr(self.fitter, "pars_subjects"),
"pars_avg": getattr(self.fitter, "avg_pars_subjects"),
"pars_evs": getattr(self.fitter, "pars_condition"),
"tcs": getattr(self, "df_func"),
"onsets": getattr(self, "df_onsets"),
}
else:
out_dict = {
"tcs": getattr(self, "df_func")
}
for key,val in out_dict.items():
val.to_csv(opj(self.output_dir, f"{self.output_base}_desc-{key}.csv"))
[docs]
def write_settings(self):
utils.verbose(f"Generating settings file", self.verbose)
if self.do_fit:
settings = {
"date": datetime.datetime.now(),
"subject": self.subjects,
"rois": self.roi_list,
"fit": self.do_fit,
"TR": self.fitter.TR,
"unit": "seconds",
"basis_sets": self.fitter.basis_sets,
"interval": self.fitter.interval,
"project_dir": self.proj_dir,
"input_dir": self.ft_dir,
"output_dir": self.output_dir,
"output_base": self.output_base
}
else:
settings = {
"date": datetime.datetime.now(),
"subject": self.subjects,
"rois": self.roi_list,
"fit": self.do_fit,
# "TR": self.TR,
# "unit": "seconds",
"project_dir": self.proj_dir,
"input_dir": self.ft_dir,
"output_dir": self.output_dir,
"output_base": self.output_base
}
settings_for_json = json.dumps(
settings,
indent=4,
cls=DateTimeEncoder
)
fname = opj(self.output_dir, f"{self.output_base}_desc-settings.json")
with open(fname, "w") as outfile:
outfile.write(settings_for_json)
[docs]
class ROI():
"""ROI
Generate a mask from FreeSurfer atlases or Label-files. The input can be a string/list of strings representing ROIs
present in FreeSurfer's aparc-files or in the 'label' folder. In case the input should be read from the segmentation
files, set *mask_type="aparc"* (default). If they are label-files, set *mask_type="surf"*. The class rests on functions
from the `cxutils <https://github.com/gjheij/cxutils>`_-toolbox, and will through an error if it is not installed.
Parameters
----------
roi: str, list
String or list of strings representing ROIs in the aparc-files or files from the label-folder as per the output of
FreeSurfer. For instance, *V1_exvivo.thresh*. By default, both hemispheres will be considered, so *?h.* can be omit-
ted.
mask_type: str, optional
Type of segmentation to use, by default "aparc". "aparc" represents the volumetric segmentations, whereas "surf"
will represent the surface based ROIs (e.g., files in 'labels'-folder)
subject: str, optional
FreeSurfer-subject to use, by default "fsaverage"
Returns
----------
np.ndarray
The actual masks will be stored as attribute "self.roi_mask", which can be accesss with :func:`fmriproc.image.ROI.return_mask()`
Example
----------
.. code-block:: python
# get motor cortex from FSAverage
from fmriproc import roi
some_roi = roi.ROI(
"precentral",
mask_type="surf",
subject="fsaverage"
).return_mask()
.. code-block:: python
# get V1 from sub-01
from fmriproc import roi
some_roi = roi.ROI(
"V1_exvivo.thresh",
mask_type="surf",
subject="sub-01"
).return_mask()
.. code-block:: python
# get precuneus from sub-01
from fmriproc import roi
some_roi = roi.ROI(
"inferiorparietal",
mask_type="aparc",
subject="sub-01"
).return_mask()
"""
def __init__(
self,
roi,
mask_type="aparc",
subject="fsaverage",
):
self.roi = roi
self.mask_type = mask_type
self.subject = subject
# force list
if isinstance(self.roi, str):
self.roi = [self.roi]
# make mask from aparc segmentation
if self.mask_type == "aparc":
self.aparc_mask()
else:
# make mask from surf-labels
self.surf_mask()
# merge list of masks
self.merge_masks()
[docs]
def surf_mask(self):
"""Generate mask from label-file using `cxutils <https://github.com/gjheij/cxutils>`_"""
try:
from cxutils import optimal
except ImportError:
print("Could not import cxutils. Please install from https://github.com/gjheij/cxutils")
self.surf_calcs = optimal.SurfaceCalc(subject=self.subject)
# loop through list and merge
self.individual_masks = {}
if isinstance(self.roi, list):
for roi in self.roi:
self.mask_obj = self.surf_calcs.read_fs_label(
self.subject,
fs_label=roi
)
self.individual_masks[roi] = self.surf_calcs.label_to_mask(
subject=self.subject,
rh_arr=self.mask_obj['rh'],
lh_arr=self.mask_obj['lh']
)["whole_roi"]
[docs]
def aparc_mask(self):
"""Generate mask from label-file using :func:`fmriproc.roi.ROI.make_roi_mask()`"""
# read parc data once even for multiple ROIs
self.read_aparc()
self.roi_list = self.read_roi_list()
# loop through list and merge
self.individual_masks = {}
if isinstance(self.roi, list):
for roi in self.roi:
if roi not in self.roi_list:
raise ValueError(f"'{roi}' is not part of the aparc atlas. Options are {self.roi_list}")
self.individual_masks[roi] = self.make_roi_mask(roi=roi)
[docs]
def merge_masks(self):
# merge
if len(self.individual_masks)>1:
self.roi_mask = np.zeros_like(self.individual_masks[self.roi[0]])
for _,val in self.individual_masks.items():
self.roi_mask[val>0] = 1
else:
self.roi_mask = self.individual_masks[self.roi[0]].copy()
[docs]
def read_aparc(self):
"""Generate mask from aparc-file using `cxutils <https://github.com/gjheij/cxutils>`_"""
try:
from cxutils import optimal
except ImportError:
print("Could not import cxutils. Please install from https://github.com/gjheij/cxutils")
# GET VERICES FOR A SPECIFIC ROI
self.parc_data = optimal.SurfaceCalc.read_fs_annot(
subject='fsaverage',
fs_annot="aparc",
hemi="both"
)
[docs]
def read_roi_list(self):
"""Read all rois present in the aparc-file"""
if not hasattr(self, "parc_data"):
self.read_aparc()
roi_list = []
for i in self.parc_data["lh"][-1]:
roi_list.append(i.decode())
roi_list = [i for i in roi_list if i != "unknown"]
return roi_list
[docs]
def make_roi_mask(self, roi=None):
"""Make ROI mask from aparc-file"""
if not isinstance(roi, str):
raise ValueError(f"Please specify an ROI to extract")
tmp = {}
for ii in ['lh', 'rh']:
# get data
parc_hemi = self.parc_data[ii][0]
# get index of specified ROI in list | hemi doesn't matter here
for ix,i in enumerate(self.parc_data[ii][2]):
if roi.encode() in i:
break
tmp[ii] = (parc_hemi == ix).astype(int)
# Concat Hemis
concat_hemis = np.hstack((tmp['lh'],tmp['rh']))
return concat_hemis