From e7d0ce0c89b6f35177a8ae9d3de3c95806c4748b Mon Sep 17 00:00:00 2001 From: Bryan Gardiner Date: Thu, 22 Aug 2024 20:46:28 -0700 Subject: [PATCH] Make the rest of the 'list' and 'diff' logic use PackageSet. --- src/nvd | 144 +++++++++++++++++++++++++------------------------------- 1 file changed, 65 insertions(+), 79 deletions(-) diff --git a/src/nvd b/src/nvd index c588130..625b63c 100755 --- a/src/nvd +++ b/src/nvd @@ -170,7 +170,7 @@ class Version: if text is None: text = "" - self._text = text + self._text: str = text self._chunks: List[VersionChunk] = [] while text != "": @@ -204,6 +204,10 @@ class Version: return NotImplemented return self._chunks < other._chunks + def text(self) -> str: + """Returns the version string, empty (not None) if there is no version.""" + return self._text + class Package: def __init__(self, *, pname: str, version: Version, store_path: StorePath): assert isinstance(pname, str), f"Not a string: {pname!r}" @@ -223,20 +227,30 @@ class Package: return self._store_path class PackageSet: - def __init__(self, packages: List[Package]): + def __init__(self, packages: List[Package], store_paths: List[str]): assert isinstance(packages, List) assert all(isinstance(package, Package) for package in packages) self._packages = packages + self._store_paths = store_paths + # Unordered map from pnames to available package versions, in ascending + # version order. self._packages_by_pname: Dict[str, List[Package]] = {} for entry in packages: self._packages_by_pname.setdefault(entry.pname(), []).append(entry) + for pkgs in self._packages_by_pname.values(): + pkgs.sort(key=lambda p: p.version()) + @staticmethod def from_direct_dependencies(path: Path) -> "PackageSet": return PackageSet.from_nix_query(["--references", str(path)]) + @staticmethod + def from_closure(path: Path) -> "PackageSet": + return PackageSet.from_nix_query(["--requisites", str(path)]) + @staticmethod def from_nix_query(nix_query_args: List[str]) -> "PackageSet": result_paths_str: str = subprocess.run( @@ -258,7 +272,7 @@ class PackageSet: store_path=StorePath(result_path), )) - return PackageSet(packages) + return PackageSet(packages, result_paths) def contains_pname(self, pname: str) -> bool: return pname in self._packages_by_pname @@ -266,6 +280,13 @@ class PackageSet: def all_pnames(self): return self._packages_by_pname.keys() + def all_store_paths(self): + return iter(self._store_paths) + + def get_pname_versions(self, pname: str) -> List[Version]: + pkgs = self._packages_by_pname.get(pname, []) + return [pkg.version() for pkg in pkgs] + class PackageSetPair: def __init__( self, @@ -333,23 +354,8 @@ def parse_pname_version(path: str) -> Tuple[str, Optional[str]]: return pname, version -def closure_paths_to_map(paths: List[str]) -> Dict[str, List[Optional[str]]]: - result: Dict[str, List[Optional[str]]] = {} - - for path in paths: - name, version = parse_pname_version(path) - if name not in result: - result[name] = [version] - else: - result[name].append(version) - - for version_list in result.values(): - version_list.sort(key=lambda ver: ver or "") - - return result - def render_versions( - version_list: List[Optional[str]], + version_list: List[Version], *, colour: bool = False, highlight_from_pos: Optional[int] = None, @@ -367,23 +373,24 @@ def render_versions( version_list.pop() count += 1 - if version is None: - version = "" + text = version.text() + if text == "": + text = "" elif colour: # (Don't apply colour to .) if highlight_from_pos is None: - version = sgr(SGR_FG + SGR_YELLOW) + version + sgr(SGR_RESET) + text = sgr(SGR_FG + SGR_YELLOW) + text + sgr(SGR_RESET) else: - version = \ + text = \ sgr(SGR_FG + SGR_YELLOW) \ - + version[0:highlight_from_pos] \ + + text[0:highlight_from_pos] \ + sgr(SGR_BOLD) \ - + version[highlight_from_pos:] \ + + text[highlight_from_pos:] \ + sgr(SGR_RESET) if count == 1: - items.append(version) + items.append(text) else: - items.append(version + f" x{count}") + items.append(text + f" x{count}") items.reverse() return ", ".join(items) @@ -392,14 +399,14 @@ def print_package_list( *, pnames: List[str], selected_sets: PackageSetPair, - left_versions_map: Dict[str, List[str]], - right_versions_map: Optional[Dict[str, List[str]]] = None, + left_package_set: PackageSet, + right_package_set: Optional[PackageSet] = None, fixed_install_state: Optional[str] = None, no_version_suffix_highlight: bool = False, ): - assert (right_versions_map is None) != (fixed_install_state is None), \ - "Expect either one versions map, or a fixed install state." - have_single_version = right_versions_map is None + assert (right_package_set is None) != (fixed_install_state is None), \ + "Expect either one package set, or a fixed install state." + have_single_version = right_package_set is None if not pnames: print("No packages to display.") @@ -412,18 +419,18 @@ def print_package_list( pname_format_str = "{:" + str(pname_width) + "}" for pname in pnames: - left_versions = left_versions_map[pname] + left_versions = left_package_set.get_pname_versions(pname) + assert left_versions, f"No left versions available for pname: {pname!r}" if have_single_version: install_state_str = fixed_install_state else: - right_versions = right_versions_map[pname] if right_versions_map is not None else None - # TODO Terrible - make this efficient. - lv_parsed = [Version(v) for v in left_versions] - rv_parsed = [Version(v) for v in right_versions] - if all(lv < rv for lv in lv_parsed for rv in rv_parsed): + right_versions = right_package_set.get_pname_versions(pname) + assert right_versions, f"No right versions available for pname: {pname!r}" + + if left_versions[-1] < right_versions[0]: install_state_str = INST_UPGRADED - elif all(lv > rv for lv in lv_parsed for rv in rv_parsed): + elif left_versions[0] > right_versions[-1]: install_state_str = INST_DOWNGRADED else: install_state_str = INST_CHANGED @@ -447,8 +454,8 @@ def print_package_list( versions_common_prefix_len = None else: versions_common_prefix_len = find_common_version_prefix_lists( - [x for x in left_versions if x is not None], - [x for x in right_versions if x is not None], + [x.text() for x in left_versions if x.text() != ""], + [x.text() for x in right_versions if x.text() != ""], ) status_str = "[{}{}]".format( @@ -620,19 +627,12 @@ def run_list(*, root, only_selected, name_patterns): else: selected_set = PackageSet.from_direct_dependencies(path / "sw") - closure_paths: List[str] = subprocess.run( - [make_nix_bin_path("nix-store"), "-qR", str(path)], - stdout=PIPE, - text=True, - check=True, - ).stdout.rstrip("\n").split("\n") - - closure_map: Dict[str, List[str]] = closure_paths_to_map(closure_paths) + closure_set = PackageSet.from_closure(path) if only_selected: target_pnames = list(selected_set.all_pnames()) else: - target_pnames = list(closure_map.keys()) + target_pnames = list(closure_set.all_pnames()) target_pnames = [ pname @@ -645,8 +645,8 @@ def run_list(*, root, only_selected, name_patterns): print_package_list( pnames=target_pnames, selected_sets=PackageSetPair(selected_set, selected_set), - left_versions_map=closure_map, - right_versions_map=None, + left_package_set=closure_set, + right_package_set=None, fixed_install_state=INST_INSTALLED, ) @@ -685,25 +685,11 @@ def run_diff( selected_sets = PackageSetPair(left_selected_set, right_selected_set) - left_closure_paths: List[str] = subprocess.run( - [make_nix_bin_path("nix-store"), "-qR", str(left_resolved)], - stdout=PIPE, - text=True, - check=True, - ).stdout.rstrip("\n").split("\n") - right_closure_paths: List[str] = subprocess.run( - [make_nix_bin_path("nix-store"), "-qR", (right_resolved)], - stdout=PIPE, - text=True, - check=True, - ).stdout.rstrip("\n").split("\n") + left_closure_set = PackageSet.from_closure(left_resolved) + right_closure_set = PackageSet.from_closure(right_resolved) - # Maps from pname to lists of versions. - left_closure_map: Dict[str, List[Optional[str]]] = closure_paths_to_map(left_closure_paths) - right_closure_map: Dict[str, List[Optional[str]]] = closure_paths_to_map(right_closure_paths) - - left_package_names = set(left_closure_map.keys()) - right_package_names = set(right_closure_map.keys()) + left_package_names = set(left_closure_set.all_pnames()) + right_package_names = set(right_closure_set.all_pnames()) common_package_names = sorted(left_package_names & right_package_names) left_only_package_names = sorted(left_package_names - right_package_names) @@ -713,7 +699,7 @@ def run_diff( package_names_with_changed_versions = [] package_names_with_changed_selection_states = [] for pname in common_package_names: - if left_closure_map[pname] != right_closure_map[pname]: + if left_closure_set.get_pname_versions(pname) != right_closure_set.get_pname_versions(pname): package_names_with_changed_versions.append(pname) elif selected_sets.is_selection_state_changed(pname): package_names_with_changed_selection_states.append(pname) @@ -727,8 +713,8 @@ def run_diff( print_package_list( pnames=package_names_with_changed_versions, selected_sets=selected_sets, - left_versions_map=left_closure_map, - right_versions_map=right_closure_map, + left_package_set=left_closure_set, + right_package_set=right_closure_set, no_version_suffix_highlight=no_version_suffix_highlight, ) @@ -739,7 +725,7 @@ def run_diff( print_package_list( pnames=package_names_with_changed_selection_states, selected_sets=selected_sets, - left_versions_map=left_closure_map, + left_package_set=left_closure_set, fixed_install_state=INST_CHANGED, ) @@ -750,7 +736,7 @@ def run_diff( print_package_list( pnames=right_only_package_names, selected_sets=selected_sets, - left_versions_map=right_closure_map, # Yes, this is correct. + left_package_set=right_closure_set, # Yes, this is correct. fixed_install_state=INST_ADDED, ) @@ -761,15 +747,15 @@ def run_diff( print_package_list( pnames=left_only_package_names, selected_sets=selected_sets, - left_versions_map=left_closure_map, + left_package_set=left_closure_set, fixed_install_state=INST_REMOVED, ) if not any_changes_displayed: print("No version or selection state changes.") - left_closure_paths_set = set(left_closure_paths) - right_closure_paths_set = set(right_closure_paths) + left_closure_paths_set = set(left_closure_set.all_store_paths()) + right_closure_paths_set = set(right_closure_set.all_store_paths()) left_only_paths_count = len(left_closure_paths_set - right_closure_paths_set) right_only_paths_count = len(right_closure_paths_set - left_closure_paths_set) @@ -785,7 +771,7 @@ def run_diff( render_bytes(right_closure_disk_usage_bytes - left_closure_disk_usage_bytes) print( - f"Closure size: {len(left_closure_paths)} -> {len(right_closure_paths)} " + f"Closure size: {len(left_closure_paths_set)} -> {len(right_closure_paths_set)} " f"({right_only_paths_count} paths added, {left_only_paths_count} paths removed, " f"delta {right_only_paths_count - left_only_paths_count:+d}" f"{diff_closure_disk_usage_str})."