#!/usr/bin/env python
"""
trajUtils.py -- functions to compute some common values on
trajectories
"""
# Numeric
import numpy as np
# Uniform filter
from scipy.ndimage import uniform_filter
# DataFrames
import pandas as pd
#########################################
## OPERATIONS ON TRAJECTORY DATAFRAMES ##
#########################################
[docs]def traj_length(trajs):
"""
Compute the number of localizations corresponding to
each trajectory.
Unassigned localizations (with a trajectory index of
-1) are given traj_len = 0.
args
----
trajs : pandas.DataFrame, localizations with the
'trajectory' column
returns
-------
pandas.DataFrame, the same dataframe with "traj_len"
column
"""
# Remove traj_len column if it already exists
if "traj_len" in trajs.columns:
trajs = trajs.drop("traj_len", axis=1)
# Get the lengths
trajs = trajs.join(
trajs.groupby("trajectory").size().rename("traj_len"),
on="trajectory"
)
# Unassigned localizations are given a traj_len of 0
trajs.loc[trajs['trajectory']==-1, 'traj_len'] = 0
return trajs
[docs]def radial_disps(trajs, pixel_size_um=0.16, first_only=False):
"""
Calculate the 2D radial displacement of each jump in every
trajectory.
args
----
trajs : pandas.DataFrame
pixel_size_um : float, size of pixels
first_only : float, only compute the first
displacement of each trajectory
returns
-------
pandas.DataFrame, with the new column 'radial_disp_um'
"""
# If handed an empty dataframe, assign a dead column
if len(trajs) == 0:
trajs['radial_disp_um'] = []
# Order by ascending trajectory, frame
trajs = trajs.sort_values(by=['trajectory', 'frame'])
# Filter out singlets
if not "traj_len" in trajs.columns:
trajs = traj_length(trajs)
F0 = np.logical_and(trajs['trajectory']>=0, trajs['traj_len']>1)
# If no locs remain, return all NaN
if F0.sum() == 0:
trajs['radial_disp_um'] = np.nan
# Format as ndarray for speed
S = np.asarray(trajs.loc[F0, ['trajectory', 'frame', 'y', 'x']])
# Compute Cartesian displacements
D = S[1:,:] - S[:-1,:]
# Add extra row at the end for indexing purposes (there is one
# fewer row in D than in S)
D = np.concatenate((D, np.array([[1, 1, 1, 1]])), axis=0)
# Take all displacements that originate from the same trajectory
# and from a single frame interval
F1 = np.logical_and(D[:,0]==0, D[:,1]==1)
# If no disps remain, return all NaN
if F1.sum() == 0:
trajs['radial_disp_um'] = np.nan
# Compute radial displacements
R = np.full(F0.sum(), np.nan)
R[F1] = np.sqrt((D[F1, 2:]**2).sum(axis=1))
# Assign to the corresponding rows in the original dataframe
trajs.loc[F0, "radial_disp_um"] = R
# If desired, only used the first jump from each trajectory
if first_only:
trajs["_ones"] = 1
trajs["_first_loc"] = trajs.groupby("trajectory")["_ones"].cumsum()==1
trajs.loc[~trajs['_first_loc'], 'radial_disp_um'] = np.nan
trajs = trajs.drop(['_ones', '_first_loc'], axis=1)
# Scale into um
trajs['radial_disp_um'] = trajs['radial_disp_um'] * pixel_size_um
return trajs
[docs]def radial_disp_histograms(trajs, n_intervals=1, pixel_size_um=0.16,
first_only=False, n_gaps=0, bin_size=0.001, max_disp=5.0):
"""
Calculate radial displacement histograms for a set of
trajectories.
args
----
trajs : pandas.DataFrame
n_intervals : int, the maximum number of frames
over which to compute displacements
pixel_size_um : float, size of pixels (um)
first_only : bool, only take the displacements
relative to the first point of
each trajectory
n_gaps : int, the number of gaps allowed
in tracking
bin_size : float, the spatial bin size in um
max_disp : float, maximum displacement to
consider
returns
-------
(
2D ndarray of shape (n_intervals, n_bins), the radial
displacements in each bin for each interval;
1D ndarray of shape (n_bins+1), the edges of the
displacement bins in um
)
"""
# Filter out localizations not assigned to a trajectory, or
# trajectories that are length 1
trajs = traj_length(trajs)
T = trajs.loc[
(trajs['trajectory']>=0) & (trajs['traj_len']>1),
:
].copy()
# Add a column that indicates whether each localization is
# the first in the corresponding trajectory (computed later)
T['first_in_traj'] = 0
# Order the trajectories by ascending frame / trajectory indices.
# Trajectories are grouped together, and frames are ascending within
# each trajectory group
T = T.sort_values(by=['trajectory', 'frame'])
# Take only the columns we care about in an ndarray. The operations
# we will perform with ndarrays are much faster than their pandas
# dataframe equivalents
X = np.asarray(T[['frame', 'trajectory', 'y', 'x', 'first_in_traj']])
# Determine which localizations are the first in their respective
# trajectories
X[0,4] = 1.0
X[1:,4] = ((X[1:,1] - X[:-1,1])>0).astype('float64')
# Set up the binning scheme
bin_edges = np.arange(0.0, max_disp+bin_size, bin_size)
# Set up the output histogram
H = np.zeros((n_intervals, bin_edges.shape[0]-1), dtype=np.int64)
# In order to capture every possible displacement in the presence
# of gaps, we need to check enough delays for the maximum interval
# (n_intervals) with the maximum number of gaps between every
# frame.
n_delays = n_intervals * (n_gaps+1)
for delay in range(1, n_delays+1):
# Take the displacements
D = X[delay:,:] - X[:-delay,:]
# We're only interested in displacements between localizations
# in the same trajectory
in_same_traj = D[:,1] == 0
# Check for displacements matching any of the desired frame
# intervals
for interval in range(1, n_intervals+1):
if first_only:
match = (D[:,0]==interval) & (D[:,4]==-1) & in_same_traj
else:
match = (D[:,0]==interval) & in_same_traj
# Check if there are any displacements that match these
# criteria
if match.sum() > 0:
# Get the radial displacements
disps = np.sqrt((D[match,2:4]**2).sum(axis=1)) * pixel_size_um
# Bin these into a histogram
H_int, _edges = np.histogram(disps, bins=bin_edges)
# Accumulate into the histogram
H[interval-1,:] = H[interval-1,:] + H_int
# Return the histogram and the accompanying set of bin
# definitions
return H, bin_edges
[docs]def get_max_gap(trajs):
"""
Return the maximum gap present in a set of trajectories.
args
----
trajs : pandas.DataFrame
returns
-------
int, the maximum gap present. For instance, 1 means
that trajectories have no gaps (1 frame interval).
"""
trajs = trajs.sort_values(by=['trajectory', 'frame'])
X = np.asarray(trajs[['frame', 'trajectory']])
D = X[1:,:] - X[:-1,:]
in_same_traj = D[:,1]==0
return int(D[in_same_traj, 0].max())
[docs]def get_spots_per_frame(trajs):
"""
Return the number of spots per frame.
Parameters
----------
trajs : pandas.DataFrame, localizations
Returns
-------
pandas.DataFrame, indexed by frame between 0
and the max frame index in *trajs*. The
column encoding the number of spots per
frame is "n_spots_per_frame".
"""
if trajs.empty:
return pd.DataFrame([], columns=["n_spots_per_frame"])
result = pd.DataFrame(
index=pd.Series(np.arange(0, trajs['frame'].max()+1)),
columns=["n_spots_per_frame"]
)
# Calculate the number of spots per frame
result["n_spots_per_frame"] = trajs.groupby("frame").size()
# Account for frames that do not exist in the input dataframe
result["n_spots_per_frame"] = result["n_spots_per_frame"].fillna(0)
# Format as 64-bit integer
result["n_spots_per_frame"] = result["n_spots_per_frame"].astype(np.int64)
return result
[docs]def filter_on_spots_per_frame(trajs, max_spots_per_frame=10,
filter_kernel=21):
"""
Mask a set of localizations by the total number of localizations
in the corresponding frame.
This function does the following:
1. Take a set of localizations.
2. Calculate the number of localizations in each frame.
3. Smooth this signal with a uniform kernel of size
*filter_kernel*.
4. Get the set of frames with total # of spots below
*max_spots_per_frame*.
5. Mark each localization in these frames with *True*,
and otherwise with *False*.
args
----
trajs : pandas.DataFrame, localizations
max_spots_per_frame : int, the maximum number of spots
tolerated per frame
filter_kernel : int, the size of the smoothing
kernel. If *None*, no smoothing is
performed.
returns
-------
pandas.Series with index *trajs.index*. True if the
corresponding localization passed the filter.
"""
# Calculate the number of spots per frame
spots_per_frame = get_spots_per_frame(trajs)
# Smooth the signal, if desired
if not filter_kernel is None:
spots_per_frame = uniform_filter(
spots_per_frame.astype(np.float64),
filter_kernel
)
# Get the set of frames that pass the filter
acceptable = (spots_per_frame <= max_spots_per_frame).nonzero()[0]
return trajs["frame"].isin(acceptable)