wfcommons.wfbench

wfcommons.wfbench.bench

class wfcommons.wfbench.bench.WorkflowBenchmark(recipe: Type[WfChefWorkflowRecipe], num_tasks: int, logger: Logger | None = None)

Bases: object

Generate a workflow benchmark instance based on a workflow recipe (WfChefWorkflowRecipe)

Parameters:
  • recipe (Type[WfChefWorkflowRecipe]) – A workflow recipe.

  • num_tasks (int) – Total number of tasks in the benchmark workflow.

  • logger (Optional[Logger]) – The logger where to log information/warning or errors.

create_benchmark(save_dir: Path, percent_cpu: float | Dict[str, float] = 0.6, cpu_work: int | Dict[str, int] = None, gpu_work: int | Dict[str, int] = None, time: int | None = None, data: int | Dict[str, str] | None = None, mem: float | None = None, lock_files_folder: Path | None = None, regenerate: bool | None = True) Path

Create a workflow benchmark.

Parameters:
  • save_dir (pathlib.Path) – Folder to generate the workflow benchmark JSON instance and input data files.

  • percent_cpu (Union[float, Dict[str, float]]) – The percentage of CPU threads.

  • cpu_work (Union[int, Dict[str, int]]) – CPU work per workflow task.

  • gpu_work (Union[int, Dict[str, int]]) – GPU work per workflow task.

  • time (Optional[int]) – Time limit for running each task (in seconds).

  • data (Optional[Union[int, Dict[str, str]]]) – Dictionary of input size files per workflow task type or total workflow data footprint (in MB).

  • mem (Optional[float]) – Maximum amount of memory consumption per task (in MB).

  • lock_files_folder (Optional[pathlib.Path]) –

  • regenerate (Optional[bool]) – Whether to regenerate the workflow tasks

Returns:

The path to the workflow benchmark JSON instance.

Return type:

pathlib.Path

create_benchmark_from_input_file(save_dir: Path, input_file: Path, lock_files_folder: Path | None = None) Path

Create a workflow benchmark.

Parameters:
  • save_dir (pathlib.Path) – Folder to generate the workflow benchmark JSON instance and input data files.

  • input_file (pathlib.Path) –

  • lock_files_folder (Optional[pathlib.Path]) –

Returns:

The path to the workflow benchmark JSON instance.

Return type:

pathlib.Path

create_benchmark_from_synthetic_workflow(save_dir: Path, workflow: Workflow, percent_cpu: float | Dict[str, float] = 0.6, cpu_work: int | Dict[str, int] = None, gpu_work: int | Dict[str, int] = None, time: int | None = None, mem: float | None = None, lock_files_folder: Path | None = None) Path

Create a workflow benchmark from a synthetic workflow

Parameters:
  • save_dir (pathlib.Path) – Folder to generate the workflow benchmark JSON instance and input data files.

  • workflow (Workflow) – The (synthetic) workflow to use as a benchmark.

  • percent_cpu (Union[float, Dict[str, float]]) – The maximum percentage of CPU threads.

  • cpu_work (Union[int, Dict[str, int]]) – Maximum CPU work per workflow task.

  • gpu_work (Union[int, Dict[str, int]]) – Maximum GPU work per workflow task.

  • time (Optional[int]) – Time limit for running each task (in seconds).

  • mem (Optional[float]) – Maximum amount of memory consumption per task (in MB).

  • lock_files_folder (Optional[pathlib.Path]) –

Returns:

The path to the workflow benchmark JSON instance.

Return type:

pathlib.Path

generate_input_file(path: Path) None

Generates input file where customization of cpu percentage, cpu work, gpu work, data size

Parameters:

path (pathlib.Path) –

run(json_path: Path, save_dir: Path) None

Run the benchmark workflow locally (for test purposes only).

Parameters:

json_path (pathlib.Path) –

Param:

save_dir:

wfcommons.wfbench.bench.assigning_correct_files(task: Dict[str, str]) List[str]
wfcommons.wfbench.bench.cleanup_sys_files() None

Remove files already used

wfcommons.wfbench.bench.generate_sys_data(num_files: int, file_total_size: int, task_name: List[str], save_dir: Path) None

Generate workflow’s input data

Parameters:
  • num_files (int) –

  • file_total_size (int) –

  • save_dir (pathlib.Path) – Folder to generate the workflow benchmark’s input data files.

wfcommons.wfbench.translator.nextflow

class wfcommons.wfbench.translator.nextflow.NextflowTranslator(workflow: Workflow | Path, logger: Logger | None = None)

Bases: Translator

A WfFormat parser for creating Nextflow workflow applications.

Parameters:
  • workflow (Union[Workflow, pathlib.Path],) – Workflow benchmark object or path to the workflow benchmark JSON instance.

  • logger (Logger) – The logger where to log information/warning or errors (optional).

translate(output_file_path: Path) None

Translate a workflow benchmark description(WfFormat) into a Nextflow workflow application.

: param output_file_path: The name of the output file(e.g., workflow.py). : type output_file_path: pathlib.Path

valid_task_name(original_task_name: str) str
wfcommons.wfbench.translator.nextflow.human_readable_memory(mem_bytes: int) str

wfcommons.wfbench.translator.pegasus

class wfcommons.wfbench.translator.pegasus.PegasusTranslator(workflow: Workflow | Path, logger: Logger | None = None)

Bases: Translator

A WfFormat parser for creating Pegasus workflow applications.

Parameters:
  • workflow (Union[Workflow, pathlib.Path],) – Workflow benchmark object or path to the workflow benchmark JSON instance.

  • logger (Logger) – The logger where to log information/warning or errors (optional).

translate(output_file_name: Path, tasks_priorities: Dict[str, int] | None = None) None

Translate a workflow benchmark description (WfFormat) into a Pegasus workflow application.

Parameters:
  • output_file_name (pathlib.Path) – The name of the output file (e.g., workflow.py).

  • tasks_priorities (Optional[Dict[str, int]]) – Priorities to be assigned to tasks.

wfcommons.wfbench.translator.swift_t

class wfcommons.wfbench.translator.swift_t.SwiftTTranslator(workflow: Workflow | Path, work_dir: Path, stress_path: Path = PosixPath('stress-ng'), logger: Logger | None = None)

Bases: Translator

A WfFormat parser for creating Swift/T workflow applications.

Parameters:
  • workflow (Union[Workflow, pathlib.Path]) – Workflow benchmark object or path to the workflow benchmark JSON instance.

  • work_dir (pathlib.Path) – Path to the workflow working directory.

  • stress_path (pathlib.Path) – Path to the stress-ng command.

  • logger (Logger) – The logger where to log information/warning or errors (optional).

translate(output_file_path: Path) None

Translate a workflow benchmark description (WfFormat) into a Swift/T workflow application.

Parameters:

output_file_path (pathlib.Path) – The path of the output file (e.g., workflow.swift).