#!/usr/bin/env python
""" -- low-level utilities

# Paths
import os
from glob import glob 

# Numeric
import numpy as np 

# Dataframes
import pandas as pd 

# Special functions
from scipy.special import erf 

# Image processing utilities
from scipy import ndimage as ndi 

# Load *.mat format files
from scipy import io as sio 

# Caching
from functools import lru_cache 

# Warnings, to filter out warnings for a potentially dangerous
# division in rs() that is subsequent corrected
import warnings 

# Profiling
from time import time 

from .plot import wireframe_overlay

[docs]def time_f(f): """ Decorator. When executing the decorated function, also print the time to execute. args ---- f : function returns ------- output of f """ def g(*args, **kwargs): t0 = time() r = f(*args, **kwargs) t1 = time() print("%f seconds" % (t1-t0)) return r return g
############# ## GENERAL ## #############
[docs]def assign_methods(methods): """ Decorator that designates a wrapper function for other methods. Adds a new kwarg to the decorated function ("method") that looks up the corresponding method in the *methods* dict. Example: def add(x, y): return x + y def mult(x, y): return x * y METHODS = {'add': add, 'mult': mult} do_operation = assign_methods(METHODS)(lambda a, b: None) This would be called: do_operation(2, 3, method='add') # is 5 do_operation(2, 3, method='mult') # is 6 args ---- methods : dict that keys strings to methods returns ------- function, wrapper for the methods """ def dec(f): def apply_method(*args, method=None, **kwargs): print(method) method_f = methods.get(method) if method_f is None: raise KeyError("Method %s not found; available methods are %s" % ( method, ", ".join(methods.keys()))) return method_f(*args, **kwargs) return apply_method return dec
[docs]def stable_divide_array(N, D, zero=0.001): """ Divide array N by array D, setting zeros in D to *zero*. Assumes nonzero D. args ---- N, D : ndarrays of the same shape zero : float returns ------- ndarray of the same shape as N and D """ return N/np.clip(D, zero, np.inf)
[docs]def stable_divide_float(N, D, inf=0.0): """ Divide float N by float D, returning *inf* if *D* is zero. args ---- N, D : float inf : float returns ------- float """ return (inf if D==0.0 else N/D)
##################### ## FILE CONVERSION ## #####################
[docs]def tracked_mat_to_csv(path, out_csv=None, pixel_size_um=0.16): """ Convert a file from *Tracked.mat format, a MATLAB-based format for trajectories to a DataFrame format. args ---- path : str, path to a *Tracked.mat file out_csv : str, file to save the result to pixel_size_um: float, the size of pixels in um returns ------- pandas.DataFrame, the contents of the *Tracked.mat file as a DataFrame """ f = sio.loadmat(path)['trackedPar'] if len(f.shape) == 2 and f.shape[0] == 1: f = f[0,:] # Total number of localizations in the file n_locs = sum([f[i][0].shape[0] for i in range(len(f))]) # Output dataframe df = pd.DataFrame(index=np.arange(n_locs), columns=["y_um", "x_um", "frame", "trajectory", "time"]) # Trajectory indices df["trajectory"] = np.concatenate([[i for j in range(f[i][0].shape[0])] for i in range(len(f))]) # XY positions of each localization df[["y_um", "x_um"]] = np.concatenate([f[i][0] for i in range(len(f))], axis=0) # Frame index (correct for MATLAB off-by-1) df["frame"] = np.concatenate([f[i][1][:,0] for i in range(len(f))]) - 1 # Timepoint in seconds df["time"] = np.concatenate([f[i][2][:,0] for i in range(len(f))]) # Convert XY positions to pixels and correct for the MATLAB # off-by-1 df[["y", "x"]] = (df[["y_um", "x_um"]] / pixel_size_um) - 1 # Assign other columns that may be expected by other utilities df["error_flag"] = 0.0 if not out_csv is None: df.to_csv(out_csv, index=False) return df
[docs]def load_tracks_dir(dirname, suffix="trajs.csv", start_frame=0, min_track_len=1): """ Load all of the trajectory files in a target directory into a single pandas.DataFrame. args ---- dirname : str, directory containing track CSVs suffix : str, extension for the track CSVs start_frame : int, exclude all trajectories before this frame min_track_len : int, the minimum trajectory length to include returns ------- pandas.DataFrame """ # Find target files if os.path.isdir(dirname): target_csvs = glob(os.path.join(dirname, "*{}".format(suffix))) if len(target_csvs) == 0: raise RuntimeError("quot.helper.load_tracks_dir: could not find " \ "trajectory CSVs in directory {}".format(dirname)) elif os.path.isfile(dirname): target_csvs = [dirname] # Concatenate trajectories tracks = [pd.read_csv(j) for j in target_csvs] tracks = concat_tracks(*tracks) # Exclude points before the start frame tracks = tracks[tracks["frame"] >= start_frame] # Exclude trajectories that are too short tracks = track_length(tracks) if min_track_len > 1: tracks = tracks[tracks["track_length"] >= min_track_len] return tracks
############### ## DETECTION ## ###############
[docs]def pad(I, H, W, mode='ceil'): """ Pad an array with zeroes around the edges, placing the original array in the center. args ---- I : 2D ndarray, image to be padded H : int, height of final image W : int, width of final image mode : str, either 'ceil' or 'floor'. 'ceil' yields convolution kernels that function similarly to scipy.ndimage.uniform_filter. returns ------- 2D ndarray, shape (H, W) """ H_in, W_in = I.shape out = np.zeros((H, W)) if mode == "ceil": hc = np.ceil(H / 2 - H_in / 2).astype(int) wc = np.ceil(W / 2 - W_in / 2).astype(int) elif mode == "floor": hc = np.floor(H / 2 - H_in / 2).astype(int) wc = np.floor(W / 2 - W_in / 2).astype(int) out[hc : hc + H_in, wc : wc + W_in] = I return out
[docs]def label_spots(binary_img, intensity_img=None, mode="max"): """ Find continuous nonzero objects in a binary image, returning the coordinates of the spots. If the objects are larger than a single pixel, then to find the central pixel do 1. use the center of mass (if mode == 'centroid') 2. use the brightest pixel (if mode == 'max') 3. use the mean position of the binary spot (if img_int is not specified) args ---- binary_img : 2D ndarray (YX), dtype bool intensity_img : 2D ndarray (YX) mode : str, 'max' or 'centroid' returns ------- 2D ndarray (n_spots, 2), dtype int64, the Y and X coordinate of each spot """ # Find and label every nonzero object img_lab, n = ndi.label(binary_img, structure=np.array([[1, 1, 1], [1, 1, 1], [1, 1, 1]])) index = np.arange(1, n + 1) # Find the centers of the spots if intensity_img is None: positions = np.asarray(ndi.center_of_mass(binary_img, labels=img_lab, index=index)) elif mode == "max": positions = np.asarray(ndi.maximum_position(intensity_img, labels=img_lab, index=index)) elif mode == "centroid": positions = np.asarray(ndi.center_of_mass(intensity_img, labels=img_lab, index=index)) return positions.astype("int64")
[docs]def threshold_image(I, t=200.0, return_filt=False, mode='max'): """ Take all spots in an image above a threshold *t* and return their approximate centers. If *return_filt* is set, the function also returns the raw image and binary image. This is useful as a back door when writing GUIs. args ---- I : 2D ndarray, image t : float, threshold return_filt : bool mode : str, either 'max' or 'centroid' returns ------- If *return_filt* is set: ( 2D ndarray, same as I; 2D ndarray, binary image; 2D ndarray, shape (n_spots, 2), the spot coordinates ) else 2D ndarray of shape (n_spots, 2), the spot coordinates """ I_bin = I > t pos = label_spots(I_bin, intensity_img=I, mode=mode) if return_filt: return I, I_bin, pos else: return pos
[docs]def hollow_box_var(I, w0=15, w1=9): """ Calculate the local variance of every point in a 2D image, returning another 2D image composed of the variances. Variances are calculated in a "hollow box" region around each point. The box is a square kernel of width *w0*; the hollow central region is width *w1*. For instance, if w0 = 5 and w1 = 3, the kernel would be 1 1 1 1 1 1 0 0 0 1 1 0 0 0 1 1 0 0 0 1 1 1 1 1 1 args ---- I : 2D ndarray, image w0 : int, width of the kernel w1 : int, width of central region returns ------- 2D ndarray """ # Make the hollow box kernel hw0 = w0 // 2 hw1 = w1 // 2 kernel = np.ones((w0, w0), dtype='float64') kernel[hw0-hw1:hw0+hw1+1, hw0-hw1:hw0+hw1+1] = 0 kernel = kernel / kernel.sum() # Calculate variance kernel_rft = np.fft.rfft2(pad(kernel, *I.shape)) I_mean = np.fft.fftshift(np.fft.irfft2(np.fft.rfft2(I) * kernel_rft, s=I.shape)) I2_mean = np.fft.fftshift(np.fft.irfft2(np.fft.rfft2(I**2) * kernel_rft, s=I.shape)) return I2_mean - (I_mean**2)
########################### ## SUBPIXEL LOCALIZATION ## ########################### ## LOW-LEVEL SUBPIXEL LOCALIZATION FUNCTIONS
[docs]def ring_mean(I): """ Take the mean of the outer ring of pixels in a 2D array. args ---- I : 2D ndarray (YX) returns ------- float, mean """ return sum([ I[0,:-1].mean(), I[:-1,-1].mean(), I[-1,1:].mean(), I[1:,0].mean(), ])/4.0
[docs]def ring_var(I, ddof=0): """ Take the variance of the outer ring of pixels in a 2D ndarray. args ---- I : 2D ndarray, image ddof : int, delta degrees of freedom returns ------- float, variance estimate """ return np.concatenate([ I[0,1:], I[1:,-1], I[-1,:-1], I[:-1,0] ]).var(ddof=ddof)
[docs]def I0_is_crazy(I0, max_I0=10000): """ Determine whether the intensity estimate in subpixel localization looks crazy. For many methods, this is often the first thing to diverge. args ---- I0 : float, the intensity estimate for subpixel fitting max_I0 : float, the maximum tolerated intensity returns ------- bool, True if crazy """ return (I0>=0) and (I0<=max_I0)
[docs]def estimate_I0(I, y, x, bg, sigma=1.0): """ Estimate the integrated intensity of a 2D integrated Gaussian PSF. This model has four parameters - y, x, I0, and bg - so with y, x, and bg constrained, we can solve for I0 directly. In this method, we use the brightest pixel in the image to solve for I0 given the PSF model. This is easy but often an overestimate in the presence of Poisson noise. args ---- I : 2D ndarray, the PSF image y, x : float, center of PSF in y and x bg : float, estimated background intensity per pixel (AU) sigma : float, 2D integrated Gaussian PSF radius returns ------- float, estimate for I0 """ # Get the index of the brightest pixel ym, xm = np.unravel_index(np.argmax(I), I.shape) # Evaluate the intensity estimate return stable_divide_float( I[ym,xm] - bg, int_gauss_psf_1d(ym, y, sigma=sigma) * \ int_gauss_psf_1d(xm, x, sigma=sigma), inf=np.nan )
[docs]def estimate_I0_multiple_points(I, y, x, bg, sigma=1.0): """ Estimate the integrated intensity of a 2D integrated Gaussian PSF. This model has four parameters - y, x, I0, and bg - so with y, x, and bg constrained, we can solve for I0 directly. In this method, we use the intensities at the 3x3 central points in the image window to obtain nine estimates for I0, then average them for the final estimate. This is more accurate than estimate_I0() but is slower. Probably overkill for a first guess. args ---- I : 2D ndarray, the PSF image y, x : float, center of PSF in y and x bg : float, estimated background intensity per pixel (AU) sigma : float, 2D integrated Gaussian PSF radius returns ------- float, estimate for I0 """ # Get the three central points in the spot window # and their coordinates wy, wx = I.shape hwy = wy//2 hwx = wx//2 Y, X = indices(wy, wx) sy = slice(hwy-1, hwy+2) sx = slice(hwx-1, hwx+2) obs_I = I[sy, sx].ravel() cen_y = Y[sy, sx].ravel() cen_x = X[sy, sx].ravel() return stable_divide_array( obs_I - bg, int_gauss_psf_1d(cen_y, y, sigma=sigma) * \ int_gauss_psf_1d(cen_x, x, sigma=sigma) ).mean()
[docs]def amp_from_I(I0, sigma=1.0): """ Given a 2D Gaussian PSF, return the PSF peak amplitude given the intensity `I0`. `I0` is equal to the PSF integrated above background, while `amp` is equal to the PSF evaluated at its maximum. args ---- I0 : float, intensity estimate sigma : float, width of Gaussian returns ------- float, amplitude estimate """ return I0/(2*np.pi*(sigma**2))
[docs]def estimate_snr(I, I0): """ Estimate the signal-to-noise ratio of a PSF, given an estimate *I0* for its intensity. args ---- I : 2D ndarray, the PSF image I0 : float, intensity estimate returns ------- float, SNR estimate """ return stable_divide_float( amp_from_I(I0)**2, ring_var(I, ddof=1), inf=np.inf)
[docs]def invert_hessian(H, ridge=0.0001): """ Invert a Hessian with ridge regularization to stabilize the inversion. args ---- H : 2D ndarray, shape (n, n) ridge : float, regularization term returns ------- 2D ndarray, shape (n, n), the inverted Hessian """ D = np.diag(np.ones(H.shape[0])*ridge) while 1: try: H_inv = np.linalg.inv(H-D) return H_inv except (ZeroDivisionError, np.linalg.linalg.LinAlgError): D *= 10 continue
[docs]def check_2d_gauss_fit(img_shape, pars, max_I0=10000): """ Check whether the fit parameters for a 2D symmetric Gaussian with static sigma are sane. This includes: - Are the y and x coordinates inside the PSF window? - Are there negative intensities? - Are there crazily high intensities? args ---- img_shape : (int, int), the PSF image shape pars : 1D ndarray, (y, x, I0, bg), the fit parameters max_I0 : float, maximum tolerated intensity returns ------- bool, True if the fit passes the checks """ return all([ pars[0]>=0, pars[1]>=0, pars[0]<img_shape[0], pars[1]<img_shape[1], pars[2]>=0, pars[2]<=max_I0, pars[3]>=0 ])
[docs]@lru_cache(maxsize=1) def psf_int_proj_denom(sigma): """ Convenience function to produce the denominator for the Gaussian functions in int_gauss_psf_1d(). args ---- sigma : float returns ------- float, np.sqrt(2 * (sigma**2)) """ return np.sqrt(2*(sigma**2))
[docs]@lru_cache(maxsize=1) def indices(size_y, size_x): """ Convenience wrapper for np.indices. args ---- size_y : int, the number of indices in the y direction size_x : int, the number of indices in the x direction returns ------- ( 2D ndarray, the Y indices of each pixel; 2D ndarray, the X indices of each pixel ) """ return np.indices((size_y, size_x))
[docs]@lru_cache(maxsize=1) def indices_1d(size_y, size_x): """ Cached convenience function to generate two sets of 1D indices. args ---- size_y : int, the number of indices in the y direction size_x ; int, the number of indices in the x direction returns ------- ( 1D ndarray, the Y indices of each pixel; 1D ndarray, the X indices of each pixel ) """ return np.arange(size_y).astype('float64'), \ np.arange(size_x).astype('float64')
[docs]def int_gauss_psf_1d(Y, yc, sigma=1.0): """ Return a 2D integrated Gaussian PSF with unit intensity, projected onto one of the axes. args ---- Y : 1D ndarray, coordinates along the axis yc : float, PSF center sigma : float, Gaussian sigma returns ------- 1D ndarray, the intensities projected along each row of pixels """ S = psf_int_proj_denom(sigma) return 0.5*(erf((Y+0.5-yc)/S) - \ erf((Y-0.5-yc)/S))
[docs]def int_gauss_psf_2d(size_y, size_x, yc, xc, I0, sigma=1.0): """ Return a 2D integrated Gaussian PSF with intensity I0. Does not include a background term. args ---- size_y : int, the number of pixels in the y direction size_x : int, the number of pixels in the x direction yc : float, center of PSF in y xc : float, center of PSF in x sigma : float, PSF width returns ------- 2D ndarray of shape (size_y, size_x), the PSF """ Y, X = indices_1d(size_y, size_x) return I0*np.outer( int_gauss_psf_1d(Y, yc, sigma=sigma), int_gauss_psf_1d(X, xc, sigma=sigma), )
[docs]def int_gauss_psf_deriv_1d(Y, yc, sigma=1.0): """ Evaluate the derivative of an integrated Gaussian PSF model with unit intensity projected onto 1 axis with respect to its axis variable. args ---- Y : 1D ndarray, the pixel indices at which to evaluate the PSF yc : float, the spot center sigma : float, Gaussian sigma returns ------- 1D ndarray, the derivative of the projection with respect to the axis variable at each pixel """ A, B = psf_point_proj_denom(sigma) return (np.exp(-(Y-0.5-yc)**2 / A) - \ np.exp(-(Y+0.5-yc)**2 / A)) / B
[docs]@lru_cache(maxsize=1) def psf_point_proj_denom(sigma): """ Convenience function to produce the denominator for the Gaussian functions in psf_point_psf_1d(). args ---- sigma : float, Gaussian sigma returns ------- float, float """ S = 2*(sigma**2) return S, np.sqrt(np.pi*S)
[docs]def point_gauss_psf_1d(Y, yc, sigma=1.0): """ Evaluate a 1D point Gaussian with unit intensity at a set of points. args ---- Y : 1D ndarray, coordinates along the axis yc : float, PSF center sigma : float, Gaussian sigma returns ------- 1D ndarray, the intensities projected along each row of pixels """ A, B = psf_point_proj_denom(sigma) return np.exp(-(Y-yc)**2 / A) / B
[docs]def point_gauss_psf_2d(size_y, size_x, yc, xc, I0, sigma=1.0): """ Return a 2D pointwise-evaluated Gaussian PSF with intensity I0. args ---- size_y : int, size of the PSF subwindow in y size_x : int, size of the PSF subwindow in x yc : float, PSF center in y xc : float, PSF center in x I0 : float, PSF intensity sigma : float, Gaussian sigma returns ------- 2D ndarray, the PSF model """ Y, X = indices_1d(size_y, size_x) return I0*np.outer( point_gauss_psf_1d(Y, yc, sigma=sigma), point_gauss_psf_1d(X, xc, sigma=sigma), )
[docs]def rs(psf_image): """ Localize the center of a PSF using the radial symmetry method. Originally conceived by the criminally underrated Parasarathy R Nature Methods 9, pgs 724–726 (2012). args ---- psf_image : 2D ndarray, PSF subwindow returns ------- float y estimate, float x estimate """ # Get the size of the image frame and build # a set of pixel indices to match N, M = psf_image.shape N_half = N // 2 M_half = M // 2 ym, xm = np.mgrid[:N-1, :M-1] ym = ym - N_half + 0.5 xm = xm - M_half + 0.5 # Calculate the diagonal gradients of intensities across each # corner of 4 pixels dI_du = psf_image[:N-1, 1:] - psf_image[1:, :M-1] dI_dv = psf_image[:N-1, :M-1] - psf_image[1:, 1:] # Smooth the image to reduce the effect of noise, at the cost # of a little resolution fdu = ndi.uniform_filter(dI_du, 3) fdv = ndi.uniform_filter(dI_dv, 3) dI2 = (fdu ** 2) + (fdv ** 2) with warnings.catch_warnings(): warnings.simplefilter('ignore') m = -(fdv + fdu) / (fdu - fdv) # For pixel values that blow up, instead set them to a very # high float m[np.isinf(m)] = 9e9 b = ym - m * xm sdI2 = dI2.sum() ycentroid = (dI2 * ym).sum() / sdI2 xcentroid = (dI2 * xm).sum() / sdI2 w = dI2 / np.sqrt((xm - xcentroid)**2 + (ym - ycentroid)**2) # Correct nan / inf values w[np.isnan(m)] = 0 b[np.isnan(m)] = 0 m[np.isnan(m)] = 0 # Least-squares analytical solution to the point of # maximum radial symmetry, given the slopes at each # edge of 4 pixels wm2p1 = w / ((m**2) + 1) sw = wm2p1.sum() smmw = ((m**2) * wm2p1).sum() smw = (m * wm2p1).sum() smbw = (m * b * wm2p1).sum() sbw = (b * wm2p1).sum() det = (smw ** 2) - (smmw * sw) xc = (smbw*sw - smw*sbw)/det yc = (smbw*smw - smmw*sbw)/det # Adjust coordinates so that they're relative to the # edge of the image frame yc = (yc + (N + 1) / 2.0) - 1 xc = (xc + (M + 1) / 2.0) - 1 return yc, xc
[docs]def fit_ls_int_gaussian(img, guess, sigma=1.0, ridge=0.0001, max_iter=20, damp=0.3, convergence=1.0e-4, divergence=1.0): """ Given an observed spot, estimate the maximum likelihood parameters for a 2D integrated Gaussian PSF sampled with normally-distributed noise. Here, we use a Levenberg- Marquardt procedure to find the ML parameters. The model parameters are (y, x, I0, bg), where y, x is the spot center, I0 is the integrated intensity, and bg is the average background per pixel. The function also returns several parameters that are useful for quality control on the fits. args ---- img : 2D ndarray, the observed PSF guess : 1D ndarray of shape (4), the initial guess for y, x, I0, and bg sigma : float, Gaussian PSF width ridge : float, initial regularization term for inversion of the Hessian max_iter: int, the maximum number of iterations tolerated damp : float, damping factor for update vector. Larger means faster convergence but also more unstable. convergence : float, the criterion for fit convergence. Only y and x are tested for convergence. divergence : float, the criterion for fit divergence. Fitting is terminated when this is reached. CURRENTLY NOT IMPLEMENTED. returns ------- ( 1D ndarray, the final parameter estimate; 1D ndarray, the estimated error in each parameter (square root of the inverse Hessian diagonal); float, the Hessian determinant; float, the root mean squared deviation of the final PSF model from the observed PSF; int, the number of iterations ) """ n_pixels = img.shape[0] * img.shape[1] # 1D indices for each pixel index_y, index_x = indices_1d(*img.shape) # Update vector update = np.ones(4, dtype='float64') # Current parameter set pars = guess.copy() # Continue improving guess until convergence iter_idx = 0 while iter_idx<max_iter and (np.abs(update[:2])>convergence).any(): # Evaluate the PSF model and its derivatives projected # onto each of the two axes proj_y = int_gauss_psf_1d(index_y, pars[0], sigma=sigma) proj_x = int_gauss_psf_1d(index_x, pars[1], sigma=sigma) # Evaluate the derivatives of the model with respect to # the Y and X axies dproj_y_dy = int_gauss_psf_deriv_1d(index_y, pars[0], sigma=sigma) dproj_x_dx = int_gauss_psf_deriv_1d(index_x, pars[1], sigma=sigma) # Evaluate the model Jacobian J = np.asarray([ (pars[2] * np.outer(dproj_y_dy, proj_x)).ravel(), (pars[2] * np.outer(proj_y, dproj_x_dx)).ravel(), (np.outer(proj_y, proj_x)).ravel(), np.ones(n_pixels) ]).T # Evaluate the model model = pars[2] * J[:,2] + pars[3] # Estimate the noise variance devs = img.ravel() - model sig2 = (devs**2).mean() # Get the gradient of the log-likelihood grad = (J.T * devs).sum(axis=1) / sig2 # Evaluate and invert the Hessian H_inv = invert_hessian(-J.T@J/sig2, ridge=ridge) # Get the update vector update[:] = -H_inv @ grad # Apply the update to parameter estimate pars = pars + damp * update iter_idx += 1 # Estimate the error with the inverse Hessian err = np.sqrt(np.diagonal(-H_inv)) # Evaluate the Hessian determinant, which is # also useful as a metric of error return pars, err, np.linalg.det(H_inv), np.sqrt(sig2), iter_idx
[docs]def fit_ls_point_gaussian(img, guess, sigma=1.0, ridge=0.0001, max_iter=20, damp=0.3, convergence=1.0e-4, divergence=1.0): """ Given an observed spot, estimate the maximum likelihood parameters for a 2D pointwise-evaluated Gaussian PSF sampled with normally-distributed noise. This function uses a Levenberg-Marquardt procedure to find the ML parameters. The underlying model for the pointwise-evaluated and integrated Gaussian PSFs is identical - a symmetric 2D Gaussian. The distinction is how they handle sampling on discrete pixels: - the point Gaussian takes the value on each pixel to be equal to the Gaussian evaluated at the center of the pixel - the integrated Gaussian takes the value on each pixel to be equal to the Gaussian integrated across the whole area of that pixel Integrated Gaussians are more accurate and less prone to edge biases, but the math is slightly more complicated and fitting is slower as a result. The model parameters are (y, x, I0, bg), where y, x is the spot center, I0 is the integrated intensity, and bg is the average background per pixel. The function also returns several parameters that are useful for quality control on the fits. args ---- img : 2D ndarray, the observed PSF guess : 1D ndarray of shape (4), the initial guess for y, x, I0, and bg sigma : float, Gaussian PSF width ridge : float, initial regularization term for inversion of the Hessian max_iter: int, the maximum number of iterations tolerated damp : float, damping factor for update vector. Larger means faster convergence but also more unstable. convergence : float, the criterion for fit convergence. Only y and x are tested for convergence. divergence : float, the criterion for fit divergence. Fitting is terminated when this is reached. CURRENTLY NOT IMPLEMENTED. returns ------- ( 1D ndarray, the final parameter estimate; 1D ndarray, the estimated error in each parameter (square root of the inverse Hessian diagonal); float, the Hessian determinant; float, the root mean squared deviation of the final PSF model from the observed PSF; int, the number of iterations ) """ S2 = sigma**2 n_pixels = img.shape[0] * img.shape[1] # 1D indices for each pixel index_y, index_x = indices_1d(*img.shape) # Update vector update = np.ones(4, dtype='float64') # Current parameter set pars = guess.copy() # Continue improving guess until convergence iter_idx = 0 while iter_idx<max_iter and (np.abs(update[:2])>convergence).any(): # Evaluate a unit intensity Gaussian projected onto each # axis psf_1d_y = point_gauss_psf_1d(index_y, pars[0], sigma=sigma) psf_1d_x = point_gauss_psf_1d(index_x, pars[1], sigma=sigma) # Evaluate the derivatives of the model with respect to # the Y and X axes dpsf_1d_y_dy = (index_y-pars[0]) * psf_1d_y / S2 dpsf_1d_x_dx = (index_x-pars[1]) * psf_1d_x / S2 # Evaluate the model Jacobian J = np.asarray([ (pars[2] * np.outer(dpsf_1d_y_dy, psf_1d_x)).ravel(), (pars[2] * np.outer(psf_1d_y, dpsf_1d_x_dx)).ravel(), (np.outer(psf_1d_y, psf_1d_x)).ravel(), np.ones(n_pixels) ]).T # Evaluate the model model = pars[2] * J[:,2] + pars[3] # Estimate the noise variance devs = img.ravel() - model sig2 = (devs**2).mean() # Get the gradient of the log-likelihood under a # normally-distributed noise model grad = (J.T * devs).sum(axis=1) / sig2 # Evaluate and invert the Hessian H_inv = invert_hessian(-J.T@J/sig2, ridge=ridge) # Get the update vector update[:] = -H_inv @ grad # Apply the update to parameter estimate pars = pars + damp * update iter_idx += 1 # Estimate the error with the inverse Hessian err = np.sqrt(np.diagonal(-H_inv)) # Evaluate the Hessian determinant, which is # also useful as a metric of error return pars, err, np.linalg.det(H_inv), np.sqrt(sig2), iter_idx
[docs]def fit_poisson_int_gaussian(img, guess, sigma=1.0, ridge=0.0001, max_iter=20, damp=0.3, convergence=1.0e-4, divergence=1.0): """ Given an observed spot, estimate the maximum likelihood parameters for a 2D integrated Gaussian PSF model sampled with Poisson noise, using a Levenberg-Marquardt procedure. While LM with Poisson noise is a little slower than LS routines, it is the most accurate model for the noise on EMCCD cameras. The model parameters are (y, x, I0, bg), where y, x is the spot center, I0 is the integrated intensity, and bg is the average background per pixel. The function also returns several parameters that are useful for quality control on the fits. args ---- img : 2D ndarray, the observed PSF guess : 1D ndarray of shape (4), the initial guess for y, x, I0, and bg sigma : float, Gaussian PSF width ridge : float, initial regularization term for inversion of the Hessian max_iter: int, the maximum number of iterations tolerated damp : float, damping factor for update vector. Larger means faster convergence but also more unstable. convergence : float, the criterion for fit convergence. Only y and x are tested for convergence. divergence : float, the criterion for fit divergence. Fitting is terminated when this is reached. CURRENTLY NOT IMPLEMENTED. returns ------- ( 1D ndarray, the final parameter estimate; 1D ndarray, the estimated error in each parameter (square root of the inverse Hessian diagonal); float, the Hessian determinant; float, the root mean squared deviation of the final PSF model from the observed PSF; int, the number of iterations ) """ n_pixels = img.shape[0] * img.shape[1] img_ravel = img.ravel() # 1D indices for each pixel index_y, index_x = indices_1d(*img.shape) # Update vector update = np.ones(4, dtype='float64') # Current parameter set pars = guess.copy() # Instantiate the Hessian H = np.empty((4, 4), dtype='float64') # Continue improving guess until convergence iter_idx = 0 while iter_idx<max_iter and (np.abs(update[:2])>convergence).any(): # Evaluate the PSF model and its derivatives projected # onto each of the two axes proj_y = int_gauss_psf_1d(index_y, pars[0], sigma=sigma) proj_x = int_gauss_psf_1d(index_x, pars[1], sigma=sigma) # Evaluate the derivatives of the model with respect to # the Y and X axies dproj_y_dy = int_gauss_psf_deriv_1d(index_y, pars[0], sigma=sigma) dproj_x_dx = int_gauss_psf_deriv_1d(index_x, pars[1], sigma=sigma) # Evaluate the model Jacobian J = np.asarray([ (pars[2] * np.outer(dproj_y_dy, proj_x)).ravel(), (pars[2] * np.outer(proj_y, dproj_x_dx)).ravel(), (np.outer(proj_y, proj_x)).ravel(), np.ones(n_pixels) ]).T # Evaluate the model model = pars[2] * J[:,2] + pars[3] # if debug: # print('iter: %d' % iter_idx) # wireframe_overlay(img, model.reshape(img.shape)) # Get the gradient of the log-likelihood grad = (J.T * ((img_ravel/model) - 1.0)).sum(axis=1) # Evaluate and invert the Hessian H_factor = -(img_ravel / (model**2)) for i in range(4): for j in range(i, 4): H[i,j] = (H_factor * J[:,i] * J[:,j]).sum() if j > i: H[j,i] = H[i,j] H_inv = invert_hessian(H, ridge=ridge) # Get the update vector update[:] = -H_inv @ grad # Apply the update to parameter estimate pars = pars + damp * update iter_idx += 1 # if debug: # print('final:') # wireframe_overlay(img, model.reshape(img.shape)) # Estimate the error with the inverse Hessian err = np.sqrt(np.diagonal(-H_inv)) # Get the root mean squared deviation of the final # model from the observed spot rmse = ((model - img_ravel)**2).mean() # Evaluate the Hessian determinant, which is # also useful as a metric of error return pars, err, np.linalg.det(H_inv), rmse, iter_idx
######################## ## TRACKING UTILITIES ## ########################
[docs]def connected_components(semigraph): """ Find independent connected subgraphs in a bipartite graph (aka semigraph) by a floodfill procedure. The semigraph is a set of edges betwen two sets of nodes- one set corresponding to each y-index, and the other corresponding to each x-index. Edges can only exist between a y-node and an x-node, so the semigraph is conveniently represented as a binary (0/1 values) 2D matrix. The goal of this function is to identify independent subgraphs in the original semigraph. If I start on a y-node or x-node for one of these independent subgraphs and walk along edges, I can reach any point in that subgraph but no points in other subgraphs. Each subgraph can itself be represented by a 2D ndarray along with a reference to the specific y-nodes and x-dnoes in the original semigraph corresponding to that subgraph. For the purposes of tracking, it is also convenient to separate y-nodes and x-nodes that are NOT connected by any edges to other nodes. These are returned as separate ndarrays: y_without_x and x_without_y. Parameters ---------- semigraph : 2D ndarray, with only 0 and 1 values Returns ------- ( subgraphs : list of 2D ndarray subgraph_y_indices : list of 1D ndarray, the set of y-nodes corresponding to each subgraph subgraph_x_indices : list of 1D ndarray, the set of x-nodes corresponding to each subgraph y_without_x : 1D ndarray, the y-nodes that are not connected to any x-nodes x_without_y : 1D ndarray, the x-nodes that are not connected to any y-nodes ) """ # The set of all y-nodes (corresponding to y-indices in the semigraph) y_indices = np.arange(semigraph.shape[0]).astype("int64") # The set of all x-nodes (corresponding to x-indices in the semigraph) x_indices = np.arange(semigraph.shape[1]).astype("int64") # Find y-nodes that don't connect to any x-node, # and vice versa where_y_without_x = semigraph.sum(axis=1) == 0 where_x_without_y = semigraph.sum(axis=0) == 0 y_without_x = y_indices[where_y_without_x] x_without_y = x_indices[where_x_without_y] # Consider the remaining nodes, which have at least one edge # to a node of the other class semigraph = semigraph[~where_y_without_x, :] semigraph = semigraph[:, ~where_x_without_y] y_indices = y_indices[~where_y_without_x] x_indices = x_indices[~where_x_without_y] # For the remaining nodes, keep track of (1) the subgraphs # encoding connected components, (2) the set of original y-indices # corresponding to each subgraph, and (3) the set of original x- # indices corresponding to each subgraph subgraphs = [] subgraph_y_indices = [] subgraph_x_indices = [] # Work by iteratively removing independent subgraphs from the # graph. The list of nodes still remaining are kept in # *unassigned_y* and *unassigned_x* unassigned_y, unassigned_x = (semigraph == 1).nonzero() # The current index is used to floodfill the graph with that # integer. It is incremented as we find more independent subgraphs. current_idx = 2 # While we still have unassigned nodes while len(unassigned_y) > 0: # Start the floodfill somewhere with an unassigned y-node semigraph[unassigned_y[0], unassigned_x[0]] = current_idx # Keep going until subsequent steps of the floodfill don't # pick up additional nodes prev_nodes = 0 curr_nodes = 1 while curr_nodes != prev_nodes: # Only floodfill along existing edges in the graph where_y, where_x = (semigraph == current_idx).nonzero() # Assign connected nodes to the same subgraph index semigraph[where_y, :] *= current_idx semigraph[semigraph > current_idx] = current_idx semigraph[:, where_x] *= current_idx semigraph[semigraph > current_idx] = current_idx # Correct for re-finding the same nodes and multiplying # them more than once (implemented in the line above) # semigraph[semigraph > current_idx] = current_idx # Update the node counts in this subgraph prev_nodes = curr_nodes curr_nodes = (semigraph == current_idx).sum() current_idx += 1 # Get the local indices of the y-nodes and x-nodes (in the context # of the remaining graph) where_y = np.unique(where_y) where_x = np.unique(where_x) # Use the local indices to pull this subgraph out of the # main graph subgraph = semigraph[where_y, :] subgraph = subgraph[:, where_x] # Save the subgraph if not (subgraph.shape[0] == 0 and subgraph.shape[0] == 0): subgraphs.append(subgraph) # Get the original y-nodes and x-nodes that were used in this # subgraph subgraph_y_indices.append(y_indices[where_y]) subgraph_x_indices.append(x_indices[where_x]) # Update the list of unassigned y- and x-nodes unassigned_y, unassigned_x = (semigraph == 1).nonzero() return subgraphs, subgraph_y_indices, subgraph_x_indices, y_without_x, x_without_y
[docs]def concat_tracks(*tracks): """ Join some trajectory dataframes together into a larger dataframe, while preserving uniqe trajectory indices. args ---- tracks : pandas.DataFrame with the "trajectory" column returns ------- pandas.DataFrame, the concatenated trajectories """ n = len(tracks) # Sort the tracks dataframes by their size. The only important thing # here is that if at least one of the tracks dataframes is nonempty, # we need to put that one first. df_lens = [len(t) for t in tracks] try: tracks = [t for _, t in sorted(zip(df_lens, tracks))][::-1] except ValueError: pass # Iteratively concatenate each dataframe to the first while # incrementing the trajectory index as necessary out = tracks[0].assign(dataframe_index=0) c_idx = out["trajectory"].max() + 1 for t in range(1, n): # Get the next set of trajectories and keep track of the origin # dataframe new = tracks[t].assign(dataframe_index=t) # Ignore negative trajectory indices (facilitating a user filter) new.loc[new["trajectory"]>=0, "trajectory"] += c_idx # Increment the total number of trajectories c_idx = new["trajectory"].max() + 1 # Concatenate out = pd.concat([out, new], ignore_index=True, sort=False) return out
[docs]def track_length(tracks): """ Generate a new column with the trajectory length in frames. args ---- tracks : pandas.DataFrame returns ------- pandas.DataFrame, the input dataframe with a new column "track_length" """ if "track_length" in tracks.columns: tracks = tracks.drop("track_length", axis=1) tracks = tracks.join( tracks.groupby("trajectory").size().rename("track_length"), on="trajectory" ) return tracks
####################### ## MASKING UTILITIES ## #######################
[docs]def get_edges(bin_img): """ Given a binary image that is False outside of an object and True inside of it, return another binary image that is True for points in the original image that border a False pixel, and False otherwise. args ---- bin_img : 2D ndarray, dtype bool returns ------- 2D ndarray, dtype bool, shape shape as bin_img """ assert bin_img.dtype == np.bool, "get_edges: must be bool input" out = np.zeros(bin_img.shape, dtype="bool") # Find the edges of the binary mask out[1:, :] = out[1:, :] | (bin_img[1:, :] & ~bin_img[:-1, :]) out[:, 1:] = out[:, 1:] | (bin_img[:, 1:] & ~bin_img[:, :-1]) out[:-1, :] = out[:-1, :] | (bin_img[:-1, :] & ~bin_img[1:, :]) out[:, :-1] = out[:, :-1] | (bin_img[:, :-1] & ~bin_img[:, 1:]) # Figure out where the object in the binary image intersects # the edge of the image frame out[0, :] = out[0, :] | bin_img[0, :] out[:, 0] = out[:, 0] | bin_img[:, 0] out[-1, :] = out[-1, :] | bin_img[-1, :] out[:, -1] = out[:, -1] | bin_img[:, -1] return out
[docs]def get_ordered_mask_points(mask, max_points=100): """ Given the edges of a two-dimensional binary mask, construct a line around the mask. args ---- mask : 2D ndarray, dtype bool, mask edges as returned by get_edges max_points : int, the maximum number of points tolerated in the final mask. If the number of points exceeds this, the points are repeatedly downsampled until there are fewer than max_points. returns ------- 2D ndarray of shape (n_points, 2), the points belonging to this ROI """ # Get the X and Y coordinates of all points in the mask edge points = np.asarray(mask.nonzero()).T # Keep track of which points we've included so far included = np.zeros(points.shape[0], dtype=np.bool) # Start at the first point ordered_points = np.zeros(points.shape, dtype=points.dtype) ordered_points[0,:] = points[0,:] included[0] = True # Index of the current point c = 0 midx = 0 # Find the closest point to the current point while c < points.shape[0]-1: # Compute distances to every other point distances = np.sqrt(((points[midx,:]-points)**2).sum(axis=1)) # Set included points to impossible distances distances[included] = np.inf # Among the points not yet included in *ordered_points*, # choose the one closest to the current point midx = np.argmin(distances) # Add this point to the set of ordered points ordered_points[c+1,:] = points[midx, :] # Mark this point as included included[midx] = True # Increment the current point counter c += 1 # Downsample until there are fewer than max_points while ordered_points.shape[0] > max_points: ordered_points = ordered_points[::2,:] return ordered_points