#!/usr/bin/env python
# -*- coding: utf-8
# Functions dealing with metrics aggregation (mean, std, etc.) across slices and/or vertebral levels
# TODO: when mask is empty, raise specific message instead of throwing "Weight sum to zero..."
import os
import numpy as np
import math
import operator
import functools
import csv
import datetime
import logging
from spinalcordtoolbox.template import get_slices_from_vertebral_levels, get_vertebral_level_from_slice
from spinalcordtoolbox.image import Image
from spinalcordtoolbox.utils import __version__, parse_num_list_inv
[docs]class Metric:
"""
Class to include in dictionaries to associate data and label
"""
def __init__(self, data=None, label=''):
"""
:param data: ndarray
:param label: str
"""
self.data = data
self.label = label
[docs]class LabelStruc:
"""
Class for labels
"""
def __init__(self, id, name, filename=None, map_cluster=None):
self.id = id
self.name = name
self.filename = filename
self.map_cluster = map_cluster
[docs]def func_bin(data, mask, map_clusters=None):
"""
Get the average of data after binarizing the input mask
:param data: nd-array: input data
:param mask: (n+1)d-array: input mask
:param map_clusters: not used
:return:
"""
# Binarize mask
mask_bin = np.where(mask >= 0.5, 1, 0)
# run weighted average
return func_wa(data, mask_bin)
[docs]def func_max(data, mask=None, map_clusters=None):
"""
Get the max of an array
:param data: nd-array: input data
:param mask: not used
:param map_clusters: not used
:return:
"""
return np.max(data), None
[docs]def func_map(data, mask, map_clusters):
"""
Compute maximum a posteriori (MAP) by aggregating the last dimension of mask according to a clustering method
defined by map_clusters
:param data: nd-array: input data
:param mask: (n+1)d-array: input mask. Note: this mask should include ALL labels to satisfy the necessary condition for\
ML-based estimation, i.e., at each voxel, the sum of all labels (across the last dimension) equals the probability\
to be inside the tissue. For example, for a pixel within the spinal cord, the sum of all labels should be 1.
:param map_clusters: list of list of int: Each sublist corresponds to a cluster of labels where ML estimation will\
be performed to provide the prior beta_0 for MAP estimation.
:return: float: beta corresponding to the first label (beta[0])
:return: nd-array: matrix of all beta
"""
# Check number of labels and map_clusters
assert mask.shape[-1] == len(map_clusters)
# Iterate across all labels (excluding the first one) and generate cluster labels. Examples of input/output:
# [[0], [0], [0], [1], [2], [0]] --> [0, 0, 0, 1, 2, 0]
# [[0, 1], [0], [0], [1], [2]] --> [0, 0, 0, 0, 1]
# [[0, 1], [0], [1], [2], [3]] --> [0, 0, 0, 1, 2]
possible_clusters = [map_clusters[0]]
id_clusters = [0] # this one corresponds to the first cluster
for i_cluster in map_clusters[1:]: # skip the first
found_index = False
for possible_cluster in possible_clusters:
if i_cluster[0] in possible_cluster:
id_clusters.append(possible_clusters.index(possible_cluster))
found_index = True
if not found_index:
possible_clusters.append(i_cluster)
id_clusters.append(possible_clusters.index([i_cluster[0]]))
# Sum across each clustered labels, then concatenate to generate mask_clusters
# mask_clusters has dimension: x, y, z, n_clustered_labels, with n_clustered_labels being equal to the number of
# clusters that need to be estimated for ML method. Let's assume:
# label_struc = [
# LabelStruc(id=0, map_cluster=0),
# LabelStruc(id=1, map_cluster=0),
# LabelStruc(id=2, map_cluster=0),
# LabelStruc(id=3, map_cluster=1),
# LabelStruc(id=4, map_cluster=2),
# ]
#
# Examples of scenario below for ML estimation:
# labels_id_user = [0,1], mask_clusters = [np.sum(label[0:2]), label[3], label[4]]
# labels_id_user = [3], mask_clusters = [np.sum(label[0:2]), label[3], label[4]]
# labels_id_user = [0,1,2,3], mask_clusters = [np.sum(label(0:3)), label[4]]
mask_l = []
for i_cluster in list(set(id_clusters)):
# Get label indices for given cluster
id_label_cluster = [i for i in range(len(id_clusters)) if i_cluster == id_clusters[i]]
# Sum all labels for this cluster
mask_l.append(np.expand_dims(np.sum(mask[..., id_label_cluster], axis=(mask.ndim - 1)), axis=(mask.ndim - 1)))
mask_clusters = np.concatenate(mask_l, axis=(mask.ndim-1))
# Run ML estimation for each clustered labels
_, beta_cluster = func_ml(data, mask_clusters)
# MAP estimation:
# y [nb_vox x 1]: measurements vector (to which weights are applied)
# x [nb_vox x nb_labels]: linear relation between the measurements y
# beta_0 [nb_labels]: A priori values estimated per cluster using ML.
# beta [nb_labels] = beta_0 + (Xt . X + 1)^(-1) . Xt . (y - X . beta_0): The estimated metric value in each label
#
# Note: for simplicity we consider that sigma_noise = sigma_label
n_vox = functools.reduce(operator.mul, data.shape, 1)
y = np.reshape(data, n_vox)
x = np.reshape(mask, (n_vox, mask.shape[mask.ndim-1]))
beta_0 = [beta_cluster[id_clusters[i_label]] for i_label in range(mask.shape[-1])]
beta = beta_0 + np.dot(np.linalg.pinv(np.dot(x.T, x) + np.diag(np.ones(mask.shape[-1]))),
np.dot(x.T,
(y - np.dot(x, beta_0))))
return beta[0], beta
[docs]def func_ml(data, mask, map_clusters=None):
"""
Compute maximum likelihood (ML) for the first label of mask.
:param data: nd-array: input data
:param mask: (n+1)d-array: input mask. Note: this mask should include ALL labels to satisfy the necessary condition\
for ML-based estimation, i.e., at each voxel, the sum of all labels (across the last dimension) equals the\
probability to be inside the tissue. For example, for a pixel within the spinal cord, the sum of all labels should\
be 1.
:return: float: beta corresponding to the first label
"""
# TODO: support weighted least square
# reshape as 1d vector (for data) and 2d vector (for mask)
n_vox = functools.reduce(operator.mul, data.shape, 1)
# ML estimation:
# y: measurements vector (to which weights are applied)
# x: linear relation between the measurements y
# beta [nb_labels] = (Xt . X)^(-1) . Xt . y: The estimated metric value in each label
y = np.reshape(data, n_vox) # [nb_vox x 1]
x = np.reshape(mask, (n_vox, mask.shape[mask.ndim-1]))
beta = np.dot(np.linalg.pinv(np.dot(x.T, x)), np.dot(x.T, y))
return beta[0], beta
[docs]def func_std(data, mask=None, map_clusters=None):
"""
Compute standard deviation
:param data: nd-array: input data
:param mask: (n+1)d-array: input mask
:param map_clusters: not used
:return:
"""
# Check if mask has an additional dimension (in case it is a label). If so, select the first label
if mask.ndim == data.ndim + 1:
mask = mask[..., 0]
average, _ = func_wa(data, np.expand_dims(mask, axis=mask.ndim))
variance = np.average((data - average) ** 2, weights=mask)
return math.sqrt(variance), None
[docs]def func_sum(data, mask=None, map_clusters=None):
"""
Compute sum
:param data: nd-array: input data
:param mask: not used
:param map_clusters: not used
:return:
"""
# Check if mask has an additional dimension (in case it is a label). If so, select the first label
return np.sum(data), None
[docs]def func_wa(data, mask=None, map_clusters=None):
"""
Compute weighted average
:param data: nd-array: input data
:param mask: (n+1)d-array: input mask
:param map_clusters: not used
:return:
"""
# Check if mask has an additional dimension (in case it is a label). If so, select the first label
if mask.ndim == data.ndim + 1:
mask = mask[..., 0]
return np.average(data, weights=mask), None
[docs]def aggregate_per_slice_or_level(metric, mask=None, slices=[], levels=[], perslice=None, perlevel=False,
vert_level=None, group_funcs=(('MEAN', func_wa),), map_clusters=None):
"""
The aggregation will be performed along the last dimension of 'metric' ndarray.
:param metric: Class Metric(): data to aggregate.
:param mask: Class Metric(): mask to use for aggregating the data. Optional.
:param slices: List[int]: Slices to aggregate metric from. If empty, select all slices.
:param levels: List[int]: Vertebral levels to aggregate metric from. It has priority over "slices".
:param Bool perslice: Aggregate per slice (True) or across slices (False)
:param Bool perlevel: Aggregate per level (True) or across levels (False). Has priority over "perslice".
:param vert_level: Vertebral level. Could be either an Image or a file name.
:param tuple group_funcs: Name and function to apply on metric. Example: (('MEAN', func_wa),)). Note, the function
has special requirements in terms of i/o. See the definition to func_wa and use it as a template.
:param map_clusters: list of list of int: See func_map()
:return: Aggregated metric
"""
if vert_level:
# Assumption: vert_level image will only ever be 3D or 4D
vert_level_slices = Image(vert_level).change_orientation('RPI').data.shape[2]
# Get slices ('z') from metrics regardless of whether they're 1D [z], 3D [x, y, z], and 4D [x, y, z, t]
metric_slices = metric.data.shape[2] if len(metric.data.shape) >= 3 else metric.data.shape[0]
if vert_level_slices != metric_slices:
raise ValueError(f"Shape mismatch between vertfile [{vert_level_slices}] and metric [{metric_slices}]). "
f"Please verify that your vertfile has the same number of slices as your input image, "
f"and that your metric is RPI/LPI oriented.")
# If user neither specified slices nor levels, set perslice=True, otherwise, the output will likely contain nan
# because in many cases the segmentation does not span the whole I-S dimension.
if perslice is None:
if not slices and not levels:
perslice = True
else:
perslice = False
# if slices is empty, select all available slices from the metric
ndim = metric.data.ndim
if not slices:
slices = range(metric.data.shape[ndim-1])
# aggregation based on levels
if levels:
im_vert_level = Image(vert_level).change_orientation('RPI')
# slicegroups = [(0, 1, 2), (3, 4, 5), (6, 7, 8)]
slicegroups = [tuple(get_slices_from_vertebral_levels(im_vert_level, level)) for level in levels]
if perlevel:
# vertgroups = [(2,), (3,), (4,)]
vertgroups = [tuple([level]) for level in levels]
elif perslice:
# slicegroups = [(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,)]
slicegroups = [tuple([i]) for i in functools.reduce(operator.concat, slicegroups)] # reduce to individual tuple
# vertgroups = [(2,), (2,), (2,), (3,), (3,), (3,), (4,), (4,), (4,)]
vertgroups = [tuple([get_vertebral_level_from_slice(im_vert_level, i[0])]) for i in slicegroups]
# output aggregate metric across levels
else:
# slicegroups = [(0, 1, 2, 3, 4, 5, 6, 7, 8)]
slicegroups = [tuple([val for sublist in slicegroups for val in sublist])] # flatten into single tuple
# vertgroups = [(2, 3, 4)]
vertgroups = [tuple([level for level in levels])]
# aggregation based on slices
else:
vertgroups = None
if perslice:
# slicegroups = [(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,)]
slicegroups = [tuple([slice]) for slice in slices]
else:
# slicegroups = [(0, 1, 2, 3, 4, 5, 6, 7, 8)]
slicegroups = [tuple(slices)]
agg_metric = dict((slicegroup, dict()) for slicegroup in slicegroups)
# loop across slice group
for slicegroup in slicegroups:
# add level info
if vertgroups is None:
agg_metric[slicegroup]['VertLevel'] = None
else:
agg_metric[slicegroup]['VertLevel'] = vertgroups[slicegroups.index(slicegroup)]
# Loop across functions (e.g.: MEAN, STD)
for (name, func) in group_funcs:
try:
data_slicegroup = metric.data[..., slicegroup] # selection is done in the last dimension
if mask is not None:
mask_slicegroup = mask.data[..., slicegroup, :]
agg_metric[slicegroup]['Label'] = mask.label
# For size calculation, only the first index [0] is relevant (See spinalcordtoolbox/issues/3216)
agg_metric[slicegroup]['Size [vox]'] = np.sum(mask_slicegroup[..., 0])
else:
mask_slicegroup = np.ones(data_slicegroup.shape)
# Ignore nonfinite values
i_nonfinite = np.where(np.isfinite(data_slicegroup) == False)
data_slicegroup[i_nonfinite] = 0.
# TODO: the lines below could probably be done more elegantly
if mask_slicegroup.ndim == data_slicegroup.ndim + 1:
arr_tmp_concat = []
for i in range(mask_slicegroup.shape[-1]):
arr_tmp = np.reshape(mask_slicegroup[..., i], data_slicegroup.shape)
arr_tmp[i_nonfinite] = 0.
arr_tmp_concat.append(np.expand_dims(arr_tmp, axis=(mask_slicegroup.ndim-1)))
mask_slicegroup = np.concatenate(arr_tmp_concat, axis=(mask_slicegroup.ndim-1))
else:
mask_slicegroup[i_nonfinite] = 0.
# Make sure the number of pixels to extract metrics is not null
if mask_slicegroup.sum() == 0:
result = None
else:
# Run estimation
result, _ = func(data_slicegroup, mask_slicegroup, map_clusters)
# check if nan
if np.isnan(result):
result = None
# here we create a field with name: FUNC(METRIC_NAME). Example: MEAN(CSA)
agg_metric[slicegroup]['{}({})'.format(name, metric.label)] = result
except Exception as e:
logging.warning(e)
agg_metric[slicegroup]['{}({})'.format(name, metric.label)] = str(e)
return agg_metric
[docs]def check_labels(indiv_labels_ids, selected_labels):
"""Check the consistency of the labels asked by the user."""
# convert strings to int
list_ids_of_labels_of_interest = list(map(int, indiv_labels_ids))
if selected_labels:
# Remove redundant values
list_ids_of_labels_of_interest = [i_label for n, i_label in enumerate(selected_labels) if i_label not in selected_labels[:n]]
# Check if the selected labels are in the available labels ids
if not set(list_ids_of_labels_of_interest).issubset(set(indiv_labels_ids)):
logging.error(
'At least one of the selected labels (' + str(list_ids_of_labels_of_interest) + ') is not available \
according to the label list from the text file in the atlas folder.')
return list_ids_of_labels_of_interest
[docs]def diff_between_list_or_int(l1, l2):
"""
Return list l1 minus the elements in l2
Examples:\
([1, 2, 3], 1) --> [2, 3]\
([1, 2, 3], [1, 2] --> [3]
:param l1: a list of int
:param l2: could be a list or an int
:return:
"""
if isinstance(l2, int):
l2 = [l2]
return [x for x in l1 if x not in l2]
[docs]def make_a_string(item):
"""Convert tuple or list or None to a string. Important: elements in tuple or list are separated with ; (not ,)
for compatibility with csv."""
if isinstance(item, tuple) or isinstance(item, list):
return ';'.join([str(i) for i in item])
elif item is None:
return 'None'
else:
return item
[docs]def merge_dict(dict_in):
"""
Merge n dictionaries that are contained at the root key
.. code-block:: python
dict_in = {
'area': {(0): {'Level': 0, 'Mean(area)': 0.5}, (1): {'Level': 1, 'Mean(area)': 0.2}}
'angle_RL': {(0): {'Level': 0, 'Mean(angle_RL)': 15}, (1): {'Level': 1, 'Mean(angle_RL)': 12}}
}
dict_merged = {
(0): {'Level': 0, 'Mean(area): 0.5, 'Mean(angle_RL): 15}
(1): {'Level': 1, 'Mean(area): 0.2, 'Mean(angle_RL): 12}
}
:param dict_in: input dict.
:return: normalized dict with sub-dicts at root level
"""
dict_merged = {}
metrics = [k for i, (k, v) in enumerate(dict_in.items())]
# Fetch first parent key (metric), then loop across children keys (slicegroup):
dict_first_metric = dict_in[metrics[0]]
# Loop across children keys: slicegroup = [(0), (1), ...]
for slicegroup in [k for i, (k, v) in enumerate(dict_first_metric.items())]:
# Initialize dict with information from first metric
dict_merged[slicegroup] = dict_first_metric[slicegroup]
# Loop across remaining metrics
for metric in metrics:
dict_merged[slicegroup].update(dict_in[metric][slicegroup])
return dict_merged
[docs]def save_as_csv(agg_metric, fname_out, fname_in=None, append=False):
"""
Write metric structure as csv. If field 'error' exists, it will add a specific column.
:param agg_metric: output of aggregate_per_slice_or_level()
:param fname_out: output filename. Extention (.csv) will be added if it does not exist.
:param fname_in: input file to be listed in the csv file (e.g., segmentation file which produced the results).
:param append: Bool: Append results at the end of file (if exists) instead of overwrite.
:return:
"""
# Item sorted in order for display in csv output
# list_item = ['VertLevel', 'Label', 'MEAN', 'WA', 'BIN', 'ML', 'MAP', 'STD', 'MAX']
# TODO: The thing below is ugly and needs to be fixed, but this is the only solution I found to order the columns
# without refactoring the code with OrderedDict.
list_item = ['Label', 'Size [vox]', 'MEAN(area)', 'STD(area)', 'MEAN(angle_AP)', 'STD(angle_AP)', 'MEAN(angle_RL)',
'STD(angle_RL)', 'MEAN(diameter_AP)', 'STD(diameter_AP)', 'MEAN(diameter_RL)', 'STD(diameter_RL)',
'MEAN(eccentricity)', 'STD(eccentricity)', 'MEAN(orientation)', 'STD(orientation)',
'MEAN(solidity)', 'STD(solidity)', 'SUM(length)', 'WA()', 'BIN()', 'ML()', 'MAP()', 'STD()', 'MAX()']
# TODO: if append=True but file does not exist yet, raise warning and set append=False
# write header (only if append=False)
if not append or not os.path.isfile(fname_out):
with open(fname_out, 'w') as csvfile:
# spamwriter = csv.writer(csvfile, delimiter=',')
header = ['Timestamp', 'SCT Version', 'Filename', 'Slice (I->S)', 'VertLevel']
agg_metric_key = [v for i, (k, v) in enumerate(agg_metric.items())][0]
for item in list_item:
for key in agg_metric_key:
if item in key:
header.append(key)
break
writer = csv.DictWriter(csvfile, fieldnames=header)
writer.writeheader()
# populate data
with open(fname_out, 'a') as csvfile:
spamwriter = csv.writer(csvfile, delimiter=',')
for slicegroup in sorted(agg_metric.keys()):
line = list()
line.append(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # Timestamp
line.append(__version__) # SCT Version
line.append(fname_in) # file name associated with the results
line.append(parse_num_list_inv(slicegroup)) # list all slices in slicegroup
line.append(parse_num_list_inv(agg_metric[slicegroup]['VertLevel'])) # list vertebral levels
agg_metric_key = [v for i, (k, v) in enumerate(agg_metric.items())][0]
for item in list_item:
for key in agg_metric_key:
if item in key:
line.append(str(agg_metric[slicegroup][key]))
break
spamwriter.writerow(line)