Source code for stelar.client.task_spec

"""Basic support for tasks in the client."""

from __future__ import annotations

from typing import TYPE_CHECKING, Generator
from uuid import UUID

from .dataset import Dataset
from .group import Organization
from .resource import Resource
from .utils import convert_proxy_id_to_str

if TYPE_CHECKING:
    # Type definitions for
    from .tool import Tool

    ToolSpec = Tool | UUID | str | None
    ImageSpec = str | None
    ExistingDataset = Dataset | UUID | str


[docs] class TaskSpec: """A task spec is used to create tasks. Contrary to other entities, Task creation is more complicated than simply providing values for some scalar attributes. Tasks, once created are not really editable; therefore, all task fields must be specified at task creation time. This class helps by collecting arguments needed to initialize a new Task. """ def __init__( self, tool: ToolSpec = None, *, image: ImageSpec = None, name: str = None ): from .tool import Tool self.tool = convert_proxy_id_to_str(tool, Tool) if isinstance(image, str | None): self.image = image else: raise TypeError("Expected a string as image") if not isinstance(name, str | None): raise TypeError("Expected a string as name") self.name = name self.datasets = {} self.inputs = {} self.outputs = {} self.parameters = {}
[docs] @classmethod def from_json(cls, json_spec: dict) -> TaskSpec: """Create a TaskSpec from a JSON specification. Args: json_spec (dict): the JSON specification for the task spec. """ tool = json_spec.get("tool") image = json_spec.get("image") name = json_spec.get("name", "task") ts = cls(tool=tool, image=image, name=name) ts.datasets = json_spec.get("datasets", {}) ts.inputs = json_spec.get("inputs", {}) ts.outputs = json_spec.get("outputs", {}) ts.parameters = json_spec.get("parameters", {}) return ts
[docs] def is_external(self) -> bool: """Check if this task spec is local (i.e., not using a remote tool).""" return self.tool is None and self.image is None
[docs] def d( self, alias: str, dset: Dataset | UUID | str | None = None, **dspec ) -> TaskSpec: """Add a dataset alias in the task spec. An alias stands for either an existing dataset or for the spec of a future dataset. Dataset aliases can be useful in specifying input and output arguments to a task. Args: alias (str): the dataset alias name dset (Dataset|UUID|str|None): specify an existing dataset for this alias dspec (dict): the spec for a future dictionary. """ if dset is not None: if dspec: raise ValueError("Cannot provide both a dataset and a new dataset spec") self.datasets[alias] = convert_proxy_id_to_str(dset, Dataset) else: if "name" not in dspec: raise ValueError("A dataset spec must provide a name") if "owner_org" not in dspec: dspec["owner_org"] = "stelar-klms" else: dspec["owner_org"] = convert_proxy_id_to_str( dspec["owner_org"], Organization, nullable=False ) self.datasets[alias] = dspec return self
[docs] def process_inspec_entry( self, inspec: str | UUID | Resource | list[str | UUID | Resource] ) -> Generator[str]: if isinstance(inspec, (Resource, UUID, str)): yield convert_proxy_id_to_str(inspec, Resource) elif isinstance(inspec, list | tuple): for i in inspec: yield from self.process_inspec_entry(i) else: raise TypeError("Unknown type for input spec")
[docs] def process_inspec(self, inspec) -> list[str]: return list(self.process_inspec_entry(inspec))
[docs] def i(self, **input_specs): for name, inspec in input_specs.items(): self.inputs[name] = self.process_inspec(inspec) return self
[docs] def process_outspec(self, outspec: str | dict): # Output is either an S3 URI (str) # or a dict. No processing here (yet!) if isinstance(outspec, str): return {"url": outspec} elif isinstance(outspec, dict): match outspec: case { "url": url, "resource": dict(), "dataset": str(), }: return outspec case {"url": url, "resource": res}: return { "url": url, "resource": convert_proxy_id_to_str(res, Resource), } case {"url": url}: return outspec case _: raise ValueError("Bad format for output spec", outspec) else: raise TypeError( f"Bad type {type(outspec)} for output spec, expected string or dict" )
[docs] def o(self, **ospec) -> TaskSpec: for name, osp in ospec.items(): self.outputs[name] = self.process_outspec(osp) return self
[docs] def p(self, **params) -> TaskSpec: self.parameters |= params return self
[docs] def spec(self) -> dict: """Return the task spec as a dictionary. Returns: dict: the task spec as a dictionary. """ s = {} if self.tool is not None: s["tool"] = self.tool if self.image is not None: s["image"] = self.image s["datasets"] = self.datasets.copy() s["inputs"] = self.inputs.copy() s["outputs"] = self.outputs.copy() s["parameters"] = self.parameters.copy() s["name"] = self.name if self.name is not None else "unnamed" return s