Module openrelik_worker_common.file_utils
Functions
def build_file_tree(output_path: str,
files: list[OutputFile]) ‑> tempfile.TemporaryDirectory | None-
Expand source code
def build_file_tree( output_path: str, files: list[OutputFile] ) -> tempfile.TemporaryDirectory | None: """Creates the original file tree structure from a list of OutputFiles. Args: output_path: Path to the OpenRelik output directory. files: A list of OutPutFile instances. Returns: The root path of the file tree as a TemporaryDirectory or None. """ if not files or not all(isinstance(file, OutputFile) for file in files): return None tree_root = tempfile.TemporaryDirectory(dir=output_path) for file in files: normalized_path = os.path.normpath(file.original_path) original_filename = Path(normalized_path).name original_folder = Path(normalized_path).parent relative_original_folder = get_relative_path(original_folder) # Create complete folder structure. try: tmp_full_path = os.path.join(tree_root.name, relative_original_folder) # Ensure that the constructed path is within the system's temporary # directory, preventing attempts to write files outside of it. if tree_root.name not in tmp_full_path: raise PermissionError( f"Folder {tmp_full_path} not in OpenRelik output_path: {output_path}" ) os.makedirs(tmp_full_path) except FileExistsError: pass # Create hardlink to file. os.link( file.path, os.path.join(tree_root.name, relative_original_folder, original_filename), ) return tree_rootCreates the original file tree structure from a list of OutputFiles.
Args
output_path- Path to the OpenRelik output directory.
files- A list of OutPutFile instances.
Returns
The root path of the file tree as a TemporaryDirectory or None.
def count_file_lines(file_path: str) ‑> int-
Expand source code
def count_file_lines(file_path: str) -> int: """Count the number of lines in a file. Args: file_path: The path to the file. Returns: The number of lines in the file. """ wc = subprocess.check_output(["wc", "-l", file_path]) return int(wc.decode("utf-8").split()[0])Count the number of lines in a file.
Args
file_path- The path to the file.
Returns
The number of lines in the file.
def create_output_file(output_base_path: str,
display_name: str | None = None,
extension: str | None = None,
data_type: str | None = None,
original_path: str | None = None,
source_file_id: OutputFile | None = None,
register_in_db: bool = True) ‑> OutputFile-
Expand source code
def create_output_file( output_base_path: str, display_name: Optional[str] = None, extension: Optional[str] = None, data_type: Optional[str] = None, original_path: Optional[str] = None, source_file_id: Optional[OutputFile] = None, register_in_db: bool = True, ) -> OutputFile: """Creates and returns an OutputFile object. Args: output_base_path: The path to the output directory. display_name: The name of the output file (optional). extension: File extension (optional). data_type: The data type of the output file (optional). original_path: The orignal path of the file (optional). source_file_id: The OutputFile this file belongs to (optional). register_in_db: Whether the mediator should create a DB row for this file. Defaults to True. Set to False for intermediate artifacts not meant for UI/DB tracking. Returns: An OutputFile object. """ # Create a new UUID for the file to use as filename on disk. uuid = uuid4().hex # If display_name is missing, set the file's UUID as display_name. display_name = display_name if display_name else uuid # Allow for an explicit extension to be set. if extension: extension = extension.lstrip(".") display_name = f"{display_name}.{extension}" # Extract extension from filename if present _, extracted_extension = os.path.splitext(display_name) # Construct the full output path. output_filename = f"{uuid}{extracted_extension}" output_path = os.path.join(output_base_path, output_filename) return OutputFile( uuid=uuid, output_path=output_path, display_name=display_name, extension=extracted_extension, data_type=data_type, original_path=original_path, source_file_id=source_file_id, register_in_db=register_in_db, )Creates and returns an OutputFile object.
Args
output_base_path- The path to the output directory.
display_name- The name of the output file (optional).
extension- File extension (optional).
data_type- The data type of the output file (optional).
original_path- The orignal path of the file (optional).
source_file_id- The OutputFile this file belongs to (optional).
register_in_db- Whether the mediator should create a DB row for this file. Defaults to True. Set to False for intermediate artifacts not meant for UI/DB tracking.
Returns
An OutputFile object.
def delete_file_tree(root_path: tempfile.TemporaryDirectory) ‑> None-
Expand source code
def delete_file_tree(root_path: tempfile.TemporaryDirectory) -> None: """Delete a temporary file tree folder structure. Args: root_path: TemporaryDirectory root object of file tree structure. Returns: None Raises: TypeError """ if not isinstance(root_path, tempfile.TemporaryDirectory): raise TypeError("Root path is not a TemporaryDirectory object!") root_path.cleanup()Delete a temporary file tree folder structure.
Args
root_path- TemporaryDirectory root object of file tree structure.
Returns: None Raises: TypeError
def get_relative_path(path: str) ‑> str-
Expand source code
def get_relative_path(path: str) -> str: """Converts a full path to relative path without the root. Args: path: A full path. Returns: A relative path without the root. """ path = PurePath(path) return str(path.relative_to(path.anchor))Converts a full path to relative path without the root.
Args
path- A full path.
Returns
A relative path without the root.
def is_disk_image(inputfile: dict) ‑> bool-
Expand source code
def is_disk_image(inputfile: dict) -> bool: """Check if inputfile is a disk image. Args: inputfile: InputFile structure. Returns: bool Raises: RuntimeError """ disk_image_extensions = [".img", ".raw", ".dd", ".qcow3", ".qcow2", ".qcow"] if "display_name" not in inputfile: raise RuntimeError("inputfile parameter malformed, no display_name found") input_filename = str(inputfile.get("display_name")) _, file_extension = os.path.splitext(input_filename) if file_extension.lower() in disk_image_extensions: return True return FalseCheck if inputfile is a disk image.
Args
inputfile- InputFile structure.
Returns: bool Raises: RuntimeError
Classes
class OutputFile (uuid: str,
output_path: str,
display_name: str,
extension: str | None = None,
data_type: str | None = None,
original_path: str | None = None,
source_file_id: int | None = None,
register_in_db: bool = True)-
Expand source code
class OutputFile: """Represents an output file. Attributes: uuid: Unique identifier for the file. display_name: Display name for the file. extension: Extension of the file. data_type: Data type of the file. path: The full path to the file. original_path: The full original path to the file. source_file_id: The OutputFile this file belongs to. register_in_db: When False, the mediator skips DB registration, hashing, and file reports for this file. Use for intermediate artifacts that downstream tasks consume but that shouldn't appear in the UI or DB. """ def __init__( self, uuid: str, output_path: str, display_name: str, extension: Optional[str] = None, data_type: Optional[str] = None, original_path: Optional[str] = None, source_file_id: Optional[int] = None, register_in_db: bool = True, ): """Initialize an OutputFile object. Args: uuid: Unique identifier (uuid4) for the file. output_path: The path to the output directory. display_name: The name of the output file. extension: File extension (optional). data_type: The data type of the output file (optional). orignal_path: The orignal path of the file (optional). source_file_id: The OutputFile this file belongs to (optional). register_in_db: Whether the mediator should create a DB row for this file. Defaults to True. """ self.uuid = uuid self.display_name = display_name self.extension = extension self.data_type = data_type self.path = output_path self.original_path = original_path self.source_file_id = source_file_id self.register_in_db = register_in_db def to_dict(self) -> dict: """ Return a dictionary representation of the OutputFile object. This is what the mediator server gets and uses to create a File in the database. Returns: A dictionary containing the attributes of the OutputFile object. """ return { "uuid": self.uuid, "display_name": self.display_name, "extension": self.extension, "data_type": self.data_type, "path": self.path, "original_path": self.original_path, "source_file_id": self.source_file_id, "register_in_db": self.register_in_db, }Represents an output file.
Attributes
uuid- Unique identifier for the file.
display_name- Display name for the file.
extension- Extension of the file.
data_type- Data type of the file.
path- The full path to the file.
original_path- The full original path to the file.
source_file_id- The OutputFile this file belongs to.
register_in_db- When False, the mediator skips DB registration, hashing, and file reports for this file. Use for intermediate artifacts that downstream tasks consume but that shouldn't appear in the UI or DB.
Initialize an OutputFile object.
Args
uuid- Unique identifier (uuid4) for the file.
output_path- The path to the output directory.
display_name- The name of the output file.
extension- File extension (optional).
data_type- The data type of the output file (optional).
orignal_path- The orignal path of the file (optional).
source_file_id- The OutputFile this file belongs to (optional).
register_in_db- Whether the mediator should create a DB row for this file. Defaults to True.
Methods
def to_dict(self) ‑> dict-
Expand source code
def to_dict(self) -> dict: """ Return a dictionary representation of the OutputFile object. This is what the mediator server gets and uses to create a File in the database. Returns: A dictionary containing the attributes of the OutputFile object. """ return { "uuid": self.uuid, "display_name": self.display_name, "extension": self.extension, "data_type": self.data_type, "path": self.path, "original_path": self.original_path, "source_file_id": self.source_file_id, "register_in_db": self.register_in_db, }Return a dictionary representation of the OutputFile object. This is what the mediator server gets and uses to create a File in the database.
Returns
A dictionary containing the attributes of the OutputFile object.