#!/usr/bin/env python
"""
core.py -- high-level user functions for running
filtering, detection, subpixel localization, and
tracking sequentially on the same datasets
"""
# File paths
import os
from glob import glob
# Progress bar
from tqdm import tqdm
# Dataframes
import pandas as pd
# Parallelization
import dask
from dask.diagnostics import ProgressBar
# File readers and filterers
from .chunkFilter import ChunkFilter
# Core detection function
from .findSpots import detect
# Core localization function
from .subpixel import localize_frame
# Core tracking function
from .track import track
[docs]def localize_file(path, out_csv=None, progress_bar=True, **kwargs):
"""
Run filtering, detection, and subpixel localization on
a single image movie. This does NOT perform tracking.
args
----
path : str, path to the image file
out_csv : str, path to save file, if
desired
progress_bar: bool, show a progress bar
kwargs : configuration
returns
-------
pandas.DataFrame, the localizations
"""
# Make sure the file exists
assert os.path.isfile(path), "quot.__main__.localize_file: " \
"file %s does not exist" % path
# If the config file does not contain a "filter" section,
# don't worry about it
kwargs["filter"] = kwargs.get("filter", {})
# Open an image file reader with some filtering
# settings, if desired
with ChunkFilter(path, **kwargs['filter']) as f:
frames = enumerate(f)
if progress_bar:
frames = tqdm(frames)
locs = []
for frame_idx, frame in frames:
# Find spots in this image frame
detections = detect(frame, **kwargs['detect'])
# Localize spots to subpixel resolution
locs.append(localize_frame(frame, detections,
**kwargs['localize']).assign(frame=frame_idx))
locs = pd.concat(locs, ignore_index=True, sort=False)
# Adjust for start index
locs['frame'] += kwargs['filter'].get('start', 0)
# Save to a file, if desired
if not out_csv is None:
locs.to_csv(out_csv, index=False)
return locs
[docs]def track_file(path, out_csv=None, progress_bar=True, **kwargs):
"""
Run filtering, detection, subpixel localization, and
tracking on a single target movie.
args
----
path : str, path to the image file
out_csv : str, path to save file, if
desired
progress_bar: bool, show a progress bar
kwargs : tracking configuration
returns
-------
pandas.DataFrame, the reconnected localizations
"""
# Run filtering + detection + localization
locs = localize_file(path, out_csv=None, progress_bar=progress_bar,
**kwargs)
# Track localizations between frames
locs = track(locs, **kwargs['track'])
# Save to a file if desired
if not out_csv is None:
locs.to_csv(out_csv, index=False)
return locs
[docs]def track_files(paths, num_workers=4, save=True, out_dir=None, **kwargs):
"""
Run tracking on several files using parallelization.
args
----
paths : list of str, paths to image files to track
num_workers : int, the number of threads to use
save : bool, save the output to CSVs files. The names
for these CSVs are generated from the names of
the corresponding image files.
out_dir : str, output directory
kwargs : tracking configuration, as read with
quot.io.read_config
returns
-------
list of pandas.DataFrame, the tracking results for each
file
"""
# Create the output directory if it does not already exist
if (not out_dir is None) and (not os.path.isdir(out_dir)):
os.mkdir(out_dir)
# Tracking function for one file with lazy evaluation
@dask.delayed
def driver(path):
if save and (not out_dir is None):
out_csv = os.path.join(
out_dir,
"{}_trajs.csv".format(os.path.splitext(os.path.basename(path))[0])
)
elif save and (out_dir is None):
out_csv = "{}_trajs.csv".format(os.path.splitext(path)[0])
else:
out_csv = None
try:
return track_file(
path,
out_csv=out_csv,
progress_bar=False,
**kwargs
)
except Exception as e:
print("WARNING: Failed to analyze file {} due to exception:".format(path))
print(e)
return []
# Run localization and tracking on each file
results = [driver(path) for path in paths]
scheduler = "single-threaded" if num_workers == 1 else "processes"
with ProgressBar():
dask.compute(*results, scheduler=scheduler, num_workers=num_workers)
return results
[docs]def track_directory(path, ext='.nd2', num_workers=4, save=True, contains=None,
out_dir=None, **kwargs):
"""
Find all image files in a directory and run
localization and tracking.
args
----
path : str, path to directory
ext : str, image file extension
num_workers : int, the number of threads
to use
save : bool, save the results to CSV
files
contains : str, a substring that all image files
are required to contain
out_dir : str, directory for output CSV files
kwargs : configuration
returns
-------
None. Output of trackng is saved to files
with extension "_trajs.csv" in the same
directory.
"""
# Make sure the directory exists
assert os.path.isdir(path), "quot.__main__.localize_directory: " \
"directory %s does not exist" % path
# Find all image files in this directory
image_paths = glob("%s/*%s" % (path, ext))
# Only include image files with a substring, if specified
if not contains is None:
image_paths = [j for j in image_paths if contains in os.path.basename(j)]
# Run tracking and localization
track_files(image_paths, num_workers=num_workers, save=save,
out_dir=out_dir, **kwargs)
[docs]def retrack_file(path, out_csv=None, **kwargs):
"""
Given an existing set of localizations or trajectories, (re)run tracking
to reconstruct trajectories.
args
----
path : str, path to a *trajs.csv file
out_csv : str, path to save the resulting trajectories, if
desired
kwargs : tracking configuration
returns
-------
pandas.DataFrame, the reconnected localizations
"""
# Load the file
T = pd.read_csv(path)
# Track localizations between frames
T = track(T, **kwargs)
# Save to a file, if desired
if not out_csv is None:
T.to_csv(out_csv, index=False)
return T
[docs]def retrack_files(paths, out_suffix=None, num_workers=1, **kwargs):
"""
Given a set of localizations, run retracking on each file and save to a
CSV.
If *out_suffix* is not specified, then the trajectories are saved to the
original set of localization files (overwriting them).
args
----
paths : list of str, a set of CSV files encoding trajectories
out_suffix : str, the suffix to use when generating the output
paths. If *None*, then the output trajectories are
saved to the original file path.
num_workers : int, the number of threads to use
kwargs : tracking configuration
"""
# Avoid redundant extensions
if (not out_suffix is None) and (not ".csv" in out_suffix):
out_suffix = "{}.csv".format(out_suffix)
@dask.delayed
def task(fn):
"""
Retrack one file.
"""
out_csv = fn if out_suffix is None else \
"{}_{}".format(os.path.splitext(fn)[0], out_suffix)
retrack_file(fn, out_csv=out_csv, **kwargs["track"])
# Run retracking on all files
scheduler = "single-threaded" if num_workers == 1 else "processes"
tasks = [task(fn) for fn in paths]
with ProgressBar():
dask.compute(*tasks, num_workers=num_workers, scheduler=scheduler)