Source code for stelar.client.tool

from __future__ import annotations

import re
from dataclasses import dataclass, fields
from datetime import datetime
from typing import Any

from frozendict import deepfreeze, frozendict

from .api_call import api_call
from .generic import GenericCursor, GenericProxy
from .license import LicensedProxy
from .package import PackageCursor, PackageProxy
from .proxy import (
    AnyField,
    DateFmtField,
    EnumeratedField,
    Id,
    Property,
    StrField,
    derived_property,
)
from .proxy.property import DictProperty
from .task_spec import TaskSpec


[docs] class ToolCategoryField(EnumeratedField): VALUES = [ "discovery", "interlinking", "annotation", "other", ]
# {'is_manifest_list': False, │(stelar) vsam@metallurgix:~/git/stelar/stelar_client$ git commit -a # 'last_modified': 'Sun, 01 Jun 2025 19:45:45 -0000', │[main e7923b9] Fixed the proxy resync code related to task creation and termination. # 'manifest_digest': 'sha256:6e4a55d6531c1aa8844164b552ba7b5f5ef9b30c5d9d94a145777│ 4 files changed, 49 insertions(+), 4 deletions(-) # c7d26b7c572', │(stelar) vsam@metallurgix:~/git/stelar/stelar_client$ # 'name': '0.2.0', │(stelar) vsam@metallurgix:~/git/stelar/stelar_client$ # 'reversion': False, │(stelar) vsam@metallurgix:~/git/stelar/stelar_client$ git pull # 'size': 56837148, │Already up to date. # 'start_ts': 1748807145 # }
[docs] @dataclass class Image: """A container image used by a tool. Images are just information objects, they are not proxies. """ is_manifest_list: bool last_modified: datetime manifest_digest: str tool: Tool # The tool this image belongs to name: str reversion: bool size: int start_ts: datetime def __post_init__(self): if isinstance(self.last_modified, str): self.last_modified = datetime.strptime( self.last_modified, "%a, %d %b %Y %H:%M:%S -0000" ) if isinstance(self.start_ts, int): self.start_ts = datetime.fromtimestamp(self.start_ts) @property def id(self): """Return a unique identifier for this image.""" return f"stelar/{self.tool.name}:{self.name}" def __repr__(self): return f"Image({self.tool.name}/{self.name})"
IMAGE_TAG_PATTERN = re.compile(r"[a-zA-Z0-9_][a-zA-Z0-9_.-]{0,127}$") KUBERNETES_NAME_PATTERN = re.compile(r"[a-z0-9]([-a-z0-9]*[a-z0-9])?") CPU_QUANTITY_PATTERN = re.compile(r"^\d+(\.\d+)?[m]?$") MEMORY_QUANTITY_PATTERN = re.compile(r"^\d+(\.\d+)?(Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?$") RESTART_POLICY_VALUES = ["Always", "OnFailure", "Never"] IMAGE_PULL_POLICY_VALUES = ["Always", "IfNotPresent", "Never"]
[docs] @dataclass(frozen=True) class ToolProfile: """An object specifying options for executing images of a tool.""" image: str | None = None """The image to use for this tool, if specified.""" description: str | None = None image_pull_policy: str | None = None image_pull_secrets: list[str] | None = None cpu_request: str | None = None cpu_limit: str | None = None memory_request: str | None = None memory_limit: str | None = None backoff_limit: int | None = None restart_policy: str | None = None ttl_seconds_after_finished: int | None = None def __post_init__(self): self.validate_image(self.image) self.validate_image_pull_policy(self.image_pull_policy) self.validate_image_pull_secrets(self.image_pull_secrets) self.validate_cpu_request(self.cpu_request) self.validate_cpu_limit(self.cpu_limit) self.validate_memory_request(self.memory_request) self.validate_memory_limit(self.memory_limit) self.validate_backoff_limit(self.backoff_limit) self.validate_restart_policy(self.restart_policy) self.validate_ttl_seconds_after_finished(self.ttl_seconds_after_finished)
[docs] @staticmethod def validate(attr_name: str, value: Any): """Validate a single attribute of the tool profile.""" validator = getattr(ToolProfile, f"validate_{attr_name}", None) if validator is None: raise ValueError(f"Invalid attribute: {attr_name}") return validator(value)
[docs] @staticmethod def validate_image(image) -> None: if image is not None and not IMAGE_TAG_PATTERN.fullmatch(image): raise ValueError( f"Invalid image name: {image}. Must match {IMAGE_TAG_PATTERN.pattern}" )
[docs] @staticmethod def validate_image_pull_policy(policy: str | None) -> None: if policy is not None and policy not in IMAGE_PULL_POLICY_VALUES: raise ValueError( f"Invalid image pull policy: {policy}. Must be one of {IMAGE_PULL_POLICY_VALUES}." )
[docs] @staticmethod def validate_image_pull_secrets(secrets: list[str] | None) -> None: if secrets is not None: if not isinstance(secrets, list | tuple): raise TypeError( f"Image pull secrets must be a list or tuple, got {type(secrets)}" ) if not all(KUBERNETES_NAME_PATTERN.fullmatch(secret) for secret in secrets): raise ValueError( f"Invalid image pull secret names: {secrets}. Must match {KUBERNETES_NAME_PATTERN.pattern}" )
[docs] @staticmethod def validate_cpu_request(cpu_request: str | None) -> None: if cpu_request is not None and not CPU_QUANTITY_PATTERN.fullmatch(cpu_request): raise ValueError( f"Invalid CPU request: {cpu_request}. Must match {CPU_QUANTITY_PATTERN.pattern}" )
[docs] @staticmethod def validate_cpu_limit(cpu_limit: str | None) -> None: if cpu_limit is not None and not CPU_QUANTITY_PATTERN.fullmatch(cpu_limit): raise ValueError( f"Invalid CPU limit: {cpu_limit}. Must match {CPU_QUANTITY_PATTERN.pattern}" )
[docs] @staticmethod def validate_memory_request(memory_request: str | None) -> None: if memory_request is not None and not MEMORY_QUANTITY_PATTERN.fullmatch( memory_request ): raise ValueError( f"Invalid memory request: {memory_request}. Must match {MEMORY_QUANTITY_PATTERN.pattern}" )
[docs] @staticmethod def validate_memory_limit(memory_limit: str | None) -> None: if memory_limit is not None and not MEMORY_QUANTITY_PATTERN.fullmatch( memory_limit ): raise ValueError( f"Invalid memory limit: {memory_limit}. Must match {MEMORY_QUANTITY_PATTERN.pattern}" )
[docs] @staticmethod def validate_backoff_limit(backoff_limit: int | None) -> None: if backoff_limit is not None and ( not isinstance(backoff_limit, int) or backoff_limit < 0 ): raise ValueError( f"Invalid backoff limit: {backoff_limit}. Must be a non-negative integer." )
[docs] @staticmethod def validate_restart_policy(restart_policy: str | None) -> None: if restart_policy is not None and restart_policy not in RESTART_POLICY_VALUES: raise ValueError( f"Invalid restart policy: {restart_policy}. Must be one of {RESTART_POLICY_VALUES}." )
[docs] @staticmethod def validate_ttl_seconds_after_finished(ttl_seconds: int | None) -> None: if ttl_seconds is not None and ( not isinstance(ttl_seconds, int) or ttl_seconds < 0 ): raise ValueError( f"Invalid TTL seconds after finished: {ttl_seconds}. Must be a non-negative integer." )
[docs] def to_dict(self) -> dict[str, Any]: """Convert the tool profile to a dictionary, omit None values.""" return { field.name: getattr(self, field.name) for field in fields(self) if getattr(self, field.name) is not None }
[docs] class ProfilesField(AnyField): """A field for storing tool profiles as a dictionary.""" def __init__(self, **kwargs): super().__init__(nullable=False, **kwargs) self.add_check(self.to_frozendict, 5)
[docs] def to_frozendict( self, value: dict[str, ToolProfile | dict], **kwargs ) -> tuple[frozendict, bool]: """Convert a dictionary of ToolProfile objects to a frozendict.""" if not isinstance(value, (dict, frozendict)): raise TypeError(f"Expected dict, got {type(value)}") fprof = {} for key, profile in value.items(): if not isinstance(profile, (ToolProfile | dict | frozendict)): raise TypeError( f"Expected profile dict or ToolProfile, got {type(profile)}" ) try: ToolProfile.validate_image(key) except Exception as e: raise ValueError(f"Invalid profile name '{key}': {e}") if isinstance(profile, dict | frozendict): try: ToolProfile(**profile) except Exception as e: raise ValueError(f"Invalid profile data for '{key}': {e}") fprof[key] = profile else: fprof[key] = profile.to_dict() return deepfreeze(fprof), False
[docs] def convert_to_proxy(self, value: Any, **kwargs) -> frozendict: return deepfreeze(value)
[docs] class Tool(PackageProxy, LicensedProxy): # weird ones # license_id = Property(validator=StrField(nullable=True), updatable=True) git_repository = Property(validator=StrField(nullable=True), updatable=True) programming_language = Property(validator=StrField(nullable=True), updatable=True) version = Property( validator=StrField(nullable=True, maximum_len=100), updatable=True ) inputs = DictProperty(str, str, updatable=True) outputs = DictProperty(str, str, updatable=True) parameters = DictProperty(str, str, updatable=True) category = Property(validator=ToolCategoryField(nullable=True), updatable=True) repository = Property(validator=StrField(nullable=False), updatable=False) profiles = Property(validator=ProfilesField, updatable=True)
[docs] def get_profile(self, name="default", default=None) -> ToolProfile | None: """Return the tool profile for the given name.""" if name not in self.profiles: return default return ToolProfile(**self.profiles[name])
[docs] def set_profile( self, name: str, profile: ToolProfile | dict | None = None, **pargs ) -> None: ps = self.profiles if profile is None: profile = ToolProfile(**pargs) elif isinstance(profile, dict): profile = ToolProfile(**(profile | pargs)) self.profiles = ps.set(name, profile)
[docs] def update_profile(self, name: str, **pargs) -> None: """Update the tool profile with the given name.""" ps = self.profiles prof = ps.get(name, {}) prof |= pargs return self.set_profile(name, prof)
[docs] def delete_profile(self, name: str) -> None: """Delete the tool profile with the given name.""" ps = dict(self.profiles) if name in ps: del ps[name] self.profiles = ps
@derived_property def images(self, entity): """Return a list of images associated with this tool.""" images = entity.get("images", []) return [Image(tool=self, **image) for image in images]
[docs] def get_image(self, name: str | None = None, default=None) -> Image | None: """Return the image of the given name, or the default image for this tool, if any. """ if name is not None: for image in self.images: if image.name == name: return image return default # No name, return the default image if it exists dprof = self.get_profile() if dprof and dprof.image: # If a default profile is set, return the image from that profile return self.get_image(dprof.image, default) img = self.images if len(img) == 1: # If there's only one image, return it return img[0] return None
[docs] def task_spec(self, profile_or_image=None): """Return a new TaskSpec initialized for this tool. Args profile_or_image: Either a profile name or an image name. If a profile name is given, the image from that profile will be used. If an image name is given, it will be used directly. """ # if image not in self.profiles return TaskSpec(tool=self, image=profile_or_image)
[docs] class ToolCursor(PackageCursor[Tool]): def __init__(self, client): super().__init__(client, Tool)
[docs] class ImageRegistryToken(GenericProxy): """A token representing an image used by a tool.""" DATEFORMAT = "%a, %d %b %Y %H:%M:%S %z" id = Id(entity_name="uuid") created = Property(validator=DateFmtField(DATEFORMAT), updatable=False) expiration = Property( validator=DateFmtField(DATEFORMAT, nullable=True), updatable=False ) last_accessed = Property( validator=DateFmtField(DATEFORMAT, nullable=True), updatable=False ) title = Property(validator=StrField(nullable=False), updatable=False) token = Property( validator=StrField(nullable=False), updatable=False, entity_name="token_code" )
[docs] class ImageRegistryTokenCursor(GenericCursor[ImageRegistryToken]): """A cursor for iterating over image registry tokens.""" def __init__(self, client): super().__init__(client, ImageRegistryToken)
[docs] def create( self, title: str, expiration: datetime | None = None ) -> ImageRegistryToken: """Create a new image registry token.""" ac = api_call(self) result = ac.image_registry_token_create(title=title, expiration=expiration) return self.fetch_proxy_for_entity(result)