From 956eb0479ae8376ffb13b36659a9a841f124dc5c Mon Sep 17 00:00:00 2001 From: Arthur Gautier Date: Fri, 24 Oct 2025 23:13:01 -0700 Subject: [PATCH] nixos/test-driver: add utils to manipulate efi variables ```python machine.create_efi_vars() machine.write_efi_vars([ EfiVariable( uuid.UUID("65a72bd9-f42b-4039-8084-66cd1702cb24"), "AbrNextBoot", b"a", flags=EfiVariable.Flags.NON_VOLATILE ) ]) machine.start() ``` --- nixos/lib/test-driver/default.nix | 2 + nixos/lib/test-driver/src/test_driver/efi.py | 233 ++++++++++++++++++ .../src/test_driver/machine/__init__.py | 43 +++- nixos/tests/all-tests.nix | 1 + nixos/tests/nixos-test-driver/efivars.nix | 110 +++++++++ .../python-modules/ovmfvartool/default.nix | 12 +- 6 files changed, 397 insertions(+), 4 deletions(-) create mode 100644 nixos/lib/test-driver/src/test_driver/efi.py create mode 100644 nixos/tests/nixos-test-driver/efivars.nix diff --git a/nixos/lib/test-driver/default.nix b/nixos/lib/test-driver/default.nix index 77638397d11c..f92e7981c55d 100644 --- a/nixos/lib/test-driver/default.nix +++ b/nixos/lib/test-driver/default.nix @@ -11,6 +11,7 @@ ptpython, pydantic, python, + ovmfvartool, remote-pdb, ruff, ty, @@ -49,6 +50,7 @@ buildPythonApplication { junit-xml ptpython pydantic + ovmfvartool remote-pdb ] ++ extraPythonPackages python.pkgs; diff --git a/nixos/lib/test-driver/src/test_driver/efi.py b/nixos/lib/test-driver/src/test_driver/efi.py new file mode 100644 index 000000000000..fb547ded5dd9 --- /dev/null +++ b/nixos/lib/test-driver/src/test_driver/efi.py @@ -0,0 +1,233 @@ +import binascii +import io +import os.path +import uuid +from collections.abc import Generator +from contextlib import contextmanager +from pathlib import Path +from typing import IO, Any, TypeVar + +from ovmfvartool import ( + AuthenticatedVariable, + FirmwareVolumeHeader, + UEFITime, + VariableStoreHeader, + resolveUUID, +) + +import test_driver.machine +from test_driver.errors import RequestedAssertionFailed + +EfiVariableT = TypeVar("EfiVariableT", bound="EfiVariable") + +# See edk2.git/OvmfPkg/Bhyve/VarStore.fdf.inc +_NV_FTW_WORKING_OFFSET = 0x41000 +_NV_FTW_WORKING_VALUE = binascii.unhexlify( + b"2b29589e687c7d49a0ce6500fd9f1b952caf2c64feffffffe00f000000000000" +) +_NV_FTW_SIZE = 0x42000 +_NV_FTW_MAIN_PLUS_SPARE_SIZE = _NV_FTW_SIZE * 2 + + +class EfiVariable(AuthenticatedVariable): + class Flags: + NON_VOLATILE = 0x1 + BOOTSERVICE_ACCESS = 0x2 + RUNTIME_ACCESS = 0x4 + TIME_BASED_AUTHENTICATED_WRITE_ACCESS = 0x20 + + class State: + VAR_HEADER_VALID_ONLY = 0x7F + VAR_ADDED = 0x3F + + volatile = False + boot_access = False + runtime_access = False + hardware_error_record = False + authenticated_write_access = False + time_based_authenticated_write_access = False + append_write = False + + def __init__( + self, + vendor_uuid: uuid.UUID | None = None, + name: str | None = None, + data: bytes | None = None, + state: int | None = None, + flags: int | None = None, + ) -> None: + self.magic = 0x55AA + self.reserved1 = 0 + self.monotonicCount = 0 + self.timestamp = UEFITime() + self.pubKeyIdx = 0 + self.state = 0 + self.flags = 0 + + if vendor_uuid is not None: + self.vendorUUID = uuid.UUID(bytes=vendor_uuid.bytes_le) + + if state is not None: + self.state = state ^ 0xFF + else: + self.state = (0x40 | 0x80) ^ 0xFF + + if flags is not None: + self.flags = flags + + if name is not None: + self.name = name + self.nameLen = len(name) * 2 + 2 + + if data is not None: + self.data = data + self.dataLen = len(data) + + def _read_flags(self) -> None: + if not (self.flags & 0x1): + self.volatile = True + if self.flags & 0x2: + self.boot_access = True + if self.flags & 0x4: + self.runtime_access = True + if self.flags & 0x8: + self.hardware_error_record = True + if self.flags & 0x10: + self.authenticated_write_access = True + if self.flags & 0x20: + self.time_based_authenticated_write_access = True + if self.flags & 0x40: + self.append_write = True + + self.flags &= ~(0x1 | 0x2 | 0x4 | 0x8 | 0x10 | 0x20 | 0x40) + + @classmethod + def deserialize(cls: type[EfiVariableT], f: Any) -> EfiVariableT | None: + # pylint: disable=no-member + # false positive https://github.com/PyCQA/pylint/issues/981 + ret = super().deserialize(f) + if ret: + ret._read_flags() + return ret + + @classmethod + def deserializeFromDocument( # noqa: N802 + cls: type[EfiVariableT], + vendorID: str, # noqa: N803 + name: str, + doc: dict[str, Any], + ) -> EfiVariableT: + # pylint: disable=no-member + # false positive https://github.com/PyCQA/pylint/issues/981 + ret = super(cls, cls).deserializeFromDocument(vendorID, name, doc) + if ret: + ret._read_flags() + return ret + + +class EfiVars: + """A container around the ovmf variables""" + + state_path: Path + machine: "test_driver.machine.QemuMachine" + + def __init__(self, state_path: Path, machine: Any): + self.state_path = state_path + self.machine = machine + + def _assert_stopped(self) -> None: + if self.machine.booted: + raise RequestedAssertionFailed( + "System is currently running and concurrent reads / writes to the OVMF variables is unsupported" + ) + + def read_content(self) -> dict[str, dict[str, EfiVariable]] | None: + self._assert_stopped() + try: + with open(self.state_path, "rb") as f: + fvh = FirmwareVolumeHeader.deserialize(f) + vsh = VariableStoreHeader.deserialize(f) + _ = fvh + _ = vsh + variables: dict[str, dict[str, EfiVariable]] = {} + + while True: + v = EfiVariable.deserialize(f) + if not v: + break + if v.isDeleted: + continue + + k = resolveUUID(v.vendorUUID) + variables.setdefault(k, {}) + variables[k][v.name] = v + + return variables + + except FileNotFoundError: + return None + + @contextmanager + def _write_store(self, *args, **kwargs) -> Generator[IO[bytes]]: + with open(self.state_path, "wb") as fo: + fm = io.BytesIO(b"\xff" * _NV_FTW_MAIN_PLUS_SPARE_SIZE) + fm.write(FirmwareVolumeHeader.create().serialize()) + fm.write(VariableStoreHeader.create().serialize()) + + try: + yield fm + finally: + fm.seek(_NV_FTW_WORKING_OFFSET) + fm.write(_NV_FTW_WORKING_VALUE) + fm.seek(0) + fo.write(fm.read()) + + def create_empty(self) -> None: + self._assert_stopped() + + if os.path.exists(self.state_path): + raise RequestedAssertionFailed("OVMF variables store exists") + + with self._write_store(): + pass + + def write(self, add: list[EfiVariable]) -> None: + self._assert_stopped() + + variables = self.read_content() + if not variables: + variables = {} + + for var in add: + k = resolveUUID(var.vendorUUID) + variables.setdefault(k, {}) + variables[k][var.name] = var + + with self._write_store() as fm: + for _, vendor in variables.items(): + for _, v in vendor.items(): + fm.write(v.serialize()) + if fm.tell() % 4: + fm.write(b"\xff" * (4 - (fm.tell() % 4))) + assert (fm.tell() % 4) == 0 + + +class EfiGuid: + from ovmfvartool import ( + gEdkiiVarErrorFlagGuid, + gEfiAuthenticatedVariableGuid, + gEfiCertDbGuid, + gEfiCustomModeEnableGuid, + gEfiGlobalVariableGuid, + gEfiImageSecurityDatabaseGuid, + gEfiIp4Config2ProtocolGuid, + gEfiIScsiInitiatorNameProtocolGuid, + gEfiMemoryTypeInformationGuid, + gEfiSecureBootEnableDisableGuid, + gEfiSystemNvDataFvGuid, + gEfiVendorKeysNvGuid, + gIScsiConfigGuid, + gMicrosoftVendorGuid, + gMtcVendorGuid, + mBmHardDriveBootVariableGuid, + ) diff --git a/nixos/lib/test-driver/src/test_driver/machine/__init__.py b/nixos/lib/test-driver/src/test_driver/machine/__init__.py index 790da705cf4f..f2ec18b00a56 100644 --- a/nixos/lib/test-driver/src/test_driver/machine/__init__.py +++ b/nixos/lib/test-driver/src/test_driver/machine/__init__.py @@ -22,6 +22,7 @@ from pathlib import Path from queue import Queue from typing import Any +from test_driver.efi import EfiVariable, EfiVars from test_driver.errors import MachineError, RequestedAssertionFailed from test_driver.logger import AbstractLogger from test_driver.machine.ocr import ( @@ -192,6 +193,7 @@ class QemuStartCommand: def build_environment( state_dir: Path, shared_dir: Path, + efi_vars_path: Path | None = None, ) -> dict: # We make a copy to not update the current environment env = dict(os.environ) @@ -202,6 +204,13 @@ class QemuStartCommand: "USE_TMPDIR": "1", } ) + if efi_vars_path is not None: + env.update( + { + "NIX_EFI_VARS": str(efi_vars_path), + } + ) + return env def run( @@ -212,6 +221,7 @@ class QemuStartCommand: qmp_socket_path: Path, shell_socket_path: Path, allow_reboot: bool, + efi_vars_path: Path | None = None, vsock_guest: Path | None = None, ) -> subprocess.Popen: return subprocess.Popen( @@ -227,7 +237,9 @@ class QemuStartCommand: stderr=subprocess.STDOUT, shell=True, cwd=state_dir, - env=self.build_environment(state_dir, shared_dir), + env=self.build_environment( + state_dir, shared_dir, efi_vars_path=efi_vars_path + ), ) @@ -750,6 +762,9 @@ class QemuMachine(BaseMachine): # Store all console output for full log retrieval full_console_log: list[str] + efi_vars_path: Path + efi_vars: EfiVars + def __init__( self, out_dir: Path, @@ -791,6 +806,9 @@ class QemuMachine(BaseMachine): self.booted = False self.connected = False + self.efi_vars_path = self.state_dir / f"{self.name}-efi-vars.fd" + self.efi_vars = EfiVars(self.efi_vars_path, self) + def ssh_backdoor_command(self) -> str: assert self.vsock_host is not None return f"ssh -o User=root vsock-mux/{self.vsock_host}" @@ -1247,6 +1265,7 @@ class QemuMachine(BaseMachine): self.qmp_path, self.shell_path, allow_reboot, + self.efi_vars_path, self.vsock_guest, ) @@ -1431,6 +1450,28 @@ class QemuMachine(BaseMachine): self.connected = False self.connect() + def dump_efi_vars(self) -> None: + for var in self.read_efi_vars(): + var.print() + + def read_efi_vars(self) -> list[EfiVariable]: + config = self.efi_vars.read_content() + if not config: + return [] + + out = [] + for vendor, variables in config.items(): + for name, v in variables.items(): + out.append(v) + + return out + + def create_efi_vars(self) -> None: + self.efi_vars.create_empty() + + def write_efi_vars(self, add: list[EfiVariable]) -> None: + self.efi_vars.write(add) + class NspawnMachine(BaseMachine): """ diff --git a/nixos/tests/all-tests.nix b/nixos/tests/all-tests.nix index b669f33158ee..51f4cb0f5a2e 100644 --- a/nixos/tests/all-tests.nix +++ b/nixos/tests/all-tests.nix @@ -166,6 +166,7 @@ in [[ 143 = $(cat $failed/testBuildFailure.exit) ]] touch $out ''; + efivars = runTestOn [ "x86_64-linux" ] ./nixos-test-driver/efivars.nix; }; # NixOS vm tests and non-vm unit tests diff --git a/nixos/tests/nixos-test-driver/efivars.nix b/nixos/tests/nixos-test-driver/efivars.nix new file mode 100644 index 000000000000..953c69a0257e --- /dev/null +++ b/nixos/tests/nixos-test-driver/efivars.nix @@ -0,0 +1,110 @@ +{ pkgs, lib, ... }: +let + vendorUuid = "a19f72f8-b554-4bd7-a0c2-2762bd854691"; + varName = "Demo"; + readWriteVar = pkgs.writers.writePython3 "read-increment-efi-var" { } '' + import os + import sys + import fcntl + import array + + if len(sys.argv) != 3: + print(f"Usage: {sys.argv[0]} NAME UUID", file=sys.stderr) + sys.exit(1) + + name = sys.argv[1] + uuid = sys.argv[2] + path = f"/sys/firmware/efi/efivars/{name}-{uuid}" + + FS_IMMUTABLE_FL = 0x00000010 + FS_IOC_GETFLAGS = 0x80086601 + FS_IOC_SETFLAGS = 0x40086602 + + if not os.path.exists(path): + print(f"{path}: does not exist", file=sys.stderr) + sys.exit(1) + + with open(path, "rb") as f: + data = f.read() + + # The first 4 bytes are attributes, the rest is the data + if data[4:] != b"\x2a": + print(f"0x2a value expected, got {data[4:]!r}", file=sys.stderr) + sys.exit(1) + + fd = os.open(path, os.O_RDONLY) + arg = array.array("L", [0]) + fcntl.ioctl(fd, FS_IOC_GETFLAGS, arg) + if arg[0] & FS_IMMUTABLE_FL: + arg[0] &= ~FS_IMMUTABLE_FL + fcntl.ioctl(fd, FS_IOC_SETFLAGS, arg) + os.close(fd) + + with open(path, "wb") as f: + # Write 0x2b + data = bytes(list(data[:4]) + [0x2b]) + f.write(data) + ''; +in +{ + name = "efivars"; + + nodes.machine = { + boot.loader.efi.canTouchEfiVariables = true; + virtualisation.useEFIBoot = true; + }; + + testScript = '' + import uuid + import unittest + + from test_driver.efi import EfiVariable + from test_driver.errors import RequestedAssertionFailed + + + class TestConcurrentRead(unittest.TestCase): + def __init__(self, machine): + super().__init__() + self.machine = machine + + def test_concurrent_read(self): + with self.assertRaises(RequestedAssertionFailed): + self.machine.read_efi_vars() + + + vendor_uuid = uuid.UUID('${vendorUuid}') + machine.create_efi_vars() + machine.write_efi_vars([ + EfiVariable( + vendor_uuid=vendor_uuid, + name="${varName}", + data=bytes([0x2a]), + flags=EfiVariable.Flags.NON_VOLATILE | EfiVariable.Flags.BOOTSERVICE_ACCESS | EfiVariable.Flags.RUNTIME_ACCESS, + ) + ]) + + machine.start() + machine.wait_for_unit("multi-user.target") + + print(machine.succeed('${readWriteVar} "${varName}" "${vendorUuid}"')) + + TestConcurrentRead(machine).test_concurrent_read() + machine.crash() + + machine.dump_efi_vars() + vars = machine.read_efi_vars() + + guid = uuid.UUID(bytes=vendor_uuid.bytes_le) + predicate = lambda v: v.name == "${varName}" and v.vendorUUID == guid + var = next((v for v in vars if predicate(v)), None) + + if var: + var.print() + if var.data == bytes([0x2b]): + print("Congrats!") + else: + raise ValueError("Value 0x2b expected") + else: + raise ValueError("Could not find ${varName} variable") + ''; +} diff --git a/pkgs/development/python-modules/ovmfvartool/default.nix b/pkgs/development/python-modules/ovmfvartool/default.nix index 9dea32157d9b..9518d6703921 100644 --- a/pkgs/development/python-modules/ovmfvartool/default.nix +++ b/pkgs/development/python-modules/ovmfvartool/default.nix @@ -3,6 +3,7 @@ buildPythonPackage, fetchFromGitHub, pyyaml, + nixosTests, }: buildPythonPackage { @@ -11,10 +12,11 @@ buildPythonPackage { format = "setuptools"; src = fetchFromGitHub { - owner = "hlandau"; + # https://github.com/hlandau/ovmfvartool/pull/4 + owner = "baloo"; repo = "ovmfvartool"; - rev = "45e6b1e53967ee6590faae454c076febce096931"; - hash = "sha256-XbvcE/MXNj5S5N7A7jxdwgEE5yMuB82Xg+PYBsFRIm0="; + rev = "6a17190131bf44699ea27815543a65efff880142"; + hash = "sha256-lIneg3kL21oxqjsraogGlOVsgmYnp38CPav1TwBg0p0="; }; propagatedBuildInputs = [ pyyaml ]; @@ -24,6 +26,10 @@ buildPythonPackage { pythonImportsCheck = [ "ovmfvartool" ]; + passthru.tests = { + inherit (nixosTests.nixos-test-driver) efivars; + }; + meta = { description = "Parse and generate OVMF_VARS.fd from Yaml"; mainProgram = "ovmfvartool";