"""Declare utilities for package-derived types of entities.
These will be:
- datasets
- workflows
- workflow processes
- tools
"""
from __future__ import annotations
from functools import cached_property
from typing import Any, Optional, TypeVar
from uuid import UUID
from .api_call import api_call
from .generic import GenericCursor
from .named import NamedProxy
from .proxy import (
BoolField,
DateField,
Property,
ProxyVec,
Reference,
RefList,
StrField,
TaggableProxy,
UUIDField,
derived_property,
)
from .relationship import Rel, Relationship, RelProxy
from .resource import Resource
from .utils import client_for, tag_split
from .vocab import Tag
[docs]
class PackageProxy(NamedProxy, TaggableProxy, entity=False):
# Auto-maintained fields
metadata_created = Property(validator=DateField)
metadata_modified = Property(validator=DateField)
creator = Property(validator=UUIDField, entity_name="creator_user_id")
private = Property(
validator=BoolField(nullable=False, default=False), updatable=True
)
organization = Reference(
"Organization",
entity_name="owner_org",
create_default="default_organization",
updatable=True,
trigger_sync=True,
)
groups = RefList("Group", trigger_sync=False)
# User-maintained fields
notes = Property(validator=StrField(nullable=True), updatable=True)
author = Property(validator=StrField(nullable=True), updatable=False)
author_email = Property(validator=StrField(nullable=True), updatable=False)
maintainer = Property(validator=StrField(nullable=True), updatable=True)
maintainer_email = Property(validator=StrField(nullable=True), updatable=True)
# Resources are NOT dataset-specific
resources = RefList(Resource, trigger_sync=True)
[docs]
def add_resource(self, **properties):
"""Add a new resource with the given properties.
Example: new_rsrc = d.add_resource(name="Profile", url="s3://datasets/a.json",
format="json", mimetype="application/json")
Args:
**properties: The arguments to pass. See 'Resource' for details.
"""
return client_for(self).resources.create(dataset=self, **properties)
@derived_property
def relationships(self, entity) -> RelProxy:
"""Return a Relationships for this package.
This is a set of relationships that this package has with other packages.
"""
return RelProxy(self)
[docs]
def add_relationship(
self, rel: Rel | str, obj: PackageProxy | UUID, comment: Optional[str] = None
):
"""Add a relationship with this package as subject.
Args:
rel: The relationship type to add.
obj: The object to relate to, either a PackageProxy or UUID.
comment: An optional comment for the relationship.
Returns:
"""
if isinstance(obj, PackageProxy):
obj = str(obj.id)
elif isinstance(obj, UUID):
obj = str(obj)
if isinstance(rel, Rel):
rel = rel.value
ac = api_call(self)
reldata = ac.relationship_create(str(self.id), rel, obj, comment)
return Relationship(ac.client, reldata)
PackageProxyType = TypeVar("PackageProxy", bound=PackageProxy)
[docs]
class PackageCursor(GenericCursor[PackageProxyType]):
"""A cursor for package-based entities (datasets, tools, workflows etc).
This cursor provides package-specific methods for searching and filtering
the entities, based on CKAN's Solr backend.
"""
def __init__(self, client, proxy_type):
super().__init__(client, proxy_type)
@cached_property
def default_organization(self):
"""Return the default organization.
This is a cached property, used in the initialization
of the `organization` field at `PackageProxy` creation.
"""
return client_for(self).organizations["stelar-klms"]
[docs]
def search(
self,
*,
q: str | None = None,
bbox: list[float] | None = None,
fq: list[str] = [],
fl: list[str] = None,
sort: str = None,
limit: int | None = None,
offset: int | None = None,
facet: dict[str, Any] | None = None,
):
"""
Search for package-based entities (datasets, tools, workflows etc).
This is the main function for searching the data catalog for packages.
Many other functions are implemented on top of this one.
Arguments:
q: The query string.
bbox: A list of four floats representing the bounding box of a spatial query.
fq: A list of filter queries. These are not used to obtain the score of the results,
but instead just to filter the results. They are quite efficient.
fl: A list of fields to return. If None, a proper entity is returned.
Note that some fields are special (not actually attribute fields), e.g.,
the score field.
sort: The fields to sort by. Sorting can be ascending or descending.
limit: The maximum number of results to return.
offset: The offset to start from.
facet: A dictionary of facet field spec (facet attributes and limits).
Returns:
An answer which contains the following fields:
- count: The number of results found (not the number of results returned).
- results: A list of results.
- facets: A dictionary of facets.
"""
search = self.client.api.get_call(self.proxy_type, "search")
args = dict(
q=q,
bbox=bbox,
fq=fq,
fl=fl,
sort=sort,
limit=limit,
offset=offset,
facet=facet,
)
query = {k: v for k, v in args.items() if v is not None}
return search(query)
[docs]
def with_tag(
self, tagarg: Tag | str, *, limit: int | None = None, offset: int | None = None
):
"""Return a list of entities ids which have the given tag
Arguments:
tagarg: A Tag (proxy) or tagspec (a string)
limit: The maximum number of results to return.
offset: The offset to start from.
Returns:
A proxy list of entity ids
"""
# Need to obtain the tag ID, in order to call tag_show
if isinstance(tagarg, Tag):
return self.with_tag(tagarg.tagspec, limit=limit, offset=offset)
# Assume that tagarg is a string
voc, tag = tag_split(tagarg)
if voc is None:
filter = f'tags:"{tag}"'
else:
filter = f'vocab_{voc}:"{tag}"'
rids = self.search(fq=[filter], fl=["id"], limit=limit, offset=offset)
ids = [UUID(r["id"]) for r in rids["results"]]
return ProxyVec(self.client, self.proxy_type, ids)