Module openrelik_worker_common.file_utils
Functions
def build_file_tree(output_path: str,
files: list[OutputFile]) ‑> tempfile.TemporaryDirectory | None-
Expand source code
def build_file_tree( output_path: str, files: list[OutputFile] ) -> tempfile.TemporaryDirectory | None: """Creates the original file tree structure from a list of OutputFiles. Args: output_path: Path to the OpenRelik output directory. files: A list of OutPutFile instances. Returns: The root path of the file tree as a TemporaryDirectory or None. """ if not files or not all(isinstance(file, OutputFile) for file in files): return None tree_root = tempfile.TemporaryDirectory(dir=output_path) for file in files: normalized_path = os.path.normpath(file.original_path) original_filename = Path(normalized_path).name original_folder = Path(normalized_path).parent relative_original_folder = get_relative_path(original_folder) # Create complete folder structure. try: tmp_full_path = os.path.join(tree_root.name, relative_original_folder) # Ensure that the constructed path is within the system's temporary # directory, preventing attempts to write files outside of it. if tree_root.name not in tmp_full_path: raise PermissionError( f"Folder {tmp_full_path} not in OpenRelik output_path: {output_path}" ) os.makedirs(tmp_full_path) except FileExistsError: pass # Create hardlink to file. os.link( file.path, os.path.join(tree_root.name, relative_original_folder, original_filename), ) return tree_root
Creates the original file tree structure from a list of OutputFiles.
Args
output_path
- Path to the OpenRelik output directory.
files
- A list of OutPutFile instances.
Returns
The root path of the file tree as a TemporaryDirectory or None.
def count_file_lines(file_path: str) ‑> int
-
Expand source code
def count_file_lines(file_path: str) -> int: """Count the number of lines in a file. Args: file_path: The path to the file. Returns: The number of lines in the file. """ wc = subprocess.check_output(["wc", "-l", file_path]) return int(wc.decode("utf-8").split()[0])
Count the number of lines in a file.
Args
file_path
- The path to the file.
Returns
The number of lines in the file.
def create_output_file(output_base_path: str,
display_name: str | None = None,
extension: str | None = None,
data_type: str | None = None,
original_path: str | None = None,
source_file_id: OutputFile | None = None) ‑> OutputFile-
Expand source code
def create_output_file( output_base_path: str, display_name: Optional[str] = None, extension: Optional[str] = None, data_type: Optional[str] = None, original_path: Optional[str] = None, source_file_id: Optional[OutputFile] = None, ) -> OutputFile: """Creates and returns an OutputFile object. Args: output_base_path: The path to the output directory. display_name: The name of the output file (optional). extension: File extension (optional). data_type: The data type of the output file (optional). original_path: The orignal path of the file (optional). source_file_id: The OutputFile this file belongs to (optional). Returns: An OutputFile object. """ # Create a new UUID for the file to use as filename on disk. uuid = uuid4().hex # If display_name is missing, set the file's UUID as display_name. display_name = display_name if display_name else uuid # Allow for an explicit extension to be set. if extension: extension = extension.lstrip(".") display_name = f"{display_name}.{extension}" # Extract extension from filename if present _, extracted_extension = os.path.splitext(display_name) # Construct the full output path. output_filename = f"{uuid}{extracted_extension}" output_path = os.path.join(output_base_path, output_filename) return OutputFile( uuid=uuid, output_path=output_path, display_name=display_name, extension=extracted_extension, data_type=data_type, original_path=original_path, source_file_id=source_file_id, )
Creates and returns an OutputFile object.
Args
output_base_path
- The path to the output directory.
display_name
- The name of the output file (optional).
extension
- File extension (optional).
data_type
- The data type of the output file (optional).
original_path
- The orignal path of the file (optional).
source_file_id
- The OutputFile this file belongs to (optional).
Returns
An OutputFile object.
def delete_file_tree(root_path: tempfile.TemporaryDirectory) ‑> None
-
Expand source code
def delete_file_tree(root_path: tempfile.TemporaryDirectory) -> None: """Delete a temporary file tree folder structure. Args: root_path: TemporaryDirectory root object of file tree structure. Returns: None Raises: TypeError """ if not isinstance(root_path, tempfile.TemporaryDirectory): raise TypeError("Root path is not a TemporaryDirectory object!") root_path.cleanup()
Delete a temporary file tree folder structure.
Args
root_path
- TemporaryDirectory root object of file tree structure.
Returns: None Raises: TypeError
def get_relative_path(path: str) ‑> str
-
Expand source code
def get_relative_path(path: str) -> str: """Converts a full path to relative path without the root. Args: path: A full path. Returns: A relative path without the root. """ path = PurePath(path) return str(path.relative_to(path.anchor))
Converts a full path to relative path without the root.
Args
path
- A full path.
Returns
A relative path without the root.
def is_disk_image(inputfile: dict) ‑> bool
-
Expand source code
def is_disk_image(inputfile: dict) -> bool: """Check if inputfile is a disk image. Args: inputfile: InputFile structure. Returns: bool Raises: RuntimeError """ disk_image_extensions = [".img", ".raw", ".dd", ".qcow3", ".qcow2", ".qcow"] if "display_name" not in inputfile: raise RuntimeError("inputfile parameter malformed, no display_name found") input_filename = str(inputfile.get("display_name")) _, file_extension = os.path.splitext(input_filename) if file_extension.lower() in disk_image_extensions: return True return False
Check if inputfile is a disk image.
Args
inputfile
- InputFile structure.
Returns: bool Raises: RuntimeError
Classes
class OutputFile (uuid: str,
output_path: str,
display_name: str,
extension: str | None = None,
data_type: str | None = None,
original_path: str | None = None,
source_file_id: int | None = None)-
Expand source code
class OutputFile: """Represents an output file. Attributes: uuid: Unique identifier for the file. display_name: Display name for the file. extension: Extension of the file. data_type: Data type of the file. path: The full path to the file. original_path: The full original path to the file. source_file_id: The OutputFile this file belongs to. """ def __init__( self, uuid: str, output_path: str, display_name: str, extension: Optional[str] = None, data_type: Optional[str] = None, original_path: Optional[str] = None, source_file_id: Optional[int] = None, ): """Initialize an OutputFile object. Args: uuid: Unique identifier (uuid4) for the file. output_path: The path to the output directory. display_name: The name of the output file. extension: File extension (optional). data_type: The data type of the output file (optional). orignal_path: The orignal path of the file (optional). source_file_id: The OutputFile this file belongs to (optional). """ self.uuid = uuid self.display_name = display_name self.extension = extension self.data_type = data_type self.path = output_path self.original_path = original_path self.source_file_id = source_file_id def to_dict(self) -> dict: """ Return a dictionary representation of the OutputFile object. This is what the mediator server gets and uses to create a File in the database. Returns: A dictionary containing the attributes of the OutputFile object. """ return { "uuid": self.uuid, "display_name": self.display_name, "extension": self.extension, "data_type": self.data_type, "path": self.path, "original_path": self.original_path, "source_file_id": self.source_file_id, }
Represents an output file.
Attributes
uuid
- Unique identifier for the file.
display_name
- Display name for the file.
extension
- Extension of the file.
data_type
- Data type of the file.
path
- The full path to the file.
original_path
- The full original path to the file.
source_file_id
- The OutputFile this file belongs to.
Initialize an OutputFile object.
Args
uuid
- Unique identifier (uuid4) for the file.
output_path
- The path to the output directory.
display_name
- The name of the output file.
extension
- File extension (optional).
data_type
- The data type of the output file (optional).
orignal_path
- The orignal path of the file (optional).
source_file_id
- The OutputFile this file belongs to (optional).
Methods
def to_dict(self) ‑> dict
-
Expand source code
def to_dict(self) -> dict: """ Return a dictionary representation of the OutputFile object. This is what the mediator server gets and uses to create a File in the database. Returns: A dictionary containing the attributes of the OutputFile object. """ return { "uuid": self.uuid, "display_name": self.display_name, "extension": self.extension, "data_type": self.data_type, "path": self.path, "original_path": self.original_path, "source_file_id": self.source_file_id, }
Return a dictionary representation of the OutputFile object. This is what the mediator server gets and uses to create a File in the database.
Returns
A dictionary containing the attributes of the OutputFile object.