# Copyright (C) 2017-2019, Brain Communication Pathways Sinergia Consortium, Switzerland
# All rights reserved.
#
# This software is distributed under the open-source license Modified BSD.
"""Definitions of classes and functions for building Connectome Mapper reports
Ongoing...
References
----------
Based on fMRIprep v1.3.2 report builder.
"""
try:
from pathlib import Path
except ImportError:
print('Error: missing pathlib')
import json
import re
import jinja2
from nipype.utils.filemanip import copyfile
from pkg_resources import resource_filename as pkgrf
# from niworkflows.utils.misc import read_crashfile
[docs]class Element(object):
"""
Just a basic component of a report
"""
def __init__(self, name, title=None):
self.name = name
self.title = title
[docs]class Reportlet(Element):
"""
A reportlet has title, description and a list of graphical components
"""
def __init__(self, name, file_pattern=None, title=None, description=None, raw=False):
self.name = name
self.file_pattern = re.compile(file_pattern)
self.title = title
self.description = description
self.source_files = []
self.contents = []
self.raw = raw
[docs]class SubReport(Element):
"""
SubReports are sections within a Report
"""
def __init__(self, name, reportlets=None, title=''):
self.name = name
self.title = title
self.reportlets = []
if reportlets:
self.reportlets += reportlets
self.isnested = False
[docs]class Report(object):
"""
The full report object
"""
def __init__(self, path, config, out_dir, run_uuid, out_filename='report.html',
sentry_sdk=None):
self.root = path
self.sections = []
self.errors = []
self.out_dir = Path(out_dir)
self.out_filename = out_filename
self.run_uuid = run_uuid
self.sentry_sdk = sentry_sdk
self._load_config(config)
def _load_config(self, config):
with open(config, 'r') as configfh:
config = json.load(configfh)
self.index(config['sections'])
[docs] def index(self, config):
"""
Parameters
----------
config
"""
fig_dir = 'figures'
subject_dir = self.root.split('/')[-1]
subject = re.search(
'^(?P<subject_id>sub-[a-zA-Z0-9]+)$', subject_dir).group()
svg_dir = self.out_dir / 'cmp' / subject / fig_dir
svg_dir.mkdir(parents=True, exist_ok=True)
reportlet_list = list(
sorted([str(f) for f in Path(self.root).glob('**/*.*')]))
for subrep_cfg in config:
reportlets = []
for reportlet_cfg in subrep_cfg['reportlets']:
rlet = Reportlet(**reportlet_cfg)
for src in reportlet_list:
ext = src.split('.')[-1]
if rlet.file_pattern.search(src):
contents = None
if ext == 'html':
with open(src) as fp:
contents = fp.read().strip()
elif ext == 'svg':
fbase = Path(src).name
copyfile(src, str(svg_dir / fbase),
copy=True, use_hardlink=True)
contents = str(Path(subject) / fig_dir / fbase)
if contents:
rlet.source_files.append(src)
rlet.contents.append(contents)
if rlet.source_files:
reportlets.append(rlet)
if reportlets:
sub_report = SubReport(
subrep_cfg['name'], reportlets=reportlets,
title=subrep_cfg.get('title'))
self.sections.append(order_by_run(sub_report))
error_dir = self.out_dir / "nipype" / subject / 'log' / self.run_uuid
if error_dir.is_dir():
self.index_error_dir(error_dir)
[docs] def index_error_dir(self, error_dir):
"""
Crawl subjects crash directory for the corresponding run, report to sentry, and
populate self.errors.
"""
for crashfile in error_dir.glob('crash*.*'):
crash_info = read_crashfile(str(crashfile))
if self.sentry_sdk:
with self.sentry_sdk.push_scope() as scope:
node_name = crash_info['node'].split('.')[-1]
# last line is probably most informative summary
gist = crash_info['traceback'].split('\n')[-1]
exception_text_start = 1
for line in crash_info['traceback'].split('\n')[1:]:
if not line[0].isspace():
break
exception_text_start += 1
exception_text = '\n'.join(crash_info['traceback'].split('\n')[
exception_text_start:])
scope.set_tag("node_name", node_name)
chunk_size = 16384
for k, v in list(crash_info.items()):
if k == 'inputs':
scope.set_extra(k, dict(v))
elif isinstance(v, str) and len(v) > chunk_size:
chunks = [v[i:i + chunk_size]
for i in range(0, len(v), chunk_size)]
for i, chunk in enumerate(chunks):
scope.set_extra('%s_%02d' % (k, i), chunk)
else:
scope.set_extra(k, v)
scope.level = 'fatal'
# Group common events with pre specified fingerprints
fingerprint_dict = {'permission-denied': [
"PermissionError: [Errno 13] Permission denied"],
'memory-error': ["MemoryError", "Cannot allocate memory"],
'reconall-already-running': [
"ERROR: it appears that recon-all is already running"],
'no-disk-space': [
"OSError: [Errno 28] No space left on device",
"[Errno 122] Disk quota exceeded"],
'sigkill': ["Return code: 137"],
'keyboard-interrupt': ["KeyboardInterrupt"]}
fingerprint = ''
issue_title = node_name + ': ' + gist
for new_fingerprint, error_snippets in list(fingerprint_dict.items()):
for error_snippet in error_snippets:
if error_snippet in crash_info['traceback']:
fingerprint = new_fingerprint
issue_title = new_fingerprint
break
if fingerprint:
break
message = issue_title + '\n\n'
message += exception_text[-(8192 - len(message)):]
if fingerprint:
self.sentry_sdk.add_breadcrumb(
message=fingerprint, level='fatal')
else:
# remove file paths
fingerprint = re.sub(r"(/[^/ ]*)+/?", '', message)
# remove words containing numbers
fingerprint = re.sub(
r"([a-zA-Z]*[0-9]+[a-zA-Z]*)+", '', fingerprint)
# adding the return code if it exists
for line in message.split('\n'):
if line.startswith("Return code"):
fingerprint += line
break
scope.fingerprint = [fingerprint]
self.sentry_sdk.capture_message(message, 'fatal')
self.errors.append(crash_info)
[docs] def generate_report(self):
"""
Returns
-------
"""
logs_path = self.out_dir / 'cmp' / 'logs'
boilerplate = []
boiler_idx = 0
if (logs_path / 'CITATION.html').exists():
text = (logs_path / 'CITATION.html').read_text(encoding='UTF-8')
text = '<div class="boiler-html">%s</div>' % re.compile(
'<body>(.*?)</body>',
re.DOTALL | re.IGNORECASE).findall(text)[0].strip()
boilerplate.append((boiler_idx, 'HTML', text))
boiler_idx += 1
if (logs_path / 'CITATION.md').exists():
text = '<pre>%s</pre>\n' % (logs_path /
'CITATION.md').read_text(encoding='UTF-8')
boilerplate.append((boiler_idx, 'Markdown', text))
boiler_idx += 1
if (logs_path / 'CITATION.tex').exists():
text = (logs_path / 'CITATION.tex').read_text(encoding='UTF-8')
text = re.compile(
r'\\begin{document}(.*?)\\end{document}',
re.DOTALL | re.IGNORECASE).findall(text)[0].strip()
text = '<pre>%s</pre>\n' % text
text += '<h3>Bibliography</h3>\n'
text += '<pre>%s</pre>\n' % Path(
pkgrf('cmp', 'data/boilerplate.bib')).read_text(encoding='UTF-8')
boilerplate.append((boiler_idx, 'LaTeX', text))
boiler_idx += 1
searchpath = pkgrf('cmp', '/')
env = jinja2.Environment(
loader=jinja2.FileSystemLoader(searchpath=searchpath),
trim_blocks=True, lstrip_blocks=True
)
report_tpl = env.get_template('viz/report.tpl')
report_render = report_tpl.render(sections=self.sections, errors=self.errors,
boilerplate=boilerplate)
# Write out report
(self.out_dir / 'cmp' /
self.out_filename).write_text(report_render, encoding='UTF-8')
return len(self.errors)
[docs]def order_by_run(subreport):
"""
Parameters
----------
subreport
Returns
-------
"""
ordered = []
run_reps = {}
for element in subreport.reportlets:
if len(element.source_files) == 1 and element.source_files[0]:
ordered.append(element)
continue
for filename, file_contents in zip(element.source_files, element.contents):
name, title = generate_name_title(filename)
if not filename or not name:
continue
new_element = Reportlet(
name=element.name, title=element.title, file_pattern=element.file_pattern,
description=element.description, raw=element.raw)
new_element.contents.append(file_contents)
new_element.source_files.append(filename)
if name not in run_reps:
run_reps[name] = SubReport(name, title=title)
run_reps[name].reportlets.append(new_element)
if run_reps:
keys = list(sorted(run_reps.keys()))
for key in keys:
ordered.append(run_reps[key])
subreport.isnested = True
subreport.reportlets = ordered
return subreport
[docs]def generate_name_title(filename):
"""
Parameters
----------
filename
Returns
-------
"""
fname = Path(filename).name
expr = re.compile('^sub-(?P<subject_id>[a-zA-Z0-9]+)(_ses-(?P<session_id>[a-zA-Z0-9]+))?'
'(_task-(?P<task_id>[a-zA-Z0-9]+))?(_acq-(?P<acq_id>[a-zA-Z0-9]+))?'
'(_rec-(?P<rec_id>[a-zA-Z0-9]+))?(_run-(?P<run_id>[a-zA-Z0-9]+))?')
outputs = expr.search(fname)
if outputs:
outputs = outputs.groupdict()
else:
return None, None
name = '{session}{task}{acq}{rec}{run}'.format(
session="_ses-" +
outputs['session_id'] if outputs['session_id'] else '',
task="_task-" + outputs['task_id'] if outputs['task_id'] else '',
acq="_acq-" + outputs['acq_id'] if outputs['acq_id'] else '',
rec="_rec-" + outputs['rec_id'] if outputs['rec_id'] else '',
run="_run-" + outputs['run_id'] if outputs['run_id'] else ''
)
title = '{session}{task}{acq}{rec}{run}'.format(
session=" Session: " +
outputs['session_id'] if outputs['session_id'] else '',
task=" Task: " + outputs['task_id'] if outputs['task_id'] else '',
acq=" Acquisition: " + outputs['acq_id'] if outputs['acq_id'] else '',
rec=" Reconstruction: " +
outputs['rec_id'] if outputs['rec_id'] else '',
run=" Run: " + outputs['run_id'] if outputs['run_id'] else ''
)
return name.strip('_'), title
[docs]def run_reports(reportlets_dir, out_dir, subject_label, run_uuid, sentry_sdk=None):
"""
Runs the reports
>>> import os
>>> from shutil import copytree
>>> from tempfile import TemporaryDirectory
>>> filepath = os.path.dirname(os.path.realpath(__file__))
>>> test_data_path = os.path.realpath(os.path.join(filepath,
... '../data/tests/work'))
>>> curdir = os.getcwd()
>>> tmpdir = TemporaryDirectory()
>>> os.chdir(tmpdir.name)
>>> data_dir = copytree(test_data_path, os.path.abspath('work'))
>>> os.makedirs('out/fmriprep', exist_ok=True)
>>> run_reports(os.path.abspath('work/reportlets'),
... os.path.abspath('out'),
... '01', 'madeoutuuid')
0
>>> os.chdir(curdir)
>>> tmpdir.cleanup()
"""
reportlet_path = str(Path(reportlets_dir) / 'cmp' /
("sub-%s" % subject_label))
config = pkgrf('cmp', 'viz/config.json')
out_filename = 'sub-{}.html'.format(subject_label)
report = Report(reportlet_path, config, out_dir, run_uuid,
out_filename, sentry_sdk=sentry_sdk)
return report.generate_report()
[docs]def generate_reports(subject_list, output_dir, work_dir, run_uuid, sentry_sdk=None):
"""
A wrapper to run_reports on a given ``subject_list``
"""
reports_dir = str(Path(work_dir) / 'reportlets')
report_errors = [
run_reports(reports_dir, output_dir, subject_label, run_uuid=run_uuid,
sentry_sdk=sentry_sdk)
for subject_label in subject_list
]
errno = sum(report_errors)
if errno:
import logging
logger = logging.getLogger('cli')
error_list = ', '.join('%s (%d)' % (subid, err)
for subid, err in zip(subject_list, report_errors) if err)
logger.error('Preprocessing did not finish successfully. Errors occurred while processing '
'data from participants: %s. Check the HTML reports for details.' %
error_list)
return errno