diff --git a/codexctl/__init__.py b/codexctl/__init__.py index 871c834..b68f8d0 100644 --- a/codexctl/__init__.py +++ b/codexctl/__init__.py @@ -261,7 +261,7 @@ def version_lookup(version: str | None) -> re.Match[str] | None: if update_file: try: from remarkable_update_image import UpdateImage - from remarkable_update_image.cpio import UpdateImage as CPIOUpdateImage + from remarkable_update_image.image import CPIOUpdateImage image = UpdateImage(update_file) if isinstance(image, CPIOUpdateImage): diff --git a/codexctl/device.py b/codexctl/device.py index 74464c7..97481e2 100644 --- a/codexctl/device.py +++ b/codexctl/device.py @@ -2,6 +2,7 @@ import logging import os import re +import shlex import socket import subprocess import tempfile @@ -291,11 +292,11 @@ def connect_to_device( return client - def _read_version_from_path(self, ftp, base_path: str = "") -> tuple[str, bool]: + def _read_version_from_path(self, ftp=None, base_path: str = "") -> tuple[str, bool]: """Reads version from a given path (current partition or mounted backup) Args: - ftp: SFTP client connection + ftp: SFTP client connection (None for local file access) base_path: Base path prefix (empty for current partition, /tmp/mount_pX for backup) Returns: @@ -304,76 +305,133 @@ def _read_version_from_path(self, ftp, base_path: str = "") -> tuple[str, bool]: update_conf_path = f"{base_path}/usr/share/remarkable/update.conf" if base_path else "/usr/share/remarkable/update.conf" os_release_path = f"{base_path}/etc/os-release" if base_path else "/etc/os-release" - def file_exists(path: str) -> bool: - try: - ftp.stat(path) - return True - except FileNotFoundError: - return False + if ftp: + def file_exists(path: str) -> bool: + try: + ftp.stat(path) + return True + except FileNotFoundError: + return False + + def read_file(path: str) -> str: + with ftp.file(path) as file: + return file.read().decode("utf-8") + else: + file_exists = os.path.exists + + def read_file(path: str) -> str: + with open(path, encoding="utf-8") as file: + return file.read() if file_exists(update_conf_path): - with ftp.file(update_conf_path) as file: - contents = file.read().decode("utf-8").strip("\n") - match = re.search("(?<=REMARKABLE_RELEASE_VERSION=).*", contents) - if match: - return match.group(), True - raise SystemError(f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}") + contents = read_file(update_conf_path).strip("\n") + match = re.search("(?<=REMARKABLE_RELEASE_VERSION=).*", contents) + if match: + return match.group(), True + raise SystemError(f"REMARKABLE_RELEASE_VERSION not found in {update_conf_path}") if file_exists(os_release_path): - with ftp.file(os_release_path) as file: - contents = file.read().decode("utf-8") - match = re.search("(?<=IMG_VERSION=).*", contents) - if match: - return match.group().strip('"'), False - raise SystemError(f"IMG_VERSION not found in {os_release_path}") + contents = read_file(os_release_path) + match = re.search("(?<=IMG_VERSION=).*", contents) + if match: + return match.group().strip('"'), False + raise SystemError(f"IMG_VERSION not found in {os_release_path}") raise SystemError(f"Cannot read version from {base_path or 'current partition'}: no version file found") - def _get_backup_partition_version(self) -> str: - """Gets the version installed on the backup (inactive) partition + def _get_active_device(self) -> str: + """Gets the active root device path. Returns: - str: Version string + str: Active device path (e.g., /dev/mmcblk2p2) Raises: - SystemError: If backup partition version cannot be determined + SystemError: If command fails or returns no output """ - if not self.client: - raise SystemError("Cannot get backup partition version: no SSH client connection") - - ftp = self.client.open_sftp() - if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM): - _stdin, stdout, _stderr = self.client.exec_command("swupdate -g") - active_device = stdout.read().decode("utf-8").strip() - active_part = int(active_device.split('p')[-1]) - inactive_part = 3 if active_part == 2 else 2 - device_base = re.sub(r'p\d+$', '', active_device) + cmd = "swupdate -g" else: - _stdin, stdout, _stderr = self.client.exec_command("rootdev") - active_device = stdout.read().decode("utf-8").strip() - active_part = int(active_device.split('p')[-1]) - inactive_part = 3 if active_part == 2 else 2 - device_base = re.sub(r'p\d+$', '', active_device) + cmd = "rootdev" - mount_point = f"/tmp/mount_p{inactive_part}" + if self.client: + _stdin, stdout, stderr = self.client.exec_command(cmd) + output = stdout.read().decode("utf-8").strip() + exit_status = stdout.channel.recv_exit_status() + if exit_status != 0 or not output: + error = stderr.read().decode("utf-8", errors="ignore") + raise SystemError(f"Failed to get active device using '{cmd}': {error or 'no output'}") + return output + else: + result = subprocess.run(cmd.split(), capture_output=True, text=True) + if result.returncode != 0 or not result.stdout.strip(): + raise SystemError(f"Failed to get active device using '{cmd}': {result.stderr or 'no output'}") + return result.stdout.strip() - self.client.exec_command(f"mkdir -p {mount_point}") - _stdin, stdout, _stderr = self.client.exec_command( - f"mount -o ro {device_base}p{inactive_part} {mount_point}" - ) - exit_status = stdout.channel.recv_exit_status() + def _parse_partition_info(self, active_device: str) -> tuple[int, int, str]: + """Parse partition numbers from device path. + + Args: + active_device: Device path (e.g., /dev/mmcblk2p2) + + Returns: + tuple: (active_part, inactive_part, device_base) + """ + active_part = int(active_device.split('p')[-1]) + inactive_part = 3 if active_part == 2 else 2 + device_base = re.sub(r'p\d+$', '', active_device) + return active_part, inactive_part, device_base - if exit_status != 0: - error_msg = _stderr.read().decode('utf-8') - raise SystemError(f"Failed to mount backup partition: {error_msg}") + def _get_backup_partition_version(self) -> str: + """Gets the version installed on the backup (inactive) partition + + Returns: + str: Version string (empty string for RM1/RM2 on failure) + Raises: + SystemError: If backup partition version cannot be determined (Paper Pro only) + """ try: - version, _ = self._read_version_from_path(ftp, mount_point) - return version - finally: - self.client.exec_command(f"umount {mount_point}") - self.client.exec_command(f"rm -rf {mount_point}") + active_device = self._get_active_device() + _, inactive_part, device_base = self._parse_partition_info(active_device) + mount_point = f"/tmp/mount_p{inactive_part}" + + if self.client: + ftp = self.client.open_sftp() + self.client.exec_command(f"mkdir -p {mount_point}") + _stdin, stdout, _stderr = self.client.exec_command( + f"mount -o ro {device_base}p{inactive_part} {mount_point}" + ) + exit_status = stdout.channel.recv_exit_status() + + if exit_status != 0: + error_msg = _stderr.read().decode('utf-8') + raise SystemError(f"Failed to mount backup partition: {error_msg}") + + try: + version, _ = self._read_version_from_path(ftp, mount_point) + return version + finally: + self.client.exec_command(f"umount {mount_point}") + self.client.exec_command(f"rm -rf {mount_point}") + else: + os.makedirs(mount_point, exist_ok=True) + result = subprocess.run( + ["mount", "-o", "ro", f"{device_base}p{inactive_part}", mount_point], + capture_output=True, text=True + ) + if result.returncode != 0: + raise SystemError(f"Failed to mount backup partition: {result.stderr}") + + try: + version, _ = self._read_version_from_path(base_path=mount_point) + return version + finally: + subprocess.run(["umount", mount_point]) + subprocess.run(["rm", "-rf", mount_point]) + except SystemError: + if self.hardware in (HardwareType.RMPP, HardwareType.RMPPM): + raise + return "" def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, int]: """Gets partition information for Paper Pro devices @@ -384,13 +442,8 @@ def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, Returns: tuple: (current_partition, inactive_partition, next_boot_partition) """ - if not self.client: - raise SystemError("SSH client required for partition detection") - - _stdin, stdout, _stderr = self.client.exec_command("swupdate -g") - active_device = stdout.read().decode("utf-8").strip() - current_part = int(active_device.split('p')[-1]) - inactive_part = 3 if current_part == 2 else 2 + active_device = self._get_active_device() + current_part, inactive_part, _ = self._parse_partition_info(active_device) parts = current_version.split('.') if len(parts) >= 2 and parts[0].isdigit() and parts[1].isdigit(): @@ -400,20 +453,42 @@ def _get_paper_pro_partition_info(self, current_version: str) -> tuple[int, int, next_boot_part = current_part + if self.client: + ftp = self.client.open_sftp() + + def file_exists(path: str) -> bool: + try: + ftp.stat(path) + return True + except FileNotFoundError: + return False + + def read_file(path: str) -> str: + with ftp.file(path) as file: + return file.read().decode("utf-8") + else: + file_exists = os.path.exists + + def read_file(path: str) -> str: + with open(path, encoding="utf-8") as file: + return file.read() + if is_new_version: + boot_part_path = "/sys/bus/mmc/devices/mmc0:0001/boot_part" try: - ftp = self.client.open_sftp() - with ftp.file("/sys/bus/mmc/devices/mmc0:0001/boot_part") as file: - boot_part_value = file.read().decode("utf-8").strip() + if file_exists(boot_part_path): + boot_part_value = read_file(boot_part_path).strip() next_boot_part = 2 if boot_part_value == "1" else 3 + else: + is_new_version = False except (IOError, OSError): is_new_version = False if not is_new_version: + root_part_path = "/sys/devices/platform/lpgpr/root_part" try: - ftp = self.client.open_sftp() - with ftp.file("/sys/devices/platform/lpgpr/root_part") as file: - root_part_value = file.read().decode("utf-8").strip() + if file_exists(root_part_path): + root_part_value = read_file(root_part_path).strip() next_boot_part = 2 if root_part_value == "a" else 3 except (IOError, OSError) as e: self.logger.debug(f"Failed to read next boot partition: {e}") @@ -442,29 +517,16 @@ def get_device_status(self) -> tuple[str | None, str, str, str, str]: beta_contents = file.read().decode("utf-8") else: - if os.path.exists("/usr/share/remarkable/update.conf"): - with open("/usr/share/remarkable/update.conf", encoding="utf-8") as file: - xochitl_version = re.search( - "(?<=REMARKABLE_RELEASE_VERSION=).*", - file.read().strip("\n"), - ).group() - else: - with open("/etc/os-release", encoding="utf-8") as file: - xochitl_version = ( - re.search("(?<=IMG_VERSION=).*", file.read()) - .group() - .strip('"') - ) + xochitl_version, old_update_engine = self._read_version_from_path() - old_update_engine = False if os.path.exists("/etc/version"): - with open("/etc/version") as file: + with open("/etc/version", encoding="utf-8") as file: version_id = file.read().rstrip() else: version_id = "" if os.path.exists("/home/root/.config/remarkable/xochitl.conf"): - with open("/home/root/.config/remarkable/xochitl.conf") as file: + with open("/home/root/.config/remarkable/xochitl.conf", encoding="utf-8") as file: beta_contents = file.read().rstrip() else: beta_contents = "" @@ -691,7 +753,7 @@ def install_sw_update(self, version_file: str, bootloader_files: dict[str, bytes print("\nDone! Running swupdate (PLEASE BE PATIENT, ~5 MINUTES)") - command = f"/usr/sbin/swupdate-from-image-file {out_location}" + command = f"bash -c 'source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {shlex.quote(out_location)}'" self.logger.debug(command) _stdin, stdout, _stderr = self.client.exec_command(command) @@ -737,7 +799,7 @@ def install_sw_update(self, version_file: str, bootloader_files: dict[str, bytes else: print("Running swupdate") - command = ["/usr/sbin/swupdate-from-image-file", version_file] + command = ["bash", "-c", f"source /usr/lib/swupdate/conf.d/09-swupdate-args && swupdate $SWUPDATE_ARGS -i {shlex.quote(version_file)}"] self.logger.debug(command) try: