Module openrelik_worker_common.mount_utils
Classes
class BlockDevice (image_path: str,
min_partition_size: int = 104857600,
max_mountpath_size: int = 500)-
Expand source code
class BlockDevice: """BlockDevice provides functionality to map a disk image file to block devices and mount them. The default minimum partition size that gets mounted is 100MB. NOTE: If running in a container the container needs: * to be privileged (due to mounting) * needs access to /dev/loop* and /dev/nbd* devices * sudo, fdisk, qemu-utils and ntfs-3g packages installed (debian) Usage: ``` try: bd = BlockDevice('/folder/path_to_disk_image.dd', min_partition_size=1) bd.setup() mountpoints = bd.mount() # Do the things you need to do :) except: # Handle your errors here. finally: bd.umount() """ MIN_PARTITION_SIZE_BYTES = 100 * 1024 * 1024 #: Default 100 MB MAX_NBD_DEVICES = 10 #: Default 10 LOCK_TIMEOUT_SECONDS = 6 * 60 * 60 #: Default 6 hours MAX_MOUNTPATH_SIZE = 500 #: Default 500 def __init__( self, image_path: str, min_partition_size: int = MIN_PARTITION_SIZE_BYTES, max_mountpath_size: int = MAX_MOUNTPATH_SIZE, ): """Initialize BlockDevice class instance. Args: image_path (str): path to the image file to map and mount. min_partition_size (int): minimum partition size, default MIN_PARTITION_SIZE_BYTES max_mountpath_size (int): maximum root mount path length, default MAX_MOUNTPATH_SIZE """ self.image_path = image_path self.min_partition_size = min_partition_size self.blkdevice = None self.blkdeviceinfo = None self.partitions = [] self.mountpoints = [] self.mountroot = "/mnt" self.max_mountpath_size = max_mountpath_size self.supported_fstypes = ["dos", "xfs", "ext2", "ext3", "ext4", "ntfs", "vfat"] self.supported_qcowtypes = ["qcow3", "qcow2", "qcow"] self.REDIS_URL = os.getenv("REDIS_URL") or "redis://localhost:6379/0" self.redis_client = None self.redis_lock = None def setup(self): """Setup BlockDevice instance The setup function will check if the required tools are available, setup the relevant block device (loop or nbd) depending on image format and scan the paritions available. """ # Log minimum partitions size logger.info( f"Minimum partition size {self.min_partition_size} Bytes, partitions smaller will be ignored!" ) # Check if image_path exists image_path = pathlib.Path(self.image_path) if not pathlib.Path.exists(image_path): raise RuntimeError(f"image_path does not exist: {self.image_path}") # Check if required tools are available self._required_tools_available() # Setup the block device ext = image_path.suffix.strip(".") if ext.lower() in self.supported_qcowtypes: self.redis_client = redis.Redis.from_url(self.REDIS_URL) self.blkdevice = self._nbdsetup() else: self.blkdevice = self._losetup() # Parse block device info self.blkdeviceinfo = self._blkinfo() # Parse partition information self.partitions = self._parse_partitions() def _losetup(self) -> str: """Map image file to loopback device using losetup. Returns: str: block device created by losetup Raises: RuntimeError: if there was an error running losetup. """ losetup_command = [ "sudo", "losetup", "--find", "--partscan", "--show", "--read-only", self.image_path, ] process = subprocess.run( losetup_command, capture_output=True, check=False, text=True ) if process.returncode == 0: blkdevice = process.stdout.strip() logger.info(f"losetup: success creating {blkdevice} for {self.image_path}") else: logger.error( f"losetup: failed creating blockdevice for {self.image_path}: {process.stderr} {process.stdout}" ) raise RuntimeError(f"Error: {process.stderr} {process.stdout}") return blkdevice def _get_hostname(self): """Return hostname from environment variable NODENAME or OS hostname. Can be used to get the hostname of the node the container is running on or the OS hostname if empty. Returns: str: hostname of node or container. """ hostname = os.environ.get("NODENAME") if not hostname: hostname = socket.gethostname() return hostname def _get_free_nbd_device(self): """Find and lock free NBD device until unlocked or timeout. NOTE: if running this in a container (e.g. Docker or k8s) the NBD device assignment is done in kernel space. This means that the locks need to done on the kernel namespace level and not on the container level. To make sure this works you need to set the environment variable NODENAME on container startup to the name of the host the container runtime engine is running on. For k8s that is the Node and for Docker that is the actual host the docker engine runs on. Returns: str: NBD device name Raises: RuntimeError: if no free nbd device was found. """ hostname = self._get_hostname() for device_number in range(self.MAX_NBD_DEVICES + 1): devname = f"/dev/nbd{device_number}" lock = self.redis_client.lock( name=f"{hostname}-{devname}", timeout=self.LOCK_TIMEOUT_SECONDS, blocking=False, ) if lock.acquire(): self.redis_lock = lock logger.info( f"Redis lock succesfully set: {lock.name} for {hostname}-{devname}" ) return devname raise RuntimeError("Error getting free NBD device: All NBD devices locked!") def _nbdsetup(self): """Map QCOW image file to NBD device using qemu-nbd and probe partitions. Returns: str: block device created by qemu-nbd Raises: RuntimeError: if there was an error running qemu-nbd or fdisk. """ # Get and lock a free nbd device self.blkdevice = self._get_free_nbd_device() nbdsetup_command = [ "sudo", "qemu-nbd", "--read-only", "--connect", self.blkdevice, self.image_path, ] process = subprocess.run( nbdsetup_command, capture_output=True, check=False, text=True ) if process.returncode == 0: logger.info( f"qemu-nbd: success creating {self.blkdevice} for {self.image_path}" ) else: logger.error( f"qemu-nbd: failed creating {self.blkdevice} for {self.image_path}: {process.stderr} {process.stdout}" ) raise RuntimeError( f"Error running qemu-nbd: {process.stderr} {process.stdout}" ) # This sleep is needed for qemu-nbd to activate the nbd device time.sleep(0.2) # Probe partitions with fdisk fdisk_command = [ "sudo", "fdisk", "-l", self.blkdevice.strip(), ] process = subprocess.run( fdisk_command, capture_output=True, check=False, text=True ) if process.returncode == 0: logger.info( f"fdisk: success probing {self.blkdevice} for {self.image_path}" ) else: logger.error( f"fdisk: failed probing {self.blkdevice} for {self.image_path}: {process.stderr} {process.stdout}" ) raise RuntimeError( f"Error fdisk: failed probing: {process.stderr} {process.stdout}" ) return self.blkdevice def _required_tools_available(self) -> bool: """Check if required cli tools are available. Required tools can be installed on Debian by adding apt installing the following packages: * fdisk * qemu-utils * ntfs-3g * sudo Returns: tuple: tuple of return bool and error message """ tools = ["lsblk", "blkid", "mount", "qemu-nbd", "sudo", "fdisk", "ntfsinfo"] missing_tools = [tool for tool in tools if not shutil.which(tool)] if missing_tools: raise RuntimeError( f"Missing required tools: {' '.join(missing_tools)}. Make sure you have the fdisk, qemu-utils and ntfs-3g packages installed!" ) return True def _blkinfo(self) -> dict: """Extract device and partition information using blkinfo. Returns: dict: lsblk json serialized dict Raises: RuntimeError: if there was an error running lsblk or parsing the json output. """ lsblk_command = ["sudo", "lsblk", "-ba", "-J", self.blkdevice] process = subprocess.run( lsblk_command, capture_output=True, check=False, text=True ) if process.returncode == 0: try: lsblk_json_output = process.stdout.strip() blkdeviceinfo = json.loads(lsblk_json_output) except json.JSONDecodeError as e: logger.error(f"Error parsing lsblk json output: {lsblk_json_output}") raise RuntimeError( f"Error parsing lsblk json output: {lsblk_json_output}: {e}" ) else: logger.error( f"Error running lsblk on {self.blkdevice}: {process.stderr} {process.stdout}" ) raise RuntimeError(f"Error lsblk: {process.stderr} {process.stdout}") logger.info(f"Success parsing lsblk info: {blkdeviceinfo}") return blkdeviceinfo def _parse_partitions(self) -> list: """Parse partition information from block device details. Returns: list[str]: a list of partitions """ partitions = [] if "blockdevices" not in self.blkdeviceinfo: raise RuntimeError("_parse_partitions: self.blkdeviceinfo malformed") if len(self.blkdeviceinfo.get("blockdevices")) == 0: logger.warning("_parse_partitions: blkdeviceinfo.blockdevices had 0 length") return partitions bd = self.blkdeviceinfo.get("blockdevices")[0] if "children" not in bd: # No partitions on this disk. return partitions for children in bd.get("children"): partition = f"/dev/{children['name']}" if self._is_important_partition(children): partitions.append(partition) return partitions def _is_important_partition(self, partition: dict): """Decides if we will process a partition. We process the partition if: * > 100Mbyte in size * contains a filesystem type ext*, dos, vfat, xfs, ntfs Args: partition (dict): Partition details from lsblk. Returns: bool: True or False for importance of partition. """ if partition["size"] < self.min_partition_size: logger.info( f"Ignoring partion {partition['name']} as size < {self.min_partition_size}" ) return False fs_type = self._get_fstype(f"/dev/{partition['name']}") if fs_type == "": logger.warning( f"Ignoring partition {partition['name']} as fs type not available!" ) return False if fs_type not in self.supported_fstypes: logger.warning( f"Ignoring partition {partition['name']} as fs type {fs_type} not supported!" ) return False return True def _get_fstype(self, devname: str): """Analyses the file system type of a block device or partition. Args: devname (str): block device or partitions device name. Returns: str: The filesystem type. Raises: RuntimeError: If there was an error running blkid. """ blkid_command = ["sudo", "blkid", "-s", "TYPE", "-o", "value", f"{devname}"] process = subprocess.run( blkid_command, capture_output=True, check=False, text=True ) if process.returncode == 0: return process.stdout.strip() else: logger.error( f"Error running blkid on {devname}: {process.stderr} {process.stdout}" ) raise RuntimeError( f"Error running blkid on {devname}: {process.stderr} {process.stdout}" ) def _select_partitions_to_mount(self, partition_name: str = "") -> list: """Select partitions to mount. Args: partitions_name (str): Name of specific partition to mount. Returns: list: A list of partitions to mount. """ to_mount = [] if partition_name and partition_name not in self.partitions: logger.error( f"Error running mount: partition name {partition_name} not found" ) raise RuntimeError( f"Error running mount: partition name {partition_name} not found" ) if partition_name: # Mount the specific partition requested to_mount.append(partition_name) elif not self.partitions: # No partitions found, mount the whole block device to_mount.append(self.blkdevice) elif self.partitions: # Mount all detected partitions to_mount = self.partitions return to_mount def _get_mount_path(self) -> str: """Generates a mount path using max_mountpath_size. Returns: str: The generated mount path. Raises: RuntimeError: If the max_mountpath_size is too small. """ if ( self.max_mountpath_size <= len(self.mountroot) + 1 ): # as we add a "/" in between root and the uuid part raise RuntimeError( f"Error generating mount path: the max_mount_path size ({self.max_mountpath_size}) is too short, please choose a larger maximum mountpath size, minimum is self.mountroot + 1" ) max_uuid_size = ( self.max_mountpath_size - len(self.mountroot) - 1 ) # as we add a "/" in between root and the uuid part uuid_path_part = uuid4().hex[:max_uuid_size] return f"{self.mountroot}/{uuid_path_part}" def mount(self, partition_name: str = ""): """Mounts a disk or one or more partititions on a mountpoint. Args: partitions_name (str): Name of specific partition to mount. Returns: list: A list of paths the disk/partitions have been mounted on. Raises: RuntimeError: If there was an error running mount. """ to_mount = self._select_partitions_to_mount(partition_name) for mounttarget in to_mount: logger.info(f"Trying to mount {mounttarget}") mount_command = ["sudo", "mount"] fstype = self._get_fstype(mounttarget) if fstype == "xfs": mount_command.extend(["-o", "ro,norecovery"]) elif fstype in ["ext2", "ext3", "ext4"]: mount_command.extend(["-o", "ro,noload"]) else: mount_command.extend(["-o", "ro"]) mount_command.append(mounttarget) mount_folder = self._get_mount_path() os.makedirs(mount_folder) mount_command.append(mount_folder) process = subprocess.run( mount_command, capture_output=True, check=False, text=True ) if process.returncode == 0: logger.info(f"Mounted {mounttarget} to {mount_folder}") self.mountpoints.append(mount_folder) else: logger.error( f"Error running mount on {mounttarget}: {process.stderr} {process.stdout}" ) raise RuntimeError( f"Error running mount on {mounttarget}: {process.stderr} {process.stdout}" ) return self.mountpoints def _umount_all(self): """Umounts all registered mount_points. Returns: None Raises: RuntimeError: If there was an error running umount. """ removed = [] for mountpoint in self.mountpoints: umount_command = ["sudo", "umount", f"{mountpoint}"] process = subprocess.run( umount_command, capture_output=True, check=False, text=True ) if process.returncode == 0: logger.info(f"umount {mountpoint} success") os.rmdir(mountpoint) removed.append(mountpoint) else: logger.error( f"Error running umount on {mountpoint}: {process.stderr} {process.stdout}" ) raise RuntimeError( f"Error running umount on {mountpoint}: {process.stderr} {process.stdout}" ) for mountpoint in removed: self.mountpoints.remove(mountpoint) def _detach_device(self): """Cleanup block devices for BlockDevice instance. Returns: None Raises: RuntimeError: If there was an error running losetup or qemu-nbd. """ if "nbd" in self.blkdevice: command = ["sudo", "qemu-nbd", "--disconnect", self.blkdevice] else: command = ["sudo", "losetup", "--detach", self.blkdevice] process = subprocess.run(command, capture_output=True, check=False, text=True) if process.returncode == 0: logger.info(f"Detached {self.blkdevice} succes!") self.blkdevice = None else: logger.error(f"Detached {self.blkdevice} failed!") raise RuntimeError( f"Error detaching block device: {process.stderr} {process.stdout}" ) def umount(self): """Unmounts all mounted file systems and detaches the block device. This method first attempts to unmount all file systems that were previously mounted by the `mount()` method. After successfully unmounting, it detaches the underlying block device (loop or NBD). If a Redis lock was acquired for an NBD device, this lock is also released. Raises: RuntimeError: If unmounting any of the mount points fails, or if detaching the block device fails. """ self._umount_all() self._detach_device() if self.redis_lock: self.redis_lock.release() logger.info(f"Redis lock released: {self.redis_lock.name}")
BlockDevice provides functionality to map a disk image file to block devices and mount them. The default minimum partition size that gets mounted is 100MB. NOTE: If running in a container the container needs: * to be privileged (due to mounting) * needs access to /dev/loop and /dev/nbd devices * sudo, fdisk, qemu-utils and ntfs-3g packages installed (debian)
Usage
``` try: bd = BlockDevice('/folder/path_to_disk_image.dd', min_partition_size=1) bd.setup() mountpoints = bd.mount() # Do the things you need to do :) except: # Handle your errors here. finally: bd.umount()
Initialize BlockDevice class instance.
Args
image_path
:str
- path to the image file to map and mount.
min_partition_size
:int
- minimum partition size, default MIN_PARTITION_SIZE_BYTES
max_mountpath_size
:int
- maximum root mount path length, default MAX_MOUNTPATH_SIZE
Class variables
var LOCK_TIMEOUT_SECONDS
-
Default 6 hours
var MAX_MOUNTPATH_SIZE
-
Default 500
var MAX_NBD_DEVICES
-
Default 10
var MIN_PARTITION_SIZE_BYTES
-
Default 100 MB
Methods
def mount(self, partition_name: str = '')
-
Expand source code
def mount(self, partition_name: str = ""): """Mounts a disk or one or more partititions on a mountpoint. Args: partitions_name (str): Name of specific partition to mount. Returns: list: A list of paths the disk/partitions have been mounted on. Raises: RuntimeError: If there was an error running mount. """ to_mount = self._select_partitions_to_mount(partition_name) for mounttarget in to_mount: logger.info(f"Trying to mount {mounttarget}") mount_command = ["sudo", "mount"] fstype = self._get_fstype(mounttarget) if fstype == "xfs": mount_command.extend(["-o", "ro,norecovery"]) elif fstype in ["ext2", "ext3", "ext4"]: mount_command.extend(["-o", "ro,noload"]) else: mount_command.extend(["-o", "ro"]) mount_command.append(mounttarget) mount_folder = self._get_mount_path() os.makedirs(mount_folder) mount_command.append(mount_folder) process = subprocess.run( mount_command, capture_output=True, check=False, text=True ) if process.returncode == 0: logger.info(f"Mounted {mounttarget} to {mount_folder}") self.mountpoints.append(mount_folder) else: logger.error( f"Error running mount on {mounttarget}: {process.stderr} {process.stdout}" ) raise RuntimeError( f"Error running mount on {mounttarget}: {process.stderr} {process.stdout}" ) return self.mountpoints
Mounts a disk or one or more partititions on a mountpoint.
Args
partitions_name
:str
- Name of specific partition to mount.
Returns
list
- A list of paths the disk/partitions have been mounted on.
Raises
RuntimeError
- If there was an error running mount.
def setup(self)
-
Expand source code
def setup(self): """Setup BlockDevice instance The setup function will check if the required tools are available, setup the relevant block device (loop or nbd) depending on image format and scan the paritions available. """ # Log minimum partitions size logger.info( f"Minimum partition size {self.min_partition_size} Bytes, partitions smaller will be ignored!" ) # Check if image_path exists image_path = pathlib.Path(self.image_path) if not pathlib.Path.exists(image_path): raise RuntimeError(f"image_path does not exist: {self.image_path}") # Check if required tools are available self._required_tools_available() # Setup the block device ext = image_path.suffix.strip(".") if ext.lower() in self.supported_qcowtypes: self.redis_client = redis.Redis.from_url(self.REDIS_URL) self.blkdevice = self._nbdsetup() else: self.blkdevice = self._losetup() # Parse block device info self.blkdeviceinfo = self._blkinfo() # Parse partition information self.partitions = self._parse_partitions()
Setup BlockDevice instance
The setup function will check if the required tools are available, setup the relevant block device (loop or nbd) depending on image format and scan the paritions available.
def umount(self)
-
Expand source code
def umount(self): """Unmounts all mounted file systems and detaches the block device. This method first attempts to unmount all file systems that were previously mounted by the `mount()` method. After successfully unmounting, it detaches the underlying block device (loop or NBD). If a Redis lock was acquired for an NBD device, this lock is also released. Raises: RuntimeError: If unmounting any of the mount points fails, or if detaching the block device fails. """ self._umount_all() self._detach_device() if self.redis_lock: self.redis_lock.release() logger.info(f"Redis lock released: {self.redis_lock.name}")
Unmounts all mounted file systems and detaches the block device.
This method first attempts to unmount all file systems that were previously mounted by the
mount()
method. After successfully unmounting, it detaches the underlying block device (loop or NBD). If a Redis lock was acquired for an NBD device, this lock is also released.Raises
RuntimeError
- If unmounting any of the mount points fails, or if detaching the block device fails.