Module Reference

Documentation for python modules in the Lung Modelling repository

workflow_manager

class lung_modelling.workflow_manager.DatasetLocator(root, rel_primary, rel_derivative, rel_pooled_primary, rel_pooled_derivative)[source]

A helper class to convert paths for SPARC datasets between absolute and relative, and to store primary and derivative directories

class lung_modelling.workflow_manager.EachItemTask(name, config)[source]

A class providing an interface for tasks that should be applied to each item in a list of sources.

abstract static initialize(dataloc, dataset_config, task_config)[source]

A method to be called before multiprocessing to load data required by all items. This should only be used to load data to avoid multiple simultaneous file access by the work function. It should not create or modify any files. TODO: this should get the dirs_list too, just so we know which dirs are in use, (e.g., if we need to look up data for all the subjects)

abstract static work(source_directory_primary, source_directory_derivative, output_directory, dataset_config, task_config, initialize_result=None)[source]

The work function of the implementation of this class. Defines work to be done on a set of sources specified by the source directory parameters. This function should not attempt to access files outside these directories because this function will be run in parallel for all workflow sources.

Results of the work should be saved in the output directory. The return type should be a list of Path objects representing the files created.

Parameters:
  • source_directory_primary (Path) – Absolute path of the source directory in the primary folder of the dataset

  • source_directory_derivative (Path) – Absolute path of the source directory in the derivative folder of the dataset

  • output_directory (Path) – Absolute path of the directory in which to save results of the work function

  • dataset_config (DictConfig) – Config relating to the entire dataset

  • task_config (DictConfig) – Task specific config

  • initialize_result

Returns:

list of Path objects representing the files created.

Return type:

list[Path]

apply_to_dataset(dataloc, dataset_config, dirs_list, initialize_result=None, mpool=None, show_progress=False)[source]

Apply work function to each subject in a dataset, with optional multiprocessing.

Parameters:
  • dataloc (DatasetLocator) –

  • dataset_config (DictConfig) –

  • dirs_list

  • mpool

  • show_progress

apply_initialize(dataloc, dataset_config)[source]
Parameters:
class lung_modelling.workflow_manager.AllItemsTask(name, config)[source]

A class providing an interface for tasks that should be applied to all items in a list of sources at once.

abstract static work(dataloc, dirs_list, output_directory, dataset_config, task_config)[source]

The work function fo the implementation of this class. Defines work to be done on a set of sources specified by the source directory parameters.

Results of the work should be saved in the output directory. The return type should be a list of Path objects representing the files created.

Parameters:
  • dataloc (DatasetLocator) – Dataset locator for the dataset

  • dirs_list (list) – List of relative paths to the source directories

  • output_directory (Path) – Absolute path of the directory in which to save results of the work function

  • dataset_config (DictConfig) – Config relating to the entire dataset

  • task_config (DictConfig) – Task specific config

Returns:

list of Path objects representing the files created.

Return type:

list[Path]

apply_to_dataset(dataloc, dataset_config, dirs_list)[source]

Apply work function to all items in a dataset at once.

Parameters:
  • dataloc (DatasetLocator) –

  • dataset_config (DictConfig) –

  • dirs_list

lung_modelling.workflow_manager.initialize(dataset_root, task_config, show_progress=True)[source]

Initialization task for processing a dataset directory structure. This is run first when WorkflowManager.run_workflow is called. Loads the dataset config and runs gather_directories, which gathers a list of source directories which tasks will act on.

Parameters:
  • dataset_root (Path) – Root directory of the dataset

  • task_config (DictConfig) –

    Configuration dict for this task

    params
    dataset_config_filename

    Filename for the dataset configuration file. This should be directly inside the dataset_root directory.

    use_directory_index

    Option to use pre-build index of the source directory instead of iterating through with os.walk.

    skip_dirs:

    List of glob strings to match directories to skip. The whole path relative to the dataset_root is tested, so slashes can be included to specify depth to match. This takes precedence over select_dirs

    select_dirs:

    List of glob strings to match directories to select. If empty, all valid source directories are selected. If not, only valid source directories that match one of these are selected.

  • show_progress – Option to show progress

Returns:

  • dataloc – DatasetLocator object

  • dataset_config – DatasetConfig object

  • dirs_list – Record of data folders with list of files

Return type:

Tuple[DatasetLocator, DictConfig, list]

lung_modelling.workflow_manager.gather_directories(primary_root, data_folder_depth, skip_dirs=None, select_dirs=None, index_list=None, show_progress=True)[source]

Gather directories with files to process. Walks through the dataset directory finding data folders at a specified depth.

Parameters:
  • primary_root (Path) – Root of the source directory

  • data_folder_depth – depth of data folder in source directory

  • skip_dirs – List of globs indicating directories to skip

  • select_dirs – List of globs indicating directories to select. Only directories that match the data folder depth are selected. If this is none, all directories are selected.

  • index_list – Optional pre-constructed index of the file system

  • show_progress – Option to show progress

Returns:

dirs_list – Record of data folders with list of files

Return type:

list[tuple[Path, list, list]]

lung_modelling.workflow_manager.exception_monitor(func, callback, logger)[source]

Decorator to catch exceptions from multiprocessing tasks

Parameters:
  • func – Function to wrap

  • callback – Callback to run if exception occurs

  • logger – Loguru logger reference needed for logging from multiprocessing

lung_modelling.workflow_manager.args_unpacker(func)[source]

Decorator to unpack tuple of arguments from multiprocessing methods such as imap

Parameters:

func – Function to wrap

lung_modelling.workflow_manager.log_workflow(dataloc, cfg, task_config, task_name, results)[source]

Workflow task for logging the result of a workflow. Runs after all tasks are complete in WorkflowManager.run_workflow.

Parameters:
  • dataloc (DatasetLocator) – Dataset locator

  • cfg (DictConfig) – DatasetConfig object

  • task_config (DictConfig) – Configuration dict for this task No parameters are currently in use for this task_config

  • task_name – name of this task

  • results – Results of workflow tasks to log

lung_modelling.workflow_manager.get_unique_file_name(path)[source]

Get a unique filename by adding an addition to the end

Parameters:

path (Path) – Original file name

Returns:

path – New unique file name

lung_modelling.workflow_manager.print_results(results)[source]

Print the results dict to the logger.

Parameters:

results (dict) –

class lung_modelling.workflow_manager.WorkflowManager(dataset_root, cfg, mpool=None, show_progress=False)[source]

A class to manage worflows consisting of a list of tasks to run on a collection of data. This class is designed to run on data organized in the SPARC Data Structure (https://doi.org/10.1101/2021.02.10.430563). Units of data to operate on are organized in a directory tree.

register_task(task)[source]

Register a task that can be run by the workflow manager. Registered tasks cannot be duplicates or “initialize” or “logging”. The task name defines which config dict will be passed to it when it is run.

Parameters:

task (EachItemTask | AllItemsTask) – EachItemTask

run_workflow()[source]

Run all tasks, then log the results. tasks will be run in the order they were registered in.