Source code for palmari.processing.image_pipeline

from __future__ import annotations
import logging
from typing import Dict, List, Type, Union
import os
import pandas as pd
import pandas as pd
import dask.array as da
import yaml

from ..data_structure.acquisition import Acquisition
from ..data_structure.experiment import Experiment
from .steps import *


[docs]class ImagePipeline: def __init__( self, name: str, movie_preprocessors: List[MoviePreProcessor], detector: Detector, localizer: SubpixelLocalizer, loc_processors: List[LocProcessor], tracker: Tracker, ): self.name = name self.movie_preprocessors = movie_preprocessors self.detector = detector self.localizer = localizer self.loc_processors = loc_processors self.tracker = tracker self.last_storage_path = None def __getitem__(self, item): return self.to_dict()[item]
[docs] @classmethod def from_dict(cls, p: Dict): """ Instantiate from a dictionnary. Here's an example Dictionnary which could be passed as an argument : .. code-block:: python3 { "name":"my_pipeline", "movie_preprocessors":[ { "MyPreProcessingClass":{"param1":value,"param2":other_value} }, { "WindowPercentileFilter":{} # If the parameter's dict is empty, default parameters will be used } }], "localizer":{ "DefaultLocalizer":{"threshold_factor":1.5} }, "tracker":{ "UnknownClass":{"bla":bla} # If the class is not found, this will raise an exception # Similarly, if the class provided does not inherit Tracker, an exception will be raised } # If some step is not mentioned (e.g. here, there's nothing about localization processing), then # if it's movie_preprocessors, then no movie_preprocessors will be used (same for loc_processors) # it it's localizer or tracker, then the default classes will be used. } """ def instantiate_from_dict(d): assert len(d) <= 1, "Only one key can be provided" if len(d) == 0: return None cls = list(d.keys())[0] params = d[cls] klass = globals()[cls] instance = klass(**params) return instance def load_from_dict(d, key, default_class): instance = None if key in d: try: instance = instantiate_from_dict(d[key]) except Exception as e: logging.debug(e) raise if instance is None: instance = default_class() return instance def load_list_from_dict(d, key, base_class): instances = [] if key in d: items_list = d[key] for item in items_list: instance = instantiate_from_dict(item) try: assert isinstance(instance, base_class) instances.append(instance) except AssertionError as e: logging.debug(e) return instances # LOAD COMPONENT BY COMPONENT name = p["name"] movie_preprocessors = load_list_from_dict( p, "movie_preprocessors", MoviePreProcessor ) detector = load_from_dict(p, "detector", BaseDetector) localizer = load_from_dict(p, "localizer", MaxLikelihoodLocalizer) loc_processors = load_list_from_dict(p, "loc_processors", LocProcessor) tracker = load_from_dict(p, "tracker", ConservativeTracker) return cls( name=name, movie_preprocessors=movie_preprocessors, detector=detector, localizer=localizer, loc_processors=loc_processors, tracker=tracker, )
[docs] @classmethod def from_yaml(cls, file): with open(file, "r") as f: params = dict(yaml.safe_load(f)) res = cls.from_dict(params) res.last_storage_path = str(file) return res
[docs] @classmethod def default_with_name(cls, name: str): return cls.from_dict({"name": name})
@property def available_steps(self): if not hasattr(self, "_available_steps"): all_globals = globals().items() base_classes = { "movie_preprocessors": MoviePreProcessor, "detector": Detector, "localizer": SubpixelLocalizer, "loc_processors": LocProcessor, "tracker": Tracker, } self._available_steps = {} for step_type in base_classes: self._available_steps[step_type] = [] for name, obj in all_globals: try: for step_type, base_class in base_classes.items(): if ( issubclass(obj, base_class) and obj is not base_class ): self._available_steps[step_type].append( (name, obj) ) break except TypeError as e: pass return self._available_steps
[docs] def to_yaml(self, fileName): tp_params = self.to_dict() yaml.dump(tp_params, open(fileName, "w")) self.last_storage_path = str(fileName)
[docs] def to_dict(self) -> dict: res = { "name": self.name, "detector": self.detector.to_dict(), "localizer": self.localizer.to_dict(), "tracker": self.tracker.to_dict(), } if len(self.movie_preprocessors) > 0: res["movie_preprocessors"] = [ p.to_dict() for p in self.movie_preprocessors ] if len(self.loc_processors) > 0: res["loc_processors"] = [p.to_dict() for p in self.loc_processors] return res
[docs] def contains_class(self, step): return self.index_of(step) is not None
[docs] def step_type_of(self, step): d = self.available_steps for step_type, steps in d.items(): for step_tuple in steps: if step_tuple[0] == step: return step_type return None
[docs] def step_class_of(self, step): d = self.available_steps for step_type, steps in d.items(): for step_tuple in steps: if step_tuple[0] == step: return step_tuple[1] return None
[docs] def index_of(self, step): d = self.to_dict() for step_type, steps in d.items(): if step_type == "name": continue if isinstance(steps, list): # Si possible d'avoir plusieurs items, on trouve le rang for step_num, step_dict in enumerate(steps): step_name = list(step_dict.keys())[0] if step_name == step: return step_num else: assert isinstance(steps, dict), steps # Sinon, on renvoie zéro parce qu'il n'y a forcément qu'un step if step in steps: return 0 return None
[docs] def can_be_removed(self, step): return self.contains_class(step) and ( self.step_type_of(step) in ["movie_preprocessors", "loc_processors"] )
[docs] def is_mandatory(self, step): return self.step_type_of(step) not in [ "movie_preprocessors", "loc_processors", ]
[docs] def has_alternatives_to(self, step): step_type = self.step_type_of(step) if step_type is None: return False else: return len(self.available_steps[step_type]) > 1
def __str__(self): desc = "TIF Processing pipeline\n" desc += "-----------------------\n" desc += "Movie preprocessing steps :\n" for i, step in enumerate(self.movie_preprocessors): desc += "%d/%d \t %s\n" % ( i + 1, len(self.movie_preprocessors), step, ) desc += "-----------------------\n" desc += "Detector :\n" desc += "\t %s\n" % self.detector desc += "-----------------------\n" desc += "Localizer :\n" desc += "\t %s\n" % self.localizer desc += "-----------------------\n" desc += "Localization processing steps :\n" for i, step in enumerate(self.loc_processors): desc += "- %d/%d \t %s\n" % (i + 1, len(self.loc_processors), step) desc += "-----------------------\n" desc += "Tracker :\n" desc += "\t %s\n" % self.tracker return desc
[docs] def movie_preprocessing(self, mov: da.Array) -> da.Array: for i, processor in enumerate(self.movie_preprocessors): logging.info( "Preprocessing step %d / %d : %s" % (i, len(self.movie_preprocessors), processor.name) ) mov = processor.preprocess(mov) return mov
[docs] def movie_localization( self, mov: da.Array, DT: float, pixel_size: float ) -> pd.DataFrame: detections = self.detector.movie_detection(mov) locs = self.localizer.movie_localization(mov, detections) locs[["x", "y"]] *= pixel_size locs["t"] = locs["frame"] * DT return locs
[docs] def loc_processing( self, mov: da.Array, # Movie locs: pd.DataFrame, # Previous localizations, with x and y in micrometers pixel_size: float = 1.0, # Pixel size in micrometers ) -> pd.DataFrame: for i, locproc in enumerate(self.loc_processors): logging.info( "Processing locs, step %d / %d : %s" % (i, len(self.loc_processors), locproc.name) ) locs = locproc.process(mov, locs, pixel_size=pixel_size) return locs
[docs] def tracking(self, locs: pd.DataFrame) -> pd.DataFrame: return self.tracker.track(locs)
def _process( self, acq: Acquisition, skip_loc: bool = False, skip_tracking: bool = False, ): # Load the original .tif if skip_tracking and not skip_loc: skip_tracking = False logging.warning( "Can't skip tracking after fresh localization. Overriding to skip_tracking = False" ) original_mov = acq.image.astype(float) if not skip_loc: mov = self.movie_preprocessing(original_mov) locs = self.movie_localization( mov, DT=acq.experiment.DT, pixel_size=acq.experiment.pixel_size ) acq.raw_locs = locs.copy() self.mark_as_localized(acq) if not skip_tracking: acq.experiment.pixel_size locs = self.loc_processing(original_mov, acq.raw_locs.copy()) acq.locs = self.tracking(locs) self.mark_as_tracked(acq)
[docs] def process( self, to_process: Union[Acquisition, Experiment], force_reprocess: bool = False, ): logging.debug("Process %s" % to_process) if isinstance(to_process, Acquisition): # Actually process self._process( to_process, skip_loc=(not force_reprocess) and self.is_already_localized(to_process), skip_tracking=(not force_reprocess) and self.is_already_tracked(to_process), ) elif isinstance(to_process, Experiment): # Actually process, acquisition after acquisition for i, acq_file in enumerate(to_process): acq = Acquisition( acq_file, experiment=to_process, image_pipeline=self ) if i == 0: pipeline_export_path_for_exp = self.exp_params_path(acq) logging.info( "Saving pipeline to %s" % pipeline_export_path_for_exp ) self.to_yaml(pipeline_export_path_for_exp) self.process(acq, force_reprocess=force_reprocess) else: raise BaseException( "to_process shoud be an experiment or an acquisition but is %s" % to_process.__class__.__qualname__ )
[docs] def exp_run_df_path(self, acq: Acquisition) -> str: run_path = os.path.join(acq.experiment.export_folder, self.name) if not os.path.exists(run_path): os.mkdir(run_path) assert os.path.isdir(run_path) exp_run_df_path = os.path.join( acq.experiment.export_folder, self.name, "run_index.csv" ) return exp_run_df_path
[docs] def exp_params_path(self, acq: Acquisition) -> str: run_path = os.path.join(acq.experiment.export_folder, self.name) if not os.path.exists(run_path): os.mkdir(run_path) assert os.path.isdir(run_path) exp_params_path = os.path.join( acq.experiment.export_folder, self.name, "%s.yaml" % self.name ) return exp_params_path
[docs] def is_already_localized(self, acq: Acquisition): p = self.exp_run_df_path(acq=acq) if os.path.exists(p): run_index_df = pd.read_csv(p, index_col=0) try: return run_index_df.loc[acq.ID, "is_localized"] == True except: pass return False
[docs] def is_already_tracked(self, acq: Acquisition): p = self.exp_run_df_path(acq=acq) if os.path.exists(p): run_index_df = pd.read_csv(p, index_col=0) try: return run_index_df.loc[acq.ID, "is_tracked"] == True except: pass return False
[docs] def mark_as_localized(self, acq: Acquisition): p = self.exp_run_df_path(acq) if os.path.exists(p): df = pd.read_csv(p, index_col=0) else: df = pd.DataFrame( columns=["is_localized", "is_tracked"], dtype=bool ) df.loc[acq.ID, "is_localized"] = True df.to_csv(p, index=True)
[docs] def mark_as_tracked(self, acq: Acquisition): p = self.exp_run_df_path(acq) if os.path.exists(p): df = pd.read_csv(p, index_col=0) else: df = pd.DataFrame( columns=["is_localized", "is_tracked"], dtype=bool ) df.loc[acq.ID, "is_tracked"] = True df.to_csv(p, index=True)