wfcommons.wfinstances
wfcommons.wfinstances.schema
- class wfcommons.wfinstances.schema.SchemaValidator(schema_file_path: Optional[pathlib.Path] = None, logger: Optional[logging.Logger] = None)
Bases:
object
Validate JSON files against WfCommons schema (WfFormat). If schema file path is not provided, it will look for a local copy of the WfFormat schema, and if not available it will fetch the latest schema from the WfFormat schema GitHub repository.
- Parameters
schema_file_path (Optional[pathlib.Path]) – JSON schema file path.
logger (Optional[Logger]) – The logger where to log information/warning or errors.
- _load_schema(schema_file_path: Optional[pathlib.Path] = None) <module 'json' from '/home/docs/.pyenv/versions/3.7.9/lib/python3.7/json/__init__.py'>
Load the schema file. If schema file path is not provided, it will look for a local copy of the WfFormat schema, and if not available it will fetch the latest schema from the GitHub repository.
- Parameters
schema_file_path (Optional[pathlib.Path]) – JSON schema file path.
- Returns
The JSON schema.
- Return type
json
- _semantic_validation(data: Dict[str, Any])
Validate the semantics of the JSON workflow execution instance.
- Parameters
data (Dict[str, Any]) – Workflow instance in JSON format.
- _syntax_validation(data: Dict[str, Any])
Validate the JSON workflow execution instance against the schema.
- Parameters
data (Dict[str, Any]) – Workflow instance in JSON format.
- validate_instance(data: Dict[str, Any]) None
Perform syntax validation against the schema, and semantic validation.
- Parameters
data (Dict[str, Any]) – Workflow instance in JSON format.
wfcommons.wfinstances.instance
- class wfcommons.wfinstances.instance.Instance(input_instance: pathlib.Path, schema_file: Optional[str] = None, logger: Optional[logging.Logger] = None)
Bases:
object
Representation of one execution of one workflow on a set of machines
Instance(input_instance = 'instance.json')
- Parameters
input_instance (pathlib.Path) – The JSON instance.
schema_file (Optional[str]) –
The path to the JSON schema that defines the instance. If no schema file is provided, it will look for a local copy of the WfFormat, and if not available it will fetch the latest schema from the WfFormat schema GitHub repository.
logger (Optional[Logger]) – The logger where to log information/warning or errors.
- draw(output_path: Optional[pathlib.Path] = None, extension: Optional[str] = 'pdf') None
Produce an image or a pdf file representing the instance.
- Parameters
output_path (Optional[pathlib.Path]) – Name of the output file.
extension (Optional[str]) – Type of the file extension (
pdf
,png
, orsvg
).
- leaves() List[str]
Get the leaves of the workflow (i.e., the tasks without any successors).
- Returns
List of leaves
- Return type
List[str]
- roots() List[str]
Get the roots of the workflow (i.e., the tasks without any predecessors).
- Returns
List of roots
- Return type
List[str]
- write_dot(output_path: Optional[pathlib.Path] = None) None
Write a dot file of the instance.
- Parameters
output_path (Optional[pathlib.Path]) – The output
dot
file name (optional).
wfcommons.wfinstances.instance_analyzer
- class wfcommons.wfinstances.instance_analyzer.InstanceAnalyzer(logger: Optional[logging.Logger] = None)
Bases:
object
Set of tools for analyzing collections of instances.
- Parameters
logger (Optional[Logger]) – The logger where to log information/warning or errors (optional).
- append_instance(instance: wfcommons.wfinstances.instance.Instance) None
Append a workflow instance object to the instance analyzer.
instance = Instance(input_instance = 'instance.json', schema = 'schema.json') instance_analyzer = InstanceAnalyzer() instance_analyzer.append_instance(instance)
- Parameters
instance (Instance) – A workflow instance object.
- build_summary(tasks_list: List[str], include_raw_data: Optional[bool] = True) Dict[str, Dict[str, Any]]
Analyzes appended instances and produce a summary of the analysis per task prefix.
workflow_tasks = ['sG1IterDecon', 'wrapper_siftSTFByMisfit'] instances_summary = instance_analyzer.build_summary(workflow_tasks, include_raw_data=False)
- Parameters
tasks_list (List[str]) – List of workflow tasks prefix (e.g., mProject, sol2sanger, add_replace)
include_raw_data (Optional[bool]) – Whether to include the raw data in the instance summary.
- Returns
A summary of the analysis of instances in the form of a dictionary in which keys are task prefixes.
- Return type
Dict[str, Dict[str, Any]]
- generate_all_fit_plots(outfile_prefix: Optional[str] = None) None
Produce fit plots as images for each entry of the summary analysis. For entries in which there are no distribution (i.e., constant value), no plot will be generated.
- Parameters
outfile_prefix (Optional[str]) – Prefix to be attached to each generated plot file name (optional).
- generate_fit_plots(instance_element: wfcommons.wfinstances.instance_analyzer.InstanceElement, outfile_prefix: Optional[str] = None) None
Produce fit plots as images for each entry of an instance element generated by the summary analysis. For entries in which there are no distribution (i.e., constant value), no plot will be generated.
- Parameters
instance_element (InstanceElement) – Workflow element for which the fit plots will be generated.
outfile_prefix (Optional[str]) – Prefix to be attached to each generated plot file name (optional).
- class wfcommons.wfinstances.instance_analyzer.InstanceElement(value)
Bases:
wfcommons.utils.NoValue
An enumeration.
- INPUT = ('input', 'Input File Size (bytes)')
- OUTPUT = ('output', 'Input File Size (bytes)')
- RUNTIME = ('runtime', 'Runtime (s)')
- wfcommons.wfinstances.instance_analyzer._append_file_to_dict(extension: str, dict_obj: Dict[str, Any], file_size: int) None
Add a file size to a file type extension dictionary.
- Parameters
extension (str) – File type extension.
dict_obj (Dict[str, Any]) – Dictionary of file type extensions.
file_size (int) – File size in bytes.
- wfcommons.wfinstances.instance_analyzer._best_fit_distribution_for_file(dict_obj, include_raw_data) None
Find the best fit distribution for a file.
- Parameters
dict_obj (Dict[str, Any]) – Dictionary of file type extensions.
include_raw_data (bool) –
- wfcommons.wfinstances.instance_analyzer._generate_fit_plots(el: Dict, title: str, xlabel: str, outfile: str, font_size: Optional[int] = None, logger: Optional[logging.Logger] = None) None
Produce a fit plot as an image for an entry of an instance element generated by the summary analysis.
- Parameters
el (Dict) – Entry of an instance element generated by the summary analysis.
title (str) – Plot title.
xlabel (str) – X-axis label.
outfile (Optional[int]) – Plot file name.
font_size – Size of the font.
logger (Logger) – The logger where to log information/warning or errors.
- wfcommons.wfinstances.instance_analyzer._json_format_distribution_fit(dist_tuple: Tuple) Dict[str, Any]
Format the best fit distribution data into a dictionary
- Parameters
dist_tuple (Tuple) – Tuple containing best fit distribution name and parameters.
- Returns
- Return type
Dict[str, Any]
wfcommons.wfinstances.logs.abstract_logs_parser
- class wfcommons.wfinstances.logs.abstract_logs_parser.LogsParser(wms_name: str, wms_url: Optional[str] = None, description: Optional[str] = None, logger: Optional[logging.Logger] = None)
Bases:
abc.ABC
An abstract class of logs parser for creating workflow instances.
- Parameters
wms_name (str) – Name of the workflow system.
wms_url (Optional[str]) – URL for the workflow system.
description (Optional[str]) – Workflow instance description.
logger (Optional[Logger]) – The logger where to log information/warning or errors (optional).
- _abc_impl = <_abc_data object>
- abstract build_workflow(workflow_name: Optional[str] = None) wfcommons.common.workflow.Workflow
Create workflow instance based on the workflow execution logs.
- Parameters
workflow_name (Optional[str]) – The workflow name.
- Returns
A workflow instance object.
- Return type
wfcommons.wfinstances.logs.makeflow
- class wfcommons.wfinstances.logs.makeflow.MakeflowLogsParser(execution_dir: pathlib.Path, resource_monitor_logs_dir: pathlib.Path, description: Optional[str] = None, logger: Optional[logging.Logger] = None)
Bases:
wfcommons.wfinstances.logs.abstract_logs_parser.LogsParser
Parse Makeflow submit directory to generate workflow instance.
- Parameters
execution_dir (pathlib.Path) – Makeflow workflow execution directory (contains .mf and .makeflowlog files).
resource_monitor_logs_dir (pathlib.Path) – Resource Monitor log files directory.
description (Optional[str]) – Workflow instance description.
logger (Optional[Logger]) – The logger where to log information/warning or errors (optional).
- _abc_impl = <_abc_data object>
- _create_files(files_list: List[str], link: wfcommons.common.file.FileLink, task_name: str) List[wfcommons.common.file.File]
Create a list of files objects.
- Parameters
files_list – list of file names.
link – Link type for the files in the list.
task_name – Task name.
- Rtype files_list
List[str]
- Rtype link
FileLink
- Rtype task_name
str
- Returns
List of file objects.
- Return type
List[File]
- _parse_makeflow_log_file()
Parse the makeflow log file and update workflow task information.
- _parse_resource_monitor_logs()
Parse the log files produced by resource monitor
- _parse_workflow_file() None
Parse the makeflow workflow file and build the workflow structure.
- build_workflow(workflow_name: Optional[str] = None) wfcommons.common.workflow.Workflow
Create workflow instance based on the workflow execution logs.
- Parameters
workflow_name (Optional[str]) – The workflow name.
- Returns
A workflow instance object.
- Return type
- description: Optional[str]
- logger: Optional[logging.Logger]
- wms_name: str
- wms_url: Optional[str]
wfcommons.wfinstances.logs.nextflow
- class wfcommons.wfinstances.logs.nextflow.NextflowLogsParser(execution_dir: pathlib.Path, description: Optional[str] = None, logger: Optional[logging.Logger] = None)
Bases:
wfcommons.wfinstances.logs.abstract_logs_parser.LogsParser
Parse Nextflow submit directory to generate workflow trace.
- Parameters
execution_dir (pathlib.Path) – Nextflow’s execution directory.
description (Optional[str]) – Workflow instance description.
logger (Optional[Logger]) – The logger where to log information/warning or errors (optional).
- _abc_impl = <_abc_data object>
- _parse_execution_report_file() None
Parse the Nextflow execution report file and gather the tasks information.
- _parse_execution_timeline_file() None
Parse the Nextflow execution timeline file and build the workflow structure.
- _read_data(file_format: str) Dict
Read data into a JSON from a file that matches the format.
- Parameters
file_format (str) – File format to be searched
- Returns
Data in JSON format
- Return type
Dict
- build_workflow(workflow_name: Optional[str] = None) wfcommons.common.workflow.Workflow
Create workflow trace based on the workflow execution logs.
- Parameters
workflow_name (Optional[str]) – The workflow name.
- Returns
A workflow trace object.
- Return type
- description: Optional[str]
- logger: Optional[logging.Logger]
- wms_name: str
- wms_url: Optional[str]
- wfcommons.wfinstances.logs.nextflow._parse_number(number: str)
Format a number.
- Parameters
number (str) – Raw number
- Returns
Formatted number
- Return type
str
- wfcommons.wfinstances.logs.nextflow._parse_task_name(task_name: str)
Format the task name.
- Parameters
task_name (str) – Raw task name
- Returns
Formatted task name
- Return type
str
wfcommons.wfinstances.logs.pegasus
- class wfcommons.wfinstances.logs.pegasus.PegasusLogsParser(submit_dir: pathlib.Path, description: Optional[str] = None, ignore_auxiliary: Optional[bool] = True, legacy: Optional[bool] = False, logger: Optional[logging.Logger] = None)
Bases:
wfcommons.wfinstances.logs.abstract_logs_parser.LogsParser
Parse Pegasus submit directory to generate workflow instance.
- Parameters
submit_dir (pathlib.Path) – Pegasus submit directory.
description (Optional[str]) – Workflow instance description.
ignore_auxiliary (Optional[bool]) – Ignore auxiliary jobs.
legacy (Optional[bool]) – Whether the submit directory is from a Pegasus 4.x version.
logger (Optional[Logger]) – The logger where to log information/warning or errors (optional).
- _abc_impl = <_abc_data object>
- _fetch_all_files(extension: str, file_name: Optional[str] = '*') List[pathlib.Path]
Fetch all files from the directory and its hierarchy
- Parameters
extension (str) – file extension to be searched for
file_name (Optional[str]) – file_name to be searched
- Returns
List of file names that match
- Return type
List[pathlib.Path]
- _parse_braindump()
Parse the Pegasus braindump.txt file
- _parse_dag()
Parse the DAG file.
- _parse_dax()
Parse the DAX file.
- _parse_job_output(task)
Parse the kickstart job output file (e.g., .out.000).
- Parameters
task (Task) – Task object.
- _parse_job_output_latest(task: wfcommons.common.task.Task, output_file_path: pathlib.Path) None
Parse the kickstart job output file in YAML format (e.g., .out.000).
- Parameters
task (Task) – Task object.
output_file_path (pathlib.Path) – Output file name.
- _parse_job_output_legacy(task: wfcommons.common.task.Task, output_file_path: pathlib.Path) None
Parse the kickstart job output file in XML format (e.g., .out.000).
- Parameters
task (Task) – Task object.
output_file_path (pathlib.Path) – Output file name.
- _parse_meta_file(task_name)
Parse the Pegasus meta file (generated from pegasus-integrity)
- Parameters
task_name (str) – Task file name.
- _parse_workflow()
Parse the Workflow file.
- build_workflow(workflow_name: Optional[str] = None) wfcommons.common.workflow.Workflow
Create workflow instance based on the workflow execution logs.
- Parameters
workflow_name (Optional[str]) – The workflow name.
- Returns
A workflow instance object.
- Return type
- description: Optional[str]
- logger: Optional[logging.Logger]
- wms_name: str
- wms_url: Optional[str]