Source code for cmp.bidsappmanager.pipelines.anatomical.anatomical
# Copyright (C) 2009-2022, Ecole Polytechnique Federale de Lausanne (EPFL) and
# Hospital Center and University of Lausanne (UNIL-CHUV), Switzerland, and CMP3 contributors
# All rights reserved.
#
# This software is distributed under the open-source license Modified BSD.
"""Anatomical pipeline UI Class definition."""
import os
import glob
import shutil
from traits.api import *
from traitsui.api import *
from traitsui.qt4.extra.qt_view import QtView
from pyface.ui.qt4.image_resource import ImageResource
# Own import
from cmtklib.bids.io import __cmp_directory__, __nipype_directory__, __freesurfer_directory__
from cmp.bidsappmanager.stages.segmentation.segmentation import SegmentationStageUI
from cmp.bidsappmanager.stages.parcellation.parcellation import ParcellationStageUI
from cmp.pipelines.anatomical.anatomical import (
AnatomicalPipeline,
)
from cmtklib.util import return_button_style_sheet
[docs]class AnatomicalPipelineUI(AnatomicalPipeline):
"""Class that extends the :class:`~cmp.pipelines.anatomical.anatomical.AnatomicalPipeline` with graphical components.
Attributes
----------
segmentation : traits.ui.Button
Button to open the window for configuration or quality inspection
of the segmentation stage depending on the ``view_mode``
parcellation : traits.ui.Button
Button to open the window for configuration or quality inspection
of the segmentation stage depending on the ``view_mode``
view_mode : ['config_view', 'inspect_outputs_view']
Variable used to control the display of either (1) the configuration
or (2) the quality inspection of stage of the pipeline
pipeline_group : traitsUI panel
Panel defining the layout of the buttons of the stages with corresponding images
traits_view : QtView
QtView that includes the ``pipeline_group`` panel
See also
---------
cmp.pipelines.anatomical.anatomical.AnatomicalPipeline
"""
segmentation = Button()
parcellation = Button()
view_mode = Enum("config_view", ["config_view", "inspect_outputs_view"])
pipeline_group = VGroup(
HGroup(
spring,
UItem(
"segmentation",
style="custom",
width=222,
height=129,
resizable=False,
style_sheet=return_button_style_sheet(
ImageResource("segmentation").absolute_path
),
),
spring,
show_labels=False,
label="",
),
# Item('parcellation',editor=CustomEditor(image=ImageResource('parcellation'))),show_labels=False),
HGroup(
spring,
UItem(
"parcellation",
style="custom",
width=222,
height=129,
resizable=False,
style_sheet=return_button_style_sheet(
ImageResource("parcellation").absolute_path
),
),
spring,
show_labels=False,
label="",
),
spring,
springy=True,
)
traits_view = QtView(Include("pipeline_group"))
def __init__(self, project_info):
"""Constructor of the AnatomicalPipelineUI class.
Parameters
-----------
project_info : cmp.project.ProjectInfo
CMP_Project_Info object that stores general information
such as the BIDS root and output directories (see
:class_`cmp.project.CMP_Project_Info` for more details)
See also
---------
cmp.pipelines.anatomical.AnatomicalPipeline.__init__
"""
AnatomicalPipeline.__init__(self, project_info)
if len(project_info.subject_sessions) > 0:
subject_id = "_".join((self.subject, self.global_conf.subject_session))
subject_session = self.global_conf.subject_session
else:
subject_id = self.subject
subject_session = ""
self.stages = {
"Segmentation": SegmentationStageUI(
subject=self.subject,
session=subject_session,
bids_dir=project_info.base_directory,
output_dir=project_info.output_directory,
),
"Parcellation": ParcellationStageUI(
pipeline_mode="Diffusion",
subject=self.subject,
session=subject_session,
bids_dir=project_info.base_directory,
output_dir=project_info.output_directory,
),
}
for stage in list(self.stages.keys()):
if project_info.subject_session != "":
self.stages[stage].stage_dir = os.path.join(
self.base_directory,
"derivatives",
__nipype_directory__,
self.subject,
project_info.subject_session,
self.pipeline_name,
self.stages[stage].name,
)
else:
self.stages[stage].stage_dir = os.path.join(
self.base_directory,
"derivatives",
__nipype_directory__,
self.subject,
self.pipeline_name,
self.stages[stage].name,
)
self._init_and_add_listeners_to_stage_traits_ui(subject_id=subject_id)
def _init_and_add_listeners_to_stage_traits_ui(self, subject_id):
"""Initialize and add listeners to traits that are shared between the pipeline and the different stages."""
self.stages["Segmentation"].config.freesurfer_subjects_dir = os.path.join(
self.output_directory, __freesurfer_directory__
)
self.stages["Segmentation"].config.freesurfer_subject_id = os.path.join(
self.output_directory, __freesurfer_directory__, subject_id
)
print(
'Freesurfer subjects directory: '
f'{self.stages["Segmentation"].config.freesurfer_subjects_dir}'
)
self.stages["Segmentation"].config.on_trait_change(
self._update_parcellation, "seg_tool"
)
self.stages["Parcellation"].config.on_trait_change(
self._update_segmentation, "parcellation_scheme"
)
def _update_parcellation(self):
"""Update self.stages['Parcellation'].config.parcellation_scheme when ``seg_tool`` is updated."""
if self.stages["Segmentation"].config.seg_tool == "Custom segmentation":
self.stages["Parcellation"].config.parcellation_scheme = "Custom"
self.stages["Parcellation"].config.parcellation_scheme_editor = ["Custom"]
else:
self.stages["Parcellation"].config.parcellation_scheme = "Lausanne2018"
self.stages["Parcellation"].config.parcellation_scheme_editor = [
"NativeFreesurfer", "Lausanne2018", "Custom"
]
def _update_segmentation(self):
"""Update self.stages['Segmentation'].config.seg_tool when ``parcellation_scheme`` is updated."""
if self.stages["Parcellation"].config.parcellation_scheme == "Custom":
self.stages["Segmentation"].config.seg_tool = "Custom segmentation"
else:
self.stages["Segmentation"].config.seg_tool = "Freesurfer"
def _segmentation_fired(self, info):
"""Method that displays the window for the segmentation stage.
The window changed accordingly to the value of ``view_mode`` to be
in configuration or quality inspection mode.
Parameters
-----------
info : traits.ui.Button
The segmentation button object
"""
self.stages["Segmentation"].configure_traits(view=self.view_mode)
def _parcellation_fired(self, info):
"""Method that displays the window for the parcellation stage.
The window changed accordingly to the value of ``view_mode`` to be
in configuration or quality inspection mode.
Parameters
-----------
info : traits.ui.Button
The parcellation button object
"""
self.stages["Parcellation"].configure_traits(view=self.view_mode)
[docs] def check_input(self, layout):
"""Method that checks if inputs of the anatomical pipeline are available in the datasets.
Parameters
-----------
layout : bids.BIDSLayout
BIDSLayout object used to query
Returns
-------
valid_inputs : bool
True in all inputs of the anatomical pipeline are available
"""
print("**** Check Inputs ****")
t1_available = False
valid_inputs = False
types = layout.get_modalities()
subjid = self.subject.split("-")[1]
if self.global_conf.subject_session != "":
sessid = self.global_conf.subject_session.split("-")[1]
files = layout.get(
subject=subjid,
session=None if (self.global_conf.subject_session == "") else sessid,
suffix="T1w",
extension="nii.gz",
)
if len(files) > 0:
T1_file = files[0].filename
print(T1_file)
else:
error(
message="T1w image not found for subject %s, session %s."
% (subjid, self.global_conf.subject_session),
title="Error",
buttons=["OK", "Cancel"],
parent=None,
)
return False
for typ in types:
if typ == "T1w" and os.path.isfile(T1_file):
print("T1_file found: %s" % T1_file)
t1_available = True
if t1_available:
# Copy diffusion data to derivatives / cmp / subject / dwi
if self.global_conf.subject_session == "":
out_T1_file = os.path.join(
self.derivatives_directory, __cmp_directory__,
self.subject, "anat",
self.subject + "_T1w.nii.gz",
)
else:
out_T1_file = os.path.join(
self.derivatives_directory, __cmp_directory__,
self.subject, self.global_conf.subject_session, "anat",
f'{self.subject}_{self.global_conf.subject_session}_T1w.nii.gz',
)
if not os.path.isfile(out_T1_file):
shutil.copy(src=T1_file, dst=out_T1_file)
msg = "Inputs check finished successfully. \nAnatomical data (T1w) available."
print(f'\tINFO: {msg}')
valid_inputs = True
else:
msg = (
" * No anatomical data (T1w) available in folder "
+ os.path.join(self.base_directory, self.subject, 'anat')
+ "!\n"
)
print(f'\tERROR: Missing required inputs: {msg}')
error(
message=f"Missing required inputs:\n{msg} Please see documentation for more details.",
title="Error",
buttons=["OK", "Cancel"],
parent=None,
)
if self.stages["Parcellation"].config.parcellation_scheme == "Custom":
msg = ""
custom_parc_nii_available = True
custom_parc_tsv_available = True
custom_brainmask_available = True
custom_gm_mask_available = True
custom_wm_mask_available = True
custom_csf_mask_available = True
custom_aparcaseg_available = True
# Add custom BIDS derivatives directories to the BIDSLayout
custom_derivatives_dirnames = [
self.stages["Parcellation"].config.custom_parcellation.get_toolbox_derivatives_dir(),
self.stages["Segmentation"].config.custom_brainmask.get_toolbox_derivatives_dir(),
self.stages["Segmentation"].config.custom_gm_mask.get_toolbox_derivatives_dir(),
self.stages["Segmentation"].config.custom_wm_mask.get_toolbox_derivatives_dir(),
self.stages["Segmentation"].config.custom_csf_mask.get_toolbox_derivatives_dir(),
self.stages["Segmentation"].config.custom_aparcaseg.get_toolbox_derivatives_dir()
]
# Keep only unique custom derivatives to make the BIDSLayout happy
custom_derivatives_dirnames = list(set(custom_derivatives_dirnames))
for custom_derivatives_dirname in custom_derivatives_dirnames:
if custom_derivatives_dirname not in layout.derivatives:
print(f" * Add custom_derivatives_dirname: {custom_derivatives_dirname}")
layout.add_derivatives(os.path.join(self.base_directory, 'derivatives', custom_derivatives_dirname))
files = layout.get(
subject=subjid,
session=(None
if self.global_conf.subject_session == ""
else self.global_conf.subject_session.split("-")[1]),
suffix=self.stages["Parcellation"].config.custom_parcellation.suffix,
atlas=self.stages["Parcellation"].config.custom_parcellation.atlas,
resolution=self.stages["Parcellation"].config.custom_parcellation.resolution,
extension="nii.gz",
)
if len(files) > 0:
custom_parc_file = os.path.join(files[0].dirname, files[0].filename)
else:
custom_parc_file = "NotFound"
custom_parc_nii_available = False
print("... custom_parc_file : %s" % custom_parc_file)
files = layout.get(
subject=subjid,
session=(None
if self.global_conf.subject_session == ""
else self.global_conf.subject_session.split("-")[1]),
suffix=self.stages["Parcellation"].config.custom_parcellation.suffix,
extension="tsv",
atlas=self.stages["Parcellation"].config.custom_parcellation.atlas,
resolution=self.stages["Parcellation"].config.custom_parcellation.resolution,
)
if len(files) > 0:
custom_parc_tsv_file = os.path.join(files[0].dirname, files[0].filename)
else:
custom_parc_tsv_file = "NotFound"
msg += f' * Custom parcellation ({self.stages["Parcellation"].config.custom_parcellation}) not found\n'
custom_parc_tsv_available = False
print("... custom_parc_tsv_file : %s" % custom_parc_tsv_file)
if not custom_parc_nii_available and not custom_parc_tsv_available:
valid_inputs = False
files = layout.get(
subject=subjid,
session=(None
if self.global_conf.subject_session == ""
else self.global_conf.subject_session.split("-")[1]),
suffix=self.stages["Segmentation"].config.custom_brainmask.suffix,
extension="nii.gz",
desc=self.stages["Segmentation"].config.custom_brainmask.desc,
)
if len(files) > 0:
custom_brainmask_file = os.path.join(files[0].dirname, files[0].filename)
else:
custom_brainmask_file = "NotFound"
msg += f' * Custom brain mask ({self.stages["Segmentation"].config.custom_brainmask}) not found\n'
custom_brainmask_available = False
print("... custom_brainmask_file : %s" % custom_brainmask_file)
if not custom_brainmask_available:
valid_inputs = False
files = layout.get(
subject=subjid,
session=(None
if self.global_conf.subject_session == ""
else self.global_conf.subject_session.split("-")[1]),
suffix=self.stages["Segmentation"].config.custom_gm_mask.suffix,
extension="nii.gz",
label=self.stages["Segmentation"].config.custom_gm_mask.label,
)
if len(files) > 0:
custom_gm_mask_file = os.path.join(files[0].dirname, files[0].filename)
else:
custom_gm_mask_file = "NotFound"
msg += f' * Custom gray matter mask ({self.stages["Segmentation"].config.custom_gm_mask}) not found\n'
custom_gm_mask_available = False
print("... custom_gm_mask_file : %s" % custom_gm_mask_file)
if not custom_gm_mask_available:
valid_inputs = False
files = layout.get(
subject=subjid,
session=(None
if self.global_conf.subject_session == ""
else self.global_conf.subject_session.split("-")[1]),
suffix=self.stages["Segmentation"].config.custom_wm_mask.suffix,
extension="nii.gz",
label=self.stages["Segmentation"].config.custom_wm_mask.label,
)
if len(files) > 0:
custom_wm_mask_file = os.path.join(files[0].dirname, files[0].filename)
else:
custom_wm_mask_file = "NotFound"
msg += f' * Custom white matter mask ({self.stages["Segmentation"].config.custom_wm_mask}) not found\n'
custom_wm_mask_available = False
print("... custom_wm_mask_file : %s" % custom_wm_mask_file)
if not custom_wm_mask_available:
valid_inputs = False
files = layout.get(
subject=subjid,
session=(None
if self.global_conf.subject_session == ""
else self.global_conf.subject_session.split("-")[1]),
suffix=self.stages["Segmentation"].config.custom_csf_mask.suffix,
extension="nii.gz",
label=self.stages["Segmentation"].config.custom_csf_mask.label,
)
if len(files) > 0:
custom_csf_mask_file = os.path.join(files[0].dirname, files[0].filename)
else:
custom_csf_mask_file = "NotFound"
msg += f' * Custom CSF mask ({self.stages["Segmentation"].config.custom_csf_mask}) not found\n'
custom_csf_mask_available = False
print("... custom_csf_mask_file : %s" % custom_csf_mask_file)
if not custom_csf_mask_available:
valid_inputs = False
files = layout.get(
subject=subjid,
session=(None
if self.global_conf.subject_session == ""
else self.global_conf.subject_session.split("-")[1]),
suffix=self.stages["Segmentation"].config.custom_aparcaseg.suffix,
extension="nii.gz",
desc=self.stages["Segmentation"].config.custom_aparcaseg.desc,
)
if len(files) > 0:
custom_aparcaseg_file = os.path.join(files[0].dirname, files[0].filename)
else:
custom_aparcaseg_file = "NotFound"
msg += f' * Custom Freesurfer\'s aparc+aseg ({self.stages["Segmentation"].config.custom_gm_mask}) not found\n'
custom_aparcaseg_available = False
print("... custom_aparcaseg_file : %s" % custom_aparcaseg_file)
if not custom_aparcaseg_available:
valid_inputs = False
if not valid_inputs:
error(
message=f"Missing required custom inputs:\n{msg} Please see documentation for more details.",
title="Error",
buttons=["OK", "Cancel"],
parent=None,
)
return valid_inputs
[docs] def check_output(self):
"""Method that checks if outputs of the anatomical pipeline are available.
Returns
--------
valid_output : bool
True is all outputs are found
error_message : string
Message in case there is an error
"""
t1_available = False
brain_available = False
brainmask_available = False
wm_available = False
roivs_available = False
valid_output = False
subject = self.subject
if self.global_conf.subject_session == "":
anat_deriv_subject_directory = os.path.join(
self.base_directory, "derivatives", __cmp_directory__, self.subject, "anat"
)
else:
if self.global_conf.subject_session not in subject:
anat_deriv_subject_directory = os.path.join(
self.base_directory, "derivatives", __cmp_directory__,
subject, self.global_conf.subject_session, "anat",
)
subject = "_".join((subject, self.global_conf.subject_session))
else:
anat_deriv_subject_directory = os.path.join(
self.base_directory, "derivatives", __cmp_directory__,
subject.split("_")[0], self.global_conf.subject_session, "anat",
)
T1_file = os.path.join(
anat_deriv_subject_directory, subject + "_T1w_head.nii.gz"
)
brain_file = os.path.join(
anat_deriv_subject_directory, subject + "_T1w_brain.nii.gz"
)
brainmask_file = os.path.join(
anat_deriv_subject_directory, subject + "_T1w_brainmask.nii.gz"
)
wm_mask_file = os.path.join(
anat_deriv_subject_directory, subject + "_T1w_class-WM.nii.gz"
)
roiv_files = glob.glob(
anat_deriv_subject_directory + "/" + subject + "_T1w_parc_scale*.nii.gz"
)
error_message = ""
if os.path.isfile(T1_file):
t1_available = True
else:
error_message = (
"Missing anatomical output file %s . Please re-run the anatomical pipeline"
% T1_file
)
print(error_message)
error(
message=error_message,
title="Error",
buttons=["OK", "Cancel"],
parent=None,
)
if os.path.isfile(brain_file):
brain_available = True
else:
error_message = (
"Missing anatomical output file %s . Please re-run the anatomical pipeline"
% brain_file
)
print(error_message)
error(
message=error_message,
title="Error",
buttons=["OK", "Cancel"],
parent=None,
)
if os.path.isfile(brainmask_file):
brainmask_available = True
else:
error_message = (
"Missing anatomical output file %s . Please re-run the anatomical pipeline"
% brainmask_file
)
print(error_message)
error(
message=error_message,
title="Error",
buttons=["OK", "Cancel"],
parent=None,
)
if os.path.isfile(wm_mask_file):
wm_available = True
else:
error_message = (
"Missing anatomical output file %s . Please re-run the anatomical pipeline"
% wm_mask_file
)
print(error_message)
error(
message=error_message,
title="Error",
buttons=["OK", "Cancel"],
parent=None,
)
cnt1 = 0
cnt2 = 0
for roiv_file in roiv_files:
cnt1 = cnt1 + 1
if os.path.isfile(roiv_file):
cnt2 = cnt2 + 1
if cnt1 == cnt2:
roivs_available = True
else:
error_message = (
"Missing %g/%g anatomical parcellation output files. Please re-run the anatomical pipeline"
% (cnt1 - cnt2, cnt1)
)
print(error_message)
error(
message=error_message,
title="Error",
buttons=["OK", "Cancel"],
parent=None,
)
if (
t1_available is True
and brain_available is True
and brainmask_available is True
and wm_available is True
and roivs_available is True
):
print("valid deriv/anat output")
valid_output = True
return valid_output, error_message