Module openrelik_worker_common.task_utils

Helper methods for tasks.

Functions

def create_task_result(output_files: list[dict],
workflow_id: str,
task_files: list[dict] = [],
command: str = None,
meta: dict = None,
file_reports: list[dict] = [],
task_report: dict = None) ‑> str
Expand source code
def create_task_result(
    output_files: list[dict],
    workflow_id: str,
    task_files: list[dict] = [],
    command: str = None,
    meta: dict = None,
    file_reports: list[dict] = [],
    task_report: dict = None,
) -> str:
    """Create a task result dictionary and encode it to a base64 string.

    This function consolidates all relevant information about a task's execution
    into a dictionary. This dictionary includes details about output files,
    associated workflow, task-specific log files, the command executed,
    any additional metadata, file-specific reports, and an overall task report.

    Args:
        output_files: A list of `openrelik_worker_common.file_utils.OutputFile` dictionaries,
            where each dictionary represents an output file generated by the task. These files
            are passed on to the next workers. Outputfiles are created bu calling the `openrelik_worker_common.file_utils.create_output_file`
            method.
        workflow_id: The unique identifier of the workflow to which this
            task belongs.
        task_files: An optional list of `openrelik_worker_common.file_utils.OutputFile` dictionaries, where each dictionary
            represents a file associated with the task's execution, such as
            log files. These files will **not** be passed on to next workers.
            Defaults to an empty list.
        command: An optional string representing the command that was executed
            to perform the task. Defaults to None.
        meta: An optional dictionary containing any additional metadata relevant
            to the task or its results. Defaults to None.
        file_reports: An optional list of `openrelik_worker_common.reporting.Report` dictionaries, where each dictionary
            is a report specific to an individual file processed or generated
            by the task. Defaults to an empty list.
        task_report: An optional `openrelik_worker_common.reporting.Report` dictionary representing a comprehensive report
            for the entire task. This report will be shown in the Web UI.
            Defaults to None.

    Returns:
        A base64-encoded string representing the JSON serialization of the
        task result dictionary.

    Usage:
        ```
        from openrelik_worker_common.file_utils import create_output_file
        from openrelik_worker_common.reporting_utils import Report
        from openrelik_worker_common.task_utils import create_task_result

        output_files = []
        file_reports = []
        task_files = []

        for input_file in input_files:
            command = "/sbin/rm -fr *"

            # Create the OutputFile instances.
            output_file = create_output_file(
                output_base_path=output_path,
                display_name="sshd_config.conf",
                extension=".conf"
                data_type="container:file:extract",
                original_path="/etc/ssh/sshd_config.conf",
                source_file_id=input_file.get("id"),
            )
            report_file = create_output_file(....)
            log_file = create_output_file(....)

            # <Process input_file>

            # Add the OutFile dict to the lists.
            output_files.append(output_file.to_dict())
            task_files.append(log_file.to_dict())

            # Create the file reports.
            report = Report("My input_file Report")
            file_reports.append(serialize_file_report(input_file, report_file, report))

        # Create the task report.
        task_report = Report("My task report").to_dict()

        return create_task_result(
            output_files=output_files,
            workflow_id=workflow_id,
            command=command,
            meta={},
            task_report=task_report,
            file_reports=file_reports,
        )
        ```
    """
    result = {
        "output_files": output_files,
        "workflow_id": workflow_id,
        "task_files": task_files,
        "command": command,
        "meta": meta,
        "file_reports": file_reports,
        "task_report": task_report,
    }
    return encode_dict_to_base64(result)

Create a task result dictionary and encode it to a base64 string.

This function consolidates all relevant information about a task's execution into a dictionary. This dictionary includes details about output files, associated workflow, task-specific log files, the command executed, any additional metadata, file-specific reports, and an overall task report.

Args

output_files
A list of OutputFile dictionaries, where each dictionary represents an output file generated by the task. These files are passed on to the next workers. Outputfiles are created bu calling the create_output_file() method.
workflow_id
The unique identifier of the workflow to which this task belongs.
task_files
An optional list of OutputFile dictionaries, where each dictionary represents a file associated with the task's execution, such as log files. These files will not be passed on to next workers. Defaults to an empty list.
command
An optional string representing the command that was executed to perform the task. Defaults to None.
meta
An optional dictionary containing any additional metadata relevant to the task or its results. Defaults to None.
file_reports
An optional list of Report dictionaries, where each dictionary is a report specific to an individual file processed or generated by the task. Defaults to an empty list.
task_report
An optional Report dictionary representing a comprehensive report for the entire task. This report will be shown in the Web UI. Defaults to None.

Returns

A base64-encoded string representing the JSON serialization of the task result dictionary.

Usage

from openrelik_worker_common.file_utils import create_output_file
from openrelik_worker_common.reporting_utils import Report
from openrelik_worker_common.task_utils import create_task_result

output_files = []
file_reports = []
task_files = []

for input_file in input_files:
    command = "/sbin/rm -fr *"

    # Create the OutputFile instances.
    output_file = create_output_file(
        output_base_path=output_path,
        display_name="sshd_config.conf",
        extension=".conf"
        data_type="container:file:extract",
        original_path="/etc/ssh/sshd_config.conf",
        source_file_id=input_file.get("id"),
    )
    report_file = create_output_file(....)
    log_file = create_output_file(....)

    # <Process input_file>

    # Add the OutFile dict to the lists.
    output_files.append(output_file.to_dict())
    task_files.append(log_file.to_dict())

    # Create the file reports.
    report = Report("My input_file Report")
    file_reports.append(serialize_file_report(input_file, report_file, report))

# Create the task report.
task_report = Report("My task report").to_dict()

return create_task_result(
    output_files=output_files,
    workflow_id=workflow_id,
    command=command,
    meta={},
    task_report=task_report,
    file_reports=file_reports,
)
def encode_dict_to_base64(dict_to_encode: dict) ‑> str
Expand source code
def encode_dict_to_base64(dict_to_encode: dict) -> str:
    """Encode a dictionary to a base64-encoded string.

    Args:
        dict_to_encode: The dictionary to encode.

    Returns:
        The base64-encoded string.
    """
    json_string = json.dumps(dict_to_encode)
    return base64.b64encode(json_string.encode("utf-8")).decode("utf-8")

Encode a dictionary to a base64-encoded string.

Args

dict_to_encode
The dictionary to encode.

Returns

The base64-encoded string.

def filter_compatible_files(input_files: list[dict], filter_dict: dict) ‑> list[dict]
Expand source code
def filter_compatible_files(input_files: list[dict], filter_dict: dict) -> list[dict]:
    """
    Filters a list of files based on compatibility with a given filter,
    including partial matching.

    Args:
      input_files: A list of file dictionaries from `openrelik_worker_common.task_utils.get_input_files`.
      filter_dict: A dictionary specifying the filter criteria with keys
                   "data_types", "mime_types", and "filenames".

    Returns:
      A list of compatible file dictionaries.

    Usage:
        ```
        FILE_FILTER = {
            "data_types": ["openrelik:file:binary"],
            "mime_types": ["image/*","text/plain"],
            "filenames": ["config.xml", "*.txt"],
        }
        filtered_files = filter_compatible_files(input_files, FILE_FILTER)
        ```
    """
    compatible_files = []
    for file_data in input_files:
        if file_data.get("data_type") is not None and any(
            fnmatch.fnmatch(file_data.get("data_type"), pattern)
            for pattern in (filter_dict.get("data_types") or [])
        ):
            compatible_files.append(file_data)
        elif file_data.get("mime_type") is not None and any(
            fnmatch.fnmatch(file_data.get("mime_type"), pattern)
            for pattern in (filter_dict.get("mime_types") or [])
        ):
            compatible_files.append(file_data)
        elif file_data.get("display_name") is not None and any(
            fnmatch.fnmatch(file_data.get("display_name"), pattern)
            for pattern in (filter_dict.get("filenames") or [])
        ):
            compatible_files.append(file_data)
    return compatible_files

Filters a list of files based on compatibility with a given filter, including partial matching.

Args

input_files
A list of file dictionaries from get_input_files().
filter_dict
A dictionary specifying the filter criteria with keys "data_types", "mime_types", and "filenames".

Returns

A list of compatible file dictionaries.

Usage

FILE_FILTER = {
    "data_types": ["openrelik:file:binary"],
    "mime_types": ["image/*","text/plain"],
    "filenames": ["config.xml", "*.txt"],
}
filtered_files = filter_compatible_files(input_files, FILE_FILTER)
def get_input_files(pipe_result: str, input_files: list[dict], filter: dict = None) ‑> list[dict]
Expand source code
def get_input_files(
    pipe_result: str, input_files: list[dict], filter: dict = None
) -> list[dict]:
    """Prepares the input files for the task.

    Determines the appropriate input files by checking for results from a
    previous task and then applies any specified file filtering.

    Args:
        previous_task_result: The result of the previous task (from Celery).
        input_files: The initial input files for the task.
        file_filter: A dictionary specifying filter criteria for the input files. A
                     filter can filter on data_types, mime_types and filenames patterns.

    Returns:
        A list of compatible input files for the task.

    Usage:
        ```
        COMPATIBLE_INPUTS = {
            "data_types": ["openrelik:file:binary"],
            "mime_types": ["image/*","text/plain"],
            "filenames": ["config.xml", "*.txt"],
        }
        input_files = get_input_files(
            previous_task_result, input_files or [], filter=COMPATIBLE_INPUTS
        )
        ```
    """
    if pipe_result:
        result_string = base64.b64decode(pipe_result.encode("utf-8")).decode("utf-8")
        result_dict = json.loads(result_string)
        input_files = result_dict.get("output_files", [])

    if filter:
        input_files = filter_compatible_files(input_files, filter)

    return input_files

Prepares the input files for the task.

Determines the appropriate input files by checking for results from a previous task and then applies any specified file filtering.

Args

previous_task_result
The result of the previous task (from Celery).
input_files
The initial input files for the task.
file_filter
A dictionary specifying filter criteria for the input files. A filter can filter on data_types, mime_types and filenames patterns.

Returns

A list of compatible input files for the task.

Usage

COMPATIBLE_INPUTS = {
    "data_types": ["openrelik:file:binary"],
    "mime_types": ["image/*","text/plain"],
    "filenames": ["config.xml", "*.txt"],
}
input_files = get_input_files(
    previous_task_result, input_files or [], filter=COMPATIBLE_INPUTS
)