# Copyright (C) 2009-2021, Ecole Polytechnique Federale de Lausanne (EPFL) and
# Hospital Center and University of Lausanne (UNIL-CHUV), Switzerland, and CMP3 contributors
# All rights reserved.
#
# This software is distributed under the open-source license Modified BSD.
"""Functional pipeline Class definition."""
import datetime
import shutil
import nipype.interfaces.io as nio
from nipype import config, logging
from nipype.interfaces.utility import Merge
from cmp.pipelines.common import *
from cmp.stages.connectome.fmri_connectome import ConnectomeStage
from cmp.stages.functional.functionalMRI import FunctionalMRIStage
from cmp.stages.preprocessing.fmri_preprocessing import PreprocessingStage
from cmp.stages.registration.registration import RegistrationStage
[docs]class Global_Configuration(HasTraits):
"""Global pipeline configurations.
Attributes
----------
process_type : 'fMRI'
Processing pipeline type
imaging_model : 'fMRI'
Imaging model used by `RegistrationStage`
"""
process_type = Str('fMRI')
imaging_model = Str
[docs]class fMRIPipeline(Pipeline):
"""Class that extends a :class:`Pipeline` and represents the processing pipeline for structural MRI.
It is composed of:
* the preprocessing stage that can perform slice timing correction, deskiping and motion correction
* the registration stage that co-registered the anatomical T1w scan to the mean BOLD image
and projects the parcellations to the native fMRI space
* the extra-preprocessing stage (FunctionalMRIStage) that can perform nuisance regression
and bandpass filtering
* the connectome stage that extracts the time-series of each parcellation ROI and
computes the Pearson's correlation coefficient between ROI time-series to create
the functional connectome.
See Also
--------
cmp.stages.preprocessing.fmri_preprocessing.PreprocessingStage
cmp.stages.registration.registration.RegistrationStage
cmp.stages.functional.functionalMRI.FunctionalMRIStage
cmp.stages.connectome.fmri_connectome.ConnectomeStage
"""
now = datetime.datetime.now().strftime("%Y%m%d_%H%M")
pipeline_name = Str("fMRI_pipeline")
input_folders = ['anat', 'func']
seg_tool = Str
subject = Str
subject_directory = Directory
derivatives_directory = Directory
ordered_stage_list = ['Preprocessing',
'Registration',
'FunctionalMRI',
'Connectome']
global_conf = Global_Configuration()
config_file = Str
parcellation_scheme = Str
atlas_info = Dict()
subjects_dir = Str
subject_id = Str
def __init__(self, project_info):
"""Constructor of a `fMRIPipeline` object.
Parameters
----------
project_info: cmp.project.CMP_Project_Info
Instance of `CMP_Project_Info` object.
See Also
--------
cmp.project.CMP_Project_Info
"""
self.subjects_dir = project_info.freesurfer_subjects_dir
self.subject_id = project_info.freesurfer_subject_id
self.global_conf.subjects = project_info.subjects
self.global_conf.subject = project_info.subject
if len(project_info.subject_sessions) > 0:
self.global_conf.subject_session = project_info.subject_session
self.subject_directory = os.path.join(project_info.base_directory,
project_info.subject,
project_info.subject_session)
else:
self.global_conf.subject_session = ''
self.subject_directory = os.path.join(project_info.base_directory,
project_info.subject)
self.derivatives_directory = os.path.abspath(project_info.output_directory)
if project_info.output_directory is not None:
self.output_directory = os.path.abspath(project_info.output_directory)
else:
self.output_directory = os.path.join(self.base_directory, "derivatives")
self.stages = {'Preprocessing': PreprocessingStage(bids_dir=project_info.base_directory,
output_dir=self.output_directory),
'Registration': RegistrationStage(pipeline_mode="fMRI",
fs_subjects_dir=project_info.freesurfer_subjects_dir,
fs_subject_id=os.path.basename(project_info.freesurfer_subject_id),
bids_dir=project_info.base_directory,
output_dir=self.output_directory),
'FunctionalMRI': FunctionalMRIStage(bids_dir=project_info.base_directory,
output_dir=self.output_directory),
'Connectome': ConnectomeStage(bids_dir=project_info.base_directory,
output_dir=self.output_directory)}
Pipeline.__init__(self, project_info)
self.subject = project_info.subject
self.stages['FunctionalMRI'].config.on_trait_change(self.update_nuisance_requirements, 'global_nuisance')
self.stages['FunctionalMRI'].config.on_trait_change(self.update_nuisance_requirements, 'csf')
self.stages['FunctionalMRI'].config.on_trait_change(self.update_nuisance_requirements, 'wm')
self.stages['Connectome'].config.on_trait_change(self.update_scrubbing, 'apply_scrubbing')
def _subject_changed(self, new):
""""Update subject in the connectome stage configuration when ``subject`` is updated.
Parameters
----------
new : string
New value.
"""
self.stages['Connectome'].config.subject = new
[docs] def update_registration(self):
"""Configure the list of registration tools."""
if 'Nonlinear (FSL)' in self.stages['Registration'].config.registration_mode_trait:
self.stages['Registration'].config.registration_mode_trait = ['Linear (FSL)', 'BBregister (FS)',
'Nonlinear (FSL)']
else:
self.stages['Registration'].config.registration_mode_trait = [
'Linear (FSL)', 'BBregister (FS)']
[docs] def update_nuisance_requirements(self):
"""Update nuisance requirements.
Configure the registration to apply the estimated transformation to multiple segmentation masks
depending on the Nuisance correction steps performed.
"""
self.stages['Registration'].config.apply_to_eroded_brain = self.stages['FunctionalMRI'].config.global_nuisance
self.stages['Registration'].config.apply_to_eroded_csf = self.stages['FunctionalMRI'].config.csf
self.stages['Registration'].config.apply_to_eroded_wm = self.stages['FunctionalMRI'].config.wm
[docs] def update_scrubbing(self):
"""Update to precompute or inputs for scrubbing during the FunctionalMRI stage."""
self.stages['FunctionalMRI'].config.scrubbing = self.stages['Connectome'].config.apply_scrubbing
[docs] def define_custom_mapping(self, custom_last_stage):
"""Define the pipeline to be executed until a specific stages.
Not used yet by CMP3.
Parameters
----------
custom_last_stage : string
Last stage to execute. Valid values are: "Preprocessing",
"Registration", "FunctionalMRI" and "Connectome".
"""
# start by disabling all stages
for stage in self.ordered_stage_list:
self.stages[stage].enabled = False
# enable until selected one
for stage in self.ordered_stage_list:
print('Enable stage : %s' % stage)
self.stages[stage].enabled = True
if stage == custom_last_stage:
break
[docs] def check_config(self):
"""Check if the fMRI pipeline parameters is properly configured.
Returns
-------
message : string
String that is empty if success, otherwise it contains the error message
"""
if self.stages['FunctionalMRI'].config.motion is True and self.stages[
'Preprocessing'].config.motion_correction is False:
return (
'\n\tMotion signal regression selected but no motion correction set.\t\n\t'
'Please activate motion correction in the preprocessing configuration window,\n\t'
'or disable the motion signal regression in the functional configuration window.\t\n')
if self.stages['Connectome'].config.apply_scrubbing is True and self.stages[
'Preprocessing'].config.motion_correction is False:
return (
'\n\tScrubbing applied but no motion correction set.\t\n\t'
'Please activate motion correction in the preprocessing configutation window,\n\t'
'or disable scrubbing in the connectome configuration window.\t\n')
return ''
[docs] def create_pipeline_flow(self, cmp_deriv_subject_directory, nipype_deriv_subject_directory):
"""Create the pipeline workflow.
Parameters
----------
cmp_deriv_subject_directory : Directory
Main CMP output directory of a subject
e.g. ``/output_dir/cmp/sub-XX/(ses-YY)``
nipype_deriv_subject_directory : Directory
Intermediate Nipype output directory of a subject
e.g. ``/output_dir/nipype/sub-XX/(ses-YY)``
Returns
-------
fMRI_flow : nipype.pipeline.engine.Workflow
An instance of :class:`nipype.pipeline.engine.Workflow`
"""
if self.parcellation_scheme == 'Lausanne2008':
bids_atlas_label = 'L2008'
elif self.parcellation_scheme == 'Lausanne2018':
bids_atlas_label = 'L2018'
elif self.parcellation_scheme == 'NativeFreesurfer':
bids_atlas_label = 'Desikan'
# Data sinker for output
sinker = pe.Node(nio.DataSink(), name="func_sinker")
sinker.inputs.base_directory = os.path.join(cmp_deriv_subject_directory)
if self.parcellation_scheme == 'NativeFreesurfer':
sinker.inputs.substitutions = [
('eroded_brain_registered.nii.gz', self.subject +
'_space-meanBOLD_desc-eroded_label-brain_dseg.nii.gz'),
('eroded_csf_registered.nii.gz', self.subject +
'_space-meanBOLD_desc-eroded_label-CSF_dseg.nii.gz'),
('wm_mask_registered.nii.gz', self.subject +
'_space-meanBOLD_label-WM_dseg.nii.gz'),
('eroded_wm_registered.nii.gz', self.subject +
'_space-meanBOLD_desc-eroded_label-WM_dseg.nii.gz'),
('fMRI_despike_st_mcf.nii.gz_mean_reg.nii.gz',
self.subject + '_meanBOLD.nii.gz'),
('fMRI_despike_st_mcf.nii.gz.par', self.subject + '_motion.par'),
('FD.npy', self.subject + '_desc-scrubbing_FD.npy'),
('DVARS.npy', self.subject + '_desc-scrubbing_DVARS.npy'),
('fMRI_bandpass.nii.gz', self.subject +
'_desc-bandpass_task-rest_bold.nii.gz'),
(self.subject + '_atlas-' + bids_atlas_label + '_dseg_flirt.nii.gz',
self.subject + '_space-meanBOLD_atlas-' + bids_atlas_label + '_dseg.nii.gz'),
('connectome_freesurferaparc', self.subject +
'_atlas-Desikan_conndata-network_connectivity'),
('averageTimeseries_freesurferaparc',
self.subject + '_atlas-Desikan_timeseries'),
]
else:
sinker.inputs.substitutions = [
('eroded_brain_registered.nii.gz', self.subject +
'_space-meanBOLD_desc-eroded_label-brain_dseg.nii.gz'),
('wm_mask_registered.nii.gz', self.subject +
'_space-meanBOLD_label-WM_dseg.nii.gz'),
('eroded_csf_registered.nii.gz', self.subject +
'_space-meanBOLD_desc-eroded_label-CSF_dseg.nii.gz'),
('eroded_wm_registered.nii.gz', self.subject +
'_space-meanBOLD_desc-eroded_label-WM_dseg.nii.gz'),
('fMRI_despike_st_mcf.nii.gz_mean_reg.nii.gz',
self.subject + '_meanBOLD.nii.gz'),
('fMRI_despike_st_mcf.nii.gz.par', self.subject + '_motion.tsv'),
('FD.npy', self.subject + '_desc-scrubbing_FD.npy'),
('DVARS.npy', self.subject + '_desc-scrubbing_DVARS.npy'),
('fMRI_bandpass.nii.gz', self.subject +
'_desc-bandpass_task-rest_bold.nii.gz'),
(self.subject + '_atlas-' + bids_atlas_label + '_dseg_flirt.nii.gz',
self.subject + '_space-meanBOLD_atlas-' + bids_atlas_label + '_dseg.nii.gz'),
(self.subject + '_atlas-' + bids_atlas_label + '_res-scale1_dseg_flirt.nii.gz',
self.subject + '_space-meanBOLD_atlas-' + bids_atlas_label + '_res-scale1_dseg.nii.gz'),
(self.subject + '_atlas-' + bids_atlas_label + '_res-scale2_dseg_flirt.nii.gz',
self.subject + '_space-meanBOLD_atlas-' + bids_atlas_label + '_res-scale2_dseg.nii.gz'),
(self.subject + '_atlas-' + bids_atlas_label + '_res-scale3_dseg_flirt.nii.gz',
self.subject + '_space-meanBOLD_atlas-' + bids_atlas_label + '_res-scale3_dseg.nii.gz'),
(self.subject + '_atlas-' + bids_atlas_label + '_res-scale4_dseg_flirt.nii.gz',
self.subject + '_space-meanBOLD_atlas-' + bids_atlas_label + '_res-scale4_dseg.nii.gz'),
(self.subject + '_atlas-' + bids_atlas_label + '_res-scale5_dseg_flirt.nii.gz',
self.subject + '_space-meanBOLD_atlas-' + bids_atlas_label + '_res-scale5_dseg.nii.gz'),
('connectome_freesurferaparc', self.subject +
'_label-Desikan_conndata-network_connectivity'),
('connectome_scale1',
self.subject + '_atlas-' + bids_atlas_label + '_res-scale1_conndata-network_connectivity'),
('connectome_scale2',
self.subject + '_atlas-' + bids_atlas_label + '_res-scale2_conndata-network_connectivity'),
('connectome_scale3',
self.subject + '_atlas-' + bids_atlas_label + '_res-scale3_conndata-network_connectivity'),
('connectome_scale4',
self.subject + '_atlas-' + bids_atlas_label + '_res-scale4_conndata-network_connectivity'),
('connectome_scale5',
self.subject + '_atlas-' + bids_atlas_label + '_res-scale5_conndata-network_connectivity'),
('averageTimeseries_scale1', self.subject + '_atlas-' +
bids_atlas_label + '_res-scale1_timeseries'),
('averageTimeseries_scale2', self.subject + '_atlas-' +
bids_atlas_label + '_res-scale2_timeseries'),
('averageTimeseries_scale3', self.subject + '_atlas-' +
bids_atlas_label + '_res-scale3_timeseries'),
('averageTimeseries_scale4', self.subject + '_atlas-' +
bids_atlas_label + '_res-scale4_timeseries'),
('averageTimeseries_scale5', self.subject + '_atlas-' +
bids_atlas_label + '_res-scale5_timeseries'),
]
# Data import
datasource = pe.Node(interface=nio.DataGrabber(
outfields=['fMRI', 'T1', 'T2', 'aseg', 'brain', 'brain_mask', 'wm_mask_file', 'wm_eroded', 'brain_eroded',
'csf_eroded', 'roi_volume_s1', 'roi_volume_s2', 'roi_volume_s3', 'roi_volume_s4',
'roi_volume_s5', 'roi_graphml_s1', 'roi_graphml_s2', 'roi_graphml_s3', 'roi_graphml_s4',
'roi_graphml_s5']), name='datasource')
datasource.inputs.base_directory = cmp_deriv_subject_directory
datasource.inputs.template = '*'
datasource.inputs.raise_on_empty = False
# datasource.inputs.field_template = dict(fMRI='fMRI.nii.gz',T1='T1.nii.gz',T2='T2.nii.gz')
if self.parcellation_scheme == 'NativeFreesurfer':
datasource.inputs.field_template = dict(
fMRI='func/' + self.subject + '_task-rest_desc-cmp_bold.nii.gz',
T1='anat/' + self.subject + '_desc-head_T1w.nii.gz',
T2='anat/' + self.subject + '_T2w.nii.gz',
aseg='anat/' + self.subject + '_desc-aseg_dseg.nii.gz',
brain='anat/' + self.subject + '_desc-brain_T1w.nii.gz',
brain_mask='anat/' + self.subject + '_desc-brain_mask.nii.gz',
wm_mask_file='anat/' + self.subject + '_label-WM_dseg.nii.gz',
wm_eroded='anat/' + self.subject + '_label-WM_desc-eroded_dseg.nii.gz',
brain_eroded='anat/' + self.subject + '_label-brain_desc-eroded_dseg.nii.gz',
csf_eroded='anat/' + self.subject + '_label-CSF_desc-eroded_dseg.nii.gz',
roi_volume_s1='anat/' + self.subject + '_atlas-Desikan_dseg.nii.gz',
roi_volume_s2='anat/irrelevant.nii.gz',
roi_volume_s3='anat/irrelevant.nii.gz',
roi_volume_s4='anat/irrelevant.nii.gz',
roi_volume_s5='anat/irrelevant.nii.gz',
roi_graphml_s1='anat/' + self.subject + '_atlas-Desikan_dseg.graphml',
roi_graphml_s2='anat/irrelevant.graphml',
roi_graphml_s3='anat/irrelevant.graphml',
roi_graphml_s4='anat/irrelevant.graphml',
roi_graphml_s5='anat/irrelevant.graphml')
else:
datasource.inputs.field_template = dict(
fMRI='func/' + self.subject + '_task-rest_desc-cmp_bold.nii.gz',
T1='anat/' + self.subject + '_desc-head_T1w.nii.gz',
T2='anat/' + self.subject + '_T2w.nii.gz',
aseg='anat/' + self.subject + '_desc-aseg_dseg.nii.gz',
brain='anat/' + self.subject + '_desc-brain_T1w.nii.gz',
brain_mask='anat/' + self.subject + '_desc-brain_mask.nii.gz',
wm_mask_file='anat/' + self.subject + '_label-WM_dseg.nii.gz',
wm_eroded='anat/' + self.subject + '_label-WM_desc-eroded_dseg.nii.gz',
brain_eroded='anat/' + self.subject + '_label-brain_desc-eroded_dseg.nii.gz',
csf_eroded='anat/' + self.subject + '_label-CSF_desc-eroded_dseg.nii.gz',
roi_volume_s1='anat/' + self.subject + '_atlas-' +
bids_atlas_label + '_res-scale1_dseg.nii.gz',
roi_volume_s2='anat/' + self.subject + '_atlas-' +
bids_atlas_label + '_res-scale2_dseg.nii.gz',
roi_volume_s3='anat/' + self.subject + '_atlas-' +
bids_atlas_label + '_res-scale3_dseg.nii.gz',
roi_volume_s4='anat/' + self.subject + '_atlas-' +
bids_atlas_label + '_res-scale4_dseg.nii.gz',
roi_volume_s5='anat/' + self.subject + '_atlas-' +
bids_atlas_label + '_res-scale5_dseg.nii.gz',
roi_graphml_s1='anat/' + self.subject + '_atlas-' +
bids_atlas_label + '_res-scale1_dseg.graphml',
roi_graphml_s2='anat/' + self.subject + '_atlas-' +
bids_atlas_label + '_res-scale2_dseg.graphml',
roi_graphml_s3='anat/' + self.subject + '_atlas-' +
bids_atlas_label + '_res-scale3_dseg.graphml',
roi_graphml_s4='anat/' + self.subject + '_atlas-' +
bids_atlas_label + '_res-scale4_dseg.graphml',
roi_graphml_s5='anat/' + self.subject + '_atlas-' +
bids_atlas_label + '_res-scale5_dseg.graphml')
datasource.inputs.sort_filelist = False
# Clear previous outputs
self.clear_stages_outputs()
# Create fMRI flow
fMRI_flow = pe.Workflow(name='fMRI_pipeline', base_dir=os.path.abspath(
nipype_deriv_subject_directory))
fMRI_inputnode = pe.Node(interface=util.IdentityInterface(
fields=["fMRI", "T1", "T2", "subjects_dir", "subject_id",
"wm_mask_file", "roi_volumes", "roi_graphMLs",
"wm_eroded", "brain_eroded", "csf_eroded"]),
name="inputnode")
fMRI_inputnode.inputs.parcellation_scheme = self.parcellation_scheme
fMRI_inputnode.inputs.atlas_info = self.atlas_info
fMRI_inputnode.subjects_dir = self.subjects_dir
fMRI_inputnode.subject_id = os.path.basename(self.subject_id)
fMRI_outputnode = pe.Node(interface=util.IdentityInterface(
fields=["connectivity_matrices"]),
name="outputnode")
fMRI_flow.add_nodes([fMRI_inputnode, fMRI_outputnode])
merge_roi_volumes = pe.Node(
interface=Merge(5), name='merge_roi_volumes')
merge_roi_graphmls = pe.Node(
interface=Merge(5), name='merge_roi_graphmls')
def remove_non_existing_scales(roi_volumes):
"""Returns a list which do not contained any empty element.
Parameters
----------
roi_volumes : list
A list of output parcellations that might contain empty element
in the case of the monoscale Desikan scheme for instance
Returns
-------
out_roi_volumes : list
The list with no empty element
"""
out_roi_volumes = []
for vol in roi_volumes:
if vol is not None:
out_roi_volumes.append(vol)
return out_roi_volumes
fMRI_flow.connect([
(datasource, merge_roi_volumes,
[("roi_volume_s1", "in1"), ("roi_volume_s2", "in2"), ("roi_volume_s3", "in3"),
("roi_volume_s4", "in4"), ("roi_volume_s5", "in5")])
])
fMRI_flow.connect([
(datasource, merge_roi_graphmls,
[("roi_graphml_s1", "in1"), ("roi_graphml_s2", "in2"), ("roi_graphml_s3", "in3"),
("roi_graphml_s4", "in4"), ("roi_graphml_s5", "in5")])
])
fMRI_flow.connect([
(datasource, fMRI_inputnode,
[("fMRI", "fMRI"), ("T1", "T1"), ("T2", "T2"), ("aseg", "aseg"),
("wm_mask_file", "wm_mask_file"), ("brain_eroded", "brain_eroded"),
("wm_eroded", "wm_eroded"), ("csf_eroded", "csf_eroded")]),
(merge_roi_volumes, fMRI_inputnode, [(("out", remove_non_existing_scales), "roi_volumes")]),
(merge_roi_graphmls, fMRI_inputnode, [(("out", remove_non_existing_scales), "roi_graphMLs")]),
])
if self.stages['Preprocessing'].enabled:
preproc_flow = self.create_stage_flow("Preprocessing")
fMRI_flow.connect([
(fMRI_inputnode, preproc_flow, [("fMRI", "inputnode.functional")]),
(preproc_flow, sinker, [("outputnode.mean_vol", "func.@mean_vol")]),
])
if self.stages['Registration'].enabled:
reg_flow = self.create_stage_flow("Registration")
fMRI_flow.connect([
(fMRI_inputnode, reg_flow, [('T1', 'inputnode.T1')]),
(fMRI_inputnode, reg_flow, [('T2', 'inputnode.T2')]),
(preproc_flow, reg_flow, [('outputnode.mean_vol', 'inputnode.target')]),
(fMRI_inputnode, reg_flow,
[('wm_mask_file', 'inputnode.wm_mask'),
('roi_volumes', 'inputnode.roi_volumes'),
('brain_eroded', 'inputnode.eroded_brain'),
('wm_eroded', 'inputnode.eroded_wm'),
('csf_eroded', 'inputnode.eroded_csf')]),
(reg_flow, sinker, [('outputnode.wm_mask_registered_crop', 'anat.@registered_wm'),
('outputnode.roi_volumes_registered_crop',
'anat.@registered_roi_volumes'),
('outputnode.eroded_wm_registered_crop',
'anat.@eroded_wm'),
('outputnode.eroded_csf_registered_crop',
'anat.@eroded_csf'),
('outputnode.eroded_brain_registered_crop', 'anat.@eroded_brain')]),
])
if self.stages['FunctionalMRI'].enabled:
func_flow = self.create_stage_flow("FunctionalMRI")
fMRI_flow.connect([
(preproc_flow, func_flow, [
('outputnode.functional_preproc', 'inputnode.preproc_file')]),
(reg_flow, func_flow, [('outputnode.wm_mask_registered_crop',
'inputnode.registered_wm'),
('outputnode.roi_volumes_registered_crop',
'inputnode.registered_roi_volumes'),
('outputnode.eroded_wm_registered_crop',
'inputnode.eroded_wm'),
('outputnode.eroded_csf_registered_crop',
'inputnode.eroded_csf'),
('outputnode.eroded_brain_registered_crop',
'inputnode.eroded_brain')]),
(func_flow, sinker, [('outputnode.func_file', 'func.@func_file'),
("outputnode.FD", "func.@FD"),
("outputnode.DVARS", "func.@DVARS")]),
])
if self.stages['FunctionalMRI'].config.scrubbing or self.stages['FunctionalMRI'].config.motion:
fMRI_flow.connect([
(preproc_flow, func_flow, [("outputnode.par_file", "inputnode.motion_par_file")]),
(preproc_flow, sinker, [("outputnode.par_file", "func.@motion_par_file")])
])
if self.stages['Connectome'].enabled:
self.stages['Connectome'].config.subject = self.global_conf.subject
con_flow = self.create_stage_flow("Connectome")
fMRI_flow.connect([
(fMRI_inputnode, con_flow, [('parcellation_scheme',
'inputnode.parcellation_scheme'),
('roi_graphMLs',
'inputnode.roi_graphMLs')]),
(func_flow, con_flow,
[('outputnode.func_file', 'inputnode.func_file'),
("outputnode.FD", "inputnode.FD"),
("outputnode.DVARS", "inputnode.DVARS")]),
(reg_flow, con_flow, [
("outputnode.roi_volumes_registered_crop", "inputnode.roi_volumes_registered")]),
(con_flow, fMRI_outputnode, [
("outputnode.connectivity_matrices", "connectivity_matrices")]),
(con_flow, sinker, [
("outputnode.connectivity_matrices", "func.@connectivity_matrices")]),
(con_flow, sinker, [
("outputnode.avg_timeseries", "func.@avg_timeseries")])
])
return fMRI_flow
[docs] def process(self):
"""Executes the fMRI pipeline workflow and returns True if successful."""
# Enable the use of the the W3C PROV data model to capture and represent provenance in Nipype
# config.enable_provenance()
# Process time
self.now = datetime.datetime.now().strftime("%Y%m%d_%H%M")
if '_' in self.subject:
self.subject = self.subject.split('_')[0]
if self.global_conf.subject_session == '':
cmp_deriv_subject_directory = os.path.join(
self.output_directory, "cmp", self.subject)
nipype_deriv_subject_directory = os.path.join(
self.output_directory, "nipype", self.subject)
else:
cmp_deriv_subject_directory = os.path.join(self.output_directory,
"cmp",
self.subject,
self.global_conf.subject_session)
nipype_deriv_subject_directory = os.path.join(self.output_directory,
"nipype",
self.subject,
self.global_conf.subject_session)
self.subject = "_".join((self.subject, self.global_conf.subject_session))
if not os.path.exists(os.path.join(nipype_deriv_subject_directory, "fMRI_pipeline")):
try:
os.makedirs(os.path.join(
nipype_deriv_subject_directory, "fMRI_pipeline"))
except os.error:
print("%s was already existing" % os.path.join(
nipype_deriv_subject_directory, "fMRI_pipeline"))
# Initialization
if os.path.isfile(os.path.join(nipype_deriv_subject_directory, "fMRI_pipeline", "pypeline.log")):
os.unlink(os.path.join(nipype_deriv_subject_directory,
"fMRI_pipeline", "pypeline.log"))
config.update_config(
{'logging': {'log_directory': os.path.join(nipype_deriv_subject_directory,
"fMRI_pipeline"),
'log_to_file': True},
'execution': {'remove_unnecessary_outputs': False,
'stop_on_first_crash': True,
'stop_on_first_rerun': False,
'use_relative_paths': True,
'crashfile_format': "txt"}
})
logging.update_logging(config)
iflogger = logging.getLogger('nipype.interface')
iflogger.info("**** Processing ****")
flow = self.create_pipeline_flow(cmp_deriv_subject_directory=cmp_deriv_subject_directory,
nipype_deriv_subject_directory=nipype_deriv_subject_directory)
flow.write_graph(graph2use='colored', format='svg', simple_form=False)
if self.number_of_cores != 1:
flow.run(plugin='MultiProc',
plugin_args={'n_procs': self.number_of_cores})
else:
flow.run()
iflogger.info("**** Processing finished ****")
return True