# Copyright (C) 2009-2021, Ecole Polytechnique Federale de Lausanne (EPFL) and
# Hospital Center and University of Lausanne (UNIL-CHUV), Switzerland, and CMP3 contributors
# All rights reserved.
#
# This software is distributed under the open-source license Modified BSD.
"""Module that defines CMTK Utility functions."""
import os
from os import path as op
import warnings
from glob import glob
# import pickle
import gzip
import json
import networkx as nx
import numpy as np
warnings.simplefilter("ignore")
[docs]class BColors:
"""Utility class for color unicode."""
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
[docs]def print_warning(message):
"""Print yellow-colored warning message
Parameters
----------
message : string
The string of the message to be printed
"""
print(BColors.WARNING + message + BColors.ENDC)
[docs]def print_error(message):
"""Print red-colored error message
Parameters
----------
message : string
The string of the message to be printed
"""
print(BColors.FAIL + message + BColors.ENDC)
[docs]def print_blue(message):
"""Print blue-colored message
Parameters
----------
message : string
The string of the message to be printed
"""
print(BColors.OKBLUE + message + BColors.ENDC)
[docs]def load_graphs(output_dir, subjects, parcellation_scheme, weight):
"""Return a dictionary of connectivity matrices (graph adjacency matrices).
Still in development
Parameters
----------
output_dir : string
Output/derivatives directory
subjects : list
List of subject
parcellation_scheme : ['NativeFreesurfer','Lausanne2008','Lausanne2018']
Parcellation scheme
weight : ['number_of_fibers','fiber_density',...]
Edge metric to extract from the graph
Returns
-------
connmats: dict
Dictionary of connectivity matrices
"""
if parcellation_scheme == 'Lausanne2008':
bids_atlas_label = 'L2008'
elif parcellation_scheme == 'Lausanne2018':
bids_atlas_label = 'L2018'
elif parcellation_scheme == 'NativeFreesurfer':
bids_atlas_label = 'Desikan'
if parcellation_scheme == 'NativeFreesurfer':
for subj in subjects:
subj_dir = os.path.join(output_dir, subj)
subj_session_dirs = glob(op.join(subj_dir, "ses-*"))
subj_sessions = ['ses-{}'.format(subj_session_dir.split("-")[-1])
for subj_session_dir in subj_session_dirs]
if len(subj_sessions) > 0: # Session structure
for subj_session in subj_sessions:
conn_derivatives_dir = op.join(
output_dir, 'cmp', subj, subj_session, 'connectivity')
# Extract the connectivity matrix
# self.subject+'_label-'+bids_atlas_label+'_desc-scale5_conndata-snetwork_connectivity'
connmat_fname = op.join(conn_derivatives_dir,
'{}_{}_label-{}_conndata-snetwork_connectivity.gpickle'.format(subj,
subj_session,
bids_atlas_label))
connmat_gp = nx.read_gpickle(connmat_fname)
connmat = nx.to_numpy_matrix(
connmat_gp, weight=weight, dtype=np.float32)
else:
# For each parcellation scale
for scale in np.arange(1, 6):
for subj in subjects:
subj_dir = os.path.join(output_dir, subj)
subj_session_dirs = glob(op.join(subj_dir, "ses-*"))
subj_sessions = ['ses-{}'.format(subj_session_dir.split("-")[-1]) for subj_session_dir in
subj_session_dirs]
if len(subj_sessions) > 0: # Session structure
for subj_session in subj_sessions:
conn_derivatives_dir = op.join(
output_dir, 'cmp', subj, subj_session, 'connectivity')
# Extract the connectivity matrix
# self.subject+'_label-'+bids_atlas_label+'_desc-scale5_conndata-snetwork_connectivity'
connmat_fname = op.join(conn_derivatives_dir,
'{}_{}_label-{}_desc-scale{}_conndata-snetwork_connectivity.gpickle'.format(
subj, subj_session, bids_atlas_label, scale))
connmat_gp = nx.read_gpickle(connmat_fname)
connmat = nx.to_numpy_matrix(
connmat_gp, weight=weight, dtype=np.float32)
# TODO: finalize condition and append all conmat to a list
return connmat
[docs]def length(xyz, along=False):
"""Euclidean length of track line.
Parameters
----------
xyz : array-like shape (N,3)
array representing x,y,z of N points in a track
along : bool, optional
If True, return array giving cumulative length along track,
otherwise (default) return scalar giving total length.
Returns
-------
L : scalar or array shape (N-1,)
scalar in case of `along` == False, giving total length, array if
`along` == True, giving cumulative lengths.
Examples
--------
>>> xyz = np.array([[1,1,1],[2,3,4],[0,0,0]])
>>> expected_lens = np.sqrt([1+2**2+3**2, 2**2+3**2+4**2])
>>> length(xyz) == expected_lens.sum()
True
>>> len_along = length(xyz, along=True)
>>> np.allclose(len_along, expected_lens.cumsum())
True
>>> length([])
0
>>> length([[1, 2, 3]])
0
>>> length([], along=True)
array([0])
"""
xyz = np.asarray(xyz)
if xyz.shape[0] < 2:
if along:
return np.array([0])
return 0
dists = np.sqrt((np.diff(xyz, axis=0) ** 2).sum(axis=1))
if along:
return np.cumsum(dists)
return np.sum(dists)
[docs]def magn(xyz, n=1):
"""Returns the vector magnitude
Parameters
----------
xyz : vector
Input vector
n : int
Tile by `n` if `n>1` before return
"""
mag = np.sum(xyz ** 2, axis=1) ** 0.5
imag = np.where(mag == 0)
mag[imag] = np.finfo(float).eps
if n > 1:
return np.tile(mag, (n, 1)).T
return mag.reshape(len(mag), 1)
[docs]def mean_curvature(xyz):
"""Calculates the mean curvature of a curve.
Parameters
------------
xyz : array-like shape (N,3)
array representing x,y,z of N points in a curve
Returns
-----------
m : float
float representing the mean curvature
Examples
--------
Create a straight line and a semi-circle and print their mean curvatures
>>> from dipy.tracking import metrics as tm
>>> import numpy as np
>>> x=np.linspace(0,1,100)
>>> y=0*x
>>> z=0*x
>>> xyz=np.vstack((x,y,z)).T
>>> m=tm.mean_curvature(xyz) #mean curvature straight line
>>> theta=np.pi*np.linspace(0,1,100)
>>> x=np.cos(theta)
>>> y=np.sin(theta)
>>> z=0*x
>>> xyz=np.vstack((x,y,z)).T
>>> m=tm.mean_curvature(xyz) #mean curvature for semi-circle
"""
xyz = np.asarray(xyz)
n_pts = xyz.shape[0]
if n_pts == 0:
raise ValueError('xyz array cannot be empty')
dxyz = np.gradient(xyz)[0]
ddxyz = np.gradient(dxyz)[0]
# Curvature
k = magn(np.cross(dxyz, ddxyz), 1) / (magn(dxyz, 1) ** 3)
return np.mean(k)
[docs]def get_pipeline_dictionary_outputs(datasink_report, local_output_dir=None, debug=False):
"""Read the Nipype datasink report and return a dictionary of pipeline outputs.
Parameters
----------
datasink_report : string
Path to the datasink report
local_output_dir : string
Local output / derivatives directory
debug : bool
If `True`, print output dictionary
Returns
-------
dict_outputs : dict
Dictionary of pipeline outputs
"""
# Read rst report of a datasink node
with open(datasink_report) as fp:
while True:
line = fp.readline()
if not line:
break
# Extract line containing listing of node outputs and stop
if "_outputs :" in line:
str_outputs = line.strip()
prefix = '* _outputs : '
str_outputs = str.replace(str_outputs, prefix, "")
str_outputs = str.replace(str_outputs, "\'", "\"")
str_outputs = str.replace(str_outputs, "<undefined>", "\"\"")
# Update from BIDS App /output_dir to local output directory
# specified by local_output_dir
if local_output_dir is not None:
str_outputs = str.replace(str_outputs, "/output_dir", local_output_dir)
break
# Convert the extracted JSON-structured string to a dictionary
dict_outputs = json.loads("{}".format(str_outputs))
if debug:
print("Dictionary of datasink outputs: {}".format(dict_outputs))
return dict_outputs
[docs]def get_node_dictionary_outputs(node_report, local_output_dir=None, debug=False):
"""Read the Nipype node report and return a dictionary of node outputs.
Parameters
----------
node_report : string
Path to node report
local_output_dir : string
Local output / derivatives directory
debug : bool
If `True`, print output dictionary
Returns
-------
dict_outputs : dict
dictionary of outputs extracted from node execution report
"""
# Read rst report of a datasink node
with open(node_report) as fp:
while True:
line = fp.readline()
if not line:
break
# Extract line containing listing of node outputs and stop
if "_outputs :" in line:
str_outputs = line.strip()
prefix = '* _outputs : '
str_outputs = str.replace(str_outputs, prefix, "")
str_outputs = str.replace(str_outputs, "\'", "\"")
# Update from BIDS App /output_dir to local output directory
# specified by local_output_dir
if local_output_dir is not None:
str_outputs = str.replace(str_outputs, "/output_dir", local_output_dir)
break
# Convert the extracted JSON-structured string to a dictionary
dict_outputs = json.loads("{}".format(str_outputs))
if debug:
print("Dictionary of node outputs: {}".format(dict_outputs))
return dict_outputs