WfInstances: Workflow Instances
Workflow execution instances have been widely used to profile and characterize workflow executions, and to build distributions of workflow execution behaviors, which are used to evaluate methods and techniques in simulation or in real conditions.
The WfCommons project targets the analysis of actual workflow execution instances (i.e., the workflow execution profile data and characterizations) in order to build Workflow Recipes of workflow applications. These recipes contain the necessary information for generating synthetic, yet realistic, workflow instances that resemble the structure and distribution of the original workflow executions.
A list of workflow execution instances that are compatible with WfFormat is kept constantly updated in our project GitHub.
WfInstances
A workflow execution instance represents an actual execution of a scientific
workflow on a distributed platform (e.g., clouds, grids, HPC, etc.). In the
WfCommons project, an instance is represented in a JSON file following the
schema described in WfFormat. This Python package
provides an instance loader tool for importing workflow execution instances
for analysis. For instance, the code snippet below shows how an instance can
be loaded using the Instance class:
import pathlib
from wfcommons import Instance
input_instance = pathlib.Path('/path/to/instance/file.json')
instance = Instance(input_instance=input_instance)
The Instance class provides a number of
methods for interacting with the workflow instance, including:
draw(): produces an image or a pdf file representing the instance.leaves(): gets the leaves of the workflow (i.e., the tasks without any successors).roots(): gets the roots of the workflow (i.e., the tasks without any predecessors).write_dot(): writes a dot file of the instance.
Note
Although the analysis methods are inherently used by WfCommons (specifically WfChef) for WfChef: Workflows Recipes, they can also be used in a standalone manner.
Parsing Workflow Execution Logs
The most common way for obtaining workflow instances from actual workflow executions is to parse execution logs. As part of the WfCommons project, we are constantly developing parsers for commonly used workflow management systems. The parsers provided in this Python package automatically scans execution logs to produce instances using WfFormat.
Each parser class is derived from the abstract
LogsParser class. Thus, each
parser provides a
build_workflow()
method.
Supported log parsers
SnakemakeLogsParserROCrateLogsParserTaskVineLogsParser
Examples
Below we give basic examples for using the log parsers. Review the specific documentation of each log parser for more details, such as additional parameters that configure the behavior of the log parser.
Makeflow
Makeflow is a workflow system for
executing large complex workflows on clusters, clouds, and grids. The Makeflow
language is similar to traditional “Make”, so if you can write a Makefile, then you
can write a Makeflow. A workflow can be just a few commands chained together, or
it can be a complex application consisting of thousands of tasks. It can have an
arbitrary DAG structure and is not limited to specific patterns. Makeflow is used
on a daily basis to execute complex scientific applications in fields such as data
mining, high energy physics, image processing, and bioinformatics. It has run on
campus clusters, the Open Science Grid, NSF XSEDE machines, NCSA Blue Waters, and
Amazon Web Services. Makeflow logs provide time-stamped event instances from these
executions. The following example shows the analysis of Makeflow execution logs,
stored in a local folder (execution_dir), for a workflow execution using the
MakeflowLogsParser class:
import pathlib
from wfcommons.wfinstances import MakeflowLogsParser
# creating the parser for the Makeflow workflow
execution_dir = pathlib.Path('/path/to/makeflow/execution/dir/blast/chameleon-small-001/')
resource_monitor_logs_dir = pathlib.Path('/path/to/makeflow/resource/monitor/logs/dir')
parser = MakeflowLogsParser(execution_dir=execution_dir,
resource_monitor_logs_dir=resource_monitor_logs_dir)
# generating the workflow instance object
workflow = parser.build_workflow('makeflow-workflow-test')
# writing the workflow instance to a JSON file
workflow_path = pathlib.Path('./makeflow-workflow.json')
workflow.write_json(workflow_path)
Note
The MakeflowLogsParser class requires
that Makeflow workflows to run with the
Resource Monitor
tool (e.g., execute the workflow using the --monitor=logs).
Nextflow
Nextflow is a reactive workflow framework and a programming DSL
that eases the writing of data-intensive computational pipelines. It is designed around
the idea that the Linux platform is the lingua franca of data science. Linux provides
many simple but powerful command-line and scripting tools that, when chained together,
facilitate complex data manipulations. Nextflow extends this approach, adding the ability
to define complex program interactions and a high-level parallel computational environment
based on the dataflow programming model. The following example shows the analysis of
Nextflow execution logs, stored in a local folder (execution_dir), for a workflow
execution using the NextflowLogsParser class:
import pathlib
from wfcommons.wfinstances import NextflowLogsParser
# creating the parser for the Nextflow workflow
execution_dir = pathlib.Path('/path/to/nextflow/execution/dir/')
parser = NextflowLogsParser(execution_dir=execution_dir)
# generating the workflow instance object
workflow = parser.build_workflow('nextflow-workflow-test')
# writing the workflow instance to a JSON file
workflow_path = pathlib.Path('./nextflow-workflow.json')
workflow.write_json(workflow_path)
Note
The NextflowLogsParser class assumes
that workflow executions will produce an execution_report_*.html and an
execution_timeline_*.html files.
Pegasus
Pegasus is being used in production to execute workflows
for dozens of high-profile applications in a wide range of scientific domains. Pegasus
provides the necessary abstractions for scientists to create workflows and allows for
transparent execution of these workflows on a range of compute platforms including
clusters, clouds, and national cyberinfrastructures. Workflow execution with Pegasus
includes data management, monitoring, and failure handling, and is managed by HTCondor
DAGMan. Individual workflow tasks are managed by a workload management framework,
HTCondor, which supervises task executions on local and remote resources. Pegasus
logs provide time-stamped event instances from these executions. The following example shows
the analysis of Pegasus execution logs, stored in a local folder (submit_dir), for a
workflow execution using the PegasusLogsParser
class:
import pathlib
from wfcommons.wfinstances import PegasusLogsParser
# creating the parser for the Pegasus workflow
submit_dir = pathlib.Path('/path/to/pegasus/submit/dir/seismology/chameleon-100p-001/')
parser = PegasusLogsParser(submit_dir=submit_dir)
# generating the workflow instance object
workflow = parser.build_workflow('pegasus-workflow-test')
# writing the workflow instance to a JSON file
workflow_path = pathlib.Path('./pegasus-workflow.json')
workflow.write_json(workflow_path)
RO-Crate
RO-Crate is a format for packaging research data so as to promote open and reproducible science. The RO-Crate logs parses processes RO-Crate artifacts created by executing workflows using the Streamflow workflow management system. It may work for RO-Crate generated using other systems, but has not been tested. It takes as input the path of the RO-Crate directory, which contains the ro-crate-metadata.json file. It generates workflow instances compatible with WfFormat:
import pathlib
from wfcommons.wfinstances import ROCrateLogsParser
execution_dir = pathlib.Path('/path/to/snakemake/execution/dir/')
parser = ROCrateLogsParser(execution_dir=execution_dir)
workflow = parser.build_workflow('ro-crate-workflow-test')
workflow.write_json(pathlib.Path('./ro-crate-workflow.json'))
Snakemake
Snakemake is a popular and easy-to-use workflow system. The Snakemake logs parser processes execution logs generated by the Snakemake snkmt plugin. It takes as input the path of the directory where all workflow data files reside (input and output of workflow tasks), and the path of the sqlite database created by the snkmt plugin. It generates workflow instances compatible with WfFormat:
import pathlib
from wfcommons.wfinstances import SnakemakeLogsParser
execution_dir = pathlib.Path('/path/to/snakemake/execution/dir/')
snkmt_db = pathlib.Path('/path/to/snkmt/sqlite/databasefile/')
parser = SnakemakeLogsParser(execution_dir=execution_dir, snkmt_db=snkmt_db)
workflow = parser.build_workflow('snakemake-workflow-test')
workflow.write_json(pathlib.Path('./snakemake-workflow.json'))
TaskVine
TaskVine is a task scheduler for data-intensive dynamic workflows. The TaskVine logs parser translates TaskVine execution logs into workflow instances compatible with WfFormat:
import pathlib
from wfcommons.wfinstances import TaskVineLogsParser
execution_dir = pathlib.Path('/path/to/taskvine/execution/dir/')
parser = TaskVineLogsParser(execution_dir=execution_dir)
workflow = parser.build_workflow('taskvine-workflow-test')
workflow.write_json(pathlib.Path('./taskvine-workflow.json'))
The Instance Analyzer
The InstanceAnalyzer class provides
a number of tools for analyzing collection of workflow execution instances. The
goal of the InstanceAnalyzer is to
perform analyzes of one or multiple workflow execution instances, and build
summaries of the analyzes per workflow’ task type prefix.
Warning
Although any workflow execution instance represented as a
Instance object (i.e., compatible with
WfFormat) can be appended to the
InstanceAnalyzer, we strongly
recommend that only instances of a single workflow application type be
appended to an analyzer object. You may though create several analyzer
objects per workflow application.
The append_instance() method
allows you to include instances for analysis. The
build_summary() method
processes all appended instances. The method applies probability distributions fitting
to a series of data to find the best (i.e., minimizes the mean square error)
probability distribution that represents the analyzed data. The method returns
a summary of the analysis of instances in the form of a Python dictionary object in
which keys are task prefixes (provided when invoking the method) and values
describe the best probability distribution fit for tasks’ runtime, and input and
output data file sizes. The code excerpt below shows an example of an analysis
summary showing the best fit probability distribution for runtime of the
individuals tasks (1000Genome workflow):
"individuals": {
"runtime": {
"min": 48.846,
"max": 192.232,
"distribution": {
"name": "skewnorm",
"params": [
11115267.652937062,
-2.9628504044929433e-05,
56.03957070238482
]
}
},
...
}
Workflow analysis summaries are used by WfChef to develop Workflow Recipes, in which themselves are used to generate realistic synthetic workflow instances.
Probability distribution fits can also be plotted by using the
generate_fit_plots() or
generate_all_fit_plots()
methods – plots will be saved as png files.
Examples
The following example shows the analysis of a set of instances, stored in a local folder,
of a Seismology workflow. In this example, we seek for finding the best probability
distribution fitting for task prefixes of the Seismology workflow
(sG1IterDecon, and wrapper_siftSTFByMisfit), and generate all fit
plots (runtime, and input and output files) into the fits folder using
seismology as a prefix for each generated plot:
import pathlib
from wfcommons import Instance, InstanceAnalyzer
# obtaining list of instance files in the folder
INSTANCES_PATH = pathlib.Path('/path/to/some/instance/folder/')
instance_files = [f for f in INSTANCES_PATH.glob('*') if INSTANCES_PATH.joinpath(f).is_file()]
# creating the instance analyzer object
analyzer = InstanceAnalyzer()
# appending instance files to the instance analyzer
for instance_file in instance_files:
instance = Instance(input_instance=INSTANCES_PATH.joinpath(instance_file))
analyzer.append_instance(instance)
# list of workflow task name prefixes to be analyzed in each instance
workflow_tasks = ['sG1IterDecon', 'wrapper_siftSTFByMisfit']
# building the instance summary
instances_summary = analyzer.build_summary(workflow_tasks, include_raw_data=True)
# generating all fit plots (runtime, and input and output files)
analyzer.generate_all_fit_plots(outfile_prefix='fits/seismology')