Make the rest of the 'list' and 'diff' logic use PackageSet.

This commit is contained in:
Bryan Gardiner 2024-08-22 20:46:28 -07:00
parent 682570f054
commit e7d0ce0c89
No known key found for this signature in database
GPG key ID: 53EFBCA063E6183C

144
src/nvd
View file

@ -170,7 +170,7 @@ class Version:
if text is None:
text = ""
self._text = text
self._text: str = text
self._chunks: List[VersionChunk] = []
while text != "":
@ -204,6 +204,10 @@ class Version:
return NotImplemented
return self._chunks < other._chunks
def text(self) -> str:
"""Returns the version string, empty (not None) if there is no version."""
return self._text
class Package:
def __init__(self, *, pname: str, version: Version, store_path: StorePath):
assert isinstance(pname, str), f"Not a string: {pname!r}"
@ -223,20 +227,30 @@ class Package:
return self._store_path
class PackageSet:
def __init__(self, packages: List[Package]):
def __init__(self, packages: List[Package], store_paths: List[str]):
assert isinstance(packages, List)
assert all(isinstance(package, Package) for package in packages)
self._packages = packages
self._store_paths = store_paths
# Unordered map from pnames to available package versions, in ascending
# version order.
self._packages_by_pname: Dict[str, List[Package]] = {}
for entry in packages:
self._packages_by_pname.setdefault(entry.pname(), []).append(entry)
for pkgs in self._packages_by_pname.values():
pkgs.sort(key=lambda p: p.version())
@staticmethod
def from_direct_dependencies(path: Path) -> "PackageSet":
return PackageSet.from_nix_query(["--references", str(path)])
@staticmethod
def from_closure(path: Path) -> "PackageSet":
return PackageSet.from_nix_query(["--requisites", str(path)])
@staticmethod
def from_nix_query(nix_query_args: List[str]) -> "PackageSet":
result_paths_str: str = subprocess.run(
@ -258,7 +272,7 @@ class PackageSet:
store_path=StorePath(result_path),
))
return PackageSet(packages)
return PackageSet(packages, result_paths)
def contains_pname(self, pname: str) -> bool:
return pname in self._packages_by_pname
@ -266,6 +280,13 @@ class PackageSet:
def all_pnames(self):
return self._packages_by_pname.keys()
def all_store_paths(self):
return iter(self._store_paths)
def get_pname_versions(self, pname: str) -> List[Version]:
pkgs = self._packages_by_pname.get(pname, [])
return [pkg.version() for pkg in pkgs]
class PackageSetPair:
def __init__(
self,
@ -333,23 +354,8 @@ def parse_pname_version(path: str) -> Tuple[str, Optional[str]]:
return pname, version
def closure_paths_to_map(paths: List[str]) -> Dict[str, List[Optional[str]]]:
result: Dict[str, List[Optional[str]]] = {}
for path in paths:
name, version = parse_pname_version(path)
if name not in result:
result[name] = [version]
else:
result[name].append(version)
for version_list in result.values():
version_list.sort(key=lambda ver: ver or "")
return result
def render_versions(
version_list: List[Optional[str]],
version_list: List[Version],
*,
colour: bool = False,
highlight_from_pos: Optional[int] = None,
@ -367,23 +373,24 @@ def render_versions(
version_list.pop()
count += 1
if version is None:
version = "<none>"
text = version.text()
if text == "":
text = "<none>"
elif colour: # (Don't apply colour to <none>.)
if highlight_from_pos is None:
version = sgr(SGR_FG + SGR_YELLOW) + version + sgr(SGR_RESET)
text = sgr(SGR_FG + SGR_YELLOW) + text + sgr(SGR_RESET)
else:
version = \
text = \
sgr(SGR_FG + SGR_YELLOW) \
+ version[0:highlight_from_pos] \
+ text[0:highlight_from_pos] \
+ sgr(SGR_BOLD) \
+ version[highlight_from_pos:] \
+ text[highlight_from_pos:] \
+ sgr(SGR_RESET)
if count == 1:
items.append(version)
items.append(text)
else:
items.append(version + f" x{count}")
items.append(text + f" x{count}")
items.reverse()
return ", ".join(items)
@ -392,14 +399,14 @@ def print_package_list(
*,
pnames: List[str],
selected_sets: PackageSetPair,
left_versions_map: Dict[str, List[str]],
right_versions_map: Optional[Dict[str, List[str]]] = None,
left_package_set: PackageSet,
right_package_set: Optional[PackageSet] = None,
fixed_install_state: Optional[str] = None,
no_version_suffix_highlight: bool = False,
):
assert (right_versions_map is None) != (fixed_install_state is None), \
"Expect either one versions map, or a fixed install state."
have_single_version = right_versions_map is None
assert (right_package_set is None) != (fixed_install_state is None), \
"Expect either one package set, or a fixed install state."
have_single_version = right_package_set is None
if not pnames:
print("No packages to display.")
@ -412,18 +419,18 @@ def print_package_list(
pname_format_str = "{:" + str(pname_width) + "}"
for pname in pnames:
left_versions = left_versions_map[pname]
left_versions = left_package_set.get_pname_versions(pname)
assert left_versions, f"No left versions available for pname: {pname!r}"
if have_single_version:
install_state_str = fixed_install_state
else:
right_versions = right_versions_map[pname] if right_versions_map is not None else None
# TODO Terrible - make this efficient.
lv_parsed = [Version(v) for v in left_versions]
rv_parsed = [Version(v) for v in right_versions]
if all(lv < rv for lv in lv_parsed for rv in rv_parsed):
right_versions = right_package_set.get_pname_versions(pname)
assert right_versions, f"No right versions available for pname: {pname!r}"
if left_versions[-1] < right_versions[0]:
install_state_str = INST_UPGRADED
elif all(lv > rv for lv in lv_parsed for rv in rv_parsed):
elif left_versions[0] > right_versions[-1]:
install_state_str = INST_DOWNGRADED
else:
install_state_str = INST_CHANGED
@ -447,8 +454,8 @@ def print_package_list(
versions_common_prefix_len = None
else:
versions_common_prefix_len = find_common_version_prefix_lists(
[x for x in left_versions if x is not None],
[x for x in right_versions if x is not None],
[x.text() for x in left_versions if x.text() != ""],
[x.text() for x in right_versions if x.text() != ""],
)
status_str = "[{}{}]".format(
@ -620,19 +627,12 @@ def run_list(*, root, only_selected, name_patterns):
else:
selected_set = PackageSet.from_direct_dependencies(path / "sw")
closure_paths: List[str] = subprocess.run(
[make_nix_bin_path("nix-store"), "-qR", str(path)],
stdout=PIPE,
text=True,
check=True,
).stdout.rstrip("\n").split("\n")
closure_map: Dict[str, List[str]] = closure_paths_to_map(closure_paths)
closure_set = PackageSet.from_closure(path)
if only_selected:
target_pnames = list(selected_set.all_pnames())
else:
target_pnames = list(closure_map.keys())
target_pnames = list(closure_set.all_pnames())
target_pnames = [
pname
@ -645,8 +645,8 @@ def run_list(*, root, only_selected, name_patterns):
print_package_list(
pnames=target_pnames,
selected_sets=PackageSetPair(selected_set, selected_set),
left_versions_map=closure_map,
right_versions_map=None,
left_package_set=closure_set,
right_package_set=None,
fixed_install_state=INST_INSTALLED,
)
@ -685,25 +685,11 @@ def run_diff(
selected_sets = PackageSetPair(left_selected_set, right_selected_set)
left_closure_paths: List[str] = subprocess.run(
[make_nix_bin_path("nix-store"), "-qR", str(left_resolved)],
stdout=PIPE,
text=True,
check=True,
).stdout.rstrip("\n").split("\n")
right_closure_paths: List[str] = subprocess.run(
[make_nix_bin_path("nix-store"), "-qR", (right_resolved)],
stdout=PIPE,
text=True,
check=True,
).stdout.rstrip("\n").split("\n")
left_closure_set = PackageSet.from_closure(left_resolved)
right_closure_set = PackageSet.from_closure(right_resolved)
# Maps from pname to lists of versions.
left_closure_map: Dict[str, List[Optional[str]]] = closure_paths_to_map(left_closure_paths)
right_closure_map: Dict[str, List[Optional[str]]] = closure_paths_to_map(right_closure_paths)
left_package_names = set(left_closure_map.keys())
right_package_names = set(right_closure_map.keys())
left_package_names = set(left_closure_set.all_pnames())
right_package_names = set(right_closure_set.all_pnames())
common_package_names = sorted(left_package_names & right_package_names)
left_only_package_names = sorted(left_package_names - right_package_names)
@ -713,7 +699,7 @@ def run_diff(
package_names_with_changed_versions = []
package_names_with_changed_selection_states = []
for pname in common_package_names:
if left_closure_map[pname] != right_closure_map[pname]:
if left_closure_set.get_pname_versions(pname) != right_closure_set.get_pname_versions(pname):
package_names_with_changed_versions.append(pname)
elif selected_sets.is_selection_state_changed(pname):
package_names_with_changed_selection_states.append(pname)
@ -727,8 +713,8 @@ def run_diff(
print_package_list(
pnames=package_names_with_changed_versions,
selected_sets=selected_sets,
left_versions_map=left_closure_map,
right_versions_map=right_closure_map,
left_package_set=left_closure_set,
right_package_set=right_closure_set,
no_version_suffix_highlight=no_version_suffix_highlight,
)
@ -739,7 +725,7 @@ def run_diff(
print_package_list(
pnames=package_names_with_changed_selection_states,
selected_sets=selected_sets,
left_versions_map=left_closure_map,
left_package_set=left_closure_set,
fixed_install_state=INST_CHANGED,
)
@ -750,7 +736,7 @@ def run_diff(
print_package_list(
pnames=right_only_package_names,
selected_sets=selected_sets,
left_versions_map=right_closure_map, # Yes, this is correct.
left_package_set=right_closure_set, # Yes, this is correct.
fixed_install_state=INST_ADDED,
)
@ -761,15 +747,15 @@ def run_diff(
print_package_list(
pnames=left_only_package_names,
selected_sets=selected_sets,
left_versions_map=left_closure_map,
left_package_set=left_closure_set,
fixed_install_state=INST_REMOVED,
)
if not any_changes_displayed:
print("No version or selection state changes.")
left_closure_paths_set = set(left_closure_paths)
right_closure_paths_set = set(right_closure_paths)
left_closure_paths_set = set(left_closure_set.all_store_paths())
right_closure_paths_set = set(right_closure_set.all_store_paths())
left_only_paths_count = len(left_closure_paths_set - right_closure_paths_set)
right_only_paths_count = len(right_closure_paths_set - left_closure_paths_set)
@ -785,7 +771,7 @@ def run_diff(
render_bytes(right_closure_disk_usage_bytes - left_closure_disk_usage_bytes)
print(
f"Closure size: {len(left_closure_paths)} -> {len(right_closure_paths)} "
f"Closure size: {len(left_closure_paths_set)} -> {len(right_closure_paths_set)} "
f"({right_only_paths_count} paths added, {left_only_paths_count} paths removed, "
f"delta {right_only_paths_count - left_only_paths_count:+d}"
f"{diff_closure_disk_usage_str})."