mirror of
https://github.com/nix-community/home-manager.git
synced 2026-06-05 21:02:51 +00:00
Help resolve issue reported a while ago about override inputs not being respected. Allows testing against a different nixpkgs revision easier.
348 lines
13 KiB
Python
Executable File
348 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
from collections.abc import Sequence
|
||
from pathlib import Path
|
||
from textwrap import dedent
|
||
|
||
SUCCESS_EMOJI = "✅"
|
||
FAILURE_EMOJI = "❌"
|
||
INFO_EMOJI = "ℹ️"
|
||
|
||
class TestRunnerError(Exception):
|
||
"""Custom exception for TestRunner errors."""
|
||
pass
|
||
|
||
def _run_command(
|
||
cmd: Sequence[str],
|
||
*,
|
||
cwd: Path | None = None,
|
||
text_input: str | None = None,
|
||
check: bool = True,
|
||
) -> subprocess.CompletedProcess:
|
||
"""A wrapper for subprocess.run with consistent error handling."""
|
||
try:
|
||
return subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
input=text_input,
|
||
check=check,
|
||
cwd=cwd,
|
||
)
|
||
except FileNotFoundError as e:
|
||
print(f"{FAILURE_EMOJI} Error: Command '{e.filename}' not found. Is it in your PATH?", file=sys.stderr)
|
||
raise TestRunnerError(f"Command not found: {e.filename}") from e
|
||
except subprocess.CalledProcessError as e:
|
||
print(f"{FAILURE_EMOJI} Error executing command: {' '.join(cmd)}", file=sys.stderr)
|
||
if e.stderr:
|
||
print(f"Nix Error Output:\n{e.stderr.strip()}", file=sys.stderr)
|
||
raise TestRunnerError("Subprocess command failed.") from e
|
||
|
||
def _read_flake_override_paths(overrides_path: str | None) -> dict[str, str]:
|
||
"""Read flake input override paths from the test wrapper."""
|
||
if not overrides_path:
|
||
return {}
|
||
|
||
try:
|
||
with open(overrides_path, encoding="utf-8") as overrides_file:
|
||
overrides = json.load(overrides_file)
|
||
except OSError as e:
|
||
raise TestRunnerError(f"Failed to read input overrides: {overrides_path}") from e
|
||
except json.JSONDecodeError as e:
|
||
raise TestRunnerError(f"Invalid input overrides: {overrides_path}") from e
|
||
|
||
if not isinstance(overrides, dict):
|
||
raise TestRunnerError(f"Invalid input overrides structure: {overrides_path}")
|
||
|
||
for name, path in overrides.items():
|
||
if not isinstance(path, str):
|
||
raise TestRunnerError(f"Invalid input override path for '{name}'")
|
||
|
||
return overrides
|
||
|
||
def _format_flake_override_args(overrides: dict[str, str]) -> list[str]:
|
||
"""Format flake input overrides as Nix CLI arguments."""
|
||
nix_args = []
|
||
for name, path in sorted(overrides.items()):
|
||
nix_args.extend(["--override-input", name, path])
|
||
return nix_args
|
||
|
||
class TestRunner:
|
||
"""Manages the discovery and execution of Nix-based tests."""
|
||
|
||
def __init__(self, repo_root: Path | None = None):
|
||
self.repo_root = repo_root or Path.cwd()
|
||
self._flake_overrides = _format_flake_override_args(
|
||
_read_flake_override_paths(
|
||
os.environ.get("HOME_MANAGER_TEST_INPUT_OVERRIDES")
|
||
)
|
||
)
|
||
|
||
def get_current_system(self) -> str:
|
||
"""Get the current system architecture using Nix."""
|
||
cmd = ["nix", "eval", "--raw", "--impure", "--expr", "builtins.currentSystem"]
|
||
result = _run_command(cmd)
|
||
return result.stdout.strip()
|
||
|
||
def discover_tests(
|
||
self,
|
||
integration: bool = False,
|
||
*,
|
||
big_only: bool = False,
|
||
) -> list[str]:
|
||
"""Discover available tests using 'nix eval'."""
|
||
system = self.get_current_system()
|
||
test_prefix = "integration-test-" if integration else "test-"
|
||
nix_apply_expr = (
|
||
'pkgs: builtins.concatStringsSep "\\n" '
|
||
f'(builtins.filter (name: builtins.match "{test_prefix}.*" name != null) '
|
||
'(builtins.attrNames pkgs))'
|
||
)
|
||
|
||
cmd = [
|
||
"nix", "eval", "--raw", *self._flake_overrides,
|
||
f".#legacyPackages.{system}", "--apply", nix_apply_expr
|
||
]
|
||
result = _run_command(cmd, cwd=self.repo_root)
|
||
discovered = result.stdout.splitlines()
|
||
|
||
if integration:
|
||
if not big_only:
|
||
return discovered
|
||
|
||
print(
|
||
"⚠️ --big-only has no effect for integration tests; returning all discovered integration tests.",
|
||
file=sys.stderr,
|
||
)
|
||
return discovered
|
||
|
||
if not big_only:
|
||
return discovered
|
||
|
||
discovered_suffixes = {
|
||
test.removeprefix(test_prefix)
|
||
for test in discovered
|
||
}
|
||
modules_root = self.repo_root / "tests" / "modules"
|
||
big_test_suffixes = set()
|
||
|
||
for module_path in modules_root.rglob("*.nix"):
|
||
try:
|
||
content = module_path.read_text(encoding="utf-8")
|
||
except OSError:
|
||
continue
|
||
|
||
if "config.test.enableBig" not in content:
|
||
continue
|
||
|
||
name_parts = list(module_path.relative_to(modules_root).with_suffix("").parts)
|
||
if name_parts and name_parts[-1] == "default":
|
||
name_parts = name_parts[:-1]
|
||
if not name_parts:
|
||
continue
|
||
|
||
for i in range(len(name_parts)):
|
||
candidate = "-".join(name_parts[i:])
|
||
if candidate in discovered_suffixes:
|
||
big_test_suffixes.add(candidate)
|
||
break
|
||
|
||
return [
|
||
test for test in discovered
|
||
if test.removeprefix(test_prefix) in big_test_suffixes
|
||
]
|
||
|
||
def filter_tests(self, tests: list[str], filters: list[str]) -> list[str]:
|
||
"""Filter tests based on a list of substrings."""
|
||
if not filters:
|
||
return tests
|
||
return [test for test in tests if any(f in test for f in filters)]
|
||
|
||
def interactive_select(self, tests: list[str]) -> list[str]:
|
||
"""Allow interactive test selection using fzf."""
|
||
if not tests:
|
||
return []
|
||
|
||
fzf_input = "\n".join(tests)
|
||
cmd = ["fzf", "--multi", "--header=Select tests (TAB to select, ENTER to confirm)"]
|
||
|
||
try:
|
||
result = _run_command(cmd, text_input=fzf_input)
|
||
return result.stdout.splitlines()
|
||
except TestRunnerError:
|
||
# Can happen if fzf is not found or the user cancels (non-zero exit)
|
||
return []
|
||
|
||
def _get_store_path(self, test: str, nix_args: list[str]) -> str | None:
|
||
"""Retrieve the store path of a test."""
|
||
try:
|
||
store_cmd = [
|
||
"nix", "build", "--no-link", "--json", "--reference-lock-file", "flake.lock",
|
||
*self._flake_overrides, f"./tests#{test}", *nix_args
|
||
]
|
||
result = _run_command(store_cmd, cwd=self.repo_root, check=False)
|
||
if result.returncode == 0:
|
||
build_info = json.loads(result.stdout)
|
||
if build_info:
|
||
return build_info[0]["outputs"]["out"]
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
def run_tests(self, tests_to_run: list[str], nix_args: list[str]) -> bool:
|
||
"""Run the selected tests and report the outcome."""
|
||
if not tests_to_run:
|
||
print(f"{INFO_EMOJI} No tests selected to run.", file=sys.stderr)
|
||
return True
|
||
|
||
count = len(tests_to_run)
|
||
print(f"{INFO_EMOJI} Running {count} test(s)...")
|
||
failed_tests = []
|
||
|
||
for i, test in enumerate(tests_to_run, 1):
|
||
print(f"\n--- Running test {i}/{count}: {test} ---")
|
||
cmd = [
|
||
"nix", "build", "-L", "--keep-failed", "--reference-lock-file", "flake.lock",
|
||
*self._flake_overrides, f"./tests#{test}", *nix_args
|
||
]
|
||
try:
|
||
subprocess.run(cmd, check=True, cwd=self.repo_root, capture_output=True)
|
||
print(f"{SUCCESS_EMOJI} Test passed: {test}")
|
||
|
||
store_path = self._get_store_path(test, nix_args)
|
||
if store_path:
|
||
print(f"{INFO_EMOJI} Test directory available at: {store_path}/tested/", file=sys.stderr)
|
||
|
||
except subprocess.CalledProcessError as e:
|
||
failed_tests.append(test)
|
||
print(f"{FAILURE_EMOJI} Test failed: {test}", file=sys.stderr)
|
||
|
||
stderr_text = None
|
||
if e.stderr:
|
||
stderr_text = e.stderr.decode(errors="replace")
|
||
print(stderr_text, file=sys.stderr)
|
||
|
||
import re
|
||
if stderr_text:
|
||
build_dir_match = re.search(r"keeping build directory '([^']+)'", stderr_text)
|
||
if build_dir_match:
|
||
build_dir = build_dir_match.group(1)
|
||
try:
|
||
import glob
|
||
attr_files = glob.glob(f"{build_dir}/.attr-*")
|
||
for attr_file in attr_files:
|
||
with open(attr_file, 'r') as f:
|
||
content = f.read()
|
||
tested_match = re.search(r'TESTED="([^"]+)"', content)
|
||
if tested_match:
|
||
tested_path = tested_match.group(1)
|
||
print(f"{INFO_EMOJI} Generated test directory at: {tested_path}/", file=sys.stderr)
|
||
break
|
||
except Exception:
|
||
print(f"{INFO_EMOJI} Build directory available at: {build_dir}", file=sys.stderr)
|
||
|
||
store_path = self._get_store_path(test, nix_args)
|
||
if store_path:
|
||
print(f"{INFO_EMOJI} Test directory available at: {store_path}/tested/", file=sys.stderr)
|
||
|
||
print("\n--- Summary ---")
|
||
if not failed_tests:
|
||
print(f"{SUCCESS_EMOJI} All {count} tests passed!")
|
||
return True
|
||
else:
|
||
print(f"{FAILURE_EMOJI} {len(failed_tests)} of {count} test(s) failed:")
|
||
for test in failed_tests:
|
||
print(f" - {test}")
|
||
return False
|
||
|
||
def main() -> None:
|
||
"""Main entry point for the test runner script."""
|
||
parser = argparse.ArgumentParser(
|
||
description="A modern test runner for Home Manager.",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog=dedent("""\
|
||
Examples:
|
||
%(prog)s
|
||
Run tests interactively.
|
||
%(prog)s -l
|
||
List all available tests.
|
||
%(prog)s -l alacritty
|
||
List tests matching 'alacritty'.
|
||
%(prog)s alacritty
|
||
Run all tests matching 'alacritty'.
|
||
%(prog)s -i firefox git
|
||
Interactively select from tests matching 'firefox' or 'git'.
|
||
%(prog)s --big-only
|
||
Run only tests enabled by `test.enableBig`.
|
||
%(prog)s --big-only -l
|
||
List only tests enabled by `test.enableBig`.
|
||
%(prog)s -t
|
||
Run integration tests interactively.
|
||
%(prog)s -- --show-trace
|
||
Pass '--show-trace' to all 'nix build' commands.
|
||
""")
|
||
)
|
||
parser.add_argument(
|
||
'-l', '--list', action='store_true', help='List available tests instead of running them.'
|
||
)
|
||
parser.add_argument(
|
||
'-i', '--interactive', action='store_true', help='Force interactive test selection using fzf.'
|
||
)
|
||
parser.add_argument(
|
||
'-t', '--integration', action='store_true', help='Discover and run integration tests.'
|
||
)
|
||
parser.add_argument(
|
||
'--big-only', action='store_true', help='Only run tests enabled by `test.enableBig`.'
|
||
)
|
||
parser.add_argument(
|
||
'filters', nargs='*', help='Filter tests by name (partial matches work).'
|
||
)
|
||
parser.add_argument(
|
||
'nix_args', nargs=argparse.REMAINDER,
|
||
help="Arguments to pass to 'nix build', must be after '--'."
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
# Strip the '--' if it exists
|
||
nix_args = [arg for arg in args.nix_args if arg != '--']
|
||
|
||
runner = TestRunner()
|
||
try:
|
||
print(f"{INFO_EMOJI} Discovering tests...", file=sys.stderr)
|
||
all_tests = runner.discover_tests(integration=args.integration, big_only=args.big_only)
|
||
if not all_tests:
|
||
print("No tests found for the current configuration.", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
tests_to_consider = runner.filter_tests(all_tests, args.filters)
|
||
if not tests_to_consider:
|
||
print("No tests match the provided filters.", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
if args.list:
|
||
print("\n".join(tests_to_consider))
|
||
print(f"\n{INFO_EMOJI} Found {len(tests_to_consider)} matching tests.", file=sys.stderr)
|
||
return
|
||
|
||
# Determine which tests to run
|
||
should_be_interactive = args.interactive or not args.filters
|
||
if should_be_interactive:
|
||
tests_to_run = runner.interactive_select(tests_to_consider)
|
||
else:
|
||
tests_to_run = tests_to_consider
|
||
|
||
if not runner.run_tests(tests_to_run, nix_args):
|
||
sys.exit(1)
|
||
|
||
except TestRunnerError:
|
||
# Error messages are printed by the functions that raise the exception
|
||
sys.exit(1)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|