Source code for spinalcordtoolbox.moco

#!/usr/bin/env python
# -*- coding: utf-8
# Tools for motion correction (moco)
# Authors: Karun Raju, Tanguy Duval, Julien Cohen-Adad

# TODO: Inform user if soft mask is used
# TODO: no need to pass absolute image path-- makes it difficult to read
# TODO: check the status of spline()
# TODO: check the status of combine_matrix()
# TODO: params for ANTS: CC/MI, shrink fact, nb_it
# TODO: ants: explore optin  --float  for faster computation

from copy import deepcopy
import sys
import os
from shutil import copyfile
import glob
import math
import time
import functools
import operator
import csv

import numpy as np
import scipy.interpolate

from spinalcordtoolbox.image import Image, add_suffix, generate_output_file
from spinalcordtoolbox.utils.shell import display_viewer_syntax, get_interpolation
from spinalcordtoolbox.utils.sys import sct_progress_bar, run_proc, printv
from spinalcordtoolbox.utils.fs import tmp_create, extract_fname, rmtree, copy

# FIXME don't import from scripts in API
from spinalcordtoolbox.scripts import sct_dmri_separate_b0_and_dwi
from spinalcordtoolbox.scripts.sct_convert import convert
from spinalcordtoolbox.scripts.sct_image import split_data, concat_data, multicomponent_split
from spinalcordtoolbox.scripts import sct_apply_transfo


[docs]class ParamMoco: """ Class with a bunch of moco-specific parameters """ # The constructor def __init__(self, is_diffusion=None, group_size=1, metric='MeanSquares', smooth='1'): """ :param is_diffusion: Bool: If True, data will be treated as diffusion-MRI data (process slightly differs) :param group_size: int: Number of images averaged for 'dwi' method. :param metric: {MeanSquares, MI, CC}: metric to use for registration :param smooth: str: Smoothing sigma in mm # TODO: make it int """ self.is_diffusion = is_diffusion self.debug = 0 self.fname_data = '' self.fname_bvecs = '' self.fname_bvals = '' self.fname_target = '' self.fname_mask = '' self.path_out = '' self.mat_final = '' self.todo = '' self.group_size = group_size self.spline_fitting = 0 self.remove_temp_files = 1 self.verbose = 1 self.plot_graph = 0 self.suffix = '_moco' self.poly = '2' # degree of polynomial function for moco self.smooth = smooth self.gradStep = '1' # gradientStep for searching algorithm self.iter = '10' # number of iterations self.metric = metric self.sampling = 'None' # sampling rate used for registration metric; 'None' means use 'dense sampling' self.interp = 'spline' # nn, linear, spline self.min_norm = 0.001 self.swapXY = 0 self.num_target = '0' self.suffix_mat = None # '0GenericAffine.mat' or 'Warp.nii.gz' depending which transfo algo is used self.bval_min = 100 # in case user does not have min bvalues at 0, set threshold (where csf disapeared). self.iterAvg = 1 # iteratively average target image for more robust moco self.is_sagittal = False # if True, then split along Z (right-left) and register each 2D slice (vs. 3D volume) self.output_motion_param = True # if True, the motion parameters are outputted # update constructor with user's parameters def update(self, param_user): # list_objects = param_user.split(',') for object in param_user: if len(object) < 2: printv('ERROR: Wrong usage.', 1, type='error') obj = object.split('=') setattr(self, obj[0], obj[1])
[docs]def copy_mat_files(nt, list_file_mat, index, folder_out, param): """ Copy mat file from the grouped folder to the final folder (will be used by all individual ungrouped volumes) :param nt: int: Total number of volumes in native 4d data :param list_file_mat: list of list: File name of transformations :param index: list: Index to associate a given matrix file with a 3d volume (from the 4d native data) :param param: Param class :param folder_out: str: Output folder :return: None """ # create final mat folder try: os.makedirs(folder_out) except FileExistsError: pass # Loop across registration matrices and copy to mat_final folder # First loop is accross z. If axial orientation, there is only one z (i.e., len(file_mat)=1) for iz in range(len(list_file_mat)): # Second loop is across ALL volumes of the input dmri dataset (corresponds to its 4th dimension: time) for it in range(nt): # Check if this index corresponds to a volume listed in the index list if it in index: file_mat = list_file_mat[iz][index.index(it)] fsrc = os.path.join(file_mat + param.suffix_mat) # Build final transfo file name file_mat_final = os.path.basename(file_mat)[:-9] + str(iz).zfill(4) + 'T' + str(it).zfill(4) fdest = os.path.join(folder_out, file_mat_final + param.suffix_mat) copyfile(fsrc, fdest)
[docs]def moco_wrapper(param): """ Wrapper that performs motion correction. :param param: ParamMoco class :return: None """ file_data = 'data.nii' # corresponds to the full input data (e.g. dmri or fmri) file_data_dirname, file_data_basename, file_data_ext = extract_fname(file_data) file_b0 = 'b0.nii' file_datasub = 'datasub.nii' # corresponds to the full input data minus the b=0 scans (if param.is_diffusion=True) file_datasubgroup = 'datasub-groups.nii' # concatenation of the average of each file_datasub file_mask = 'mask.nii' file_moco_params_csv = 'moco_params.tsv' file_moco_params_x = 'moco_params_x.nii.gz' file_moco_params_y = 'moco_params_y.nii.gz' ext_data = '.nii.gz' # workaround "too many open files" by slurping the data # TODO: check if .nii can be used mat_final = 'mat_final/' # ext_mat = 'Warp.nii.gz' # warping field # Start timer start_time = time.time() printv('\nInput parameters:', param.verbose) printv(' Input file ............ ' + param.fname_data, param.verbose) printv(' Group size ............ {}'.format(param.group_size), param.verbose) # Get full path # param.fname_data = os.path.abspath(param.fname_data) # param.fname_bvecs = os.path.abspath(param.fname_bvecs) # if param.fname_bvals != '': # param.fname_bvals = os.path.abspath(param.fname_bvals) # Extract path, file and extension # path_data, file_data, ext_data = extract_fname(param.fname_data) # path_mask, file_mask, ext_mask = extract_fname(param.fname_mask) path_tmp = tmp_create(basename="moco") # Copying input data to tmp folder printv('\nCopying input data to tmp folder and convert to nii...', param.verbose) convert(param.fname_data, os.path.join(path_tmp, file_data)) if param.fname_mask != '': convert(param.fname_mask, os.path.join(path_tmp, file_mask), verbose=param.verbose) # Update field in param (because used later in another function, and param class will be passed) param.fname_mask = file_mask # Build absolute output path and go to tmp folder curdir = os.getcwd() path_out_abs = os.path.abspath(param.path_out) os.chdir(path_tmp) # Get dimensions of data printv('\nGet dimensions of data...', param.verbose) im_data = Image(file_data) nx, ny, nz, nt, px, py, pz, pt = im_data.dim printv(' ' + str(nx) + ' x ' + str(ny) + ' x ' + str(nz), param.verbose) # Get orientation printv('\nData orientation: ' + im_data.orientation, param.verbose) if im_data.orientation[2] in 'LR': param.is_sagittal = True printv(' Treated as sagittal') elif im_data.orientation[2] in 'IS': param.is_sagittal = False printv(' Treated as axial') else: param.is_sagittal = False printv('WARNING: Orientation seems to be neither axial nor sagittal. Treated as axial.') printv("\nSet suffix of transformation file name, which depends on the orientation:") if param.is_sagittal: param.suffix_mat = '0GenericAffine.mat' printv("Orientation is sagittal, suffix is '{}'. The image is split across the R-L direction, and the " "estimated transformation is a 2D affine transfo.".format(param.suffix_mat)) else: param.suffix_mat = 'Warp.nii.gz' printv("Orientation is axial, suffix is '{}'. The estimated transformation is a 3D warping field, which is " "composed of a stack of 2D Tx-Ty transformations".format(param.suffix_mat)) # Adjust group size in case of sagittal scan if param.is_sagittal and param.group_size != 1: printv('For sagittal data group_size should be one for more robustness. Forcing group_size=1.', 1, 'warning') param.group_size = 1 if param.is_diffusion: # Identify b=0 and DWI images index_b0, index_dwi, nb_b0, nb_dwi = \ sct_dmri_separate_b0_and_dwi.identify_b0(param.fname_bvecs, param.fname_bvals, param.bval_min, param.verbose) # check if dmri and bvecs are the same size if not nb_b0 + nb_dwi == nt: printv( '\nERROR in ' + os.path.basename(__file__) + ': Size of data (' + str(nt) + ') and size of bvecs (' + str( nb_b0 + nb_dwi) + ') are not the same. Check your bvecs file.\n', 1, 'error') sys.exit(2) # ================================================================================================================== # Prepare data (mean/groups...) # ================================================================================================================== # Split into T dimension printv('\nSplit along T dimension...', param.verbose) im_data_split_list = split_data(im_data, 3) for im in im_data_split_list: x_dirname, x_basename, x_ext = extract_fname(im.absolutepath) im.absolutepath = os.path.join(x_dirname, x_basename + ".nii.gz") im.save() if param.is_diffusion: # Merge and average b=0 images printv('\nMerge and average b=0 data...', param.verbose) im_b0_list = [] for it in range(nb_b0): im_b0_list.append(im_data_split_list[index_b0[it]]) im_b0 = concat_data(im_b0_list, 3).save(file_b0, verbose=0) # Average across time im_b0.mean(dim=3).save(add_suffix(file_b0, '_mean')) n_moco = nb_dwi # set number of data to perform moco on (using grouping) index_moco = index_dwi # If not a diffusion scan, we will motion-correct all volumes else: n_moco = nt index_moco = list(range(0, nt)) nb_groups = int(math.floor(n_moco / param.group_size)) # Generate groups indexes group_indexes = [] for iGroup in range(nb_groups): group_indexes.append(index_moco[(iGroup * param.group_size):((iGroup + 1) * param.group_size)]) # add the remaining images to a new last group (in case the total number of image is not divisible by group_size) nb_remaining = n_moco % param.group_size # number of remaining images if nb_remaining > 0: nb_groups += 1 group_indexes.append(index_moco[len(index_moco) - nb_remaining:len(index_moco)]) _, file_dwi_basename, file_dwi_ext = extract_fname(file_datasub) # Group data list_file_group = [] for iGroup in sct_progress_bar(range(nb_groups), unit='iter', unit_scale=False, desc="Merge within groups", ascii=False, ncols=80): # get index index_moco_i = group_indexes[iGroup] n_moco_i = len(index_moco_i) # concatenate images across time, within this group file_dwi_merge_i = os.path.join(file_dwi_basename + '_' + str(iGroup) + ext_data) im_dwi_list = [] for it in range(n_moco_i): im_dwi_list.append(im_data_split_list[index_moco_i[it]]) im_dwi_out = concat_data(im_dwi_list, 3).save(file_dwi_merge_i, verbose=0) # Average across time list_file_group.append(os.path.join(file_dwi_basename + '_' + str(iGroup) + '_mean' + ext_data)) im_dwi_out.mean(dim=3).save(list_file_group[-1]) # Merge across groups printv('\nMerge across groups...', param.verbose) # file_dwi_groups_means_merge = 'dwi_averaged_groups' fname_dw_list = [] for iGroup in range(nb_groups): fname_dw_list.append(list_file_group[iGroup]) im_dw_list = [Image(fname) for fname in fname_dw_list] concat_data(im_dw_list, 3).save(file_datasubgroup, verbose=0) # Cleanup del im, im_data_split_list # ================================================================================================================== # Estimate moco # ================================================================================================================== # Initialize another class instance that will be passed on to the moco() function param_moco = deepcopy(param) if param.is_diffusion: # Estimate moco on b0 groups printv('\n-------------------------------------------------------------------------------', param.verbose) printv(' Estimating motion on b=0 images...', param.verbose) printv('-------------------------------------------------------------------------------', param.verbose) param_moco.file_data = 'b0.nii' # Identify target image if index_moco[0] != 0: # If first DWI is not the first volume (most common), then there is a least one b=0 image before. In that # case select it as the target image for registration of all b=0 param_moco.file_target = os.path.join(file_data_dirname, file_data_basename + '_T' + str(index_b0[index_moco[0] - 1]).zfill( 4) + ext_data) else: # If first DWI is the first volume, then the target b=0 is the first b=0 from the index_b0. param_moco.file_target = os.path.join(file_data_dirname, file_data_basename + '_T' + str(index_b0[0]).zfill(4) + ext_data) # Run moco param_moco.path_out = '' param_moco.todo = 'estimate_and_apply' param_moco.mat_moco = 'mat_b0groups' file_mat_b0, _ = moco(param_moco) # Estimate moco across groups printv('\n-------------------------------------------------------------------------------', param.verbose) printv(' Estimating motion across groups...', param.verbose) printv('-------------------------------------------------------------------------------', param.verbose) param_moco.file_data = file_datasubgroup param_moco.file_target = list_file_group[0] # target is the first volume (closest to the first b=0 if DWI scan) param_moco.path_out = '' param_moco.todo = 'estimate_and_apply' param_moco.mat_moco = 'mat_groups' file_mat_datasub_group, _ = moco(param_moco) # Spline Regularization along T if param.spline_fitting: # TODO: fix this scenario (haven't touched that code for a while-- it is probably buggy) raise NotImplementedError() # spline(mat_final, nt, nz, param.verbose, np.array(index_b0), param.plot_graph) # ================================================================================================================== # Apply moco # ================================================================================================================== # If group_size>1, assign transformation to each individual ungrouped 3d volume if param.group_size > 1: file_mat_datasub = [] for iz in range(len(file_mat_datasub_group)): # duplicate by factor group_size the transformation file for each it # example: [mat.Z0000T0001Warp.nii] --> [mat.Z0000T0001Warp.nii, mat.Z0000T0001Warp.nii] for group_size=2 file_mat_datasub.append( functools.reduce(operator.iconcat, [[i] * param.group_size for i in file_mat_datasub_group[iz]], [])) else: file_mat_datasub = file_mat_datasub_group # Copy transformations to mat_final folder and rename them appropriately copy_mat_files(nt, file_mat_datasub, index_moco, mat_final, param) if param.is_diffusion: copy_mat_files(nt, file_mat_b0, index_b0, mat_final, param) # Apply moco on all dmri data printv('\n-------------------------------------------------------------------------------', param.verbose) printv(' Apply moco', param.verbose) printv('-------------------------------------------------------------------------------', param.verbose) param_moco.file_data = file_data param_moco.file_target = list_file_group[0] # reference for reslicing into proper coordinate system param_moco.path_out = '' # TODO not used in moco() param_moco.mat_moco = mat_final param_moco.todo = 'apply' file_mat_data, im_moco = moco(param_moco) # copy geometric information from header # NB: this is required because WarpImageMultiTransform in 2D mode wrongly sets pixdim(3) to "1". im_moco.header = im_data.header im_moco.save(verbose=0) # Average across time if param.is_diffusion: # generate b0_moco_mean and dwi_moco_mean args = ['-i', im_moco.absolutepath, '-bvec', param.fname_bvecs, '-a', '1', '-v', '0'] if not param.fname_bvals == '': # if bvals file is provided args += ['-bval', param.fname_bvals] fname_b0, fname_b0_mean, fname_dwi, fname_dwi_mean = sct_dmri_separate_b0_and_dwi.main(argv=args) else: fname_moco_mean = add_suffix(im_moco.absolutepath, '_mean') im_moco.mean(dim=3).save(fname_moco_mean) # Extract and output the motion parameters (doesn't work for sagittal orientation) printv('Extract motion parameters...') if param.output_motion_param: if param.is_sagittal: printv('Motion parameters cannot be generated for sagittal images.', 1, 'warning') else: files_warp_X, files_warp_Y = [], [] moco_param = [] for fname_warp in file_mat_data[0]: # Cropping the image to keep only one voxel in the XY plane im_warp = Image(fname_warp + param.suffix_mat) im_warp.data = np.expand_dims(np.expand_dims(im_warp.data[0, 0, :, :, :], axis=0), axis=0) # These three lines allow to generate one file instead of two, containing X, Y and Z moco parameters #fname_warp_crop = fname_warp + '_crop_' + ext_mat # files_warp.append(fname_warp_crop) # im_warp.save(fname_warp_crop) # Separating the three components and saving X and Y only (Z is equal to 0 by default). im_warp_XYZ = multicomponent_split(im_warp) fname_warp_crop_X = fname_warp + '_crop_X_' + param.suffix_mat im_warp_XYZ[0].save(fname_warp_crop_X) files_warp_X.append(fname_warp_crop_X) fname_warp_crop_Y = fname_warp + '_crop_Y_' + param.suffix_mat im_warp_XYZ[1].save(fname_warp_crop_Y) files_warp_Y.append(fname_warp_crop_Y) # Calculating the slice-wise average moco estimate to provide a QC file moco_param.append([np.mean(np.ravel(im_warp_XYZ[0].data)), np.mean(np.ravel(im_warp_XYZ[1].data))]) # These two lines allow to generate one file instead of two, containing X, Y and Z moco parameters # im_warp = [Image(fname) for fname in files_warp] # im_warp_concat = concat_data(im_warp, dim=3) # im_warp_concat.save('fmri_moco_params.nii') # Concatenating the moco parameters into a time series for X and Y components. im_warp_X = [Image(fname) for fname in files_warp_X] im_warp_concat = concat_data(im_warp_X, dim=3) im_warp_concat.save(file_moco_params_x) im_warp_Y = [Image(fname) for fname in files_warp_Y] im_warp_concat = concat_data(im_warp_Y, dim=3) im_warp_concat.save(file_moco_params_y) # Writing a TSV file with the slicewise average estimate of the moco parameters. Useful for QC with open(file_moco_params_csv, 'wt') as out_file: tsv_writer = csv.writer(out_file, delimiter='\t') tsv_writer.writerow(['X', 'Y']) for mocop in moco_param: tsv_writer.writerow([mocop[0], mocop[1]]) # Generate output files printv('\nGenerate output files...', param.verbose) fname_moco = os.path.join(path_out_abs, add_suffix(os.path.basename(param.fname_data), param.suffix)) generate_output_file(im_moco.absolutepath, fname_moco) if param.is_diffusion: generate_output_file(fname_b0_mean, add_suffix(fname_moco, '_b0_mean')) generate_output_file(fname_dwi_mean, add_suffix(fname_moco, '_dwi_mean')) else: generate_output_file(fname_moco_mean, add_suffix(fname_moco, '_mean')) if os.path.exists(file_moco_params_csv): generate_output_file(file_moco_params_x, os.path.join(path_out_abs, file_moco_params_x), squeeze_data=False) generate_output_file(file_moco_params_y, os.path.join(path_out_abs, file_moco_params_y), squeeze_data=False) generate_output_file(file_moco_params_csv, os.path.join(path_out_abs, file_moco_params_csv)) # Delete temporary files if param.remove_temp_files == 1: printv('\nDelete temporary files...', param.verbose) rmtree(path_tmp, verbose=param.verbose) # come back to working directory os.chdir(curdir) # display elapsed time elapsed_time = time.time() - start_time printv('\nFinished! Elapsed time: ' + str(int(np.round(elapsed_time))) + 's', param.verbose) display_viewer_syntax( [os.path.join(param.path_out, add_suffix(os.path.basename(param.fname_data), param.suffix)), param.fname_data], mode='ortho,ortho')
[docs]def moco(param): """ Main function that performs motion correction. :param param: :return: """ # retrieve parameters file_data = param.file_data file_target = param.file_target folder_mat = param.mat_moco # output folder of mat file todo = param.todo suffix = param.suffix verbose = param.verbose # other parameters file_mask = 'mask.nii' printv('\nInput parameters:', param.verbose) printv(' Input file ............ ' + file_data, param.verbose) printv(' Reference file ........ ' + file_target, param.verbose) printv(' Polynomial degree ..... ' + param.poly, param.verbose) printv(' Smoothing kernel ...... ' + param.smooth, param.verbose) printv(' Gradient step ......... ' + param.gradStep, param.verbose) printv(' Metric ................ ' + param.metric, param.verbose) printv(' Sampling .............. ' + param.sampling, param.verbose) printv(' Todo .................. ' + todo, param.verbose) printv(' Mask ................. ' + param.fname_mask, param.verbose) printv(' Output mat folder ..... ' + folder_mat, param.verbose) try: os.makedirs(folder_mat) except FileExistsError: pass # Get size of data printv('\nData dimensions:', verbose) im_data = Image(param.file_data) nx, ny, nz, nt, px, py, pz, pt = im_data.dim printv((' ' + str(nx) + ' x ' + str(ny) + ' x ' + str(nz) + ' x ' + str(nt)), verbose) # copy file_target to a temporary file printv('\nCopy file_target to a temporary file...', verbose) file_target = "target.nii.gz" convert(param.file_target, file_target, verbose=0) # Check if user specified a mask if not param.fname_mask == '': # Check if this mask is soft (i.e., non-binary, such as a Gaussian mask) im_mask = Image(param.fname_mask) if not np.array_equal(im_mask.data, im_mask.data.astype(bool)): # If it is a soft mask, multiply the target by the soft mask. im = Image(file_target) im_masked = im.copy() im_masked.data = im.data * im_mask.data im_masked.save(verbose=0) # silence warning about file overwritting # If scan is sagittal, split src and target along Z (slice) if param.is_sagittal: dim_sag = 2 # TODO: find it # z-split data (time series) im_z_list = split_data(im_data, dim=dim_sag, squeeze_data=False) file_data_splitZ = [] for im_z in im_z_list: im_z.save(verbose=0) file_data_splitZ.append(im_z.absolutepath) # z-split target im_targetz_list = split_data(Image(file_target), dim=dim_sag, squeeze_data=False) file_target_splitZ = [] for im_targetz in im_targetz_list: im_targetz.save(verbose=0) file_target_splitZ.append(im_targetz.absolutepath) # z-split mask (if exists) if not param.fname_mask == '': im_maskz_list = split_data(Image(file_mask), dim=dim_sag, squeeze_data=False) file_mask_splitZ = [] for im_maskz in im_maskz_list: im_maskz.save(verbose=0) file_mask_splitZ.append(im_maskz.absolutepath) # initialize file list for output matrices file_mat = np.empty((nz, nt), dtype=object) # axial orientation else: file_data_splitZ = [file_data] # TODO: make it absolute like above file_target_splitZ = [file_target] # TODO: make it absolute like above # initialize file list for output matrices file_mat = np.empty((1, nt), dtype=object) # deal with mask if not param.fname_mask == '': convert(param.fname_mask, file_mask, squeeze_data=False, verbose=0) im_maskz_list = [Image(file_mask)] # use a list with single element # Loop across file list, where each file is either a 2D volume (if sagittal) or a 3D volume (otherwise) # file_mat = tuple([[[] for i in range(nt)] for i in range(nz)]) file_data_splitZ_moco = [] printv('\nRegister. Loop across Z (note: there is only one Z if orientation is axial)') for file in file_data_splitZ: iz = file_data_splitZ.index(file) # Split data along T dimension # printv('\nSplit data along T dimension.', verbose) im_z = Image(file) list_im_zt = split_data(im_z, dim=3) file_data_splitZ_splitT = [] for im_zt in list_im_zt: im_zt.save(verbose=0) file_data_splitZ_splitT.append(im_zt.absolutepath) # file_data_splitT = file_data + '_T' # Motion correction: initialization index = np.arange(nt) file_data_splitT_num = [] file_data_splitZ_splitT_moco = [] failed_transfo = [0 for i in range(nt)] # Motion correction: Loop across T for indice_index in sct_progress_bar(range(nt), unit='iter', unit_scale=False, desc="Z=" + str(iz) + "/" + str(len(file_data_splitZ) - 1), ascii=False, ncols=80): # create indices and display stuff it = index[indice_index] file_mat[iz][it] = os.path.join(folder_mat, "mat.Z") + str(iz).zfill(4) + 'T' + str(it).zfill(4) file_data_splitZ_splitT_moco.append(add_suffix(file_data_splitZ_splitT[it], '_moco')) # deal with masking (except in the 'apply' case, where masking is irrelevant) input_mask = None if not param.fname_mask == '' and not param.todo == 'apply': # Check if mask is binary if np.array_equal(im_maskz_list[iz].data, im_maskz_list[iz].data.astype(bool)): # If it is, pass this mask into register() to be used input_mask = im_maskz_list[iz] else: # If not, do not pass this mask into register() because ANTs cannot handle non-binary masks. # Instead, multiply the input data by the Gaussian mask. im = Image(file_data_splitZ_splitT[it]) im_masked = im.copy() im_masked.data = im.data * im_maskz_list[iz].data im_masked.save(verbose=0) # silence warning about file overwritting # run 3D registration failed_transfo[it] = register(param, file_data_splitZ_splitT[it], file_target_splitZ[iz], file_mat[iz][it], file_data_splitZ_splitT_moco[it], im_mask=input_mask) # average registered volume with target image # N.B. use weighted averaging: (target * nb_it + moco) / (nb_it + 1) if param.iterAvg and indice_index < 10 and failed_transfo[it] == 0 and not param.todo == 'apply': im_targetz = Image(file_target_splitZ[iz]) data_targetz = im_targetz.data data_mocoz = Image(file_data_splitZ_splitT_moco[it]).data data_targetz = (data_targetz * (indice_index + 1) + data_mocoz) / (indice_index + 2) im_targetz.data = data_targetz im_targetz.save(verbose=0) # Replace failed transformation with the closest good one fT = [i for i, j in enumerate(failed_transfo) if j == 1] gT = [i for i, j in enumerate(failed_transfo) if j == 0] for it in range(len(fT)): abs_dist = [np.abs(gT[i] - fT[it]) for i in range(len(gT))] if not abs_dist == []: index_good = abs_dist.index(min(abs_dist)) printv(' transfo #' + str(fT[it]) + ' --> use transfo #' + str(gT[index_good]), verbose) # copy transformation copy(file_mat[iz][gT[index_good]] + 'Warp.nii.gz', file_mat[iz][fT[it]] + 'Warp.nii.gz') # apply transformation sct_apply_transfo.main(argv=['-i', file_data_splitZ_splitT[fT[it]], '-d', file_target, '-w', file_mat[iz][fT[it]] + 'Warp.nii.gz', '-o', file_data_splitZ_splitT_moco[fT[it]], '-x', param.interp]) else: # exit program if no transformation exists. printv('\nERROR in ' + os.path.basename(__file__) + ': No good transformation exist. Exit program.\n', verbose, 'error') sys.exit(2) # Merge data along T file_data_splitZ_moco.append(add_suffix(file, suffix)) if todo != 'estimate': im_data_splitZ_splitT_moco = [Image(fname) for fname in file_data_splitZ_splitT_moco] im_out = concat_data(im_data_splitZ_splitT_moco, 3) im_out.absolutepath = file_data_splitZ_moco[iz] im_out.save(verbose=0) # If sagittal, merge along Z if param.is_sagittal: # TODO: im_out.dim is incorrect: Z value is one im_data_splitZ_moco = [Image(fname) for fname in file_data_splitZ_moco] im_out = concat_data(im_data_splitZ_moco, 2) dirname, basename, ext = extract_fname(file_data) path_out = os.path.join(dirname, basename + suffix + ext) im_out.absolutepath = path_out im_out.save(verbose=0) return file_mat, im_out
[docs]def register(param, file_src, file_dest, file_mat, file_out, im_mask=None): """ Register two images by estimating slice-wise Tx and Ty transformations, which are regularized along Z. This function uses ANTs' isct_antsSliceRegularizedRegistration. :param param: :param file_src: :param file_dest: :param file_mat: :param file_out: :param im_mask: Image of mask, could be 2D or 3D :return: """ # TODO: deal with mask # initialization failed_transfo = 0 # by default, failed matrix is 0 (i.e., no failure) do_registration = True # get metric radius (if MeanSquares, CC) or nb bins (if MI) if param.metric == 'MI': metric_radius = '16' else: metric_radius = '4' file_out_concat = file_out kw = dict() im_data = Image(file_src) # TODO: pass argument to use antsReg instead of opening Image each time # register file_src to file_dest if param.todo == 'estimate' or param.todo == 'estimate_and_apply': # If orientation is sagittal, use antsRegistration in 2D mode # Note: the parameter --restrict-deformation is irrelevant with affine transfo if param.sampling == 'None': # 'None' sampling means 'fully dense' sampling # see https://github.com/ANTsX/ANTs/wiki/antsRegistration-reproducibility-issues sampling = param.sampling else: # param.sampling should be a float in [0,1], and means the # samplingPercentage that chooses a subset of points to # estimate from. We always use 'Regular' (evenly-spaced) # mode, though antsRegistration offers 'Random' as well. # Be aware: even 'Regular' is not fully deterministic: # > Regular includes a random perturbation on the grid sampling # - https://github.com/ANTsX/ANTs/issues/976#issuecomment-602313884 sampling = 'Regular,' + param.sampling if im_data.orientation[2] in 'LR': cmd = ['isct_antsRegistration', '-d', '2', '--transform', 'Affine[%s]' % param.gradStep, '--metric', param.metric + '[' + file_dest + ',' + file_src + ',1,' + metric_radius + ',' + sampling + ']', '--convergence', param.iter, '--shrink-factors', '1', '--smoothing-sigmas', param.smooth, '--verbose', '1', '--output', '[' + file_mat + ',' + file_out_concat + ']'] cmd += get_interpolation('isct_antsRegistration', param.interp) if im_mask is not None: # if user specified a mask, make sure there are non-null voxels in the image before running the registration if np.count_nonzero(im_mask.data): cmd += ['--masks', im_mask.absolutepath] else: # Mask only contains zeros. Copying the image instead of estimating registration. copy(file_src, file_out_concat, verbose=0) do_registration = False # TODO: create affine mat file with identity, in case used by -g 2 # 3D mode else: cmd = ['isct_antsSliceRegularizedRegistration', '--polydegree', param.poly, '--transform', 'Translation[%s]' % param.gradStep, '--metric', param.metric + '[' + file_dest + ',' + file_src + ',1,' + metric_radius + ',' + sampling + ']', '--iterations', param.iter, '--shrinkFactors', '1', '--smoothingSigmas', param.smooth, '--verbose', '1', '--output', '[' + file_mat + ',' + file_out_concat + ']'] cmd += get_interpolation('isct_antsSliceRegularizedRegistration', param.interp) if im_mask is not None: cmd += ['--mask', im_mask.absolutepath] # run command if do_registration: kw.update(dict(is_sct_binary=True)) # reducing the number of CPU used for moco (see issue #201 and #2642) env = {**os.environ, **{"ITK_GLOBAL_DEFAULT_NUMBER_OF_THREADS": "1"}} status, output = run_proc(cmd, verbose=1 if param.verbose == 2 else 0, env=env, **kw) elif param.todo == 'apply': sct_apply_transfo.main(argv=['-i', file_src, '-d', file_dest, '-w', file_mat + param.suffix_mat, '-o', file_out_concat, '-x', param.interp, '-v', '0']) # check if output file exists # Note (from JCA): In the past, i've tried to catch non-zero output from ANTs function (via the 'status' variable), # but in some OSs, the function can fail while outputing zero. So as a pragmatic approach, I decided to go with # the "output file checking" approach, which is 100% sensitive. if not os.path.isfile(file_out_concat): # printv(output, verbose, 'error') printv('WARNING in ' + os.path.basename(__file__) + ': No output. Maybe related to improper calculation of ' 'mutual information. Either the mask you provided is ' 'too small, or the subject moved a lot. If you see too ' 'many messages like this try with a bigger mask. ' 'Using previous transformation for this volume (if it' 'exists).', param.verbose, 'warning') failed_transfo = 1 # If sagittal, copy header (because ANTs screws it) and add singleton in 3rd dimension (for z-concatenation) if im_data.orientation[2] in 'LR' and do_registration: im_out = Image(file_out_concat) im_out.header = im_data.header im_out.data = np.expand_dims(im_out.data, 2) im_out.save(file_out, verbose=0) # return status of failure return failed_transfo
def spline(folder_mat, nt, nz, verbose, index_b0=[], graph=0): printv('\n\n\n------------------------------------------------------------------------------', verbose) printv('Spline Regularization along T: Smoothing Patient Motion...', verbose) file_mat = [[[] for i in range(nz)] for i in range(nt)] for it in range(nt): for iz in range(nz): file_mat[it][iz] = os.path.join(folder_mat, "mat.T") + str(it) + '_Z' + str(iz) + '.txt' # Copying the existing Matrices to another folder old_mat = os.path.join(folder_mat, "old") try: os.makedirs(old_mat) except FileExistsError: pass # TODO for mat in glob.glob(os.path.join(folder_mat, '*.txt')): copy(mat, old_mat) printv('\nloading matrices...', verbose) X = [[[] for i in range(nt)] for i in range(nz)] Y = [[[] for i in range(nt)] for i in range(nz)] X_smooth = [[[] for i in range(nt)] for i in range(nz)] Y_smooth = [[[] for i in range(nt)] for i in range(nz)] for iz in range(nz): for it in range(nt): file = open(file_mat[it][iz]) Matrix = np.loadtxt(file) file.close() X[iz][it] = Matrix[0, 3] Y[iz][it] = Matrix[1, 3] # Generate motion splines printv('\nGenerate motion splines...', verbose) T = np.arange(nt) if graph: import pylab as pl for iz in range(nz): spline = scipy.interpolate.UnivariateSpline(T, X[iz][:], w=None, bbox=[None, None], k=3, s=None) X_smooth[iz][:] = spline(T) if graph: pl.plot(T, X_smooth[iz][:], label='spline_smoothing') pl.plot(T, X[iz][:], marker='*', linestyle='None', label='original_val') if len(index_b0) != 0: T_b0 = [T[i_b0] for i_b0 in index_b0] X_b0 = [X[iz][i_b0] for i_b0 in index_b0] pl.plot(T_b0, X_b0, marker='D', linestyle='None', color='k', label='b=0') pl.title('X') pl.grid() pl.legend() pl.show() spline = scipy.interpolate.UnivariateSpline(T, Y[iz][:], w=None, bbox=[None, None], k=3, s=None) Y_smooth[iz][:] = spline(T) if graph: pl.plot(T, Y_smooth[iz][:], label='spline_smoothing') pl.plot(T, Y[iz][:], marker='*', linestyle='None', label='original_val') if len(index_b0) != 0: T_b0 = [T[i_b0] for i_b0 in index_b0] Y_b0 = [Y[iz][i_b0] for i_b0 in index_b0] pl.plot(T_b0, Y_b0, marker='D', linestyle='None', color='k', label='b=0') pl.title('Y') pl.grid() pl.legend() pl.show() # Storing the final Matrices printv('\nStoring the final Matrices...', verbose) for iz in range(nz): for it in range(nt): file = open(file_mat[it][iz]) Matrix = np.loadtxt(file) file.close() Matrix[0, 3] = X_smooth[iz][it] Matrix[1, 3] = Y_smooth[iz][it] file = open(file_mat[it][iz], 'w') np.savetxt(file_mat[it][iz], Matrix, fmt="%s", delimiter=' ', newline='\n') file.close() printv('\n...Done. Patient motion has been smoothed', verbose) printv('------------------------------------------------------------------------------\n', verbose)