Source code for stelar.client.lineage

from __future__ import annotations

from dataclasses import KW_ONLY, dataclass, field
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID

if TYPE_CHECKING:
    from .client import Client
    from .proxy import Proxy
    from .resource import Resource


[docs] @dataclass class ENode: id: UUID type: str lineage: "Lineage" incoming: list[ENode] = field(default_factory=list, init=False) outgoing: list[ENode] = field(default_factory=list, init=False) _ = KW_ONLY @property def ent(self) -> Proxy: """Return the proxy for this ENode.""" return self.lineage.client.registry_for(self.type).fetch_proxy(self.id)
[docs] @dataclass class EResource(ENode): def __post_init__(self): self.type = "Resource" _ = KW_ONLY package_id: UUID | None = None url: str | None = None label: str = ""
[docs] @dataclass class ETask(ENode): def __post_init__(self): self.type = "Task" _ = KW_ONLY label: str = "" state: str = "unknown" image: str | None = None process_id: UUID | None = None start_date: datetime | None = None end_date: datetime | None = None
[docs] class Lineage: """ Represents a lineage graph between resources and tasks in the Stelar system. A lineage is a collection of tasks and resources that share a common ancestry or descendant. """ def __init__( self, client: Client, focus_id: UUID, data: dict, forward: bool = False ): self.client = client self.focus_id = focus_id self.data = data self.forward = forward self.nodes: list[ENode] = [] self.node: dict[UUID, ENode] = {} self.focus_node = None for node in self.data["nodes"]: if node["type"] == "Resource": enode = EResource( id=UUID(node["id"]), type=node["type"], lineage=self, label=node.get("label", ""), ) elif node["type"] == "Task": enode = ETask( id=UUID(node["id"]), type=node["type"], lineage=self, label=node.get("label"), state=node["state"], image=node["image"], start_date=datetime.fromisoformat(node["start_date"]), end_date=datetime.fromisoformat(node["end_date"]), ) else: raise ValueError(f"Unknown node type: {node['type']}") self.nodes.append(enode) self.node[enode.id] = enode if node.get("final"): self.focus_node = enode for s, t in self.data["edges"]: source = self.node[UUID(s)] target = self.node[UUID(t)] source.outgoing.append(target) target.incoming.append(source) self.resources: list[EResource] = [ n for n in self.nodes if isinstance(n, EResource) ] self.tasks: list[ETask] = [n for n in self.nodes if isinstance(n, ETask)] @property def focus(self) -> Resource: """ Return the focus resource of this lineage. The focus is the resource that is at the center of the lineage graph. """ return self.client.resources.get(self.focus_id) def __repr__(self): return f"Lineage(focus={self.focus_id},forward={self.forward})"