mirror of
https://github.com/NixOS/nixpkgs.git
synced 2026-06-05 21:03:40 +00:00
nixos/test-driver: add utils to manipulate efi variables
```python
machine.create_efi_vars()
machine.write_efi_vars([
EfiVariable(
uuid.UUID("65a72bd9-f42b-4039-8084-66cd1702cb24"),
"AbrNextBoot",
b"a",
flags=EfiVariable.Flags.NON_VOLATILE
)
])
machine.start()
```
This commit is contained in:
@@ -11,6 +11,7 @@
|
||||
ptpython,
|
||||
pydantic,
|
||||
python,
|
||||
ovmfvartool,
|
||||
remote-pdb,
|
||||
ruff,
|
||||
ty,
|
||||
@@ -49,6 +50,7 @@ buildPythonApplication {
|
||||
junit-xml
|
||||
ptpython
|
||||
pydantic
|
||||
ovmfvartool
|
||||
remote-pdb
|
||||
]
|
||||
++ extraPythonPackages python.pkgs;
|
||||
|
||||
233
nixos/lib/test-driver/src/test_driver/efi.py
Normal file
233
nixos/lib/test-driver/src/test_driver/efi.py
Normal file
@@ -0,0 +1,233 @@
|
||||
import binascii
|
||||
import io
|
||||
import os.path
|
||||
import uuid
|
||||
from collections.abc import Generator
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from typing import IO, Any, TypeVar
|
||||
|
||||
from ovmfvartool import (
|
||||
AuthenticatedVariable,
|
||||
FirmwareVolumeHeader,
|
||||
UEFITime,
|
||||
VariableStoreHeader,
|
||||
resolveUUID,
|
||||
)
|
||||
|
||||
import test_driver.machine
|
||||
from test_driver.errors import RequestedAssertionFailed
|
||||
|
||||
EfiVariableT = TypeVar("EfiVariableT", bound="EfiVariable")
|
||||
|
||||
# See edk2.git/OvmfPkg/Bhyve/VarStore.fdf.inc
|
||||
_NV_FTW_WORKING_OFFSET = 0x41000
|
||||
_NV_FTW_WORKING_VALUE = binascii.unhexlify(
|
||||
b"2b29589e687c7d49a0ce6500fd9f1b952caf2c64feffffffe00f000000000000"
|
||||
)
|
||||
_NV_FTW_SIZE = 0x42000
|
||||
_NV_FTW_MAIN_PLUS_SPARE_SIZE = _NV_FTW_SIZE * 2
|
||||
|
||||
|
||||
class EfiVariable(AuthenticatedVariable):
|
||||
class Flags:
|
||||
NON_VOLATILE = 0x1
|
||||
BOOTSERVICE_ACCESS = 0x2
|
||||
RUNTIME_ACCESS = 0x4
|
||||
TIME_BASED_AUTHENTICATED_WRITE_ACCESS = 0x20
|
||||
|
||||
class State:
|
||||
VAR_HEADER_VALID_ONLY = 0x7F
|
||||
VAR_ADDED = 0x3F
|
||||
|
||||
volatile = False
|
||||
boot_access = False
|
||||
runtime_access = False
|
||||
hardware_error_record = False
|
||||
authenticated_write_access = False
|
||||
time_based_authenticated_write_access = False
|
||||
append_write = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
vendor_uuid: uuid.UUID | None = None,
|
||||
name: str | None = None,
|
||||
data: bytes | None = None,
|
||||
state: int | None = None,
|
||||
flags: int | None = None,
|
||||
) -> None:
|
||||
self.magic = 0x55AA
|
||||
self.reserved1 = 0
|
||||
self.monotonicCount = 0
|
||||
self.timestamp = UEFITime()
|
||||
self.pubKeyIdx = 0
|
||||
self.state = 0
|
||||
self.flags = 0
|
||||
|
||||
if vendor_uuid is not None:
|
||||
self.vendorUUID = uuid.UUID(bytes=vendor_uuid.bytes_le)
|
||||
|
||||
if state is not None:
|
||||
self.state = state ^ 0xFF
|
||||
else:
|
||||
self.state = (0x40 | 0x80) ^ 0xFF
|
||||
|
||||
if flags is not None:
|
||||
self.flags = flags
|
||||
|
||||
if name is not None:
|
||||
self.name = name
|
||||
self.nameLen = len(name) * 2 + 2
|
||||
|
||||
if data is not None:
|
||||
self.data = data
|
||||
self.dataLen = len(data)
|
||||
|
||||
def _read_flags(self) -> None:
|
||||
if not (self.flags & 0x1):
|
||||
self.volatile = True
|
||||
if self.flags & 0x2:
|
||||
self.boot_access = True
|
||||
if self.flags & 0x4:
|
||||
self.runtime_access = True
|
||||
if self.flags & 0x8:
|
||||
self.hardware_error_record = True
|
||||
if self.flags & 0x10:
|
||||
self.authenticated_write_access = True
|
||||
if self.flags & 0x20:
|
||||
self.time_based_authenticated_write_access = True
|
||||
if self.flags & 0x40:
|
||||
self.append_write = True
|
||||
|
||||
self.flags &= ~(0x1 | 0x2 | 0x4 | 0x8 | 0x10 | 0x20 | 0x40)
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls: type[EfiVariableT], f: Any) -> EfiVariableT | None:
|
||||
# pylint: disable=no-member
|
||||
# false positive https://github.com/PyCQA/pylint/issues/981
|
||||
ret = super().deserialize(f)
|
||||
if ret:
|
||||
ret._read_flags()
|
||||
return ret
|
||||
|
||||
@classmethod
|
||||
def deserializeFromDocument( # noqa: N802
|
||||
cls: type[EfiVariableT],
|
||||
vendorID: str, # noqa: N803
|
||||
name: str,
|
||||
doc: dict[str, Any],
|
||||
) -> EfiVariableT:
|
||||
# pylint: disable=no-member
|
||||
# false positive https://github.com/PyCQA/pylint/issues/981
|
||||
ret = super(cls, cls).deserializeFromDocument(vendorID, name, doc)
|
||||
if ret:
|
||||
ret._read_flags()
|
||||
return ret
|
||||
|
||||
|
||||
class EfiVars:
|
||||
"""A container around the ovmf variables"""
|
||||
|
||||
state_path: Path
|
||||
machine: "test_driver.machine.QemuMachine"
|
||||
|
||||
def __init__(self, state_path: Path, machine: Any):
|
||||
self.state_path = state_path
|
||||
self.machine = machine
|
||||
|
||||
def _assert_stopped(self) -> None:
|
||||
if self.machine.booted:
|
||||
raise RequestedAssertionFailed(
|
||||
"System is currently running and concurrent reads / writes to the OVMF variables is unsupported"
|
||||
)
|
||||
|
||||
def read_content(self) -> dict[str, dict[str, EfiVariable]] | None:
|
||||
self._assert_stopped()
|
||||
try:
|
||||
with open(self.state_path, "rb") as f:
|
||||
fvh = FirmwareVolumeHeader.deserialize(f)
|
||||
vsh = VariableStoreHeader.deserialize(f)
|
||||
_ = fvh
|
||||
_ = vsh
|
||||
variables: dict[str, dict[str, EfiVariable]] = {}
|
||||
|
||||
while True:
|
||||
v = EfiVariable.deserialize(f)
|
||||
if not v:
|
||||
break
|
||||
if v.isDeleted:
|
||||
continue
|
||||
|
||||
k = resolveUUID(v.vendorUUID)
|
||||
variables.setdefault(k, {})
|
||||
variables[k][v.name] = v
|
||||
|
||||
return variables
|
||||
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
|
||||
@contextmanager
|
||||
def _write_store(self, *args, **kwargs) -> Generator[IO[bytes]]:
|
||||
with open(self.state_path, "wb") as fo:
|
||||
fm = io.BytesIO(b"\xff" * _NV_FTW_MAIN_PLUS_SPARE_SIZE)
|
||||
fm.write(FirmwareVolumeHeader.create().serialize())
|
||||
fm.write(VariableStoreHeader.create().serialize())
|
||||
|
||||
try:
|
||||
yield fm
|
||||
finally:
|
||||
fm.seek(_NV_FTW_WORKING_OFFSET)
|
||||
fm.write(_NV_FTW_WORKING_VALUE)
|
||||
fm.seek(0)
|
||||
fo.write(fm.read())
|
||||
|
||||
def create_empty(self) -> None:
|
||||
self._assert_stopped()
|
||||
|
||||
if os.path.exists(self.state_path):
|
||||
raise RequestedAssertionFailed("OVMF variables store exists")
|
||||
|
||||
with self._write_store():
|
||||
pass
|
||||
|
||||
def write(self, add: list[EfiVariable]) -> None:
|
||||
self._assert_stopped()
|
||||
|
||||
variables = self.read_content()
|
||||
if not variables:
|
||||
variables = {}
|
||||
|
||||
for var in add:
|
||||
k = resolveUUID(var.vendorUUID)
|
||||
variables.setdefault(k, {})
|
||||
variables[k][var.name] = var
|
||||
|
||||
with self._write_store() as fm:
|
||||
for _, vendor in variables.items():
|
||||
for _, v in vendor.items():
|
||||
fm.write(v.serialize())
|
||||
if fm.tell() % 4:
|
||||
fm.write(b"\xff" * (4 - (fm.tell() % 4)))
|
||||
assert (fm.tell() % 4) == 0
|
||||
|
||||
|
||||
class EfiGuid:
|
||||
from ovmfvartool import (
|
||||
gEdkiiVarErrorFlagGuid,
|
||||
gEfiAuthenticatedVariableGuid,
|
||||
gEfiCertDbGuid,
|
||||
gEfiCustomModeEnableGuid,
|
||||
gEfiGlobalVariableGuid,
|
||||
gEfiImageSecurityDatabaseGuid,
|
||||
gEfiIp4Config2ProtocolGuid,
|
||||
gEfiIScsiInitiatorNameProtocolGuid,
|
||||
gEfiMemoryTypeInformationGuid,
|
||||
gEfiSecureBootEnableDisableGuid,
|
||||
gEfiSystemNvDataFvGuid,
|
||||
gEfiVendorKeysNvGuid,
|
||||
gIScsiConfigGuid,
|
||||
gMicrosoftVendorGuid,
|
||||
gMtcVendorGuid,
|
||||
mBmHardDriveBootVariableGuid,
|
||||
)
|
||||
@@ -22,6 +22,7 @@ from pathlib import Path
|
||||
from queue import Queue
|
||||
from typing import Any
|
||||
|
||||
from test_driver.efi import EfiVariable, EfiVars
|
||||
from test_driver.errors import MachineError, RequestedAssertionFailed
|
||||
from test_driver.logger import AbstractLogger
|
||||
from test_driver.machine.ocr import (
|
||||
@@ -192,6 +193,7 @@ class QemuStartCommand:
|
||||
def build_environment(
|
||||
state_dir: Path,
|
||||
shared_dir: Path,
|
||||
efi_vars_path: Path | None = None,
|
||||
) -> dict:
|
||||
# We make a copy to not update the current environment
|
||||
env = dict(os.environ)
|
||||
@@ -202,6 +204,13 @@ class QemuStartCommand:
|
||||
"USE_TMPDIR": "1",
|
||||
}
|
||||
)
|
||||
if efi_vars_path is not None:
|
||||
env.update(
|
||||
{
|
||||
"NIX_EFI_VARS": str(efi_vars_path),
|
||||
}
|
||||
)
|
||||
|
||||
return env
|
||||
|
||||
def run(
|
||||
@@ -212,6 +221,7 @@ class QemuStartCommand:
|
||||
qmp_socket_path: Path,
|
||||
shell_socket_path: Path,
|
||||
allow_reboot: bool,
|
||||
efi_vars_path: Path | None = None,
|
||||
vsock_guest: Path | None = None,
|
||||
) -> subprocess.Popen:
|
||||
return subprocess.Popen(
|
||||
@@ -227,7 +237,9 @@ class QemuStartCommand:
|
||||
stderr=subprocess.STDOUT,
|
||||
shell=True,
|
||||
cwd=state_dir,
|
||||
env=self.build_environment(state_dir, shared_dir),
|
||||
env=self.build_environment(
|
||||
state_dir, shared_dir, efi_vars_path=efi_vars_path
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@@ -750,6 +762,9 @@ class QemuMachine(BaseMachine):
|
||||
# Store all console output for full log retrieval
|
||||
full_console_log: list[str]
|
||||
|
||||
efi_vars_path: Path
|
||||
efi_vars: EfiVars
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
out_dir: Path,
|
||||
@@ -791,6 +806,9 @@ class QemuMachine(BaseMachine):
|
||||
self.booted = False
|
||||
self.connected = False
|
||||
|
||||
self.efi_vars_path = self.state_dir / f"{self.name}-efi-vars.fd"
|
||||
self.efi_vars = EfiVars(self.efi_vars_path, self)
|
||||
|
||||
def ssh_backdoor_command(self) -> str:
|
||||
assert self.vsock_host is not None
|
||||
return f"ssh -o User=root vsock-mux/{self.vsock_host}"
|
||||
@@ -1247,6 +1265,7 @@ class QemuMachine(BaseMachine):
|
||||
self.qmp_path,
|
||||
self.shell_path,
|
||||
allow_reboot,
|
||||
self.efi_vars_path,
|
||||
self.vsock_guest,
|
||||
)
|
||||
|
||||
@@ -1431,6 +1450,28 @@ class QemuMachine(BaseMachine):
|
||||
self.connected = False
|
||||
self.connect()
|
||||
|
||||
def dump_efi_vars(self) -> None:
|
||||
for var in self.read_efi_vars():
|
||||
var.print()
|
||||
|
||||
def read_efi_vars(self) -> list[EfiVariable]:
|
||||
config = self.efi_vars.read_content()
|
||||
if not config:
|
||||
return []
|
||||
|
||||
out = []
|
||||
for vendor, variables in config.items():
|
||||
for name, v in variables.items():
|
||||
out.append(v)
|
||||
|
||||
return out
|
||||
|
||||
def create_efi_vars(self) -> None:
|
||||
self.efi_vars.create_empty()
|
||||
|
||||
def write_efi_vars(self, add: list[EfiVariable]) -> None:
|
||||
self.efi_vars.write(add)
|
||||
|
||||
|
||||
class NspawnMachine(BaseMachine):
|
||||
"""
|
||||
|
||||
@@ -166,6 +166,7 @@ in
|
||||
[[ 143 = $(cat $failed/testBuildFailure.exit) ]]
|
||||
touch $out
|
||||
'';
|
||||
efivars = runTestOn [ "x86_64-linux" ] ./nixos-test-driver/efivars.nix;
|
||||
};
|
||||
|
||||
# NixOS vm tests and non-vm unit tests
|
||||
|
||||
110
nixos/tests/nixos-test-driver/efivars.nix
Normal file
110
nixos/tests/nixos-test-driver/efivars.nix
Normal file
@@ -0,0 +1,110 @@
|
||||
{ pkgs, lib, ... }:
|
||||
let
|
||||
vendorUuid = "a19f72f8-b554-4bd7-a0c2-2762bd854691";
|
||||
varName = "Demo";
|
||||
readWriteVar = pkgs.writers.writePython3 "read-increment-efi-var" { } ''
|
||||
import os
|
||||
import sys
|
||||
import fcntl
|
||||
import array
|
||||
|
||||
if len(sys.argv) != 3:
|
||||
print(f"Usage: {sys.argv[0]} NAME UUID", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
name = sys.argv[1]
|
||||
uuid = sys.argv[2]
|
||||
path = f"/sys/firmware/efi/efivars/{name}-{uuid}"
|
||||
|
||||
FS_IMMUTABLE_FL = 0x00000010
|
||||
FS_IOC_GETFLAGS = 0x80086601
|
||||
FS_IOC_SETFLAGS = 0x40086602
|
||||
|
||||
if not os.path.exists(path):
|
||||
print(f"{path}: does not exist", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
with open(path, "rb") as f:
|
||||
data = f.read()
|
||||
|
||||
# The first 4 bytes are attributes, the rest is the data
|
||||
if data[4:] != b"\x2a":
|
||||
print(f"0x2a value expected, got {data[4:]!r}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
fd = os.open(path, os.O_RDONLY)
|
||||
arg = array.array("L", [0])
|
||||
fcntl.ioctl(fd, FS_IOC_GETFLAGS, arg)
|
||||
if arg[0] & FS_IMMUTABLE_FL:
|
||||
arg[0] &= ~FS_IMMUTABLE_FL
|
||||
fcntl.ioctl(fd, FS_IOC_SETFLAGS, arg)
|
||||
os.close(fd)
|
||||
|
||||
with open(path, "wb") as f:
|
||||
# Write 0x2b
|
||||
data = bytes(list(data[:4]) + [0x2b])
|
||||
f.write(data)
|
||||
'';
|
||||
in
|
||||
{
|
||||
name = "efivars";
|
||||
|
||||
nodes.machine = {
|
||||
boot.loader.efi.canTouchEfiVariables = true;
|
||||
virtualisation.useEFIBoot = true;
|
||||
};
|
||||
|
||||
testScript = ''
|
||||
import uuid
|
||||
import unittest
|
||||
|
||||
from test_driver.efi import EfiVariable
|
||||
from test_driver.errors import RequestedAssertionFailed
|
||||
|
||||
|
||||
class TestConcurrentRead(unittest.TestCase):
|
||||
def __init__(self, machine):
|
||||
super().__init__()
|
||||
self.machine = machine
|
||||
|
||||
def test_concurrent_read(self):
|
||||
with self.assertRaises(RequestedAssertionFailed):
|
||||
self.machine.read_efi_vars()
|
||||
|
||||
|
||||
vendor_uuid = uuid.UUID('${vendorUuid}')
|
||||
machine.create_efi_vars()
|
||||
machine.write_efi_vars([
|
||||
EfiVariable(
|
||||
vendor_uuid=vendor_uuid,
|
||||
name="${varName}",
|
||||
data=bytes([0x2a]),
|
||||
flags=EfiVariable.Flags.NON_VOLATILE | EfiVariable.Flags.BOOTSERVICE_ACCESS | EfiVariable.Flags.RUNTIME_ACCESS,
|
||||
)
|
||||
])
|
||||
|
||||
machine.start()
|
||||
machine.wait_for_unit("multi-user.target")
|
||||
|
||||
print(machine.succeed('${readWriteVar} "${varName}" "${vendorUuid}"'))
|
||||
|
||||
TestConcurrentRead(machine).test_concurrent_read()
|
||||
machine.crash()
|
||||
|
||||
machine.dump_efi_vars()
|
||||
vars = machine.read_efi_vars()
|
||||
|
||||
guid = uuid.UUID(bytes=vendor_uuid.bytes_le)
|
||||
predicate = lambda v: v.name == "${varName}" and v.vendorUUID == guid
|
||||
var = next((v for v in vars if predicate(v)), None)
|
||||
|
||||
if var:
|
||||
var.print()
|
||||
if var.data == bytes([0x2b]):
|
||||
print("Congrats!")
|
||||
else:
|
||||
raise ValueError("Value 0x2b expected")
|
||||
else:
|
||||
raise ValueError("Could not find ${varName} variable")
|
||||
'';
|
||||
}
|
||||
@@ -3,6 +3,7 @@
|
||||
buildPythonPackage,
|
||||
fetchFromGitHub,
|
||||
pyyaml,
|
||||
nixosTests,
|
||||
}:
|
||||
|
||||
buildPythonPackage {
|
||||
@@ -11,10 +12,11 @@ buildPythonPackage {
|
||||
format = "setuptools";
|
||||
|
||||
src = fetchFromGitHub {
|
||||
owner = "hlandau";
|
||||
# https://github.com/hlandau/ovmfvartool/pull/4
|
||||
owner = "baloo";
|
||||
repo = "ovmfvartool";
|
||||
rev = "45e6b1e53967ee6590faae454c076febce096931";
|
||||
hash = "sha256-XbvcE/MXNj5S5N7A7jxdwgEE5yMuB82Xg+PYBsFRIm0=";
|
||||
rev = "6a17190131bf44699ea27815543a65efff880142";
|
||||
hash = "sha256-lIneg3kL21oxqjsraogGlOVsgmYnp38CPav1TwBg0p0=";
|
||||
};
|
||||
|
||||
propagatedBuildInputs = [ pyyaml ];
|
||||
@@ -24,6 +26,10 @@ buildPythonPackage {
|
||||
|
||||
pythonImportsCheck = [ "ovmfvartool" ];
|
||||
|
||||
passthru.tests = {
|
||||
inherit (nixosTests.nixos-test-driver) efivars;
|
||||
};
|
||||
|
||||
meta = {
|
||||
description = "Parse and generate OVMF_VARS.fd from Yaml";
|
||||
mainProgram = "ovmfvartool";
|
||||
|
||||
Reference in New Issue
Block a user