Module openrelik_worker_common.archive_utils

Helper methods for archives.

Functions

def extract_archive(input_file: dict,
output_folder: str,
log_file: str,
file_filter: list = [],
archive_password: str | None = None) ‑> tuple[str, str]
Expand source code
def extract_archive(
    input_file: dict, output_folder: str, log_file: str, file_filter: list = [], archive_password: str | None = None
) -> tuple[str, str]:
    """Unpacks an archive.

    Args:
      input_file(dict): Input file dict.
      output_folder(string): OpenRelik output_folder.
      log_file(string): Log file path.
      file_filter(list): List of file patterns to extract (optional).
      archive_password(str | None): Password of the input archives (optional). 

    Return:
      command(string): The executed command string.
      export_folder: Root folder path to the unpacked archive.
    """
    if "path" not in input_file or "display_name" not in input_file:
        raise RuntimeError("input_file parameter malformed")

    input_path = input_file.get("path")
    input_filename = input_file.get("display_name")

    if not shutil.which("7z"):
        raise RuntimeError("7z executable not found!")

    export_folder = os.path.join(output_folder, uuid4().hex)
    os.makedirs(export_folder)

    if input_filename.endswith((".tgz", ".tar.gz")):
        command = [
            "tar",
            "-vxzf",
            input_path,
            "-C",
            f"{export_folder}",
        ]
        if file_filter:
            command.extend(["--recursion", "--no-anchored"])
            for pattern in file_filter:
                command.extend(["--wildcards", pattern.strip()])
    else:
        command = [
            "7z",
            "x",
            input_path,
            f"-o{export_folder}",
        ]
        if archive_password is not None:
            command.append(f"-p{archive_password}")
        if file_filter:
            command.append("-r")
            for pattern in file_filter:
                command.append(pattern.strip())

    command_string = " ".join(command)
    with open(log_file, "wb") as out:
        ret = subprocess.call(command, stdout=out, stderr=out)
    if ret != 0:
        raise RuntimeError("7zip or tar execution error.")

    return (command_string, export_folder)

Unpacks an archive.

Args

input_file(dict): Input file dict. output_folder(string): OpenRelik output_folder. log_file(string): Log file path. file_filter(list): List of file patterns to extract (optional). archive_password(str | None): Password of the input archives (optional).

Return

command(string): The executed command string. export_folder: Root folder path to the unpacked archive.