wfcommons.wfinstances

wfcommons.wfinstances.schema

class wfcommons.wfinstances.schema.SchemaValidator(schema_file_path: Path | None = None, logger: Logger | None = None)

Bases: object

Validate JSON files against WfCommons schema (WfFormat). If schema file path is not provided, it will look for a local copy of the WfFormat schema, and if not available it will fetch the latest schema from the WfFormat schema GitHub repository.

Parameters:
  • schema_file_path (Optional[pathlib.Path]) – JSON schema file path.

  • logger (Optional[Logger]) – The logger where to log information/warning or errors.

_load_schema(schema_file_path: Path | None = None) <module 'json' from '/home/docs/.asdf/installs/python/3.11.10/lib/python3.11/json/__init__.py'>

Load the schema file. If schema file path is not provided, it will look for a local copy of the WfFormat schema, and if not available it will fetch the latest schema from the GitHub repository.

Parameters:

schema_file_path (Optional[pathlib.Path]) – JSON schema file path.

Returns:

The JSON schema.

Return type:

json

_semantic_validation(data: Dict[str, Any])

Validate the semantics of the JSON workflow execution instance.

Parameters:

data (Dict[str, Any]) – Workflow instance in JSON format.

_syntax_validation(data: Dict[str, Any])

Validate the JSON workflow execution instance against the schema.

Parameters:

data (Dict[str, Any]) – Workflow instance in JSON format.

validate_instance(data: Dict[str, Any]) None

Perform syntax validation against the schema, and semantic validation.

Parameters:

data (Dict[str, Any]) – Workflow instance in JSON format.

wfcommons.wfinstances.instance

class wfcommons.wfinstances.instance.Instance(input_instance: Path, schema_file: str | None = None, logger: Logger | None = None)

Bases: object

Representation of one execution of one workflow on a set of machines

Instance(input_instance = 'instance.json')
Parameters:
  • input_instance (pathlib.Path) – The JSON instance.

  • schema_file (Optional[str]) –

    The path to the JSON schema that defines the instance. If no schema file is provided, it will look for a local copy of the WfFormat, and if not available it will fetch the latest schema from the WfFormat schema GitHub repository.

  • logger (Optional[Logger]) – The logger where to log information/warning or errors.

draw(output_path: Path | None = None, extension: str | None = 'pdf') None

Produce an image or a pdf file representing the instance.

Parameters:
  • output_path (Optional[pathlib.Path]) – Name of the output file.

  • extension (Optional[str]) – Type of the file extension (pdf, png, or svg).

leaves() List[str]

Get the leaves of the workflow (i.e., the tasks without any successors).

Returns:

List of leaves

Return type:

List[str]

roots() List[str]

Get the roots of the workflow (i.e., the tasks without any predecessors).

Returns:

List of roots

Return type:

List[str]

write_dot(output_path: Path | None = None) None

Write a dot file of the instance.

Parameters:

output_path (Optional[pathlib.Path]) – The output dot file name (optional).

wfcommons.wfinstances.instance_analyzer

class wfcommons.wfinstances.instance_analyzer.InstanceAnalyzer(logger: Logger | None = None)

Bases: object

Set of tools for analyzing collections of instances.

Parameters:

logger (Optional[Logger]) – The logger where to log information/warning or errors (optional).

append_instance(instance: Instance) None

Append a workflow instance object to the instance analyzer.

instance = Instance(input_instance = 'instance.json', schema = 'schema.json')
instance_analyzer = InstanceAnalyzer()
instance_analyzer.append_instance(instance)
Parameters:

instance (Instance) – A workflow instance object.

build_summary(tasks_list: List[str], include_raw_data: bool | None = True) Dict[str, Dict[str, Any]]

Analyzes appended instances and produce a summary of the analysis per task prefix.

workflow_tasks = ['sG1IterDecon', 'wrapper_siftSTFByMisfit']
instances_summary = instance_analyzer.build_summary(workflow_tasks, include_raw_data=False)
Parameters:
  • tasks_list (List[str]) – List of workflow tasks prefix (e.g., mProject, sol2sanger, add_replace)

  • include_raw_data (Optional[bool]) – Whether to include the raw data in the instance summary.

Returns:

A summary of the analysis of instances in the form of a dictionary in which keys are task prefixes.

Return type:

Dict[str, Dict[str, Any]]

generate_all_fit_plots(outfile_prefix: str | None = None) None

Produce fit plots as images for each entry of the summary analysis. For entries in which there are no distribution (i.e., constant value), no plot will be generated.

Parameters:

outfile_prefix (Optional[str]) – Prefix to be attached to each generated plot file name (optional).

generate_fit_plots(instance_element: InstanceElement, outfile_prefix: str | None = None) None

Produce fit plots as images for each entry of an instance element generated by the summary analysis. For entries in which there are no distribution (i.e., constant value), no plot will be generated.

Parameters:
  • instance_element (InstanceElement) – Workflow element for which the fit plots will be generated.

  • outfile_prefix (Optional[str]) – Prefix to be attached to each generated plot file name (optional).

class wfcommons.wfinstances.instance_analyzer.InstanceElement(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: NoValue

INPUT = ('input', 'Input File Size (bytes)')
OUTPUT = ('output', 'Input File Size (bytes)')
RUNTIME = ('runtime', 'Runtime (s)')
wfcommons.wfinstances.instance_analyzer._append_file_to_dict(extension: str, dict_obj: Dict[str, Any], file_size: int) None

Add a file size to a file type extension dictionary.

Parameters:
  • extension (str) – File type extension.

  • dict_obj (Dict[str, Any]) – Dictionary of file type extensions.

  • file_size (int) – File size in bytes.

wfcommons.wfinstances.instance_analyzer._best_fit_distribution_for_file(dict_obj, include_raw_data) None

Find the best fit distribution for a file.

Parameters:
  • dict_obj (Dict[str, Any]) – Dictionary of file type extensions.

  • include_raw_data (bool)

wfcommons.wfinstances.instance_analyzer._generate_fit_plots(el: Dict, title: str, xlabel: str, outfile: str, font_size: int | None = None, logger: Logger | None = None) None

Produce a fit plot as an image for an entry of an instance element generated by the summary analysis.

Parameters:
  • el (Dict) – Entry of an instance element generated by the summary analysis.

  • title (str) – Plot title.

  • xlabel (str) – X-axis label.

  • outfile (Optional[int]) – Plot file name.

  • font_size – Size of the font.

  • logger (Logger) – The logger where to log information/warning or errors.

wfcommons.wfinstances.instance_analyzer._json_format_distribution_fit(dist_tuple: Tuple) Dict[str, Any]

Format the best fit distribution data into a dictionary

Parameters:

dist_tuple (Tuple) – Tuple containing best fit distribution name and parameters.

Returns:

Return type:

Dict[str, Any]

wfcommons.wfinstances.logs.abstract_logs_parser

class wfcommons.wfinstances.logs.abstract_logs_parser.LogsParser(wms_name: str, wms_url: str | None = None, description: str | None = None, logger: Logger | None = None)

Bases: ABC

An abstract class of logs parser for creating workflow instances.

Parameters:
  • wms_name (str) – Name of the workflow system.

  • wms_url (Optional[str]) – URL for the workflow system.

  • description (Optional[str]) – Workflow instance description.

  • logger (Optional[Logger]) – The logger where to log information/warning or errors (optional).

_abc_impl = <_abc._abc_data object>
abstract build_workflow(workflow_name: str | None = None) Workflow

Create workflow instance based on the workflow execution logs.

Parameters:

workflow_name (Optional[str]) – The workflow name.

Returns:

A workflow instance object.

Return type:

Workflow

wfcommons.wfinstances.logs.makeflow

class wfcommons.wfinstances.logs.makeflow.MakeflowLogsParser(execution_dir: Path, resource_monitor_logs_dir: Path, description: str | None = None, logger: Logger | None = None)

Bases: LogsParser

Parse Makeflow submit directory to generate workflow instance.

Parameters:
  • execution_dir (pathlib.Path) – Makeflow workflow execution directory (contains .mf and .makeflowlog files).

  • resource_monitor_logs_dir (pathlib.Path) – Resource Monitor log files directory.

  • description (Optional[str]) – Workflow instance description.

  • logger (Optional[Logger]) – The logger where to log information/warning or errors (optional).

_abc_impl = <_abc._abc_data object>
_create_files(files_list: List[str], link: FileLink, task_name: str) List[File]

Create a list of files objects.

Parameters:
  • files_list – list of file names.

  • link – Link type for the files in the list.

  • task_name – Task name.

Rtype files_list:

List[str]

Rtype link:

FileLink

Rtype task_name:

str

Returns:

List of file objects.

Return type:

List[File]

_parse_makeflow_log_file()

Parse the makeflow log file and update workflow task information.

_parse_resource_monitor_logs()

Parse the log files produced by resource monitor

_parse_workflow_file() None

Parse the makeflow workflow file and build the workflow structure.

build_workflow(workflow_name: str | None = None) Workflow

Create workflow instance based on the workflow execution logs.

Parameters:

workflow_name (Optional[str]) – The workflow name.

Returns:

A workflow instance object.

Return type:

Workflow

wfcommons.wfinstances.logs.nextflow

class wfcommons.wfinstances.logs.nextflow.NextflowLogsParser(execution_dir: Path, description: str | None = None, logger: Logger | None = None)

Bases: LogsParser

Parse Nextflow submit directory to generate workflow trace.

Parameters:
  • execution_dir (pathlib.Path) – Nextflow’s execution directory.

  • description (Optional[str]) – Workflow instance description.

  • logger (Optional[Logger]) – The logger where to log information/warning or errors (optional).

_abc_impl = <_abc._abc_data object>
_parse_execution_report_file() None

Parse the Nextflow execution report file and gather the tasks information.

_parse_execution_timeline_file() None

Parse the Nextflow execution timeline file and build the workflow structure.

_read_data(file_format: str) Dict

Read data into a JSON from a file that matches the format.

Parameters:

file_format (str) – File format to be searched

Returns:

Data in JSON format

Return type:

Dict

build_workflow(workflow_name: str | None = None) Workflow

Create workflow trace based on the workflow execution logs.

Parameters:

workflow_name (Optional[str]) – The workflow name.

Returns:

A workflow trace object.

Return type:

Workflow

wfcommons.wfinstances.logs.nextflow._parse_number(number: str)

Format a number.

Parameters:

number (str) – Raw number

Returns:

Formatted number

Return type:

str

wfcommons.wfinstances.logs.nextflow._parse_task_name(task_name: str)

Format the task name.

Parameters:

task_name (str) – Raw task name

Returns:

Formatted task name

Return type:

str

wfcommons.wfinstances.logs.pegasus

class wfcommons.wfinstances.logs.pegasus.PegasusLogsParser(submit_dir: Path, description: str | None = None, ignore_auxiliary: bool | None = True, logger: Logger | None = None)

Bases: LogsParser

Parse Pegasus submit directory to generate workflow instance.

Parameters:
  • submit_dir (pathlib.Path) – Pegasus submit directory.

  • description (Optional[str]) – Workflow instance description.

  • ignore_auxiliary (Optional[bool]) – Ignore auxiliary jobs.

  • logger (Optional[Logger]) – The logger where to log information/warning or errors (optional).

_abc_impl = <_abc._abc_data object>
_fetch_all_files(extension: str, file_name: str | None = '*') List[Path]

Fetch all files from the directory and its hierarchy

Parameters:
  • extension (str) – file extension to be searched for

  • file_name (Optional[str]) – file_name to be searched

Returns:

List of file names that match

Return type:

List[pathlib.Path]

_parse_braindump()

Parse the Pegasus braindump.txt file

_parse_dag()

Parse the DAG file.

_parse_dax()

Parse the DAX file.

_parse_job_output(task)

Parse the kickstart job output file (e.g., .out.000).

Parameters:

task (Task) – Task object.

_parse_job_output_latest(task: Task, output_file_path: Path) None

Parse the kickstart job output file in YAML format (e.g., .out.000).

Parameters:
  • task (Task) – Task object.

  • output_file_path (pathlib.Path) – Output file name.

_parse_job_output_legacy(task: Task, output_file_path: Path) None

Parse the kickstart job output file in XML format (e.g., .out.000).

Parameters:
  • task (Task) – Task object.

  • output_file_path (pathlib.Path) – Output file name.

_parse_meta_file(task_name)

Parse the Pegasus meta file (generated from pegasus-integrity)

Parameters:

task_name (str) – Task file name.

_parse_workflow()

Parse the Workflow file.

build_workflow(workflow_name: str | None = None) Workflow

Create workflow instance based on the workflow execution logs.

Parameters:

workflow_name (Optional[str]) – The workflow name.

Returns:

A workflow instance object.

Return type:

Workflow