Ephys/Imaging Automation Pipeline in BRAINCoGS (Developer Guide)

The Ephys/Imaging Automation Pipeline in BRAINCoGS main goals are:

  • Automate spike sorting and imaging segmentation for all recordings
  • Centralize/Standardize paths for Recording Data Storage
  • Unify & Register Ephys/Imaging Processing
  • Store processed data in BRAINCoGS Database (DJ)

To accomplish this we developed three tools:

Ephys/Imaging Automation GUI

In this mini guide for the automation GUI we will show the relationship between the GUI and the Database. From which tables some values are taken and which records are written by the GUI.

Automation GUI main screen

Ephys Preprocessing (precluster) parameters organization

Main tables

  • u19_pipeline_ephys_element.#pre_cluster_method List of methods (or algorithms) supported for ephys preprocessing
  • u19_pipeline_ephys_element.pre_cluster_param_set Specific set of parameters (mainly a dictionary) for a given preprocessing method. Multiple set of parameters can be stored for the same method.
  • u19_pipeline_ephys_element.pre_cluster_param_steps (Ephys) Reference to a set to steps to perform in ephys preprocessing
  • u19_pipeline_ephys_element.pre_cluster_param_steps__step These records indicate which set of parameters for given preprocessing methods will be executed (and in which order).
  • Depicted in the above image:
  1. Let's pretend: precluster_param_steps_name = new_preprocessing_steps_1 (precluster_param_steps_id = 10 ) is selected for preprocessing.
  2. According to pre_cluster_param_steps__step
    • paramset_idx = 9 will be executed 1st
    • paramset_idx = 2 2nd
    • paramset_idx = 3 3rd.
  3. Checking pre_cluster_param_set for paramset_idx = 9,2,3 we conclude preprocessing will comprise:
    • Tprime (Tprime ParamSet 1)
    • Catgt (Catgt ParamSet for Towers Task)
    • PreClustMethod1 (PreClusterMethod1 Paramset Mika)

Epgys Processing (cluster) parameters organization

  • Simpler than preprocessing structure (since there are no multiple steps involved), we have two tables to organize Ephys Processing parameters.

Main tables

  • u19_pipeline_ephys_element.#clustering_method List of methods (or algorithms) supported for ephys processing

  • u19_pipeline_ephys_element.#clustering_param_set Specific set of parameters (mainly a dictionary) for a given processing method. Multiple set of parameters can be stored for the same method.

  • Each recording (or to be precise, recording process) can be processed with a different set of parameters. Default parameters are used for the majority of the recordings in BRAINCoGS.

Default parameters for preprocessing and processing

  • As seen in in Automation GUI main screen, u19_recording.#modality stores default parameters for each modality.
  • As a developer manually update default parameters for all modalities when needed by the project.
  • In u19_recording.#modality table it is stored a reference for default parameters most commonly used for processing ephys & imaging.
  • Main table to store preprocessing parameters:
    1. u19_pipeline_ephys_element.pre_cluster_param_steps: (Ephys) Reference to a set to steps to perform in ephys preprocessing
    2. Imaging, u19_pipeline_imaging_element.pre_process_param_steps: (Imaging) Reference to a set to steps to perform in imaging preprocessing (No preprocessing in imaging for any user at the moment)
  • Main table to store processing parameters:
    1. Ephys, u19_pipeline_ephys_element.#clustering_param_set: (Ephys) Reference to a set of parameters for chosen sorting algorithm.
    2. Imaging, u19_pipeline_imaging_element.#processing_param_set: (Imaging) Reference to a set of parameters for chosen segmentation algorithm.

Imaging equivalence parameter tables:

  • All description made for ephys preprocessing and processing tables apply for the imaging counterparts.

Tables written when recording is registered:

  • When a new recording is created three tables are written:

    1. u19_recording.recording: Main table for recordings. Recording_id is created will identify the recording through all the process
    2. u19_recording.recording__behavior_session: Reference to which behavior session corresponds this recording.
    3. u19_recording.default_params: Set of parameters chosen for this recording.
  • If there is no behavior session attached to recording:

  • u19_recording.recording__recording_session: Subject and datetime of recording is stored as reference in this table.

u19_recording.default_params design:

  • Default_params works as a "guide" to know which parameters where chosen for recording.
  • Explanation for all fields of this table:
  1. recording_id Reference to which recording parameters are being selected
  2. fragment_number Reference to which "fragment" (or job) the parameters apply to. (Check next session to know how recordings are split in fragments).
  3. default_same_preparams_all If default_same_preparams_all = 1 (default case), same preprocessing parameters will be applied to all fragments of recording.
  4. preprocess_param_steps_id Preprocessing parameter id chosen for this recording-fragment. Taken from u19_recording.#modality by default.
  5. default_same_params_all If default_same_params_all = 1 (default case), same processing parameters will be applied to all fragments of recording.
  6. paramset_idx Processing parameter id chosen for this recording-fragment. Taken from u19_recording.#modality by default.
  • In the default case (main screen Automation GUI) default_same_preparams_all=1 & default_same_params_all=1 so default parameters will be applied to all fragments of recording.

Workflow management description

Workflow management code creates and coordinates of a set of tasks for all recordings that were registered with the GUI to make sure they are entirely processed.

Shell code executed as a cronjob for workflow management: (call_cronjob_automatic_job.sh )

Workflow management is composed mainly by two classes that handles recordings and recording_processes (recording_processes or jobs are how recordings are composed)

  • Ephys recordings are composed by one or many independent probe electrophysiology recordings. Each probe recording correspond to a job in the workflow management
  • Calcium imaging recordings are composed by one or many independent field of views image stacks. Each field of view image stack correspond to a job in the workflow management.

The class that manages workflow at the recording level is (RecordingHandler)

Main functions and variables in recording workflow manager

  • recording_status_dict in (Params Config file): This dictionary defines status definitions and corresponding functions to execute.
  • pipeline_handler_main in (RecordingHandler): Main function in recording workflow
    1. Executes corresponding functions based in status.
    2. Executed every 30 minutes to check for new recordings to be handled.
    3. Send notifications for processed and failed functions.
  • modality_preingestion in (RecordingHandler): Main ingestion function from recording to recording_process tables. There are subcalls depending on modality of recording (ephys or imaging).

Imaging preingestion main steps:

  • imaging_preingestion in (RecordingHandler): Ingestion to recording_process table for an imaging recording. Get all FOVs (TIFF stacks) for the recording and assign a new job for each one with corresponding parameters fetched from selection done in automation GUI. Make function in AcquiredTiff in (AcquiredTiff make function ): Population calls to:
  1. u19_imaging_pipeline.AcquiredTiff: Each recording is divided into Tiff Splits (e.g. Mesoscope recordings contain multiple tiff stacks that will be processed independently)
  2. u19_imaging_pipeline.SyncImagingBehavior: Find correspondence between virtual reality frame in the behavior experiment and Calcium Imaging frame in recording.
    (Code here). Given that most of users use MATLAB to read sync data population of this table is done in general populate tables cronjob script. (populate tables script description).

Ephys preingestion main steps:

  • electrophysiology_preingestion in (RecordingHandler): Ingestion to recording_process table for an ephys recording. Get all probes for the recording and assign a new job for each one with corresponding parameters fetched from selection done in automation GUI.
  1. Ingest ephys_pipeline.EphysPipelineSession table
  2. Ingest ephys_element.ProbeInsertion table
  3. Ingest ephys_element.EphysRecording table
  4. Ingest ephys_pipeline.BehaviorSync table: Find corresponding iteration in ephys recording with frame from Virmen behavior task (Code here ) ( and here ).
  5. For each probe (insertion_number) in EphysSession insert a Processing (job) in u19_recording_process.Processing

Main functions and variables in recording_process workflow manager

  • recording_process_status_dict in (Params Config file): This dictionary defines status definitions and corresponding functions to execute.
  • pipeline_handler_main in (RecProcessHandler): Main function in recording process workflow
  1. Executes corresponding functions based in status.
  2. Executed every 30 minutes to check for new recordings to be handled.
  3. Send notifications for processed and failed functions.
  • transfer check/review in (transfer_check/review): Executes and monitors globus transfer from PNI to PrincetonUniversity clusters. (Deprecated)
  • slurm_job_queue/check in (slurm_job_functions): Generate slurm file and queue the job in the cluster that will process recording process. Monitor job to check if it has already finished.
  • populate_element in (slurm_job_queuew): After processing jobs populate imaging or ephys element tables downstream from results file.

Collab reposiotries to handle Ephys/Imaging Processing

BrainCogsEphysSorters

  • BrainCogsEphysSorters is the electrophysiology processing pipeline used by BrainCOGS to preprocess, sort, and post-process Neuropixels recordings. This repository works with parameters defined in previous steps of the Automation Pipeline.

  • Location: Current location of repository: /mnt/cup/braininit/Shared/repos/AutomaticPipelineProcessing/electrophysiology_processing/BrainCogsEphysSorters

  • System: The repository is installed in g-bcogs-u19proc2.pni.princeton.edu and is run through slurm job scheduler.

  • Logs locations:

    • ErrorLogs: /mnt/cup/braininit/Shared/repos/AutomaticPipelineProcessing/u19_pipeline/automatic_job/ErrorLog
    • OutputLogs: /mnt/cup/braininit/Shared/repos/AutomaticPipelineProcessing/u19_pipeline/automatic_job/OutputLog
  • The repository acts as a unified orchestration layer around multiple electrophysiology tools:

    • CatGT (preprocessing)
    • Kilosort 2
    • Kilosort 3
    • Kilosort 4
    • IBL Atlas post-processing pipeline

High-Level Workflow

Raw Neuropixels Recording │ ▼ Preprocessing (CatGT) │ ▼ Spike Sorting (Kilosort2/3/4) │ ▼ Partial Cleanup │ ▼ IBL Atlas Conversion │ ▼ Processed Output

Main Components

  1. main_script
  • Coordinates the entire pipeline.
  • File: main_script.py, and main entry point of repository.
# Get recording process and data directories
recording_process_id = os.environ['recording_process_id']
raw_data_directory = os.environ['raw_data_directory']
processed_data_directory = os.environ['processed_data_directory']

# Get absolute paths to raw and processed
raw_data_directory = pathlib.Path(config.root_raw_data_dir,raw_data_directory)
processed_data_directory = pathlib.Path(config.root_processed_data_dir,processed_data_directory)

# Execute selected preprocessing steps
new_raw_data_directory = pw.preprocess_main(recording_process_id, raw_data_directory, processed_data_directory)

# Execute selected sorter
sorter_processed_directory = sw.sorter_main(recording_process_id, new_raw_data_directory, processed_data_directory)

# Post process data
pw.post_process_partial_results(recording_process_id, raw_data_directory, processed_data_directory)
ppw.post_process_main(raw_data_directory, processed_data_directory, sorter_processed_directory)

  1. Preprocessing Layer
  • Checks which preprocessing steps to perform based on preprocessing param file. And executes them (for now only CatGT is implemented as preprocessing stage).
  • File: u19_sorting/preprocess_wrappers.py
  • Output result Location: braininit/Data/Processed/electrophysiology/(user)/(subject)/(session_date)_g(session#)/(g#_spikeglx_dir)/(imec#spikeglx_dir)/job_id(jobid)/catGT_output
def preprocess_main(recording_process_id, raw_data_directory, processed_data_directory):

    preprocess_parameters = json.load(preprocess_param_file)

    for this_preparam in preprocess_parameters:
        if config.preproc_tools['catgt'] in this_preparam:
            catgt_output_dir = pathlib.Path(processed_data_directory, config.preproc_tools['catgt']+"_output")
            new_raw_data_directory = cat_gt.run_cat_gt(new_raw_data_directory, catgt_output_dir, this_preparam[config.preproc_tools['catgt']])
  1. Sorting Layer
  • Executes the selected spike sorting algorithm.
  • File: u19_sorting/sorter_wrappers.py
  • Output result Location: braininit/Data/Processed/electrophysiology/(user)/(subject)/(session_date)_g(session#)/(g#_spikeglx_dir)/(imec#spikeglx_dir)/job_id(jobid)/(sorter)_output
 sorter = config.sorters_names[process_parameters['clustering_method']]

sorter_processed_directory = pathlib.Path(processed_directory, process_parameters['clustering_method']+'_output')

  if sorter == config.sorters_names['kilosort2']:
      Kilosort2.run_Kilosort2(raw_directory, sorter_processed_directory, process_parameters_filename, chanmap_filename)
  elif sorter == config.sorters_names['kilosort3']:
      Kilosort3.run_Kilosort3(raw_directory, sorter_processed_directory, process_parameters_filename, chanmap_filename)
  elif sorter == config.sorters_names['kilosort4']:
      print('running Kilosort 4 here xxxxxxxx')
      Kilosort4.run_Kilosort4(raw_directory, sorter_processed_directory, process_parameters_filename, chanmap_filename)

  1. Post-Processing
  • Convert sorter outputs into formats required by downstream analysis pipelines.
  • File: u19_sorting/postprocess_wrappers.py
  • Output result Location: braininit/Data/Processed/electrophysiology/(user)/(subject)/(session_date)_g(session#)/(g#_spikeglx_dir)/(imec#spikeglx_dir)/job_id(jobid)/ibl_data
  # For the moment we just call ibl_data transformation to run atlas
  ibl_atlas_post_processing.run_ibl_atlas_post_processing(raw_data_directory, processed_data_directory, sorter_processed_directory)

Design Patterns

  • The repository isolates third-party tools behind wrappers:

    • CatGT
    • Kilosort2
    • Kilosort3
    • Kilosort4
  • This makes it easier to:

    • Replace sorters
    • Add new preprocessing tools
    • Keep a common interface
    • Configuration-Driven Execution
  • Behavior is controlled entirely by JSON files (created by Automation Pipeline):

    • preprocess_paramset_(id).json
    • process_paramset_(id).json