Module openrelik_worker_common.task_utils
Helper methods for tasks.
Functions
def create_task_result(output_files: list[dict],
workflow_id: str,
task_files: list[dict] = [],
command: str = None,
meta: dict = None,
file_reports: list[dict] = [],
task_report: dict = None) ‑> str-
Expand source code
def create_task_result( output_files: list[dict], workflow_id: str, task_files: list[dict] = [], command: str = None, meta: dict = None, file_reports: list[dict] = [], task_report: dict = None, ) -> str: """Create a task result dictionary and encode it to a base64 string. This function consolidates all relevant information about a task's execution into a dictionary. This dictionary includes details about output files, associated workflow, task-specific log files, the command executed, any additional metadata, file-specific reports, and an overall task report. Args: output_files: A list of `openrelik_worker_common.file_utils.OutputFile` dictionaries, where each dictionary represents an output file generated by the task. These files are passed on to the next workers. Outputfiles are created bu calling the `openrelik_worker_common.file_utils.create_output_file` method. workflow_id: The unique identifier of the workflow to which this task belongs. task_files: An optional list of `openrelik_worker_common.file_utils.OutputFile` dictionaries, where each dictionary represents a file associated with the task's execution, such as log files. These files will **not** be passed on to next workers. Defaults to an empty list. command: An optional string representing the command that was executed to perform the task. Defaults to None. meta: An optional dictionary containing any additional metadata relevant to the task or its results. Defaults to None. file_reports: An optional list of `openrelik_worker_common.reporting.Report` dictionaries, where each dictionary is a report specific to an individual file processed or generated by the task. Defaults to an empty list. task_report: An optional `openrelik_worker_common.reporting.Report` dictionary representing a comprehensive report for the entire task. This report will be shown in the Web UI. Defaults to None. Returns: A base64-encoded string representing the JSON serialization of the task result dictionary. Usage: ``` from openrelik_worker_common.file_utils import create_output_file from openrelik_worker_common.reporting_utils import Report from openrelik_worker_common.task_utils import create_task_result output_files = [] file_reports = [] task_files = [] for input_file in input_files: command = "/sbin/rm -fr *" # Create the OutputFile instances. output_file = create_output_file( output_base_path=output_path, display_name="sshd_config.conf", extension=".conf" data_type="container:file:extract", original_path="/etc/ssh/sshd_config.conf", source_file_id=input_file.get("id"), ) report_file = create_output_file(....) log_file = create_output_file(....) # <Process input_file> # Add the OutFile dict to the lists. output_files.append(output_file.to_dict()) task_files.append(log_file.to_dict()) # Create the file reports. report = Report("My input_file Report") file_reports.append(serialize_file_report(input_file, report_file, report)) # Create the task report. task_report = Report("My task report").to_dict() return create_task_result( output_files=output_files, workflow_id=workflow_id, command=command, meta={}, task_report=task_report, file_reports=file_reports, ) ``` """ result = { "output_files": output_files, "workflow_id": workflow_id, "task_files": task_files, "command": command, "meta": meta, "file_reports": file_reports, "task_report": task_report, } return encode_dict_to_base64(result)
Create a task result dictionary and encode it to a base64 string.
This function consolidates all relevant information about a task's execution into a dictionary. This dictionary includes details about output files, associated workflow, task-specific log files, the command executed, any additional metadata, file-specific reports, and an overall task report.
Args
output_files
- A list of
OutputFile
dictionaries, where each dictionary represents an output file generated by the task. These files are passed on to the next workers. Outputfiles are created bu calling thecreate_output_file()
method. workflow_id
- The unique identifier of the workflow to which this task belongs.
task_files
- An optional list of
OutputFile
dictionaries, where each dictionary represents a file associated with the task's execution, such as log files. These files will not be passed on to next workers. Defaults to an empty list. command
- An optional string representing the command that was executed to perform the task. Defaults to None.
meta
- An optional dictionary containing any additional metadata relevant to the task or its results. Defaults to None.
file_reports
- An optional list of
Report
dictionaries, where each dictionary is a report specific to an individual file processed or generated by the task. Defaults to an empty list. task_report
- An optional
Report
dictionary representing a comprehensive report for the entire task. This report will be shown in the Web UI. Defaults to None.
Returns
A base64-encoded string representing the JSON serialization of the task result dictionary.
Usage
from openrelik_worker_common.file_utils import create_output_file from openrelik_worker_common.reporting_utils import Report from openrelik_worker_common.task_utils import create_task_result output_files = [] file_reports = [] task_files = [] for input_file in input_files: command = "/sbin/rm -fr *" # Create the OutputFile instances. output_file = create_output_file( output_base_path=output_path, display_name="sshd_config.conf", extension=".conf" data_type="container:file:extract", original_path="/etc/ssh/sshd_config.conf", source_file_id=input_file.get("id"), ) report_file = create_output_file(....) log_file = create_output_file(....) # <Process input_file> # Add the OutFile dict to the lists. output_files.append(output_file.to_dict()) task_files.append(log_file.to_dict()) # Create the file reports. report = Report("My input_file Report") file_reports.append(serialize_file_report(input_file, report_file, report)) # Create the task report. task_report = Report("My task report").to_dict() return create_task_result( output_files=output_files, workflow_id=workflow_id, command=command, meta={}, task_report=task_report, file_reports=file_reports, )
def encode_dict_to_base64(dict_to_encode: dict) ‑> str
-
Expand source code
def encode_dict_to_base64(dict_to_encode: dict) -> str: """Encode a dictionary to a base64-encoded string. Args: dict_to_encode: The dictionary to encode. Returns: The base64-encoded string. """ json_string = json.dumps(dict_to_encode) return base64.b64encode(json_string.encode("utf-8")).decode("utf-8")
Encode a dictionary to a base64-encoded string.
Args
dict_to_encode
- The dictionary to encode.
Returns
The base64-encoded string.
def filter_compatible_files(input_files: list[dict], filter_dict: dict) ‑> list[dict]
-
Expand source code
def filter_compatible_files(input_files: list[dict], filter_dict: dict) -> list[dict]: """ Filters a list of files based on compatibility with a given filter, including partial matching. Args: input_files: A list of file dictionaries from `openrelik_worker_common.task_utils.get_input_files`. filter_dict: A dictionary specifying the filter criteria with keys "data_types", "mime_types", and "filenames". Returns: A list of compatible file dictionaries. Usage: ``` FILE_FILTER = { "data_types": ["openrelik:file:binary"], "mime_types": ["image/*","text/plain"], "filenames": ["config.xml", "*.txt"], } filtered_files = filter_compatible_files(input_files, FILE_FILTER) ``` """ compatible_files = [] for file_data in input_files: if file_data.get("data_type") is not None and any( fnmatch.fnmatch(file_data.get("data_type"), pattern) for pattern in (filter_dict.get("data_types") or []) ): compatible_files.append(file_data) elif file_data.get("mime_type") is not None and any( fnmatch.fnmatch(file_data.get("mime_type"), pattern) for pattern in (filter_dict.get("mime_types") or []) ): compatible_files.append(file_data) elif file_data.get("display_name") is not None and any( fnmatch.fnmatch(file_data.get("display_name"), pattern) for pattern in (filter_dict.get("filenames") or []) ): compatible_files.append(file_data) return compatible_files
Filters a list of files based on compatibility with a given filter, including partial matching.
Args
input_files
- A list of file dictionaries from
get_input_files()
. filter_dict
- A dictionary specifying the filter criteria with keys "data_types", "mime_types", and "filenames".
Returns
A list of compatible file dictionaries.
Usage
FILE_FILTER = { "data_types": ["openrelik:file:binary"], "mime_types": ["image/*","text/plain"], "filenames": ["config.xml", "*.txt"], } filtered_files = filter_compatible_files(input_files, FILE_FILTER)
def get_input_files(pipe_result: str, input_files: list[dict], filter: dict = None) ‑> list[dict]
-
Expand source code
def get_input_files( pipe_result: str, input_files: list[dict], filter: dict = None ) -> list[dict]: """Prepares the input files for the task. Determines the appropriate input files by checking for results from a previous task and then applies any specified file filtering. Args: previous_task_result: The result of the previous task (from Celery). input_files: The initial input files for the task. file_filter: A dictionary specifying filter criteria for the input files. A filter can filter on data_types, mime_types and filenames patterns. Returns: A list of compatible input files for the task. Usage: ``` COMPATIBLE_INPUTS = { "data_types": ["openrelik:file:binary"], "mime_types": ["image/*","text/plain"], "filenames": ["config.xml", "*.txt"], } input_files = get_input_files( previous_task_result, input_files or [], filter=COMPATIBLE_INPUTS ) ``` """ if pipe_result: result_string = base64.b64decode(pipe_result.encode("utf-8")).decode("utf-8") result_dict = json.loads(result_string) input_files = result_dict.get("output_files", []) if filter: input_files = filter_compatible_files(input_files, filter) return input_files
Prepares the input files for the task.
Determines the appropriate input files by checking for results from a previous task and then applies any specified file filtering.
Args
previous_task_result
- The result of the previous task (from Celery).
input_files
- The initial input files for the task.
file_filter
- A dictionary specifying filter criteria for the input files. A filter can filter on data_types, mime_types and filenames patterns.
Returns
A list of compatible input files for the task.
Usage
COMPATIBLE_INPUTS = { "data_types": ["openrelik:file:binary"], "mime_types": ["image/*","text/plain"], "filenames": ["config.xml", "*.txt"], } input_files = get_input_files( previous_task_result, input_files or [], filter=COMPATIBLE_INPUTS )