# Copyright (C) 2009-2022, Ecole Polytechnique Federale de Lausanne (EPFL) and
# Hospital Center and University of Lausanne (UNIL-CHUV), Switzerland, and CMP3 contributors
# All rights reserved.
#
# This software is distributed under the open-source license Modified BSD.
"""EEG pipeline Class definition."""
import datetime
import nipype.interfaces.io as nio
from nipype import config, logging
import cmp.pipelines.common as cmp_common
from cmp.info import __version__
from cmp.pipelines.common import *
from cmp.stages.eeg.inverse_solution import EEGInverseSolutionStage
from cmp.stages.eeg.loader import EEGLoaderStage
from cmp.stages.eeg.preparer import EEGPreparerStage
from cmtklib.bids.io import (
__cmp_directory__,
__nipype_directory__,
__cartool_directory__,
__eeglab_directory__
)
[docs]class GlobalConfig(HasTraits):
"""Global EEG pipeline configurations.
Attributes
----------
process_type : 'EEG'
Processing pipeline type
subjects : traits.List
List of subjects ID (in the form ``sub-XX``)
subject : traits.Str
Subject to be processed (in the form ``sub-XX``)
subject_session : traits.Str
Subject session to be processed (in the form ``ses-YY``)
"""
process_type = Str("EEG")
subjects = List(trait=Str)
subject = Str
subject_session = Str
[docs]class EEGPipeline(Pipeline):
"""Class that extends a :class:`Pipeline` and represents the processing pipeline for EEG.
It is composed of:
* the EEG preparer stage that ...
* the EEG loader stage that ...
* the EEG inverse solution stage that ...
See Also
--------
cmp.stages.eeg.preparer.EEGPreparerStage
cmp.stages.eeg.loader.EEGLoaderStage
cmp.stages.eeg.inverse_solution.EEGInverseSolutionStage
"""
now = datetime.datetime.now().strftime("%Y%m%d_%H%M")
pipeline_name = Str("EEG_pipeline")
input_folders = ["anat", "eeg"]
subject = Str
subject_directory = Directory
derivatives_directory = Directory
ordered_stage_list = ["EEGPreparer", "EEGLoader", "InverseSolution"]
global_conf = GlobalConfig()
config_file = Str
parcellation_scheme = Str
atlas_info = Dict()
eeg_format = Str
subjects_dir = Str
flow = Instance(pe.Workflow)
def __init__(self, project_info):
"""Constructor of a `EEGPipeline` object.
Parameters
----------
project_info: cmp.project.ProjectInfo
Instance of `CMP_Project_Info` object.
See Also
--------
cmp.project.CMP_Project_Info
"""
self.subject = project_info.subject
self.global_conf.subjects = project_info.subjects
self.global_conf.subject = project_info.subject
if len(project_info.subject_sessions) > 0:
self.global_conf.subject_session = project_info.subject_session
self.subject_directory = os.path.join(
project_info.base_directory, project_info.subject, project_info.subject_session
)
else:
self.global_conf.subject_session = ""
self.subject_directory = os.path.join(project_info.base_directory, project_info.subject)
self.derivatives_directory = os.path.abspath(project_info.output_directory)
if project_info.output_directory is not None:
self.output_directory = os.path.abspath(project_info.output_directory)
else:
self.output_directory = os.path.join(self.base_directory, "derivatives")
self.stages = {
"EEGPreparer": EEGPreparerStage(bids_dir=project_info.base_directory, output_dir=self.output_directory),
"EEGLoader": EEGLoaderStage(bids_dir=project_info.base_directory, output_dir=self.output_directory),
"EEGInverseSolution": EEGInverseSolutionStage(
bids_dir=project_info.base_directory, output_dir=self.output_directory
),
}
cmp_common.Pipeline.__init__(self, project_info)
cmp3_dir = os.path.join(self.derivatives_directory, f"cmp-{__version__}")
self.stages["EEGPreparer"].config.cmp3_dir = cmp3_dir
# Removing the following three lines because these parameters are all set to default
# values at this point (the config file hasn't been read yet), so setting them with
# each other is pointless
self.stages["EEGPreparer"].config.cartool_dir = os.path.join(self.derivatives_directory, __cartool_directory__)
# self.stages['EEGLoader'].config.eeg_format = self.stages['EEGPreparer'].config.eeg_format
# self.stages['EEGLoader'].config.invsol_format = self.stages['EEGPreparer'].config.invsol_format
# TODO: Re-integrate the lines below for integration in the GUI
# self.stages['EEGPreparer'].config.on_trait_change(self.update_eeg_format, 'eeg_format')
# self.stages['EEGLoader'].config.on_trait_change(self.update_eeg_format, 'eeg_format')
# self.stages['EEGInverseSolution'].config.on_trait_change(self.update_parcellation_scheme, 'parcellation_scheme')
[docs] def check_config(self):
# TODO: To Be Implemented if Necessary
raise NotImplementedError
# TODO: To Be Implemented if Necessary
# def check_output(self):
# raise NotImplementedError
[docs] def create_datagrabber_node(self, **kwargs):
"""Create the appropriate Nipype DataGrabber node depending on the `parcellation_scheme`
Parameters
----------
base_directory : Directory
Main CMP output directory of a subject
e.g. ``/output_dir/cmp/sub-XX/(ses-YY)``
bids_atlas_label : string
Parcellation atlas label
Returns
-------
datasource : Output Nipype DataGrabber Node
Output Nipype Node with :obj:`~nipype.interfaces.io.DataGrabber` interface
"""
# TODO: To Be Implemented
print(kwargs)
datasource = None
return datasource
[docs] def init_subject_derivatives_dirs(self):
"""Return the paths to Nipype and CMP derivatives folders of a given subject / session.
Notes
-----
`self.subject` is updated to "sub-<participant_label>_ses-<session_label>"
when subject has multiple sessions.
"""
if "_" in self.subject:
self.subject = self.subject.split("_")[0]
if self.global_conf.subject_session == "":
cmp_deriv_subject_directory = os.path.join(
self.output_directory, __cmp_directory__, self.subject
)
nipype_deriv_subject_directory = os.path.join(
self.output_directory, __nipype_directory__, self.subject
)
else:
cmp_deriv_subject_directory = os.path.join(
self.output_directory,
__cmp_directory__,
self.subject,
self.global_conf.subject_session,
)
nipype_deriv_subject_directory = os.path.join(
self.output_directory,
__nipype_directory__,
self.subject,
self.global_conf.subject_session,
)
self.subject = "_".join((self.subject, self.global_conf.subject_session))
nipype_eeg_pipeline_subject_dir = os.path.join(nipype_deriv_subject_directory, "eeg_pipeline")
if not os.path.exists(nipype_eeg_pipeline_subject_dir):
try:
os.makedirs(nipype_eeg_pipeline_subject_dir)
except os.error:
print(f"{nipype_eeg_pipeline_subject_dir} was already existing")
return cmp_deriv_subject_directory, nipype_deriv_subject_directory, nipype_eeg_pipeline_subject_dir
[docs] def create_pipeline_flow(self, cmp_deriv_subject_directory, nipype_deriv_subject_directory):
"""Create the workflow of the EEG pipeline.
Parameters
----------
cmp_deriv_subject_directory : Directory
Main CMP output directory of a subject
e.g. ``/output_dir/cmp/sub-XX/(ses-YY)``
nipype_deriv_subject_directory : Directory
Intermediate Nipype output directory of a subject
e.g. ``/output_dir/nipype/sub-XX/(ses-YY)``
Returns
-------
eeg_flow : nipype.pipeline.engine.Workflow
An instance of :class:`nipype.pipeline.engine.Workflow`
"""
datasource = pe.Node(
interface=util.IdentityInterface(
fields=[
"base_directory",
"subject",
"cmp_deriv_subject_directory",
"nipype_deriv_subject_directory",
],
mandatory_inputs=True,
),
name="datasource",
)
datasource.inputs.base_directory = self.base_directory
datasource.inputs.subject = self.subject
datasource.inputs.cmp_deriv_subject_directory = cmp_deriv_subject_directory
datasource.inputs.nipype_deriv_subject_directory = nipype_deriv_subject_directory
# Create common_flow
eeg_flow = pe.Workflow(name="eeg_pipeline", base_dir=os.path.abspath(nipype_deriv_subject_directory))
# Set a couple of config parameters that were read from the config file
self.stages["EEGLoader"].config.eeg_format = self.stages["EEGPreparer"].config.eeg_format
self.stages["EEGLoader"].config.invsol_format = self.stages["EEGPreparer"].config.invsol_format
self.stages["EEGInverseSolution"].config.invsol_format = self.stages["EEGPreparer"].config.invsol_format
# Create stages (parameters specified in config file are read and set)
preparer_flow = self.create_stage_flow("EEGPreparer")
loader_flow = self.create_stage_flow("EEGLoader")
invsol_flow = self.create_stage_flow("EEGInverseSolution")
# Read name of eeg file and determine format and derivatives folder name
epochs_fname = self.stages["EEGPreparer"].config.epochs
file_extension_start = epochs_fname.find(".")
eeg_format = epochs_fname[file_extension_start:]
if eeg_format == ".set":
derivatives_folder = __eeglab_directory__
elif eeg_format == ".fif":
derivatives_folder = "mne"
else:
pass # Throw not implemented exception (Dangerous! What happens is derivatives folder is not set!)
# Define diverse EEG derivatives folder (TODO: Handle sessions)
cmp_path_prefix_file = os.path.join(
self.base_directory, 'derivatives', f'cmp-{__version__}', self.subject, 'eeg'
)
eeglab_path_prefix_file = os.path.join(
self.base_directory, 'derivatives', __eeglab_directory__, self.subject, 'eeg'
)
derivatives_path_prefix_file = os.path.join(
os.path.join(self.base_directory, "derivatives", derivatives_folder, self.subject, "eeg")
)
self.stages["EEGPreparer"].config.eeg_format = eeg_format
datasource.inputs.epochs = [
os.path.join(derivatives_path_prefix_file, epochs_fname)
]
datasource.inputs.EEG_params = self.stages["EEGPreparer"].config.EEG_params
# Read name of parcellation and determine file name
parcellation_atlas = self.stages["EEGPreparer"].config.parcellation["atlas"]
parcellation_resolution = self.stages["EEGPreparer"].config.parcellation["res"]
print('Parcellation atlas: ', parcellation_atlas)
print('Parcellation resolution: ', parcellation_resolution)
if parcellation_atlas == 'lausanne2018':
parcellation_atlas = 'L2018'
if self.stages["EEGPreparer"].config.eeg_format == ".set":
# File with events/behavioral info
expe_name = datasource.inputs.EEG_params["expe_name"]
datasource.inputs.behav_file = [
os.path.join(
self.base_directory,
self.subject,
"eeg",
self.subject + "_task-" + expe_name + "_events.tsv",
)
]
# Name of output file (EEG data source level)
datasource.inputs.epochs_fif_fname = os.path.join(
cmp_path_prefix_file, f"{self.subject}_epo.fif"
)
# Name of output file (ROI time courses)
if parcellation_resolution is not None and parcellation_resolution != "":
datasource.inputs.roi_ts_file = os.path.join(
cmp_path_prefix_file,
f"{self.subject}_atlas-{parcellation_atlas}_res-{parcellation_resolution}_desc-epo_timeseries.npy"
)
else:
datasource.inputs.roi_ts_file = os.path.join(
cmp_path_prefix_file,
f"{self.subject}_atlas-{parcellation_atlas}_desc-epo_timeseries.npy"
)
datasource.inputs.output_query = dict()
# Data sinker for output
sinker = pe.Node(nio.DataSink(), name="eeg_datasinker")
sinker.inputs.base_directory = os.path.abspath(cmp_deriv_subject_directory)
# Clear previous outputs
self.clear_stages_outputs()
if "Cartool" in self.stages['EEGPreparer'].config.invsol_format:
# atlas image is required
parcellation = f'{self.subject}_atlas-{parcellation_atlas}_res-{parcellation_resolution}_dseg.nii.gz'
datasource.inputs.parcellation = os.path.join(
self.base_directory, 'derivatives', f'cmp-{__version__}',
self.subject, 'anat', parcellation
)
# fmt: off
eeg_flow.connect(
[
(datasource, preparer_flow, [('epochs', 'inputnode.epochs'),
('subject', 'inputnode.subject'),
('behav_file', 'inputnode.behav_file'),
('parcellation', 'inputnode.parcellation'),
('epochs_fif_fname', 'inputnode.epochs_fif_fname'),
('EEG_params','inputnode.EEG_params'),
('output_query', 'inputnode.output_query'),
('base_directory','inputnode.bids_dir')]),
(datasource, loader_flow, [('base_directory', 'inputnode.base_directory'),
('subject', 'inputnode.subject')]),
(preparer_flow, loader_flow, [('outputnode.output_query', 'inputnode.output_query'),
('outputnode.derivative_list', 'inputnode.derivative_list')]),
(loader_flow, invsol_flow, [('outputnode.EEG', 'inputnode.eeg_ts_file'),
('outputnode.rois', 'inputnode.rois_file'),
('outputnode.src', 'inputnode.src_file'),
('outputnode.invsol', 'inputnode.invsol_file')]),
(datasource, invsol_flow, [('roi_ts_file', 'inputnode.roi_ts_file')]),
(preparer_flow, invsol_flow, [('outputnode.invsol_params', 'inputnode.invsol_params')]),
(invsol_flow, sinker, [("outputnode.roi_ts_file", "eeg.@roi_ts_file")]),
]
)
# fmt: on
elif 'mne' in self.stages['EEGPreparer'].config.invsol_format:
# Freesurfer annot files are required
if parcellation_resolution == '':
parcellation = parcellation_atlas # aparc
else:
parcellation = 'lausanne2018.' + parcellation_resolution
datasource.inputs.parcellation = parcellation
# Define names for MNE outputs
datasource.inputs.noise_cov_fname = os.path.join(
cmp_path_prefix_file, f'{self.subject}_noisecov.fif'
)
datasource.inputs.trans_fname = os.path.join(
cmp_path_prefix_file, f'{self.subject}_trans.fif')
datasource.inputs.fwd_fname = os.path.join(
cmp_path_prefix_file, f'{self.subject}_fwd.fif'
)
datasource.inputs.inv_fname = os.path.join(
cmp_path_prefix_file, f'{self.subject}_inv.fif'
)
# These two files come from cartool, which is non-standard, needs to be fixed!!
datasource.inputs.electrode_positions_file = os.path.join(
eeglab_path_prefix_file, f'{self.subject}_eeg.xyz'
)
# fmt: off
eeg_flow.connect(
[
(datasource, preparer_flow, [('epochs', 'inputnode.epochs'),
('subject', 'inputnode.subject'),
('behav_file', 'inputnode.behav_file'),
('parcellation', 'inputnode.parcellation'),
('epochs_fif_fname', 'inputnode.epochs_fif_fname'),
('electrode_positions_file','inputnode.electrode_positions_file'),
('EEG_params','inputnode.EEG_params'),
('output_query', 'inputnode.output_query'),
('base_directory','inputnode.bids_dir')]),
(datasource, loader_flow, [('base_directory', 'inputnode.base_directory'),
('subject', 'inputnode.subject')]),
(preparer_flow, loader_flow, [('outputnode.output_query', 'inputnode.output_query'),
('outputnode.derivative_list', 'inputnode.derivative_list')]),
(datasource, invsol_flow, [('subject', 'inputnode.subject'),
('base_directory','inputnode.bids_dir'),
('noise_cov_fname','inputnode.noise_cov_fname'),
('trans_fname','inputnode.trans_fname'),
('fwd_fname','inputnode.fwd_fname'),
('inv_fname','inputnode.inv_fname'),
('parcellation', 'inputnode.parcellation'),
('roi_ts_file', 'inputnode.roi_ts_file')]),
(preparer_flow, invsol_flow, [('outputnode.epochs_fif_fname','inputnode.epochs_fif_fname')]),
(loader_flow, invsol_flow, [('outputnode.src', 'inputnode.src_file'),
('outputnode.bem', 'inputnode.bem_file')]),
(invsol_flow, sinker, [("outputnode.roi_ts_file", "eeg.@roi_ts_file")]),
]
)
# fmt: on
self.flow = eeg_flow
return eeg_flow
[docs] def process(self):
"""Executes the anatomical pipeline workflow and returns True if successful."""
# Enable the use of the W3C PROV data model to capture and represent provenance in Nipype
# config.enable_provenance()
# Process time
self.now = datetime.datetime.now().strftime("%Y%m%d_%H%M")
cmp_deriv_subject_directory, nipype_deriv_subject_directory, nipype_eeg_pipeline_subject_dir = \
self.init_subject_derivatives_dirs()
# Initialization
log_file = os.path.join(nipype_eeg_pipeline_subject_dir, "pypeline.log")
if os.path.isfile(log_file):
os.unlink(log_file)
config.update_config(
{
"logging": {
"log_directory": os.path.join(nipype_deriv_subject_directory, "eeg_pipeline"),
"log_to_file": True,
},
"execution": {
"remove_unnecessary_outputs": False,
"stop_on_first_crash": True,
"stop_on_first_rerun": False,
"use_relative_paths": True,
"crashfile_format": "txt",
},
}
)
logging.update_logging(config)
iflogger = logging.getLogger("nipype.interface")
iflogger.info("**** Processing ****")
eeg_flow = self.create_pipeline_flow(
cmp_deriv_subject_directory=cmp_deriv_subject_directory,
nipype_deriv_subject_directory=nipype_deriv_subject_directory,
)
eeg_flow.write_graph(graph2use="colored", format="svg", simple_form=True)
# Create dictionary of arguments passed to plugin_args
plugin_args = {
'maxtasksperchild': 1,
'n_procs': self.number_of_cores,
'raise_insufficient': False,
}
eeg_flow.run(plugin="MultiProc", plugin_args=plugin_args)
iflogger.info("**** Processing finished ****")
return True