#!/usr/bin/env python
"""
track.py -- reconnect localizations into trajectories
"""
# Numeric
import numpy as np
# Dataframes
import pandas as pd
# Distance between two sets of 2D points
from scipy.spatial import distance_matrix
# Hungarian algorithm
from munkres import Munkres
from scipy.optimize import linear_sum_assignment
# Custom utilities
from .helper import connected_components
# Deep copy
from copy import copy
# Filter on the number of spots per frame
from .trajUtils import filter_on_spots_per_frame
from tqdm import tqdm
##################################
## LOW-LEVEL TRACKING UTILITIES ##
##################################
[docs]class ScipyMunkres:
[docs] def compute(self, W):
rows, cols = linear_sum_assignment(W)
return [(i, j) for i, j in zip(rows, cols)]
# hungarian_solver = Munkres()
hungarian_solver = ScipyMunkres()
[docs]def is_duplicates(trajs):
"""
Return True if there are duplicates in a set of Trajectories.
"""
if len(trajs) < 2:
return False
for j in range(len(trajs) - 1):
for i in range(j + 1, len(trajs)):
R = trajs[i].get_slice()[:, :2] == trajs[j].get_slice()[:, :2]
if isinstance(R, bool):
if R:
return True
elif R.all():
return True
else:
pass
return False
[docs]def has_match(trajs_0, trajs_1):
"""
Return True if a trajectory in trajs_0 exactly matches a trajectory
in trajs_1.
"""
for i in range(len(trajs_0)):
for j in range(len(trajs_1)):
R = trajs_0[i].get_slice()[:, :2] == trajs_1[j].get_slice()[:, :2]
if isinstance(R, bool):
if R:
return True
elif R.all():
return True
else:
pass
return False
[docs]class Trajectory:
"""
Convenience class used internally by track_locs().
A Trajectory object specifies a set of indices to
localizations in a large array that are to be reconnected
into trajectories.
It also holds a reference to the original array, so
that it can grab information about its localization when
necessary.
When Trajectories are not reconnected in a given frame,
their blink counter (self.n_blinks) is incremented. When
this exceeds *max_blinks*, the Trajectories are marked
for termination.
The Trajectory class is also convenient to hold associated
information about the tracking problem, such as the
number of competing trajectories and localizations, etc.,
that are returned at the end of tracking.
init
----
start_idx : int, the index of the first
localization in this Trajectory
locs : 2D ndarray, all localizations
subproblem_shape: (int, int), the number of trajs
and locs in the subproblem that
created this trajectory
max_blinks : int, the maximum tolerated number
of gaps in tracking
"""
def __init__(self, start_idx, locs, subproblem_shape, max_blinks=0):
self.indices = [int(start_idx)]
self.locs = locs
self.max_blinks = max_blinks
self.n_blinks = 0
self.active = True
self.subproblem_shapes = [subproblem_shape]
[docs] def add_index(self, idx, subproblem_shape):
"""
Extend this trajectory by one localization.
args
----
idx : int, the index of the localization
in self.locs to add
subproblem_shape : (int, int), the size
of the tracking subproblem in which
this localization was added
"""
self.indices.append(int(idx))
self.subproblem_shapes.append(subproblem_shape)
[docs] def blink(self):
"""
Skip a frame. If a Trajectory has been in blink for
more than *self.n_blinks* frames, it marks itself
for termination by setting self.active = False.
"""
self.n_blinks += 1
if self.n_blinks > self.max_blinks:
self.active = False
[docs] def get_slice(self):
"""
Return the slice of the localization array that
corresponds to this trajectory.
returns
-------
2D ndarray of shape (traj_len, 5)
"""
return self.locs[tuple(self.indices), :]
[docs] def last_pos(self):
"""
Return the last known position of this Trajectory.
returns
-------
(float y, float x), in pixels
"""
return self.locs[self.indices[-1], 2:4]
[docs]def traj_loc_distance(trajs, locs):
"""
Return the distance between each trajectory and each
localization.
args
----
trajs : list of Trajectory
locs : 2D ndarray with columns loc_idx,
frame, y, x, I0
returns
-------
2D ndarray D, where D[i,j] is the distance between
Trajectory i and localization j
"""
return distance_matrix(
np.asarray([t.last_pos() for t in trajs]), locs[:, 2:4]
)
[docs]def diffusion_weight_matrix(
trajs,
locs,
frame_interval=0.00548,
pixel_size_um=0.16,
k_return_from_blink=1.0,
d_max=5.0,
y_diff=0.9,
search_radius=2.5,
d_bound_naive=0.1,
init_cost=50.0,
):
"""
Generate the weight matrix for reconnection between a set of
Trajectories and a set of localizations for the "diffusion"
method.
In this method, the weight of reconnecting trajectory A to
localization B is equal to the negative log likelihood of
trajectory A diffusing to localization B in the relevant
frame interval (equal to *frame_interval* if there are no
gaps). The likelihood is evaluated with a 2D Brownian motion
model.
A weighted combination of two such negative log-likelihoods
is used. The first assumes a diffusion coefficient equal to
the maximum likelihood diffusion coefficient for that
trajectory (using the MSD method). The second assumes a
diffusion coefficient equal to d_max. The relative weights
of the two estimates are set by *y_diff*.
args
----
trajs : list of Trajectory
locs : 2D ndarray, localizations to consider
for connection
frame_interval : float, seconds
pixel_size_um : float, um
k_return_from_blink : float, penalty to return a trajectory
from blinking status
d_max : float, the maximum expected diffusion
coefficient in um^2 s^-1
y_diff : float, the relative influence of the
particle's local history on its estimated
diffusion coefficient
search_radius : float, um
d_bound_naive : float, naive estimate for a particle's
local diffusion coefficient, um^2 s^-1
init_cost : float, static cost to initialize a new
trajectory when reconnections are available
in the search radius
returns
-------
2D ndarray of shape (n_trajs, n_locs), the weights
for reconnection
"""
n_traj = len(trajs)
n_locs = locs.shape[0]
n_dim = n_traj + n_locs
# Unit conversions
search_radius_pxl = search_radius / pixel_size_um
max_var2 = 2.0 * d_max * frame_interval / (pixel_size_um**2)
# The weight matrix
W = np.zeros((n_dim, n_dim), dtype="float64")
# Set the static cost of starting a new trajectory
for li in range(n_locs):
W[n_traj:, li] = init_cost
W[:, n_locs:] = init_cost
# For each trajectory, calculating the weight of
# assignments to each localization
for ti in range(n_traj):
# Last known position of this trajectory
last_pos = trajs[ti].last_pos()
# Penalize blinking trajectories
L_blink = -k_return_from_blink * trajs[ti].n_blinks + np.log(
k_return_from_blink
)
# Distances to each localization
R = traj_loc_distance([trajs[ti]], locs)[0, :]
R2 = R**2
# Estimate the local diffusion coefficient of this
# trajectory.
# If no prior displacements are available, use the
# naive estimate
if len(trajs[ti].indices) == 1:
local_var2 = (
2 * d_bound_naive * frame_interval * (1 + trajs[ti].n_blinks)
)
# Otherwise, use the MSD method
else:
traj_slice = trajs[ti].get_slice()
frames = traj_slice[:, 1].astype("int64")
pos = traj_slice[:, 2:4]
delta_frames = frames[1:] - frames[:-1]
local_var2 = (
0.5
* (
((pos[1:, :] - pos[:-1, :]) ** 2).sum(1) / delta_frames
).mean()
* (1 + trajs[ti].n_blinks)
)
# Log-likelihood of diffusing from last
# known position to each new position
diff = y_diff * (R / local_var2) * np.exp(-R2 / (2 * local_var2)) + (
1 - y_diff
) * (R / max_var2) * np.exp(-R2 / (2 * max_var2))
L_diff = np.log(diff)
L_diff[diff <= 0] = -np.inf
# Make sure we do NOT reconnect trajectories to localizations
# outside of their search radii
L_diff[R > search_radius_pxl] = -np.inf
# Assign reconnection weight
W[ti, :n_locs] = -(L_blink + L_diff)
return W
[docs]def euclidean_weight_matrix(
trajs,
locs,
pixel_size_um=0.16,
scale=1.0,
search_radius=2.5,
init_cost=50.0,
**kwargs
):
"""
Generate the weight matrix for reconnection between
Trajectories and localizations for the "euclidean"
reconnection method.
Here, the weight to reconnect traj I with localization J
is just the distance between the last known position of I
and J, scaled by the constant *scale*.
If J is outside the search radius of I, the weight is
infinite.
The weight to drop *I* or to start a new trajectory from
*J* when other reconnections are available is *init_cost*.
args
----
trajs : list of Trajectory
locs : 2D ndarray, localizations to consider
for connection
pixel_size_um : float, um
scale : float, inflation factor for the distances
search_radius : float, um
init_cost : float, penalty for not performing
available reconnections
kwargs : discarded
returns
-------
2D ndarray of shape (n_trajs, n_locs), the reconnection
weights
"""
n_traj = len(trajs)
n_locs = locs.shape[0]
n_dim = n_traj + n_locs
# Weight matrix
W = np.zeros((n_dim, n_dim), dtype="float64")
# Distance from each trajectory to each localization
distances = traj_loc_distance(trajs, locs)
# Reconnections out of the search radius are impossible
out_of_radius = distances > search_radius / pixel_size_um
distances[out_of_radius] = np.inf
# Rescale if desired
if scale != 1.0:
distances[~out_of_radius] = distances[~out_of_radius] * scale
# Weight of traj:loc reconnection is proportional to Euclidean distance
W[:n_traj, :n_locs] = distances
# Static cost to start new trajectory
W[n_traj:, :] = init_cost
# Penalize reconnecting to nothing
W[:n_traj, n_locs:] = init_cost
return W
######################################
## FRAME-FRAME RECONNECTION METHODS ##
######################################
[docs]def reconnect_conservative(
trajs,
locs,
locs_array,
max_blinks=0,
frame_interval=0.00548,
pixel_size_um=0.16,
**kwargs
):
"""
Only reassign trajs to locs when the assignment is
unambiguous (1 traj, 1 loc within the search radius).
For all other trajectories, terminate them.
For all other locs, start new trajectories.
args
----
trajs : list of Trajectory
locs : 2D ndarray with columns loc_idx, frame,
y, x, I0
max_blinks : int
frame_interval : float
pixel_size_um : float
kwargs : ignored, possibly passed due to upstream
method disambiguation
returns
-------
list of Trajectory
"""
out = []
# A single localization and trajectory pair - assignment
# is unambiguous. This is the only situation where reconnection
# is allowed.
n_trajs = len(trajs)
n_locs = locs.shape[0]
if n_trajs == 1 and n_locs == 1:
trajs[0].add_index(locs[0, 0], (n_trajs, n_locs))
out = trajs
# Multiple localizations and/or trajectories
else:
# Terminate all existing trajectories
for ti in range(n_trajs):
trajs[ti].active = False
out += trajs
# Start new trajectories from all localizatoins
for li in range(n_locs):
out.append(
Trajectory(
locs[li, 0],
locs_array,
(n_trajs, n_locs),
max_blinks=max_blinks,
)
)
return out
WEIGHT_MATRIX_METHODS = {
"diffusion": diffusion_weight_matrix,
"euclidean": euclidean_weight_matrix,
}
[docs]def reconnect_hungarian(
trajs,
locs,
locs_array,
max_blinks=0,
weight_method=None,
min_I0=0.0,
**kwargs
):
"""
Assign Trajectories to localizations by assigning each
possible reconnection a weight, then finding the assignment
that minimizes the summed weights with the Hungarian
algorithm.
args
----
trajs : list of Trajectory
locs : 2D ndarray, localizations to consider
for connection
locs_array : 2D ndarray, all localizations in this
movie
max_blinks : int
weight_method : str, the method to use to generate
the weight matrix
min_I0 : float, minimum intensity to start
a new Trajectory
kwargs : to weight_method
returns
-------
list of Trajectory
"""
out = []
# Get the size of the assignment problem
n_traj = len(trajs)
n_locs = locs.shape[0]
# Unambiguous - only one trajectory and localization
if n_traj == 1 and n_locs == 1:
trajs[0].add_index(locs[0, 0], (n_traj, n_locs))
# Otherwise, solve the assignment problem by finding
# weights and minimizing with the Hungarian algorithm
else:
# Get the weight matrix for reconnection
W = WEIGHT_MATRIX_METHODS[weight_method](trajs, locs, **kwargs)
# Minimize negative log likelihood with
# Hungarian algorithm
for i, j in hungarian_solver.compute(W):
# traj:loc
if (i < n_traj) and (j < n_locs):
trajs[i].add_index(locs[j, 0], (n_traj, n_locs))
# traj:(empty)
elif (i < n_traj) and (j >= n_locs):
trajs[i].blink()
# (empty):loc
elif (j < n_locs) and (i >= n_traj):
if locs[j, 4] >= min_I0:
out.append(
Trajectory(
locs[j, 0],
locs_array,
(n_traj, n_locs),
max_blinks=max_blinks,
)
)
else:
pass
# Combine new and existing trajs
out += trajs
return out
[docs]def reconnect_diffusion(
trajs, locs, locs_array, max_blinks=0, min_I0=0.0, **kwargs
):
"""
Assign Trajectories to localizations on the basis of their
expected probability of diffusion and their blinking status.
Each of the Trajectories is assumed to be a Brownian motion
in 2D. Its diffusion coefficient is evaluated from its
history by MSD if it is greater than length 1, or from
d_bound_naive otherwise.
args
----
trajs : list of Trajectory
locs : 2D ndarray, localizations to consider
for connection
locs_array : 2D ndarray, all localizations in this
movie
max_blinks : int
frame_interval : float, seconds
pixel_size_um : float, um
min_I0 : float, AU
k_return_from_blink : float, penalty to return a trajectory
from blinking status
d_max : float, the maximum expected diffusion
coefficient in um^2 s^-1
y_diff : float, the relative influence of the
particle's local history on its estimated
diffusion coefficient
search_radius : float, um
d_bound_naive : float, naive estimate for a particle's
local diffusion coefficient, um^2 s^-1
returns
-------
list of Trajectory
"""
return reconnect_hungarian(
trajs,
locs,
locs_array,
weight_method="diffusion",
max_blinks=0,
min_I0=min_I0,
**kwargs
)
[docs]def reconnect_euclidean(
trajs, locs, locs_array, max_blinks=0, min_I0=0.0, **kwargs
):
"""
Assign Trajectories to localizations purely by minimizing
the total Trajectory-localization distances.
args
----
trajs : list of Trajectory
locs : 2D ndarray, localizations to consider
for connection
locs_array : 2D ndarray, all localizations in this
movie
max_blinks : int
min_I0 : float, minimum intensity to start a
new trajectory
pixel_size_um : float, um
scale : float, inflation factor for the distances
search_radius : float, um
init_cost : float, cost to start a new trajectory
if reconnections are available
returns
-------
list of Trajectory
"""
return reconnect_hungarian(
trajs,
locs,
locs_array,
weight_method="euclidean",
max_blinks=0,
min_I0=min_I0,
**kwargs
)
########################################
## ALL AVAILABLE RECONNECTION METHODS ##
########################################
METHODS = {
"conservative": reconnect_conservative,
"diffusion": reconnect_diffusion,
"euclidean": reconnect_euclidean,
}
#############################
## MAIN TRACKING FUNCTIONS ##
#############################
[docs]def track(
locs,
method="diffusion",
search_radius=2.5,
pixel_size_um=0.16,
frame_interval=0.00548,
min_I0=0.0,
max_blinks=0,
debug=False,
max_spots_per_frame=None,
reindex_unassigned=True,
**kwargs
):
"""
Given a dataframe with localizations, reconnect into
trajectories.
Each frame-frame reconnection problem is considered
separately and sequentially. For each problem:
1. Figure out which localizations lie within the
the search radii of the current trajectories
2. Identify disconnected "subproblems" in this
trajectory-localization adjacency map
3. Solve all of the subproblems by a method
specified by the *method* kwarg
4. Update the trajectories and proceed to the
next frame
The result is an assignment of each localization to a
trajectory index. Localizations that were not reconnected
into a trajectory for whatever reason are assigned a
trajectory index of -1.
args
----
locs : pandas.DataFrame, set of localizations
method : str, the tracking method. Currently either
"diffusion", "euclidean", or "conservative"
search_radius : float, max jump length in um
pixel_size_um : float, um per pixel
frame_interval : float, seconds
min_I0 : float, only track spots with at least this
intensity
max_blinks : int, number of gaps allowed
debug : bool
max_spots_per_frame
: int, don't track in frames with more than
this number of spots
**kwargs : additional keyword arguments to the tracking
method
returns
-------
pandas.Series, trajectory indices for each localization.
"""
# Filter on the number of spots per frame, if desired
if not max_spots_per_frame is None:
return track_subset(
locs,
[
lambda T: filter_on_spots_per_frame(
T,
max_spots_per_frame=max_spots_per_frame,
filter_kernel=21,
)
],
method=method,
search_radius=search_radius,
pixel_size_um=pixel_size_um,
frame_interval=frame_interval,
min_I0=min_I0,
max_blinks=max_blinks,
debug=debug,
**kwargs
)
# If passed an empty dataframe, do nothing
if locs.empty:
for c in ["trajectory", "subproblem_n_locs", "subproblem_n_traj"]:
locs[c] = []
return locs
# Determine frame limits for tracking
start_frame = int(locs["frame"].min())
stop_frame = int(locs["frame"].max()) + 1
# Get the reconnection method
method_f = METHODS.get(method)
# Sort the localizations by frame (unecessary, but easier
# to interpret)
locs = locs.sort_values(by="frame")
# Assign each localization a unique index
locs["loc_idx"] = np.arange(len(locs))
# Convert locs to ndarray for speed
cols = ["loc_idx", "frame", "y", "x", "I0"]
if "I0" not in locs.columns:
locs["I0"] = 1.0
L = np.asarray(locs[cols])
# Maximum tolerated traj-loc jump distance (search radius)
search_radius_pxl = search_radius / pixel_size_um
# Convenience function: get all of locs from one frame
def get_locs(frame):
return L[L[:, 1] == frame, :]
# Convenience function: in a list of Trajectory, find
# trajectories that have finished
def get_finished(trajs):
_finished = [t for t in trajs if not t.active]
_active = [t for t in trajs if t.active]
return _active, _finished
# Start by grabbing the locs in the first frame and
# initializing Trajectories from each of them
frame_locs = get_locs(start_frame)
active = [
Trajectory(int(i), L, (0, 1), max_blinks) for i in frame_locs[:, 0]
]
# During tracking, Trajectories are tossed between
# three categories: "active", "new", and "completed".
# "active" Trajectories are eligible for reconnection in
# this frame, "new" Trajectories will become active
# Trajectories in the next frame, and "completed" Trajectories
# have been removed from the pool.
new = []
completed = []
for fi in tqdm(
range(start_frame + 1, stop_frame),
total=stop_frame - (start_frame + 1),
):
# # DEBUG
# print("FRAME:\t%d" % fi)
# print("Duplicates in active:\t", is_duplicates(active))
# print("Duplicates in new:\t", is_duplicates(new))
# print("Duplicates in completed:\t", is_duplicates(completed))
# print("Completed trajectories:")
# for i, t in enumerate(completed):
# print("\tTrajectory %d" % i)
# print(t.get_slice()[:,:2])
# print("\n")
# if fi > 7:
# break
frame_locs = get_locs(fi)
# If there are no locs in this frame, set all active
# trajectories into blink
if len(frame_locs.shape) < 2 or frame_locs.shape[0] == 0:
# Increment blink counter
for t in active:
t.blink()
# Find which trajectories are finished
active, done = get_finished(active)
completed += done
# To next frame
continue
# If there are no active trajectories, consider starting
# one from each localization if it passes the intensity
# threshold
elif len(active) == 0:
for i in frame_locs[frame_locs[:, 4] >= min_I0, 0]:
new.append(Trajectory(int(i), L, (0, 1), max_blinks))
active = new
new = []
# To next frame
continue
# Otherwise, there is some combination of active trajectories
# and localizations in this frame.
else:
# Calculate the adjacency graph: which localizations are
# within the search radius of which trajectories?
adj_g = (
traj_loc_distance(active, frame_locs) <= search_radius_pxl
).astype(np.int64)
# Break this graph into subgraphs, each of which represents
# a separate tracking subproblem
(
subgraphs,
Ti,
Li,
traj_singlets,
loc_singlets,
) = connected_components(adj_g)
# # DEBUG - PASSED
# for i in range(len(Ti)):
# for j in [k for k in range(len(Ti)) if k != i]:
# for element in Ti[i]:
# assert element not in Ti[j]
# # DEBUG - PASSED
# for i in range(len(Li)):
# for j in [k for k in range(len(Li)) if k != i]:
# for element in Li[i]:
# assert element not in Li[j]
# # DEBUG - a trajectory cannot be simultaneously in the
# # singlet list and also in a subproblem group - PASSED
# for i in traj_singlets:
# for j in range(len(Ti)):
# assert i not in Ti[j]
# for i in loc_singlets:
# for j in range(len(Li)):
# assert i not in Li[j]
# If a trajectory does not have localizations in its
# search radius, set it into blink
for ti in traj_singlets:
active[ti].blink()
if active[ti].active:
new.append(active[ti])
else:
completed.append(active[ti])
# If a localization has no nearby trajectories, start
# a new trajectory if it passes the intensity threshold
for li in loc_singlets:
if frame_locs[li, 4] >= min_I0:
new.append(
Trajectory(frame_locs[li, 0], L, (0, 1), max_blinks)
)
# If there are both trajectories and localizations in the
# subproblem, reconnect according to the reconnection method
for si, subgraph in enumerate(subgraphs):
# Only one traj and one loc: assignment is unambiguous
if subgraph.shape[0] == 1 and subgraph.shape[1] == 1:
active[Ti[si][0]].add_index(
frame_locs[Li[si][0], 0], (1, 1)
)
new.append(active[Ti[si][0]])
# Otherwise, pass to the reconnection method
else:
in_trajs = [active[i] for i in Ti[si]]
out_trajs = method_f(
[active[i] for i in Ti[si]],
frame_locs[Li[si], :],
L,
max_blinks=max_blinks,
pixel_size_um=pixel_size_um,
frame_interval=frame_interval,
search_radius=search_radius,
**kwargs
)
# Find finished trajectories
not_done, done = get_finished(out_trajs)
completed += done
new += not_done
# For trajs eligible for reconnection in the next frame,
# transfer to *active*
active = new
new = []
# Finish any trajectories still running
completed += active
# Trajectory indices
ids = np.full(L.shape[0], -1, dtype=np.int64)
# Number of competing trajectories and competing localizations
# for the subproblem in which each localization was connected
# (1: no competition)
subproblem_sizes_traj = np.full(L.shape[0], -1, dtype=np.int64)
subproblem_sizes_locs = np.full(L.shape[0], -1, dtype=np.int64)
# For each trajectory, add its information to these arrays
for ti, t in enumerate(completed):
indices = np.asarray(t.indices)
T_size = [t.subproblem_shapes[i][0] for i in range(len(indices))]
L_size = [t.subproblem_shapes[i][1] for i in range(len(indices))]
ids[np.asarray(t.indices)] = ti
subproblem_sizes_traj[np.asarray(t.indices)] = T_size
subproblem_sizes_locs[np.asarray(t.indices)] = L_size
# Assign traj index as a column in the original dataframe
locs["trajectory"] = ids
locs["subproblem_n_traj"] = subproblem_sizes_traj
locs["subproblem_n_locs"] = subproblem_sizes_locs
# For localizations unassigned to any trajectory, assign
# unique trajectory indices
if reindex_unassigned:
max_index = locs["trajectory"].max() + 1
unassigned = locs["trajectory"] == -1
n_unassigned = unassigned.sum()
locs.loc[unassigned, "trajectory"] = np.arange(
max_index, max_index + n_unassigned
)
# If desired, return the Trajectory objects for testing
if debug:
return locs, completed
else:
return locs
[docs]def track_subset(locs, filters, **kwargs):
"""
Run tracking on a subset of trajectories - for instance,
only on localizations in frames that meet a particular
criterion.
*filters* should be a set of functions that take trajectory
dataframes as argument and return a boolean pandas.Series
indicating whether each localization passed or failed the
criterion.
Example for the *filters* argument:
filters = [
lambda locs: filter_on_spots_per_frame(
locs,
max_spots_per_frame=3,
filter_kernel=21
)
]
This would only track localizations belonging to frames with
3 or fewer total localizations in that frame. (After smoothing
with a uniform kernel of width 21 frames, that is.)
Parameters
----------
locs : pandas.DataFrame, localizations
filters : list of functions, the filters
kwargs : keyword arguments to the tracking method
Returns
-------
pandas.DataFrame, trajectories
"""
# Determine which localizations pass the QC filters
filter_pass = np.ones(len(locs), dtype=np.bool)
for f in filters:
filter_pass = np.logical_and(filter_pass, f(locs))
# Track the subset of localizations that pass QC filters
L = locs[filter_pass].copy()
L = track(L, **kwargs)
# Join the results with the original dataframe
for c in [j for j in L.columns if j not in locs.columns]:
locs[c] = L[c]
locs[c] = (locs[c].fillna(-1)).astype(np.int64)
return locs