wfcommons.wfinstances

wfcommons.wfinstances.instance

class wfcommons.wfinstances.instance.Instance(input_instance: Path, schema_file: str | None = None, logger: Logger | None = None)

Bases: object

Representation of one execution of one workflow on a set of machines

Instance(input_instance = 'instance.json')
Parameters:
  • input_instance (pathlib.Path) – The JSON instance.

  • schema_file (Optional[str]) – The path to the JSON schema that defines the instance. If no schema file is provided, it will look for a local copy of the WfFormat, and if not available it will fetch the latest schema from the WfFormat schema GitHub repository.

  • logger (Optional[Logger]) – The logger where to log information/warning or errors.

draw(output_path: Path | None = None, extension: str | None = 'pdf') None

Produce an image or a pdf file representing the instance.

Parameters:
  • output_path (Optional[pathlib.Path]) – Name of the output file.

  • extension (Optional[str]) – Type of the file extension (pdf, png, or svg).

leaves() List[str]

Get the leaves of the workflow (i.e., the tasks without any successors).

Returns:

List of leaves

Return type:

List[str]

roots() List[str]

Get the roots of the workflow (i.e., the tasks without any predecessors).

Returns:

List of roots

Return type:

List[str]

write_dot(output_path: Path | None = None) None

Write a dot file of the instance.

Parameters:

output_path (Optional[pathlib.Path]) – The output dot file name (optional).

wfcommons.wfinstances.instance_analyzer

class wfcommons.wfinstances.instance_analyzer.InstanceAnalyzer(logger: Logger | None = None)

Bases: object

Set of tools for analyzing collections of instances.

Parameters:

logger (Optional[Logger]) – The logger where to log information/warning or errors (optional).

append_instance(instance: Instance) None

Append a workflow instance object to the instance analyzer.

instance = Instance(input_instance = 'instance.json', schema = 'schema.json')
instance_analyzer = InstanceAnalyzer()
instance_analyzer.append_instance(instance)
Parameters:

instance (Instance) – A workflow instance object.

build_summary(tasks_list: List[str], include_raw_data: bool | None = True) Dict[str, Dict[str, Any]]

Analyzes appended instances and produce a summary of the analysis per task prefix.

workflow_tasks = ['sG1IterDecon', 'wrapper_siftSTFByMisfit']
instances_summary = instance_analyzer.build_summary(workflow_tasks, include_raw_data=False)
Parameters:
  • tasks_list (List[str]) – List of workflow tasks prefix (e.g., mProject, sol2sanger, add_replace)

  • include_raw_data (Optional[bool]) – Whether to include the raw data in the instance summary.

Returns:

A summary of the analysis of instances in the form of a dictionary in which keys are task prefixes.

Return type:

Dict[str, Dict[str, Any]]

generate_all_fit_plots(outfile_prefix: str | None = None) None

Produce fit plots as images for each entry of the summary analysis. For entries in which there are no distribution (i.e., constant value), no plot will be generated.

Parameters:

outfile_prefix (Optional[str]) – Prefix to be attached to each generated plot file name (optional).

generate_fit_plots(instance_element: InstanceElement, outfile_prefix: str | None = None) None

Produce fit plots as images for each entry of an instance element generated by the summary analysis. For entries in which there are no distribution (i.e., constant value), no plot will be generated.

Parameters:
  • instance_element (InstanceElement) – Workflow element for which the fit plots will be generated.

  • outfile_prefix (Optional[str]) – Prefix to be attached to each generated plot file name (optional).

class wfcommons.wfinstances.instance_analyzer.InstanceElement(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: NoValue

INPUT = ('input', 'Input File Size (bytes)')
OUTPUT = ('output', 'Input File Size (bytes)')
RUNTIME = ('runtime', 'Runtime (s)')

wfcommons.wfinstances.logs.makeflow

class wfcommons.wfinstances.logs.makeflow.MakeflowLogsParser(execution_dir: Path, resource_monitor_logs_dir: Path, description: str | None = None, logger: Logger | None = None)

Bases: LogsParser

Parse Makeflow submit directory to generate workflow instance.

Parameters:
  • execution_dir (pathlib.Path) – Makeflow workflow execution directory (contains .mf and .makeflowlog files).

  • resource_monitor_logs_dir (pathlib.Path) – Resource Monitor log files directory.

  • description (Optional[str]) – Workflow instance description.

  • logger (Optional[Logger]) – The logger where to log information/warning or errors (optional).

build_workflow(workflow_name: str | None = None) Workflow

Create workflow instance based on the workflow execution logs.

Parameters:

workflow_name (Optional[str]) – The workflow name.

Returns:

A workflow instance object.

Return type:

Workflow

description: str | None
execution_dir: Path
logger: Logger | None
mf_file: Path
mf_log_file: Path
resource_monitor_logs_dir: Path
wms_name: str
wms_url: str | None

wfcommons.wfinstances.logs.nextflow

class wfcommons.wfinstances.logs.nextflow.NextflowLogsParser(execution_dir: Path, description: str | None = None, logger: Logger | None = None)

Bases: LogsParser

Parse Nextflow submit directory to generate workflow trace.

Parameters:
  • execution_dir (pathlib.Path) – Nextflow’s execution directory.

  • description (Optional[str]) – Workflow instance description.

  • logger (Optional[Logger]) – The logger where to log information/warning or errors (optional).

build_workflow(workflow_name: str | None = None) Workflow

Create workflow trace based on the workflow execution logs.

Parameters:

workflow_name (Optional[str]) – The workflow name.

Returns:

A workflow trace object.

Return type:

Workflow

description: str | None
logger: Logger | None
wms_name: str
wms_url: str | None

wfcommons.wfinstances.logs.pegasus

class wfcommons.wfinstances.logs.pegasus.PegasusLogsParser(submit_dir: Path, description: str | None = None, ignore_auxiliary: bool | None = True, logger: Logger | None = None)

Bases: LogsParser

Parse Pegasus submit directory to generate workflow instance.

Parameters:
  • submit_dir (pathlib.Path) – Pegasus submit directory.

  • description (Optional[str]) – Workflow instance description.

  • ignore_auxiliary (Optional[bool]) – Ignore auxiliary jobs.

  • logger (Optional[Logger]) – The logger where to log information/warning or errors (optional).

build_workflow(workflow_name: str | None = None) Workflow

Create workflow instance based on the workflow execution logs.

Parameters:

workflow_name (Optional[str]) – The workflow name.

Returns:

A workflow instance object.

Return type:

Workflow

description: str | None
ignore_auxiliary: bool | None
logger: Logger | None
submit_dir: Path
wms_name: str
wms_url: str | None