diff --git a/components/polylith/check/__init__.py b/components/polylith/check/__init__.py index c2c6e1fb..5ce57b76 100644 --- a/components/polylith/check/__init__.py +++ b/components/polylith/check/__init__.py @@ -1,3 +1,3 @@ -from polylith.check import collect, grouping, report +from polylith.check import collect, report -__all__ = ["collect", "grouping", "report"] +__all__ = ["collect", "report"] diff --git a/components/polylith/check/collect.py b/components/polylith/check/collect.py index 38e974de..a7da8887 100644 --- a/components/polylith/check/collect.py +++ b/components/polylith/check/collect.py @@ -1,13 +1,13 @@ from pathlib import Path from typing import Set -from polylith import check, imports, workspace +from polylith import imports, workspace def extract_bricks(paths: Set[Path], ns: str) -> dict: all_imports = imports.fetch_all_imports(paths) - return check.grouping.extract_brick_imports(all_imports, ns) + return imports.extract_brick_imports(all_imports, ns) def with_unknown_components(root: Path, ns: str, brick_imports: dict) -> dict: diff --git a/components/polylith/check/report.py b/components/polylith/check/report.py index 1b9440f4..a07e76b2 100644 --- a/components/polylith/check/report.py +++ b/components/polylith/check/report.py @@ -2,7 +2,7 @@ from typing import Set from polylith import imports, libs, workspace -from polylith.check import collect, grouping +from polylith.check import collect from polylith.reporting import theme from rich.console import Console @@ -78,8 +78,8 @@ def extract_collected_imports( ns: str, imports_in_bases: dict, imports_in_components: dict ) -> dict: brick_imports = { - "bases": grouping.extract_brick_imports(imports_in_bases, ns), - "components": grouping.extract_brick_imports(imports_in_components, ns), + "bases": imports.grouping.extract_brick_imports(imports_in_bases, ns), + "components": imports.grouping.extract_brick_imports(imports_in_components, ns), } third_party_imports = { diff --git a/components/polylith/commands/deps.py b/components/polylith/commands/deps.py index 6c529815..67636289 100644 --- a/components/polylith/commands/deps.py +++ b/components/polylith/commands/deps.py @@ -1,7 +1,7 @@ from pathlib import Path from typing import List, Set -from polylith import bricks, deps, info +from polylith import bricks, deps, info, interface def get_imports(root: Path, ns: str, bricks: dict) -> dict: @@ -30,6 +30,17 @@ def get_components(root: Path, ns: str, project_data: dict) -> Set[str]: return pick_name(bricks.get_components_data(root, ns)) +def used_by_as_bricks(bricks: dict, brick_deps: dict) -> dict: + bases = bricks["bases"] + components = bricks["components"] + + used_by = brick_deps["used_by"] + return { + "bases": {b for b in used_by if b in bases}, + "components": {b for b in used_by if b in components}, + } + + def run(root: Path, ns: str, options: dict): directory = options.get("directory") brick = options.get("brick") @@ -53,6 +64,8 @@ def run(root: Path, ns: str, options: dict): if brick and imports.get(brick): brick_deps = bricks_deps[brick] + used_bricks = used_by_as_bricks(bricks, brick_deps) + circular_deps = circular_bricks.get(brick) deps.print_brick_deps(brick, bricks, brick_deps, options) @@ -60,6 +73,8 @@ def run(root: Path, ns: str, options: dict): if circular_deps: deps.print_brick_with_circular_deps(brick, circular_deps, bricks) + interface.report.print_brick_interface_usage(root, ns, brick, used_bricks) + return deps.print_deps(bricks, imports, options) diff --git a/components/polylith/imports/__init__.py b/components/polylith/imports/__init__.py index 94143ff5..20863904 100644 --- a/components/polylith/imports/__init__.py +++ b/components/polylith/imports/__init__.py @@ -1,13 +1,23 @@ +from polylith.imports.grouping import ( + extract_brick_imports, + extract_brick_imports_with_namespaces, +) from polylith.imports.parser import ( extract_top_ns, fetch_all_imports, + fetch_api, + fetch_brick_import_usages, fetch_excluded_imports, list_imports, ) __all__ = [ + "extract_brick_imports", + "extract_brick_imports_with_namespaces", "extract_top_ns", "fetch_all_imports", + "fetch_api", + "fetch_brick_import_usages", "fetch_excluded_imports", "list_imports", ] diff --git a/components/polylith/check/grouping.py b/components/polylith/imports/grouping.py similarity index 85% rename from components/polylith/check/grouping.py rename to components/polylith/imports/grouping.py index 2e39a520..5dd304e7 100644 --- a/components/polylith/check/grouping.py +++ b/components/polylith/imports/grouping.py @@ -34,3 +34,9 @@ def extract_brick_imports(all_imports: dict, top_ns) -> dict: with_only_brick_names = only_brick_names(with_only_bricks) return exclude_empty(with_only_brick_names) + + +def extract_brick_imports_with_namespaces(all_imports: dict, top_ns) -> dict: + with_only_bricks = only_bricks(all_imports, top_ns) + + return exclude_empty(with_only_bricks) diff --git a/components/polylith/imports/parser.py b/components/polylith/imports/parser.py index 1a70a764..07ddf42b 100644 --- a/components/polylith/imports/parser.py +++ b/components/polylith/imports/parser.py @@ -2,11 +2,15 @@ from collections.abc import Iterable from functools import lru_cache from pathlib import Path -from typing import List, Set, Union +from typing import FrozenSet, List, Optional, Set, Tuple, Union typing_ns = "typing" type_checking = "TYPE_CHECKING" +WRAPPER_NODES = (ast.Await, ast.Expr, ast.NamedExpr, ast.Starred, ast.Subscript) +FN_NODES = (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda) +SYMBOLS = (*FN_NODES, ast.ClassDef) + def parse_import(node: ast.Import) -> List[str]: return [name.name for name in node.names] @@ -68,7 +72,110 @@ def parse_node(node: ast.AST) -> Union[dict, None]: return None -def parse_module(path: Path) -> ast.AST: +def extract_api_part(path: str) -> str: + return path.rsplit(".", 1)[-1] + + +def find_import_root_and_path( + expr: ast.expr, parts: Tuple[str, ...] = () +) -> Tuple[ast.expr, str]: + """Builds a namespace when the expression is an Attribute or Name, otherwise empty.""" + if isinstance(expr, ast.Attribute): + return find_import_root_and_path(expr.value, (*parts, expr.attr)) + + namespace_parts = (*parts, expr.id) if isinstance(expr, ast.Name) else parts + + namespace = str.join(".", reversed(namespace_parts)) + + return expr, namespace + + +def with_ns(usage: str, ns: str) -> str: + return usage if str.startswith(usage, ns + ".") else f"{ns}.{usage}" + + +def find_matching_usage(expr: ast.expr, options: dict) -> Optional[str]: + ns = options["ns"] + api_map = options["api_map"] + allowed_prefixes = options["allowed_prefixes"] + shadowed = options["shadowed"] + + root, usage = find_import_root_and_path(expr) + + if not isinstance(root, ast.Name): + return None + + if root.id in shadowed: + return None + + if root.id in api_map: + found = api_map[root.id] if usage == root.id else usage + + return with_ns(found, ns) + + if any(usage.startswith(p + ".") for p in allowed_prefixes): + return with_ns(usage, ns) + + return None + + +def parse_import_usage(node: ast.AST, options: dict) -> Union[str, None]: + usage = None + child = None + + if isinstance(node, ast.Attribute): + usage = find_matching_usage(node, options) + child = node.value + elif isinstance(node, WRAPPER_NODES): + child = node.value + elif isinstance(node, ast.Call): + usage = find_matching_usage(node.func, options) + child = node.func + elif isinstance(node, ast.UnaryOp): + child = node.operand + + if usage: + return usage + + return parse_import_usage(child, options) if child is not None else None + + +def collect_arg_names( + fn: Union[ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda], +) -> Set[str]: + args = fn.args + + names = {a.arg for a in args.posonlyargs + args.args + args.kwonlyargs} + + if args.vararg: + names.add(args.vararg.arg) + + if args.kwarg: + names.add(args.kwarg.arg) + + return names + + +def walk_usages(node: ast.AST, options: dict) -> Set[str]: + if isinstance(node, FN_NODES): + options = { + **options, + "shadowed": options["shadowed"] | frozenset(collect_arg_names(node)), + } + + out = set() + hit = parse_import_usage(node, options) + + if hit: + out.add(hit) + + for child in ast.iter_child_nodes(node): + out |= walk_usages(child, options) + + return out + + +def parse_module(path: Path) -> ast.Module: with open(path.as_posix(), "r", encoding="utf-8", errors="ignore") as f: tree = ast.parse(f.read(), path.name) @@ -88,7 +195,71 @@ def extract_imports(path: Path) -> List[str]: return [i for i in includes if i not in excludes] -def extract_and_flatten(py_modules: Iterable) -> Set[str]: +@lru_cache(maxsize=None) +def extract_symbols(path: Path) -> Set[str]: + tree = parse_module(path) + + return { + s.name + for s in tree.body + if isinstance(s, SYMBOLS) and not s.name.startswith("_") + } + + +def target_names(t: ast.AST) -> Set[str]: + if isinstance(t, ast.Name): + return {t.id} + + if isinstance(t, (ast.Tuple, ast.List)): + return {n for e in t.elts for n in target_names(e)} + + return set() + + +def extract_variables(statement: ast.stmt) -> Set[str]: + if isinstance(statement, ast.Assign): + return {n for t in statement.targets for n in target_names(t)} + + if isinstance(statement, (ast.AnnAssign, ast.AugAssign)): + return target_names(statement.target) + + if hasattr(ast, "TypeAlias") and isinstance(statement, ast.TypeAlias): + return {statement.name.id} + + return set() + + +def extract_public_variables(path: Path) -> Set[str]: + tree = parse_module(path) + + return {v for s in tree.body for v in extract_variables(s) if not v.startswith("_")} + + +def is_all_statement(target: ast.expr) -> bool: + return isinstance(target, ast.Name) and target.id == "__all__" + + +def is_string_constant(expression: ast.AST) -> bool: + return isinstance(expression, ast.Constant) and isinstance(expression.value, str) + + +def extract_all(statement: ast.stmt) -> Optional[Set[str]]: + if not isinstance(statement, ast.Assign): + return None + + if not any(is_all_statement(t) for t in statement.targets): + return None + + if not isinstance(statement.value, (ast.List, ast.Tuple)): + return None + + if not all(is_string_constant(e) for e in statement.value.elts): + return None + + return {e.value for e in statement.value.elts if isinstance(e, ast.Constant)} + + +def extract_imports_and_flatten(py_modules: Iterable) -> Set[str]: return {i for m in py_modules for i in extract_imports(m)} @@ -104,7 +275,7 @@ def find_files(path: Path) -> Iterable: def list_imports(path: Path) -> Set[str]: py_modules = find_files(path) - return extract_and_flatten(py_modules) + return extract_imports_and_flatten(py_modules) def fetch_all_imports(paths: Set[Path]) -> dict: @@ -113,6 +284,45 @@ def fetch_all_imports(paths: Set[Path]) -> dict: return {k: v for row in rows for k, v in row.items()} +def fetch_import_usages_in_module(path: Path, ns: str, imported: Set[str]) -> Set[str]: + tree = parse_module(path) + api_map = {extract_api_part(p): p for p in imported} + + options = { + "ns": ns, + "api_map": api_map, + "allowed_prefixes": frozenset(api_map.values()), + "shadowed": frozenset(), + } + return walk_usages(tree, options) + + +@lru_cache(maxsize=None) +def fetch_brick_import_usages( + path: Path, ns: str, imported: FrozenSet[str] +) -> Set[str]: + py_modules = find_files(path) + + found = {m: set(extract_imports(m)).intersection(imported) for m in py_modules} + filtered = {k: v for k, v in found.items() if v} + + fetched = (fetch_import_usages_in_module(k, ns, v) for k, v in filtered.items()) + + return {i for f in fetched if f for i in f} + + +def extract_api(paths: Set[str]) -> Set[str]: + return {extract_api_part(p) for p in paths} + + +def fetch_api(paths: Set[Path]) -> dict: + interfaces = [Path(p / "__init__.py") for p in paths] + + rows = [{i.parent.name: extract_api(list_imports(i))} for i in interfaces] + + return {k: v for row in rows for k, v in row.items()} + + def should_exclude(path: Path, excludes: Set[str]): return any(path.match(pattern) for pattern in excludes) @@ -122,7 +332,7 @@ def list_excluded_imports(path: Path, excludes: Set[str]) -> Set[str]: filtered = [p for p in py_modules if should_exclude(p, excludes)] - return extract_and_flatten(filtered) + return extract_imports_and_flatten(filtered) def fetch_excluded_imports(paths: Set[Path], excludes: Set[str]) -> dict: diff --git a/components/polylith/interface/__init__.py b/components/polylith/interface/__init__.py index 3774173f..3a6799ef 100644 --- a/components/polylith/interface/__init__.py +++ b/components/polylith/interface/__init__.py @@ -1,3 +1,4 @@ +from polylith.interface import report from polylith.interface.interfaces import create_interface -__all__ = ["create_interface"] +__all__ = ["create_interface", "report"] diff --git a/components/polylith/interface/report.py b/components/polylith/interface/report.py new file mode 100644 index 00000000..54aaadf1 --- /dev/null +++ b/components/polylith/interface/report.py @@ -0,0 +1,143 @@ +from pathlib import Path +from typing import Dict, FrozenSet, Set, Tuple + +from polylith import imports +from polylith.reporting import theme +from polylith.workspace.paths import collect_bases_paths, collect_components_paths +from rich.console import Console +from rich.padding import Padding +from rich.table import Table + + +def get_brick_interface(root: Path, ns: str, brick: str, bricks: dict) -> set: + bases = bricks["bases"] + paths = {brick} + + fn = collect_bases_paths if brick in bases else collect_components_paths + + brick_paths = fn(root, ns, paths) + bricks_api = imports.fetch_api(brick_paths) + brick_api = bricks_api.get(brick) or set() + brick_ns = f"{ns}.{brick}" + + return {f"{brick_ns}.{endpoint}" for endpoint in brick_api} + + +def get_brick_imports(root: Path, ns: str, bases: set, components: set) -> dict: + bases_paths = collect_bases_paths(root, ns, bases) + components_paths = collect_components_paths(root, ns, components) + + in_bases = imports.fetch_all_imports(bases_paths) + in_comps = imports.fetch_all_imports(components_paths) + + extracted_bases = imports.extract_brick_imports_with_namespaces(in_bases, ns) + extracted_components = imports.extract_brick_imports_with_namespaces(in_comps, ns) + + return {**extracted_bases, **extracted_components} + + +def to_imported_api(brick_imports: Set[str]) -> Set[str]: + return {imports.parser.extract_api_part(b) for b in brick_imports} + + +def filter_by_brick(brick_imports: Set[str], brick: str, ns: str) -> Set[str]: + brick_with_ns = f"{ns}.{brick}" + + return {b for b in brick_imports if str.startswith(b, brick_with_ns)} + + +def is_within_namespace(current: str, namespaces: Set[str]) -> bool: + return any(current.startswith(i) for i in namespaces) + + +def starts_with(usages: Set[str], current: str) -> bool: + return any(usage.startswith(current + ".") for usage in usages) + + +def check_usage(usings: Set[str], brick_interface: Set[str]) -> dict: + return {u: is_within_namespace(u, brick_interface) for u in usings} + + +def frozen(data: Dict[str, Set[str]], key: str) -> FrozenSet[str]: + return frozenset(data.get(key) or set()) + + +def check_brick_interface_usage( + root: Path, ns: str, brick: str, bricks: dict +) -> Tuple[dict, set]: + brick_interface = get_brick_interface(root, ns, brick, bricks) + bases = bricks["bases"] + components = bricks["components"] + + brick_imports = get_brick_imports(root, ns, bases, components) + by_brick = {k: filter_by_brick(v, brick, ns) for k, v in brick_imports.items()} + + bases_paths = collect_bases_paths(root, ns, bases) + comp_paths = collect_components_paths(root, ns, components) + paths = bases_paths.union(comp_paths) + + usage = { + p.name: imports.fetch_brick_import_usages(p, ns, frozen(by_brick, p.name)) + for p in paths + } + + checked = {k: check_usage(v, brick_interface) for k, v in usage.items()} + + return checked, brick_interface + + +def print_brick_interface(brick: str, brick_interface: set, bricks: dict) -> None: + console = Console(theme=theme.poly_theme) + + tag = "base" if brick in bricks["bases"] else "comp" + + table = Table(box=None) + + message = f"[{tag}]{brick}[/] exposes:" + table.add_column(Padding(message, (1, 0, 0, 0))) + + for endpoint in sorted(brick_interface): + *_ns, exposes = str.split(endpoint, ".") + table.add_row(f"[data]{exposes}[/]") + + console.print(table, overflow="ellipsis") + + +def unified_usages(usages: dict) -> Set[str]: + filtered = {k for k, v in usages.items() if not v} + + return {f for f in filtered if starts_with(filtered, f)} + + +def print_brick_interface_usage(root: Path, ns: str, brick: str, bricks: dict) -> None: + res, brick_interface = check_brick_interface_usage(root, ns, brick, bricks) + + invalid_usage = { + brick: unified_usages(usages) + for brick, usages in res.items() + if not all(usages.values()) + } + + if not invalid_usage: + return + + console = Console(theme=theme.poly_theme) + + table = Table(box=None) + tag = "base" if brick in bricks["bases"] else "comp" + + for using_brick, usages in invalid_usage.items(): + using_tag = "base" if using_brick in bricks["bases"] else "comp" + + for using in usages: + used = str.replace(using, f"{ns}.{brick}.", "") + prefix = f"Found in [{using_tag}]{using_brick}[/]" + middle = f"[data]{used}[/] is not part of the public interface of [{tag}]{brick}[/]" + + message = f":information: {prefix}: {middle}." + + table.add_row(f"{message}") + + console.print(table, overflow="ellipsis") + + print_brick_interface(brick, brick_interface, bricks) diff --git a/components/polylith/test/core.py b/components/polylith/test/core.py index 3115b326..8e464461 100644 --- a/components/polylith/test/core.py +++ b/components/polylith/test/core.py @@ -1,7 +1,7 @@ from pathlib import Path from typing import List, Union -from polylith import check, diff, imports +from polylith import diff, imports def is_test(root: Path, ns: str, path: Path, theme: str) -> bool: @@ -34,4 +34,4 @@ def get_brick_imports_in_tests( all_imports = {k: v for k, v in enumerate(listed_imports)} - return check.grouping.extract_brick_imports(all_imports, ns) + return imports.extract_brick_imports(all_imports, ns) diff --git a/test/components/polylith/imports/test_parser.py b/test/components/polylith/imports/test_parser.py new file mode 100644 index 00000000..c5d56532 --- /dev/null +++ b/test/components/polylith/imports/test_parser.py @@ -0,0 +1,132 @@ +import ast +import io +from functools import partial +from pathlib import Path + +from polylith.imports import parser + +fake_path = Path.cwd() + + +top_ns = "top_namespace" +brick = "something" +imported = {f"{top_ns}.{brick}"} + + +ns_brick_import = f""" +from {top_ns} import {brick} + + +def first() -> str: + return {brick}.one() + + +def second() -> str: + return {brick}.two() +""" + +ns_brick_fn_import = f""" +from {top_ns}.{brick} import one, two + + +def first() -> str: + return one() + + +def second() -> str: + return two() +""" + + +ns_brick_import_with_shadowed = f""" +from {top_ns} import {brick} + + +def second({brick}: dict) -> str: + return {brick}.get("key") +""" + +ns_import = f""" +import {top_ns} + + +def first() -> str: + return {top_ns}.x.one() + + +def second() -> str: + return {top_ns}.{brick}.one() + +""" + +ns_import_star = f""" +from {top_ns} import * + + +def first() -> str: + return x.one() + + +def second() -> str: + return {brick}.one() + +""" + + +def fake_parse_module(contents: str, *args, **kwargs) -> ast.AST: + f = io.StringIO(contents) + + return ast.parse(f.read(), "unit_test") + + +def test_fetch_import_usages_in_module_ns_brick(monkeypatch) -> None: + fn = partial(fake_parse_module, ns_brick_import) + monkeypatch.setattr(parser, "parse_module", fn) + + expected = {f"{top_ns}.{brick}.one", f"{top_ns}.{brick}.two"} + + res = parser.fetch_import_usages_in_module(fake_path, top_ns, imported) + + assert res == expected + + +def test_fetch_import_usages_in_module_ns_brick_fn(monkeypatch) -> None: + fn = partial(fake_parse_module, ns_brick_import) + monkeypatch.setattr(parser, "parse_module", fn) + + expected = {f"{top_ns}.{brick}.one", f"{top_ns}.{brick}.two"} + + res = parser.fetch_import_usages_in_module(fake_path, top_ns, imported) + + assert res == expected + + +def test_fetch_import_usages_in_module_ns_brick_with_shadowed(monkeypatch) -> None: + fn = partial(fake_parse_module, ns_brick_import_with_shadowed) + monkeypatch.setattr(parser, "parse_module", fn) + + res = parser.fetch_import_usages_in_module(fake_path, top_ns, imported) + + assert res == set() + + +def test_fetch_import_usages_in_module_ns(monkeypatch) -> None: + fn = partial(fake_parse_module, ns_import) + monkeypatch.setattr(parser, "parse_module", fn) + + expected = {f"{top_ns}.{brick}.one"} + + res = parser.fetch_import_usages_in_module(fake_path, top_ns, imported) + + assert res == expected + + +def test_fetch_import_usages_in_module_ns_star(monkeypatch) -> None: + fn = partial(fake_parse_module, ns_import_star) + monkeypatch.setattr(parser, "parse_module", fn) + + expected = {f"{top_ns}.{brick}.one"} + + res = parser.fetch_import_usages_in_module(fake_path, top_ns, imported) + + assert res == expected