From 3e619dafccddd1ec3b44a3fa8815ea49a1188244 Mon Sep 17 00:00:00 2001 From: Bryan Gardiner Date: Fri, 9 Apr 2021 19:21:39 -0700 Subject: [PATCH] Initial commit. --- nvd | 538 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ nvd.1 | 64 +++++++ 2 files changed, 602 insertions(+) create mode 100755 nvd create mode 100644 nvd.1 diff --git a/nvd b/nvd new file mode 100755 index 0000000..64f72cd --- /dev/null +++ b/nvd @@ -0,0 +1,538 @@ +#!/usr/bin/env python3 + +# TODO Closure disk size change. + +# Package install state (by pname): +# A: Added to closure +# R: Removed from closure +# U: Upgrade +# D: Downgrade +# C: Changed versions or selection state + +# Package selection state (by pname): +# +: Package is newly selected in environment.systemPackages. +# -: Package is newly unselected. +# *: Package is selected; state unchanged. +# .: Not in environment.systemPackages; dependency. +# l: Package manifest only available on the left; not selected there. +# L: Package manifest only available on the left; selected there. +# r: Package manifest only available on the right; not selected there. +# R: Package manifest only available on the right; selected there. + +import argparse +import os +import os.path +import re +import subprocess +import sys +from collections import namedtuple +from pathlib import Path +from subprocess import PIPE +from typing import Dict, List, Optional, Tuple, Union + +parser = argparse.ArgumentParser( + description="Nix/NixOS package version diff tool", + epilog="See the nvd(1) manual page for more information.", +) +parser.add_argument( + "--color", + default="auto", + help="Controls use of colour; one of 'auto', 'never', 'always'.") +parser.add_argument( + "root1", + help="The first Nix store ptah to compare.") +parser.add_argument( + "root2", + help="The second Nix store path to compare.") + +ARGS = parser.parse_args() + +USE_COLOUR = ARGS.color == "always" or (ARGS.color == "auto" and sys.stdout.isatty()) + +SGR_RESET = 0 +SGR_BOLD = 1 +SGR_FG = 30 +SGR_BG = 40 +SGR_BRIGHT = 60 # Add to SGR_FG or SGR_BG. +SGR_BLACK = 0 +SGR_RED = 1 +SGR_GREEN = 2 +SGR_YELLOW = 3 +SGR_BLUE = 4 +SGR_MAGENTA = 5 +SGR_CYAN = 6 + +def sgr(*args): + assert all(isinstance(arg, int) for arg in args), \ + "sgr() args must all be integers: {args!r}" + if not USE_COLOUR: + return "" + return "\x1b[" + ";".join(str(arg) for arg in args) + "m" + +INST_ADDED = sgr(SGR_FG + SGR_BRIGHT + SGR_GREEN) + "A" +INST_REMOVED = sgr(SGR_FG + SGR_BRIGHT + SGR_RED) + "R" +INST_UPGRADED = sgr(SGR_FG + SGR_BRIGHT + SGR_CYAN) + "U" +INST_DOWNGRADED = sgr(SGR_FG + SGR_BRIGHT + SGR_YELLOW) + "D" +INST_CHANGED = sgr(SGR_FG + SGR_BRIGHT + SGR_MAGENTA) + "C" + +SEL_SELECTED = "*" +SEL_UNSELECTED = "." +SEL_NEWLY_SELECTED = "+" +SEL_NEWLY_UNSELECTED = "-" +SEL_LEFT_ONLY_SELECTED = "L" +SEL_LEFT_ONLY_UNSELECTED = "l" +SEL_RIGHT_ONLY_SELECTED = "R" +SEL_RIGHT_ONLY_UNSELECTED = "r" +SEL_NO_MANIFESTS = "" + +NIX_STORE_PATH_REGEX = re.compile(r"^/nix/store/[a-z0-9]+-(.+?)(-([0-9].*?))?(\.drv)?$") + +def raise_arg(e): + raise e + +class StorePath: + def __init__(self, path: Union[str, Path]): + assert str(path).startswith("/nix/store/"), \ + f"Doesn't start with /nix/store/: {str(resolved)!r}" + self._path = Path(path) + + def __str__(self) -> str: + return f"" + + def __hash__(self): + return hash(self._path) + + def __eq__(self, other): + return type(other) is StorePath and self._path == other._path + + def path(self) -> Path: + return self._path + + def to_base_path(self) -> "StorePath": + return StorePath(os.path.join(*self._path.parts[0:4])) + +# For the version comparison algorithm, see: +# https://nixos.org/manual/nix/stable/#ssec-version-comparisons + +class VersionChunk: + def __init__(self, chunk_value: Union[int, str]): + assert isinstance(chunk_value, int) or isinstance(chunk_value, str) + self._chunk_value = chunk_value + + def __str__(self): + return repr(self._chunk_value) + + def __lt__(self, other): + if not isinstance(other, VersionChunk): + return NotImplemented + + x = self._chunk_value + y = other._chunk_value + x_int = isinstance(x, int) + y_int = isinstance(y, int) + if x_int and y_int: + return x < y + elif (x == "" and y_int) or x == "pre": + return True + elif (x_int and y == "") or y == "pre": + return False + elif x_int: # y is a string + return False + elif y_int: # x is a string + return True + else: # both are strings + return x < y + + def __eq__(self, other): + if not isinstance(other, VersionChunk): + return NotImplemented + + return self._chunk_value == other._chunk_value + +class Version: + def __init__(self, text: Optional[str]): + if text is None: + text = "" + + self._text = text + + self._chunks: List[VersionChunk] = [] + while text != "": + first_char = text[0] + if first_char.isdigit(): + last = 0 + while last + 1 < len(text) and text[last + 1].isdigit(): + last += 1 + self._chunks.append(VersionChunk(int(text[0 : last + 1]))) + text = text[last + 1:] + elif first_char.isalpha(): + last = 0 + while last + 1 < len(text) and text[last + 1].isalpha(): + last += 1 + self._chunks.append(VersionChunk(text[0 : last + 1])) + text = text[last + 1:] + else: + last = 0 + while last + 1 < len(text) and not text[last + 1].isdigit() and not text[last + 1].isalpha(): + last += 1 + # No chunk append here, only care about alnum runs. + text = text[last + 1:] + + def __eq__(self, other): + if not isinstance(other, Version): + return NotImplemented + return self._chunks == other._chunks + + def __lt__(self, other): + if not isinstance(other, Version): + return NotImplemented + return self._chunks < other._chunks + +class Package: + def __init__(self, *, pname: str, version: Version, store_path: StorePath): + assert isinstance(pname, str), f"Not a string: {pname!r}" + assert isinstance(version, Version), f"Not a Version: {version!r}" + assert isinstance(store_path, StorePath), f"Not a StorePath: {store_path!r}" + self._pname = pname + self._version = version + self._store_path = store_path + + def pname(self) -> str: + return self._pname + + def version(self) -> Version: + return self._version + + def store_path(self) -> StorePath: + return self._store_path + +class PackageManifest: + def __init__(self, packages: List[Package]): + assert isinstance(packages, List) + assert all(isinstance(package, Package) for package in packages) + + self._packages = packages + + self._packages_by_pname: Dict[str, List[Package]] = {} + for entry in packages: + self._packages_by_pname.setdefault(entry.pname(), []).append(entry) + + @staticmethod + def parse_tree(root: Path) -> "PackageManifest": + assert isinstance(root, Path), f"Not a Path: {root!r}" + + store_paths = set() + for curdir, dirs, files in os.walk(root, onerror=raise_arg): + for filename in files: + path_str = os.path.join(curdir, filename) + if os.path.islink(path_str): + store_paths.add(StorePath(os.readlink(path_str)).to_base_path()) + + packages = [] + for store_path in store_paths: + pname, version = parse_pname_version(str(store_path.path())) + packages.append(Package( + pname=pname, + version=Version(version), + store_path=store_path, + )) + + return PackageManifest(packages) + + def contains_pname(self, pname: str) -> bool: + return pname in self._packages_by_pname + +class PackageManifestPair: + def __init__( + self, + left_manifest: Optional[PackageManifest], + right_manifest: Optional[PackageManifest] + ): + assert left_manifest is None or isinstance(left_manifest, PackageManifest) + assert right_manifest is None or isinstance(right_manifest, PackageManifest) + self._left_manifest = left_manifest + self._right_manifest = right_manifest + + def get_left_manifest(self) -> Optional[PackageManifest]: + return self._left_manifest + + def get_right_manifest(self) -> Optional[PackageManifest]: + return self._right_manifest + + def get_selection_state(self, pname: str) -> str: + left_manifest = self._left_manifest + right_manifest = self._right_manifest + + if left_manifest is not None and right_manifest is not None: + in_left_manifest = left_manifest.contains_pname(pname) + in_right_manifest = right_manifest.contains_pname(pname) + selection_state_str = [ + SEL_UNSELECTED, + SEL_NEWLY_UNSELECTED, + SEL_NEWLY_SELECTED, + SEL_SELECTED, + ][int(in_left_manifest) + 2 * int(in_right_manifest)] + elif left_manifest is not None: + in_left_manifest = left_manifest.contains_pname(pname) + selection_state_str = [ + SEL_LEFT_ONLY_UNSELECTED, + SEL_LEFT_ONLY_SELECTED, + ][int(in_left_manifest)] + elif right_manifest is not None: + in_right_manifest = right_manifest.contains_pname(pname) + selection_state_str = [ + SEL_RIGHT_ONLY_UNSELECTED, + SEL_RIGHT_ONLY_SELECTED, + ][int(in_right_manifest)] + else: + selection_state_str = SEL_NO_MANIFESTS + + return selection_state_str + + def is_selection_state_changed(self, pname: str) -> str: + in_left_manifest = \ + self._left_manifest is not None and \ + self._left_manifest.contains_pname(pname) + in_right_manifest = \ + self._right_manifest is not None and \ + self._right_manifest.contains_pname(pname) + return in_left_manifest != in_right_manifest + +def parse_pname_version(path: str) -> Tuple[str, str]: + base_path = str(StorePath(path).to_base_path().path()) + + match = NIX_STORE_PATH_REGEX.search(base_path) + assert match is not None, f"Couldn't parse path: {path}" + + pname = match.group(1) + version = match.group(3) + + return pname, version + +def closure_paths_to_map(paths: List[str]) -> Dict[str, List[str]]: + result = {} + + for path in paths: + name, version = parse_pname_version(path) + if name not in result: + result[name] = [version] + else: + result[name].append(version) + + for version_list in result.values(): + version_list.sort(key=lambda ver: ver or "") + + return result + +def render_versions(version_list: List[str], *, colour: bool = False) -> str: + if version_list == []: + return "" + + items = [] + + version_list = list(version_list) # Create a copy we can modify. + while version_list != []: + version = version_list.pop() + count = 1 + while version_list != [] and version_list[-1] == version: + version_list.pop() + count += 1 + + if version == None: + version = "" + elif colour: # (Don't apply colour to .) + version = sgr(SGR_FG + SGR_YELLOW) + version + sgr(SGR_RESET) + + if count == 1: + items.append(version) + else: + items.append(version + f" x{count}") + + items.reverse() + return ", ".join(items) + +def print_package_list( + *, + pnames: List[str], + manifest_pair: PackageManifestPair, + left_versions_map: Dict[str, List[str]], + right_versions_map: Optional[Dict[str, List[str]]] = None, + fixed_install_state: Optional[str] = None, +): + assert (right_versions_map is None) != (fixed_install_state is None), \ + "Expect either one versions map, or a fixed install state." + have_single_version = right_versions_map is None + + count = 1 + count_width = len(str(len(pnames))) + count_format_str = "#{:0" + str(count_width) + "d}" + pname_width = max(len(pname) for pname in pnames) + pname_format_str = "{:" + str(pname_width) + "}" + + for pname in pnames: + left_versions = left_versions_map[pname] + + if have_single_version: + install_state_str = fixed_install_state + else: + right_versions = right_versions_map[pname] if right_versions_map is not None else None + # TODO Terrible - make this efficient. + lv_parsed = [Version(v) for v in left_versions] + rv_parsed = [Version(v) for v in right_versions] + if all(lv < rv for lv in lv_parsed for rv in rv_parsed): + install_state_str = INST_UPGRADED + elif all(lv > rv for lv in lv_parsed for rv in rv_parsed): + install_state_str = INST_DOWNGRADED + else: + install_state_str = INST_CHANGED + + selected_at_all = ( + manifest_pair.get_left_manifest() is not None + and manifest_pair.get_left_manifest().contains_pname(pname) + ) or ( + manifest_pair.get_right_manifest() is not None + and manifest_pair.get_right_manifest().contains_pname(pname) + ) + if selected_at_all: + bold_if_selected_sgr = sgr(SGR_BOLD) + pname_sgr = bold_if_selected_sgr + sgr(SGR_FG + SGR_GREEN + SGR_BRIGHT) + else: + bold_if_selected_sgr = "" + pname_sgr = bold_if_selected_sgr + sgr(SGR_FG + SGR_GREEN) + + status_str = "[{}{}]".format( + bold_if_selected_sgr + install_state_str + sgr(SGR_RESET), + pname_sgr + manifest_pair.get_selection_state(pname) + sgr(SGR_RESET), + ) + count_str = count_format_str.format(count) + pname_str = pname_format_str.format(pname) + print("{} {} {} {}{}".format( + status_str, + count_str, + pname_sgr + pname_str + sgr(SGR_RESET), + render_versions(left_versions, colour=True), + "" if have_single_version else (" -> " + render_versions(right_versions, colour=True)), + )) + count += 1 + +def main(): + left_path = Path(ARGS.root1) + right_path = Path(ARGS.root2) + + if not left_path.exists(): + sys.stderr.write(f"Path does not exist: {left_path}\n") + sys.exit(1) + + if not right_path.exists(): + sys.stderr.write(f"Path does not exist: {right_path}\n") + sys.exit(1) + + left_resolved = left_path.resolve() + right_resolved = right_path.resolve() + + print(f"<<< {left_path}") + print(f">>> {right_path}") + + left_manifest: Optional[PackageManifest] = None + right_manifest: Optional[PackageManifest] = None + + if (left_resolved / "sw").is_dir() and (right_resolved / "sw").is_dir(): + left_manifest = PackageManifest.parse_tree(left_resolved / "sw") + right_manifest = PackageManifest.parse_tree(right_resolved / "sw") + else: + left_manifest = PackageManifest.parse_tree(left_resolved) + right_manifest = PackageManifest.parse_tree(right_resolved) + + manifest_pair = PackageManifestPair(left_manifest, right_manifest) + + left_closure_paths: List[str] = \ + subprocess.run(["nix-store", "-qR", str(left_resolved)], stdout=PIPE, text=True) \ + .stdout.rstrip("\n").split("\n") + right_closure_paths: List[str] = \ + subprocess.run(["nix-store", "-qR", (right_resolved)], stdout=PIPE, text=True) \ + .stdout.rstrip("\n").split("\n") + + # Maps from pname to lists of versions. + left_closure_map: Dict[str, List[str]] = closure_paths_to_map(left_closure_paths) + right_closure_map: Dict[str, List[str]] = closure_paths_to_map(right_closure_paths) + + left_package_names = set(left_closure_map.keys()) + right_package_names = set(right_closure_map.keys()) + + common_package_names = sorted(left_package_names & right_package_names) + left_only_package_names = sorted(left_package_names - right_package_names) + right_only_package_names = sorted(right_package_names - left_package_names) + + # Announce version changes. + package_names_with_changed_versions = [] + package_names_with_changed_selection_states = [] + for pname in common_package_names: + if left_closure_map[pname] != right_closure_map[pname]: + package_names_with_changed_versions.append(pname) + elif manifest_pair.is_selection_state_changed(pname): + package_names_with_changed_selection_states.append(pname) + + any_changes_displayed = False + + # Announce version changes. + if package_names_with_changed_versions != []: + any_changes_displayed = True + print("Version changes:") + print_package_list( + pnames=package_names_with_changed_versions, + manifest_pair=manifest_pair, + left_versions_map=left_closure_map, + right_versions_map=right_closure_map, + ) + + # Announce specific changes for packages whose versions haven't changed. + if package_names_with_changed_selection_states != []: + any_changes_displayed = True + print("Selection state changes:") + print_package_list( + pnames=package_names_with_changed_selection_states, + manifest_pair=manifest_pair, + left_versions_map=left_closure_map, + fixed_install_state=INST_CHANGED, + ) + + # Announce added packages. + if right_only_package_names != []: + any_changes_displayed = True + print("Added packages:") + print_package_list( + pnames=right_only_package_names, + manifest_pair=manifest_pair, + left_versions_map=right_closure_map, # Yes, this is correct. + fixed_install_state=INST_ADDED, + ) + + # Announce removed packages. + if left_only_package_names != []: + any_changes_displayed = True + print("Removed packages:") + print_package_list( + pnames=left_only_package_names, + manifest_pair=manifest_pair, + left_versions_map=left_closure_map, + fixed_install_state=INST_REMOVED, + ) + + if not any_changes_displayed: + print("No version or selection state changes.") + + left_closure_paths_set = set(left_closure_paths) + right_closure_paths_set = set(right_closure_paths) + left_only_paths_count = len(left_closure_paths_set - right_closure_paths_set) + right_only_paths_count = len(right_closure_paths_set - left_closure_paths_set) + + print( + f"Closure size: {len(left_closure_paths)} -> {len(right_closure_paths)} " + f"({right_only_paths_count} paths added, {left_only_paths_count} paths removed, " + f"delta {right_only_paths_count - left_only_paths_count:+d})." + ) + +# Importing this module is kinda broken because of the constant initializations +# at the top. +if __name__ == "__main__": + main() diff --git a/nvd.1 b/nvd.1 new file mode 100644 index 0000000..584b21e --- /dev/null +++ b/nvd.1 @@ -0,0 +1,64 @@ +.TH nvd 1 2021-04-06 nvd "User Commands" +.SH NAME +nvd \- Nix/NixOS package version diff tool +.SH SYNOPSIS +.P +.B nvd [ -h | --help ] +.P +.B nvd [ --color=(auto|always|never) ] root1 root2 +.SH DESCRIPTION +.P +.B nvd +is a tool for diffing the versions of all store paths in the closures of two Nix +store paths, neatly summarizing the differences. This is mainly intended for +comparing two system configurations and is inspired by the output of +.B emerge -pv +from Gentoo's Portage package manager. +.B nvd +could also be likened to the output of Debian's +.B apt upgrade -V, +or any equivalent from other distributions' package managers. +.B nvd +isn't limited to comparing system configurations though, and can work with any +two store paths. +.P +When given two system configurations, +.B nvd +distinguishes between packages that are explicitly included in +.B environment.systemPackages, +versus the rest of the packages that make up the system. The former packages +are dubbed +.I selected packages. +.B nvd +marks and bolds any selected packages it lists. Additionally, +.B nvd +reports on any changes to packages' selection states between the two +configurations. This is done by looking at all packages that are referenced +directly from the configurations' system paths, within each +.I /sw +subdirectory. When either of the arguments isn't a system configuration, +i.e. when either of the arguments is missing a +.I sw +subdirectory, then +.I selected packages +refer to direct dependencies of each of the arguments instead, and the +comparison is still performed. +.B nvd +doesn't actually know if a Nix package represents a system configuration; it +just uses the existence of a +.I sw +directory as a heuristic. +.SH OPTIONS +.P +.TP +-h, --help +Displays a brief help message. +.TP +--color=(auto|always|never) +When to display colour. The default +.B auto +detects whether stdout is a terminal and only uses colour if it is. Passing +.B always +forces colour to always be used, and passing +.B never +forces colour to never be used.