Source code for cmp.pipelines.common

# Copyright (C) 2009-2022, Ecole Polytechnique Federale de Lausanne (EPFL) and
# Hospital Center and University of Lausanne (UNIL-CHUV), Switzerland, and CMP3 contributors
# All rights reserved.
#
#  This software is distributed under the open-source license Modified BSD.

"""Definition of common parent classes for pipelines."""

import os
import threading
import time

from traits.api import *

import nipype.pipeline.engine as pe
import nipype.interfaces.utility as util
from nipype.interfaces.base import File, Directory

from cmtklib.bids.io import __nipype_directory__


[docs]class Pipeline(HasTraits): """Parent class that extends `HasTraits` and represents a processing pipeline. It is extended by the various pipeline classes. See Also -------- cmp.pipelines.anatomical.anatomical.AnatomicalPipeline cmp.pipelines.diffusion.diffusion.DiffusionPipeline cmp.pipelines.functional.fMRI.fMRIPipeline """ # informations common to project_info base_directory = Directory output_directory = Directory root = Property subject = "sub-01" last_date_processed = Str last_stage_processed = Str # num core settings number_of_cores = 1 anat_flow = None # -- Property Implementations --------------------------------------------- @property_depends_on("base_directory") def _get_root(self): return File(path=self.base_directory) def __init__(self, project_info): self.base_directory = project_info.base_directory self.number_of_cores = project_info.number_of_cores for stage in list(self.stages.keys()): if project_info.subject_session != "": self.stages[stage].stage_dir = os.path.join( self.base_directory, "derivatives", __nipype_directory__, self.subject, project_info.subject_session, self.pipeline_name, self.stages[stage].name, ) else: self.stages[stage].stage_dir = os.path.join( self.base_directory, "derivatives", __nipype_directory__, self.subject, self.pipeline_name, self.stages[stage].name, )
[docs] def create_stage_flow(self, stage_name): """Create the sub-workflow of a processing stage. Parameters ---------- stage_name : str Stage name Returns ------- flow : nipype.pipeline.engine.Workflow Created stage sub-workflow """ stage = self.stages[stage_name] flow = pe.Workflow(name=stage.name) inputnode = pe.Node( interface=util.IdentityInterface(fields=stage.inputs), name="inputnode" ) outputnode = pe.Node( interface=util.IdentityInterface(fields=stage.outputs), name="outputnode" ) flow.add_nodes([inputnode, outputnode]) stage.create_workflow(flow, inputnode, outputnode) return flow
[docs] def fill_stages_outputs(self): """Update processing stage output list for visual inspection.""" for stage in list(self.stages.values()): if stage.enabled: stage.define_inspect_outputs()
[docs] def clear_stages_outputs(self): """Clear processing stage outputs.""" for stage in list(self.stages.values()): if stage.enabled: stage.inspect_outputs_dict = {} stage.inspect_outputs = ["Outputs not available"]
# Remove result_*.pklz files to clear them from visualisation drop down list # stage_results = [os.path.join(dirpath, f) # for dirpath, dirnames, files in os.walk(stage.stage_dir) # for f in fnmatch.filter(files, 'result_*.pklz')] # for stage_res in stage_results: # os.remove(stage_res)