nixos/test-driver: add utils to manipulate efi variables (#456099)

This commit is contained in:
Arthur Gautier
2026-06-05 16:16:08 +00:00
committed by GitHub
6 changed files with 397 additions and 4 deletions

View File

@@ -11,6 +11,7 @@
ptpython,
pydantic,
python,
ovmfvartool,
remote-pdb,
ruff,
ty,
@@ -49,6 +50,7 @@ buildPythonApplication {
junit-xml
ptpython
pydantic
ovmfvartool
remote-pdb
]
++ extraPythonPackages python.pkgs;

View File

@@ -0,0 +1,233 @@
import binascii
import io
import os.path
import uuid
from collections.abc import Generator
from contextlib import contextmanager
from pathlib import Path
from typing import IO, Any, TypeVar
from ovmfvartool import (
AuthenticatedVariable,
FirmwareVolumeHeader,
UEFITime,
VariableStoreHeader,
resolveUUID,
)
import test_driver.machine
from test_driver.errors import RequestedAssertionFailed
EfiVariableT = TypeVar("EfiVariableT", bound="EfiVariable")
# See edk2.git/OvmfPkg/Bhyve/VarStore.fdf.inc
_NV_FTW_WORKING_OFFSET = 0x41000
_NV_FTW_WORKING_VALUE = binascii.unhexlify(
b"2b29589e687c7d49a0ce6500fd9f1b952caf2c64feffffffe00f000000000000"
)
_NV_FTW_SIZE = 0x42000
_NV_FTW_MAIN_PLUS_SPARE_SIZE = _NV_FTW_SIZE * 2
class EfiVariable(AuthenticatedVariable):
class Flags:
NON_VOLATILE = 0x1
BOOTSERVICE_ACCESS = 0x2
RUNTIME_ACCESS = 0x4
TIME_BASED_AUTHENTICATED_WRITE_ACCESS = 0x20
class State:
VAR_HEADER_VALID_ONLY = 0x7F
VAR_ADDED = 0x3F
volatile = False
boot_access = False
runtime_access = False
hardware_error_record = False
authenticated_write_access = False
time_based_authenticated_write_access = False
append_write = False
def __init__(
self,
vendor_uuid: uuid.UUID | None = None,
name: str | None = None,
data: bytes | None = None,
state: int | None = None,
flags: int | None = None,
) -> None:
self.magic = 0x55AA
self.reserved1 = 0
self.monotonicCount = 0
self.timestamp = UEFITime()
self.pubKeyIdx = 0
self.state = 0
self.flags = 0
if vendor_uuid is not None:
self.vendorUUID = uuid.UUID(bytes=vendor_uuid.bytes_le)
if state is not None:
self.state = state ^ 0xFF
else:
self.state = (0x40 | 0x80) ^ 0xFF
if flags is not None:
self.flags = flags
if name is not None:
self.name = name
self.nameLen = len(name) * 2 + 2
if data is not None:
self.data = data
self.dataLen = len(data)
def _read_flags(self) -> None:
if not (self.flags & 0x1):
self.volatile = True
if self.flags & 0x2:
self.boot_access = True
if self.flags & 0x4:
self.runtime_access = True
if self.flags & 0x8:
self.hardware_error_record = True
if self.flags & 0x10:
self.authenticated_write_access = True
if self.flags & 0x20:
self.time_based_authenticated_write_access = True
if self.flags & 0x40:
self.append_write = True
self.flags &= ~(0x1 | 0x2 | 0x4 | 0x8 | 0x10 | 0x20 | 0x40)
@classmethod
def deserialize(cls: type[EfiVariableT], f: Any) -> EfiVariableT | None:
# pylint: disable=no-member
# false positive https://github.com/PyCQA/pylint/issues/981
ret = super().deserialize(f)
if ret:
ret._read_flags()
return ret
@classmethod
def deserializeFromDocument( # noqa: N802
cls: type[EfiVariableT],
vendorID: str, # noqa: N803
name: str,
doc: dict[str, Any],
) -> EfiVariableT:
# pylint: disable=no-member
# false positive https://github.com/PyCQA/pylint/issues/981
ret = super(cls, cls).deserializeFromDocument(vendorID, name, doc)
if ret:
ret._read_flags()
return ret
class EfiVars:
"""A container around the ovmf variables"""
state_path: Path
machine: "test_driver.machine.QemuMachine"
def __init__(self, state_path: Path, machine: Any):
self.state_path = state_path
self.machine = machine
def _assert_stopped(self) -> None:
if self.machine.booted:
raise RequestedAssertionFailed(
"System is currently running and concurrent reads / writes to the OVMF variables is unsupported"
)
def read_content(self) -> dict[str, dict[str, EfiVariable]] | None:
self._assert_stopped()
try:
with open(self.state_path, "rb") as f:
fvh = FirmwareVolumeHeader.deserialize(f)
vsh = VariableStoreHeader.deserialize(f)
_ = fvh
_ = vsh
variables: dict[str, dict[str, EfiVariable]] = {}
while True:
v = EfiVariable.deserialize(f)
if not v:
break
if v.isDeleted:
continue
k = resolveUUID(v.vendorUUID)
variables.setdefault(k, {})
variables[k][v.name] = v
return variables
except FileNotFoundError:
return None
@contextmanager
def _write_store(self, *args, **kwargs) -> Generator[IO[bytes]]:
with open(self.state_path, "wb") as fo:
fm = io.BytesIO(b"\xff" * _NV_FTW_MAIN_PLUS_SPARE_SIZE)
fm.write(FirmwareVolumeHeader.create().serialize())
fm.write(VariableStoreHeader.create().serialize())
try:
yield fm
finally:
fm.seek(_NV_FTW_WORKING_OFFSET)
fm.write(_NV_FTW_WORKING_VALUE)
fm.seek(0)
fo.write(fm.read())
def create_empty(self) -> None:
self._assert_stopped()
if os.path.exists(self.state_path):
raise RequestedAssertionFailed("OVMF variables store exists")
with self._write_store():
pass
def write(self, add: list[EfiVariable]) -> None:
self._assert_stopped()
variables = self.read_content()
if not variables:
variables = {}
for var in add:
k = resolveUUID(var.vendorUUID)
variables.setdefault(k, {})
variables[k][var.name] = var
with self._write_store() as fm:
for _, vendor in variables.items():
for _, v in vendor.items():
fm.write(v.serialize())
if fm.tell() % 4:
fm.write(b"\xff" * (4 - (fm.tell() % 4)))
assert (fm.tell() % 4) == 0
class EfiGuid:
from ovmfvartool import (
gEdkiiVarErrorFlagGuid,
gEfiAuthenticatedVariableGuid,
gEfiCertDbGuid,
gEfiCustomModeEnableGuid,
gEfiGlobalVariableGuid,
gEfiImageSecurityDatabaseGuid,
gEfiIp4Config2ProtocolGuid,
gEfiIScsiInitiatorNameProtocolGuid,
gEfiMemoryTypeInformationGuid,
gEfiSecureBootEnableDisableGuid,
gEfiSystemNvDataFvGuid,
gEfiVendorKeysNvGuid,
gIScsiConfigGuid,
gMicrosoftVendorGuid,
gMtcVendorGuid,
mBmHardDriveBootVariableGuid,
)

View File

@@ -22,6 +22,7 @@ from pathlib import Path
from queue import Queue
from typing import Any
from test_driver.efi import EfiVariable, EfiVars
from test_driver.errors import MachineError, RequestedAssertionFailed
from test_driver.logger import AbstractLogger
from test_driver.machine.ocr import (
@@ -192,6 +193,7 @@ class QemuStartCommand:
def build_environment(
state_dir: Path,
shared_dir: Path,
efi_vars_path: Path | None = None,
) -> dict:
# We make a copy to not update the current environment
env = dict(os.environ)
@@ -202,6 +204,13 @@ class QemuStartCommand:
"USE_TMPDIR": "1",
}
)
if efi_vars_path is not None:
env.update(
{
"NIX_EFI_VARS": str(efi_vars_path),
}
)
return env
def run(
@@ -212,6 +221,7 @@ class QemuStartCommand:
qmp_socket_path: Path,
shell_socket_path: Path,
allow_reboot: bool,
efi_vars_path: Path | None = None,
vsock_guest: Path | None = None,
) -> subprocess.Popen:
return subprocess.Popen(
@@ -227,7 +237,9 @@ class QemuStartCommand:
stderr=subprocess.STDOUT,
shell=True,
cwd=state_dir,
env=self.build_environment(state_dir, shared_dir),
env=self.build_environment(
state_dir, shared_dir, efi_vars_path=efi_vars_path
),
)
@@ -750,6 +762,9 @@ class QemuMachine(BaseMachine):
# Store all console output for full log retrieval
full_console_log: list[str]
efi_vars_path: Path
efi_vars: EfiVars
def __init__(
self,
out_dir: Path,
@@ -791,6 +806,9 @@ class QemuMachine(BaseMachine):
self.booted = False
self.connected = False
self.efi_vars_path = self.state_dir / f"{self.name}-efi-vars.fd"
self.efi_vars = EfiVars(self.efi_vars_path, self)
def ssh_backdoor_command(self) -> str:
assert self.vsock_host is not None
return f"ssh -o User=root vsock-mux/{self.vsock_host}"
@@ -1249,6 +1267,7 @@ class QemuMachine(BaseMachine):
self.qmp_path,
self.shell_path,
allow_reboot,
self.efi_vars_path,
self.vsock_guest,
)
@@ -1433,6 +1452,28 @@ class QemuMachine(BaseMachine):
self.connected = False
self.connect()
def dump_efi_vars(self) -> None:
for var in self.read_efi_vars():
var.print()
def read_efi_vars(self) -> list[EfiVariable]:
config = self.efi_vars.read_content()
if not config:
return []
out = []
for vendor, variables in config.items():
for name, v in variables.items():
out.append(v)
return out
def create_efi_vars(self) -> None:
self.efi_vars.create_empty()
def write_efi_vars(self, add: list[EfiVariable]) -> None:
self.efi_vars.write(add)
class NspawnMachine(BaseMachine):
"""

View File

@@ -168,6 +168,7 @@ in
[[ 143 = $(cat $failed/testBuildFailure.exit) ]]
touch $out
'';
efivars = runTestOn [ "x86_64-linux" ] ./nixos-test-driver/efivars.nix;
};
# NixOS vm tests and non-vm unit tests

View File

@@ -0,0 +1,110 @@
{ pkgs, lib, ... }:
let
vendorUuid = "a19f72f8-b554-4bd7-a0c2-2762bd854691";
varName = "Demo";
readWriteVar = pkgs.writers.writePython3 "read-increment-efi-var" { } ''
import os
import sys
import fcntl
import array
if len(sys.argv) != 3:
print(f"Usage: {sys.argv[0]} NAME UUID", file=sys.stderr)
sys.exit(1)
name = sys.argv[1]
uuid = sys.argv[2]
path = f"/sys/firmware/efi/efivars/{name}-{uuid}"
FS_IMMUTABLE_FL = 0x00000010
FS_IOC_GETFLAGS = 0x80086601
FS_IOC_SETFLAGS = 0x40086602
if not os.path.exists(path):
print(f"{path}: does not exist", file=sys.stderr)
sys.exit(1)
with open(path, "rb") as f:
data = f.read()
# The first 4 bytes are attributes, the rest is the data
if data[4:] != b"\x2a":
print(f"0x2a value expected, got {data[4:]!r}", file=sys.stderr)
sys.exit(1)
fd = os.open(path, os.O_RDONLY)
arg = array.array("L", [0])
fcntl.ioctl(fd, FS_IOC_GETFLAGS, arg)
if arg[0] & FS_IMMUTABLE_FL:
arg[0] &= ~FS_IMMUTABLE_FL
fcntl.ioctl(fd, FS_IOC_SETFLAGS, arg)
os.close(fd)
with open(path, "wb") as f:
# Write 0x2b
data = bytes(list(data[:4]) + [0x2b])
f.write(data)
'';
in
{
name = "efivars";
nodes.machine = {
boot.loader.efi.canTouchEfiVariables = true;
virtualisation.useEFIBoot = true;
};
testScript = ''
import uuid
import unittest
from test_driver.efi import EfiVariable
from test_driver.errors import RequestedAssertionFailed
class TestConcurrentRead(unittest.TestCase):
def __init__(self, machine):
super().__init__()
self.machine = machine
def test_concurrent_read(self):
with self.assertRaises(RequestedAssertionFailed):
self.machine.read_efi_vars()
vendor_uuid = uuid.UUID('${vendorUuid}')
machine.create_efi_vars()
machine.write_efi_vars([
EfiVariable(
vendor_uuid=vendor_uuid,
name="${varName}",
data=bytes([0x2a]),
flags=EfiVariable.Flags.NON_VOLATILE | EfiVariable.Flags.BOOTSERVICE_ACCESS | EfiVariable.Flags.RUNTIME_ACCESS,
)
])
machine.start()
machine.wait_for_unit("multi-user.target")
print(machine.succeed('${readWriteVar} "${varName}" "${vendorUuid}"'))
TestConcurrentRead(machine).test_concurrent_read()
machine.crash()
machine.dump_efi_vars()
vars = machine.read_efi_vars()
guid = uuid.UUID(bytes=vendor_uuid.bytes_le)
predicate = lambda v: v.name == "${varName}" and v.vendorUUID == guid
var = next((v for v in vars if predicate(v)), None)
if var:
var.print()
if var.data == bytes([0x2b]):
print("Congrats!")
else:
raise ValueError("Value 0x2b expected")
else:
raise ValueError("Could not find ${varName} variable")
'';
}

View File

@@ -3,6 +3,7 @@
buildPythonPackage,
fetchFromGitHub,
pyyaml,
nixosTests,
}:
buildPythonPackage {
@@ -11,10 +12,11 @@ buildPythonPackage {
format = "setuptools";
src = fetchFromGitHub {
owner = "hlandau";
# https://github.com/hlandau/ovmfvartool/pull/4
owner = "baloo";
repo = "ovmfvartool";
rev = "45e6b1e53967ee6590faae454c076febce096931";
hash = "sha256-XbvcE/MXNj5S5N7A7jxdwgEE5yMuB82Xg+PYBsFRIm0=";
rev = "6a17190131bf44699ea27815543a65efff880142";
hash = "sha256-lIneg3kL21oxqjsraogGlOVsgmYnp38CPav1TwBg0p0=";
};
propagatedBuildInputs = [ pyyaml ];
@@ -24,6 +26,10 @@ buildPythonPackage {
pythonImportsCheck = [ "ovmfvartool" ];
passthru.tests = {
inherit (nixosTests.nixos-test-driver) efivars;
};
meta = {
description = "Parse and generate OVMF_VARS.fd from Yaml";
mainProgram = "ovmfvartool";