wfcommons.wfbench
wfcommons.wfbench.bench
- class wfcommons.wfbench.bench.WorkflowBenchmark(recipe: Type[WfChefWorkflowRecipe], num_tasks: int, logger: Logger | None = None)
Bases:
object
Generate a workflow benchmark instance based on a workflow recipe (WfChefWorkflowRecipe)
- Parameters:
recipe (Type[WfChefWorkflowRecipe]) – A workflow recipe.
num_tasks (int) – Total number of tasks in the benchmark workflow.
logger (Optional[Logger]) – The logger where to log information/warning or errors.
- _add_input_files(output_files: Dict[str, Dict[str, str]], data: int | Dict[str, str]) None
Add input files when input data was offered by the user.
- Parameters:
output_files
data (Union[int, Dict[str, str]])
- _add_output_files(output_files: int | Dict[str, Dict[str, int]]) None
Add output files when input data was offered by the user.
- Parameters:
output_files
- _calculate_input_files()
Calculate total number of files needed. This mehtod is used if the user provides total datafootprint.
- _create_data_footprint(data: int | Dict[str, str] | None, save_dir: Path) None
task’s data footprint provided as individual data input size (JSON file)
- _creating_lock_files(lock_files_folder: Path | None) Tuple[Path, Path]
Creating the lock files
- _generate_data_for_root_nodes(save_dir: Path, data: int | Dict[str, str]) None
Generate workflow’s input data for root nodes based on user’s input.
- Parameters:
save_dir (pathlib.Path)
data (Dict[str, str])
- _generate_task_cpu_params(task: Task, percent_cpu: float | Dict[str, float], cpu_work: int | Dict[str, int], lock_files_folder: Path | None, cores: Path | None, lock: Path | None) List[str]
Setting cpu arguments if cpu benchmark requested
- _generate_task_gpu_params(task: Task, gpu_work: int | Dict[str, int]) List[str]
Setting gpu arguments if gpu benchmark requested
- _output_files(data: Dict[str, str]) Dict[str, Dict[str, int]]
Calculate, for each task, total number of output files needed. This method is used when the user is specifying the input file sizes.
- Parameters:
data (Dict[str, str])
- Returns:
- Return type:
Dict[str, Dict[str, int]]
- _rename_files_to_wfbench_format() List[File]
Rename the files in the workflow to the wfbench format.
- Returns:
A list of the input files that need to be generated (with their new names).
- Return type:
List[File]
- _set_argument_parameters(task: Task, percent_cpu: float | Dict[str, float], cpu_work: int | Dict[str, int], gpu_work: int | Dict[str, int], time: int | None, mem: float | None, lock_files_folder: Path | None, cores: Path | None, lock: Path | None, rundir: Path | None) None
Setting the parameters for the arguments section of the JSON
- create_benchmark(save_dir: Path, percent_cpu: float | Dict[str, float] = 0.6, cpu_work: int | Dict[str, int] = None, gpu_work: int | Dict[str, int] = None, time: int | None = None, data: int | Dict[str, str] | None = None, mem: float | None = None, lock_files_folder: Path | None = None, regenerate: bool | None = True, rundir: Path | None = None) Path
Create a workflow benchmark.
- Parameters:
save_dir (pathlib.Path) – Folder to generate the workflow benchmark JSON instance and input data files.
percent_cpu (Union[float, Dict[str, float]]) – The percentage of CPU threads.
cpu_work (Union[int, Dict[str, int]]) – CPU work per workflow task.
gpu_work (Union[int, Dict[str, int]]) – GPU work per workflow task.
time (Optional[int]) – Time limit for running each task (in seconds).
data (Optional[Union[int, Dict[str, str]]]) – Dictionary of input size files per workflow task type or total workflow data footprint (in MB).
mem (Optional[float]) – Maximum amount of memory consumption per task (in MB).
lock_files_folder (Optional[pathlib.Path])
regenerate (Optional[bool]) – Whether to regenerate the workflow tasks
rundir (Optional[pathlib.Path]) – If you would like for the files to be created/saved in a different directory.
- Returns:
The path to the workflow benchmark JSON instance.
- Return type:
pathlib.Path
- create_benchmark_from_input_file(save_dir: Path, input_file: Path, lock_files_folder: Path | None = None, rundir: Path | None = None) Path
Create a workflow benchmark.
- Parameters:
save_dir (pathlib.Path) – Folder to generate the workflow benchmark JSON instance and input data files.
input_file (pathlib.Path)
lock_files_folder (Optional[pathlib.Path])
rundir (Optional[pathlib.Path]) – If you would like for the files to be created/saved in a different directory.
- Returns:
The path to the workflow benchmark JSON instance.
- Return type:
pathlib.Path
- create_benchmark_from_synthetic_workflow(save_dir: Path, workflow: Workflow, percent_cpu: float | Dict[str, float] = 0.6, cpu_work: int | Dict[str, int] = None, gpu_work: int | Dict[str, int] = None, time: int | None = None, mem: float | None = None, lock_files_folder: Path | None = None, rundir: Path | None = None) Path
Create a workflow benchmark from a synthetic workflow
- Parameters:
save_dir (pathlib.Path) – Folder to generate the workflow benchmark JSON instance and input data files.
workflow (Workflow) – The (synthetic) workflow to use as a benchmark.
percent_cpu (Union[float, Dict[str, float]]) – The maximum percentage of CPU threads.
cpu_work (Union[int, Dict[str, int]]) – Maximum CPU work per workflow task.
gpu_work (Union[int, Dict[str, int]]) – Maximum GPU work per workflow task.
time (Optional[int]) – Time limit for running each task (in seconds).
mem (Optional[float]) – Maximum amount of memory consumption per task (in MB).
lock_files_folder (Optional[pathlib.Path])
rundir (Optional[pathlib.Path]) – If you would like for the files to be created/saved in a different directory.
- Returns:
The path to the workflow benchmark JSON instance.
- Return type:
pathlib.Path
- generate_input_file(path: Path) None
Generates input file where customization of cpu percentage, cpu work, gpu work, data size
- Parameters:
path (pathlib.Path)
- run(json_path: Path, save_dir: Path) None
Run the benchmark workflow locally (for test purposes only).
- Parameters:
json_path (pathlib.Path)
- Param:
save_dir:
- wfcommons.wfbench.bench.assigning_correct_files(task: Dict[str, str]) List[str]
- wfcommons.wfbench.bench.clean_entry(entry)
- wfcommons.wfbench.bench.cleanup_sys_files() None
Remove files already used
- wfcommons.wfbench.bench.generate_sys_data(num_files: int, tasks: Dict[str, int], save_dir: Path) None
Generate workflow’s input data
- Parameters:
num_files (int) – number of each file to be generated.
tasks (Dict[str, int]) – Dictionary with the name of the tasks and their data sizes.
save_dir (pathlib.Path) – Folder to generate the workflow benchmark’s input data files.
wfcommons.wfbench.translator.nextflow
- class wfcommons.wfbench.translator.nextflow.NextflowTranslator(workflow: Workflow | Path, logger: Logger | None = None)
Bases:
Translator
A WfFormat parser for creating Nextflow workflow applications.
- Parameters:
workflow (Union[Workflow, pathlib.Path],) – Workflow benchmark object or path to the workflow benchmark JSON instance.
logger (Logger) – The logger where to log information/warning or errors (optional).
- _abc_impl = <_abc._abc_data object>
- _add_abstract_task_definition(abstract_task_name: str, physical_tasks: List[Task]) None
Add an abstract task to the workflow considering it’s physical tasks.
: param abstract_task_name: the name of the abstract task : type abstract_task_name: str : param physical_tasks: a list of physical tasks for this abstract tasks : type physical_tasks: List[Task]
- _add_call_to_abstract_task(abstract_task_name: str, physical_tasks: List[Task]) None
- _create_file_task_mappings(output_file_path: Path) None
- _create_task_args_map(output_file_path: Path, abstract_task_name: str, physical_tasks: List[Task]) None
- _determine_abstract_relations() None
Determines the abstract tasks that will be used for the nextflow definition of the workflow.
- _determine_input_output() None
Determines the inputs and outputs for the physical and abstract tasks.
- _is_resource_arg(arg: str) bool
- _write_map_file(map_dict: Dict, map_name: str, output_file_path: Path) None
- translate(output_file_path: Path) None
Translate a workflow benchmark description(WfFormat) into a Nextflow workflow application.
: param output_file_path: The name of the output file(e.g., workflow.py). : type output_file_path: pathlib.Path
- valid_task_name(original_task_name: str) str
- wfcommons.wfbench.translator.nextflow.human_readable_memory(mem_bytes: int) str
wfcommons.wfbench.translator.pegasus
- class wfcommons.wfbench.translator.pegasus.PegasusTranslator(workflow: Workflow | Path, logger: Logger | None = None)
Bases:
Translator
A WfFormat parser for creating Pegasus workflow applications.
- Parameters:
workflow (Union[Workflow, pathlib.Path],) – Workflow benchmark object or path to the workflow benchmark JSON instance.
logger (Logger) – The logger where to log information/warning or errors (optional).
- _abc_impl = <_abc._abc_data object>
- _add_task(task_name: str, parent_task: str | None = None, tasks_priorities: Dict[str, int] | None = None) None
Add a task and its dependencies to the workflow.
- Parameters:
task_name (str) – name of the task
parent_task (Optional[str]) – name of the parent task
tasks_priorities (Optional[Dict[str, int]]) – Priorities to be assigned to tasks.
- translate(output_folder: Path, tasks_priorities: Dict[str, int] | None = None) None
Translate a workflow benchmark description (WfFormat) into a Pegasus workflow application.
- Parameters:
output_folder (pathlib.Path) – The path to the folder in which the workflow benchmark will be generated.
tasks_priorities (Optional[Dict[str, int]]) – Priorities to be assigned to tasks.
wfcommons.wfbench.translator.swift_t
- class wfcommons.wfbench.translator.swift_t.SwiftTTranslator(workflow: Workflow | Path, stress_path: Path = PosixPath('stress-ng'), logger: Logger | None = None)
Bases:
Translator
A WfFormat parser for creating Swift/T workflow applications.
- Parameters:
workflow (Union[Workflow, pathlib.Path]) – Workflow benchmark object or path to the workflow benchmark JSON instance.
stress_path (pathlib.Path) – Path to the stress-ng command.
logger (Logger) – The logger where to log information/warning or errors (optional).
- _abc_impl = <_abc._abc_data object>
- _add_tasks(category: str) None
Add all tasks for a specific category.
- Parameters:
category (str) – category name
- _find_categories_list(task_name: str, parent_task: str | None = None) None
” Find list of task categories ordered by task dependencies.
- Parameters:
task_name (str) – name of the task
parent_task (Optional[str]) – name of the parent task
- translate(output_folder: Path) None
Translate a workflow benchmark description (WfFormat) into a Swift/T workflow application.
- Parameters:
output_folder (pathlib.Path) – The path to the folder in which the workflow benchmark will be generated.
wfcommons.wfbench.translator.taskvine
- class wfcommons.wfbench.translator.taskvine.TaskVineTranslator(workflow: Workflow | Path, logger: Logger | None = None)
Bases:
Translator
A WfFormat parser for creating TaskVine workflow applications.
- Parameters:
workflow (Union[Workflow, pathlib.Path],) – Workflow benchmark object or path to the workflow benchmark JSON instance.
logger (Logger) – The logger where to log information/warning or errors (optional).
- _abc_impl = <_abc._abc_data object>
- _add_level_tasks(tasks_list: list[str]) list[str]
Add all tasks from a level in the workflow.
- Parameters:
tasks_list (list[str]) – list of tasks in the level
- Returns:
List of next level tasks
- Return type:
list[str]
- _add_task(task_name: str, parent_task: str | None = None) list[str]
Add a task and its dependencies to the workflow.
- Parameters:
task_name (str) – name of the task
parent_task (Optional[str]) – name of the parent task
- Returns:
List of children tasks
- Return type:
list[str]
- translate(output_folder: Path) None
Translate a workflow benchmark description (WfFormat) into an actual workflow application.
- Parameters:
output_folder (pathlib.Path) – The path to the folder in which the workflow benchmark will be generated.