Module openrelik_worker_common.file_utils

Functions

def build_file_tree(output_path: str,
files: list[OutputFile]) ‑> tempfile.TemporaryDirectory | None
Expand source code
def build_file_tree(
    output_path: str, files: list[OutputFile]
) -> tempfile.TemporaryDirectory | None:
    """Creates the original file tree structure from a list of OutputFiles.

    Args:
        output_path: Path to the OpenRelik output directory.
        files: A list of OutPutFile instances.

    Returns:
        The root path of the file tree as a TemporaryDirectory or None.
    """
    if not files or not all(isinstance(file, OutputFile) for file in files):
        return None

    tree_root = tempfile.TemporaryDirectory(dir=output_path)

    for file in files:
        normalized_path = os.path.normpath(file.original_path)
        original_filename = Path(normalized_path).name
        original_folder = Path(normalized_path).parent
        relative_original_folder = get_relative_path(original_folder)
        # Create complete folder structure.
        try:
            tmp_full_path = os.path.join(tree_root.name, relative_original_folder)

            # Ensure that the constructed path is within the system's temporary
            # directory, preventing attempts to write files outside of it.
            if tree_root.name not in tmp_full_path:
                raise PermissionError(
                    f"Folder {tmp_full_path} not in OpenRelik output_path: {output_path}"
                )

            os.makedirs(tmp_full_path)
        except FileExistsError:
            pass
        # Create hardlink to file.
        os.link(
            file.path,
            os.path.join(tree_root.name, relative_original_folder, original_filename),
        )

    return tree_root

Creates the original file tree structure from a list of OutputFiles.

Args

output_path
Path to the OpenRelik output directory.
files
A list of OutPutFile instances.

Returns

The root path of the file tree as a TemporaryDirectory or None.

def count_file_lines(file_path: str) ‑> int
Expand source code
def count_file_lines(file_path: str) -> int:
    """Count the number of lines in a file.

    Args:
        file_path: The path to the file.

    Returns:
        The number of lines in the file.
    """
    wc = subprocess.check_output(["wc", "-l", file_path])
    return int(wc.decode("utf-8").split()[0])

Count the number of lines in a file.

Args

file_path
The path to the file.

Returns

The number of lines in the file.

def create_output_file(output_base_path: str,
display_name: str | None = None,
extension: str | None = None,
data_type: str | None = None,
original_path: str | None = None,
source_file_id: OutputFile | None = None,
register_in_db: bool = True) ‑> OutputFile
Expand source code
def create_output_file(
    output_base_path: str,
    display_name: Optional[str] = None,
    extension: Optional[str] = None,
    data_type: Optional[str] = None,
    original_path: Optional[str] = None,
    source_file_id: Optional[OutputFile] = None,
    register_in_db: bool = True,
) -> OutputFile:
    """Creates and returns an OutputFile object.

    Args:
        output_base_path: The path to the output directory.
        display_name: The name of the output file (optional).
        extension: File extension (optional).
        data_type: The data type of the output file (optional).
        original_path: The orignal path of the file (optional).
        source_file_id: The OutputFile this file belongs to (optional).
        register_in_db: Whether the mediator should create a DB row for
            this file. Defaults to True. Set to False for intermediate
            artifacts not meant for UI/DB tracking.

    Returns:
        An OutputFile object.
    """
    # Create a new UUID for the file to use as filename on disk.
    uuid = uuid4().hex

    # If display_name is missing, set the file's UUID as display_name.
    display_name = display_name if display_name else uuid

    # Allow for an explicit extension to be set.
    if extension:
        extension = extension.lstrip(".")
        display_name = f"{display_name}.{extension}"

    # Extract extension from filename if present
    _, extracted_extension = os.path.splitext(display_name)

    # Construct the full output path.
    output_filename = f"{uuid}{extracted_extension}"
    output_path = os.path.join(output_base_path, output_filename)

    return OutputFile(
        uuid=uuid,
        output_path=output_path,
        display_name=display_name,
        extension=extracted_extension,
        data_type=data_type,
        original_path=original_path,
        source_file_id=source_file_id,
        register_in_db=register_in_db,
    )

Creates and returns an OutputFile object.

Args

output_base_path
The path to the output directory.
display_name
The name of the output file (optional).
extension
File extension (optional).
data_type
The data type of the output file (optional).
original_path
The orignal path of the file (optional).
source_file_id
The OutputFile this file belongs to (optional).
register_in_db
Whether the mediator should create a DB row for this file. Defaults to True. Set to False for intermediate artifacts not meant for UI/DB tracking.

Returns

An OutputFile object.

def delete_file_tree(root_path: tempfile.TemporaryDirectory) ‑> None
Expand source code
def delete_file_tree(root_path: tempfile.TemporaryDirectory) -> None:
    """Delete a temporary file tree folder structure.

    Args:
        root_path: TemporaryDirectory root object of file tree structure.

    Returns: None
    Raises: TypeError
    """
    if not isinstance(root_path, tempfile.TemporaryDirectory):
        raise TypeError("Root path is not a TemporaryDirectory object!")

    root_path.cleanup()

Delete a temporary file tree folder structure.

Args

root_path
TemporaryDirectory root object of file tree structure.

Returns: None Raises: TypeError

def get_relative_path(path: str) ‑> str
Expand source code
def get_relative_path(path: str) -> str:
    """Converts a full path to relative path without the root.

    Args:
        path: A full path.

    Returns:
        A relative path without the root.
    """
    path = PurePath(path)
    return str(path.relative_to(path.anchor))

Converts a full path to relative path without the root.

Args

path
A full path.

Returns

A relative path without the root.

def is_disk_image(inputfile: dict) ‑> bool
Expand source code
def is_disk_image(inputfile: dict) -> bool:
    """Check if inputfile is a disk image.

    Args:
        inputfile: InputFile structure.

    Returns: bool
    Raises: RuntimeError
    """
    disk_image_extensions = [".img", ".raw", ".dd", ".qcow3", ".qcow2", ".qcow"]

    if "display_name" not in inputfile:
        raise RuntimeError("inputfile parameter malformed, no display_name found")

    input_filename = str(inputfile.get("display_name"))

    _, file_extension = os.path.splitext(input_filename)

    if file_extension.lower() in disk_image_extensions:
        return True

    return False

Check if inputfile is a disk image.

Args

inputfile
InputFile structure.

Returns: bool Raises: RuntimeError

Classes

class OutputFile (uuid: str,
output_path: str,
display_name: str,
extension: str | None = None,
data_type: str | None = None,
original_path: str | None = None,
source_file_id: int | None = None,
register_in_db: bool = True)
Expand source code
class OutputFile:
    """Represents an output file.

    Attributes:
        uuid: Unique identifier for the file.
        display_name: Display name for the file.
        extension: Extension of the file.
        data_type: Data type of the file.
        path: The full path to the file.
        original_path: The full original path to the file.
        source_file_id: The OutputFile this file belongs to.
        register_in_db: When False, the mediator skips DB registration, hashing,
            and file reports for this file. Use for intermediate artifacts that
            downstream tasks consume but that shouldn't appear in the UI or DB.
    """

    def __init__(
        self,
        uuid: str,
        output_path: str,
        display_name: str,
        extension: Optional[str] = None,
        data_type: Optional[str] = None,
        original_path: Optional[str] = None,
        source_file_id: Optional[int] = None,
        register_in_db: bool = True,
    ):
        """Initialize an OutputFile object.

        Args:
            uuid: Unique identifier (uuid4) for the file.
            output_path: The path to the output directory.
            display_name: The name of the output file.
            extension: File extension (optional).
            data_type: The data type of the output file (optional).
            orignal_path: The orignal path of the file (optional).
            source_file_id: The OutputFile this file belongs to (optional).
            register_in_db: Whether the mediator should create a DB row for
                this file. Defaults to True.
        """
        self.uuid = uuid
        self.display_name = display_name
        self.extension = extension
        self.data_type = data_type
        self.path = output_path
        self.original_path = original_path
        self.source_file_id = source_file_id
        self.register_in_db = register_in_db

    def to_dict(self) -> dict:
        """
        Return a dictionary representation of the OutputFile object.
        This is what the mediator server gets and uses to create a File in the database.

        Returns:
            A dictionary containing the attributes of the OutputFile object.
        """
        return {
            "uuid": self.uuid,
            "display_name": self.display_name,
            "extension": self.extension,
            "data_type": self.data_type,
            "path": self.path,
            "original_path": self.original_path,
            "source_file_id": self.source_file_id,
            "register_in_db": self.register_in_db,
        }

Represents an output file.

Attributes

uuid
Unique identifier for the file.
display_name
Display name for the file.
extension
Extension of the file.
data_type
Data type of the file.
path
The full path to the file.
original_path
The full original path to the file.
source_file_id
The OutputFile this file belongs to.
register_in_db
When False, the mediator skips DB registration, hashing, and file reports for this file. Use for intermediate artifacts that downstream tasks consume but that shouldn't appear in the UI or DB.

Initialize an OutputFile object.

Args

uuid
Unique identifier (uuid4) for the file.
output_path
The path to the output directory.
display_name
The name of the output file.
extension
File extension (optional).
data_type
The data type of the output file (optional).
orignal_path
The orignal path of the file (optional).
source_file_id
The OutputFile this file belongs to (optional).
register_in_db
Whether the mediator should create a DB row for this file. Defaults to True.

Methods

def to_dict(self) ‑> dict
Expand source code
def to_dict(self) -> dict:
    """
    Return a dictionary representation of the OutputFile object.
    This is what the mediator server gets and uses to create a File in the database.

    Returns:
        A dictionary containing the attributes of the OutputFile object.
    """
    return {
        "uuid": self.uuid,
        "display_name": self.display_name,
        "extension": self.extension,
        "data_type": self.data_type,
        "path": self.path,
        "original_path": self.original_path,
        "source_file_id": self.source_file_id,
        "register_in_db": self.register_in_db,
    }

Return a dictionary representation of the OutputFile object. This is what the mediator server gets and uses to create a File in the database.

Returns

A dictionary containing the attributes of the OutputFile object.