.. _generating-workflow-benchmarks-label: WfBench: Workflow Benchmarks ============================ **WfBench** generates realistic workflow benchmark specifications that can be translated into runnable benchmarks for current workflow systems. It produces tasks with tunable performance characteristics (CPU, memory, and I/O usage) and realistic dependency structures derived from production workflows. Benchmark generation is twofold: first, a specification is produced in the :ref:`json-format-label`; then, that specification is translated into executable benchmark code for a target workflow system. Generating Workflow Benchmark Specifications -------------------------------------------- The :class:`~wfcommons.wfbench.bench.WorkflowBenchmark` class uses recipes of workflows (as described in :ref:`workflow-recipe-generator-label`) for generating workflow benchmarks with an arbitrary number of tasks:: import pathlib from wfcommons import BlastRecipe from wfcommons.wfbench import WorkflowBenchmark # create a workflow benchmark object to generate specifications based on a recipe benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=500) # generate a specification based on performance characteristics path = benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6) In the example above, the workflow benchmark generator first invokes the WfChef recipe to generate a task graph. Once the task graph has been generated, each task is set to be an instance of the workflow task benchmark. For each task, the following values for the parameters of the workflow task benchmark can be specified: - :code:`cpu_work`: CPU work per workflow task. The :code:`cpu-benchmark` executable (compiled C++) calculates an increasingly precise value of π up until the specified total amount of computation (cpu_work) has been performed. - :code:`data`: Individual data volumes for each task in a way that is coherent with respect to task data dependencies (in the form of a dictionary of input size files per workflow task type). Alternatively, a total data footprint (in MB) can be defined, i.e., the sum of the sizes of all data files read/written by workflow tasks, in which case uniform I/O volumes are computed for each workflow task benchmark. - :code:`percent_cpu`: The fraction of the computation's instructions that correspond to non-memory operations. Generate from synthetic workflow instances ++++++++++++++++++++++++++++++++++++++++++ WfCommons also allows you to convert synthetic workflow instances into benchmarks directly. The generated benchmark will have exactly the same structure as the synthetic workflow instance:: import pathlib from wfcommons import BlastRecipe from wfcommons.wfbench import WorkflowBenchmark # create a synthetic workflow instance with 500 tasks or use one that you already have workflow = BlastRecipe.from_num_tasks(500).build_workflow() # create a workflow benchmark object to generate specifications based on a recipe benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=500) # generate a specification based on performance characteristics and the structure of the synthetic workflow instance path = benchmark.create_benchmark_from_synthetic_workflow(pathlib.Path("/tmp/"), workflow, cpu_work=100, percent_cpu=0.6) This is useful when you want to generate a benchmark with a specific structure or when you want benchmarks with the more detailed structure provided by WfChef workflow generation. Translating Specifications into Benchmark Code ---------------------------------------------- WfCommons provides a collection of translators that turn benchmark specifications into runnable workflow code. All translators inherit from :class:`~wfcommons.wfbench.translator.abstract_translator.Translator` and accept either a :class:`~wfcommons.common.workflow.Workflow` object or a path to a benchmark specification in :ref:`json-format-label`. Supported translators (alphabetical) ++++++++++++++++++++++++++++++++++++ - Airflow - Bash - CWL - Dask - Makeflow - Nextflow - Parsl - Pegasus - PyCOMPSs - Swift/T - TaskVine .. warning:: WfBench leverages :code:`stress-ng` (https://github.com/ColinIanKing/stress-ng) to execute memory-intensive threads. Ensure :code:`stress-ng` is installed on all worker nodes. Airflow +++++++ `Apache Airflow `_ is a platform for authoring, scheduling, and monitoring workflows as code. Use the Airflow translator to produce DAGs that can be executed by an Airflow scheduler:: import pathlib from wfcommons import BlastRecipe from wfcommons.wfbench import WorkflowBenchmark, AirflowTranslator benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=200) benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6) translator = AirflowTranslator(benchmark.workflow) translator.translate(output_folder=pathlib.Path("./airflow-wf/")) Bash ++++ The Bash translator generates a simple, runnable shell workflow for quick local validation and debugging:: import pathlib from wfcommons import BlastRecipe from wfcommons.wfbench import WorkflowBenchmark, BashTranslator benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=100) benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=50, data=5, percent_cpu=0.7) translator = BashTranslator(benchmark.workflow) translator.translate(output_folder=pathlib.Path("./bash-wf/")) CWL +++ `CWL `_ is a community standard for describing command-line tools and workflows. The CWL translator emits portable CWL definitions:: import pathlib from wfcommons import BlastRecipe from wfcommons.wfbench import WorkflowBenchmark, CWLTranslator benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=150) benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=80, data=8, percent_cpu=0.6) translator = CWLTranslator(benchmark.workflow) translator.translate(output_folder=pathlib.Path("./cwl-wf/")) Dask ++++ `Dask `_ is an open-source library for parallel computing in Python. It supports local execution, HPC schedulers, and cloud environments:: import pathlib from wfcommons import BlastRecipe from wfcommons.wfbench import WorkflowBenchmark, DaskTranslator benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=500) benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6) translator = DaskTranslator(benchmark.workflow) translator.translate(output_folder=pathlib.Path("./dask-wf/")) Makeflow ++++++++ `Makeflow `_ targets large, DAG-shaped workflows on clusters, grids, and clouds. The translator emits Makeflow workflows:: import pathlib from wfcommons import BlastRecipe from wfcommons.wfbench import WorkflowBenchmark, MakeflowTranslator benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=200) benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6) translator = MakeflowTranslator(benchmark.workflow) translator.translate(output_folder=pathlib.Path("./makeflow-wf/")) Nextflow ++++++++ `Nextflow `_ enables portable, reproducible workflows across local, HPC, and cloud environments:: import pathlib from wfcommons import BlastRecipe from wfcommons.wfbench import WorkflowBenchmark, NextflowTranslator benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=500) benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6) translator = NextflowTranslator( benchmark.workflow, use_subworkflows=False, max_tasks_per_subworkflow=1000, ) translator.translate(output_folder=pathlib.Path("./nextflow-wf/")) If you want to split large workflows across multiple Nextflow module files, enable subworkflows and set the maximum number of tasks per module. This produces a ``modules/`` directory plus a top-level ``workflow.nf`` that includes and runs the modules sequentially:: translator = NextflowTranslator( benchmark.workflow, use_subworkflows=True, max_tasks_per_subworkflow=250, ) translator.translate(output_folder=pathlib.Path("./nextflow-wf/")) .. warning:: Nextflow does not support tasks with iterations (tasks that depend on another instance of the same abstract task). Translation fails for workflows that include iterations. .. note:: If you plan to run Nextflow on an HPC system using Slurm, we **strongly recommend** using the `HyperQueue `_ executor. HyperQueue efficiently distributes workflow tasks across all allocated compute nodes, improving scalability and resource utilization. The :class:`~wfcommons.wfbench.translator.nextflow.NextflowTranslator` class includes functionality to automatically generate a Slurm script template for running the workflow on HPC systems. Parsl +++++ `Parsl `_ is a parallel scripting library for Python. The translator emits a Parsl workflow suitable for local or distributed execution:: import pathlib from wfcommons import BlastRecipe from wfcommons.wfbench import WorkflowBenchmark, ParslTranslator benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=200) benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6) translator = ParslTranslator(benchmark.workflow) translator.translate(output_folder=pathlib.Path("./parsl-wf/")) Pegasus +++++++ `Pegasus `_ orchestrates complex scientific workflows on clusters, grids, and clouds by mapping tasks onto distributed resources:: import pathlib from wfcommons import BlastRecipe from wfcommons.wfbench import WorkflowBenchmark, PegasusTranslator benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=500) benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=0.6) translator = PegasusTranslator(benchmark.workflow) translator.translate(output_folder=pathlib.Path("./pegasus-wf/")) .. warning:: Pegasus uses `HTCondor `_ to orchestrate tasks. By default, HTCondor does not implement CPU affinity for program threads. To enable CPU affinity, specify :code:`lock_files_folder` when using :meth:`~wfcommons.wfbench.bench.WorkflowBenchmark.create_benchmark`. PyCOMPSs ++++++++ `PyCOMPSs `_ is a programming model and runtime for parallel Python applications on distributed infrastructures:: import pathlib from wfcommons import CyclesRecipe from wfcommons.wfbench import WorkflowBenchmark, PyCompssTranslator benchmark = WorkflowBenchmark(recipe=CyclesRecipe, num_tasks=200) benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=500, data=1000, percent_cpu=0.8) translator = PyCompssTranslator(benchmark.workflow) translator.translate(output_folder=pathlib.Path("./pycompss-wf/")) Swift/T +++++++ `Swift/T `_ is a workflow system for HPC environments, designed to scale to large task graphs:: import pathlib from wfcommons import BlastRecipe from wfcommons.wfbench import WorkflowBenchmark, SwiftTTranslator benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=500) benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=1.0) translator = SwiftTTranslator(benchmark.workflow) translator.translate(output_folder=pathlib.Path("./swift-t-wf/")) TaskVine ++++++++ `TaskVine `_ is a task scheduler for data-intensive dynamic workflows across HPC clusters, GPU clusters, and clouds:: import pathlib from wfcommons import BlastRecipe from wfcommons.wfbench import WorkflowBenchmark, TaskVineTranslator benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=500) benchmark.create_benchmark(save_dir=pathlib.Path("/tmp/"), cpu_work=100, data=10, percent_cpu=1.0) translator = TaskVineTranslator(benchmark.workflow) translator.translate(output_folder=pathlib.Path("./taskvine-wf/")) WfBench will generate a folder containing the TaskVine workflow :code:`taskvine_workflow.py`, workflow input data (:code:`./taskvine-wf/data/`), workflow binaries (:code:`./taskvine-wf/bin/`), and the Poncho package specification (:code:`./taskvine-wf/taskvine_poncho.json`). .. warning:: This TaskVine workflow requires :code:`stress-ng` to be installed and accessible in the system's :code:`$PATH` where the manager runs.