"""
Implement an accessor for CKAN API that bypasses the STELAR API.
This is useful for debugging and testing.
"""
from __future__ import annotations
from functools import wraps
from typing import TYPE_CHECKING
from urllib.parse import urljoin
import requests
from .api_call import api_context
from .proxy import EntityNotFound, Proxy, ProxyOperationError
if TYPE_CHECKING:
from .client import Client
[docs]
class CKAN:
"""A simple-minded back door to CKAN. It assumes that CKAN
is published by the ingress on path '/dc'.
Use as follows:
First, ensure that there is a context in ~/.stelar containing the field
'ckan_apitoken'. For example:
[apitest]
base_url=https://stelar.foo.com
username=joe
password=joesecret
ckan_apitoken: 123415343416151235jgf1gh3412jk1ljh5g1l5g....
Assuming the context is called 'apitest' (as above), do
dc = CKAN('apitesst')
dc.site_read() # This is like 'ping'
dc.help_show(name='site_read') # shows CKAN help
dc.package_list()
dc.package_show(id='shakespeare_novels')
dc.package_create(name='just_a_test', title='Just a test', owner_org='stelar-klms')
dc.package_delete(id='just_a_test')
... etc. Use
"""
def __init__(self, context="apitest", client=None):
from .client import Client
if client is None:
self.client = Client(context)
else:
self.client = client
self.ckanapi = urljoin(self.client._base_url, "/dc/api/3/action/")
self.headers = {
"Authorization": self.client._ckan_apitoken,
}
self.docs = {}
self.bad_names = set()
self.status = self.check()
def __bool__(self) -> bool:
return self.status
[docs]
def check(self) -> bool:
try:
url = urljoin(self.ckanapi, "site_read")
resp = requests.get(url, headers=self.headers)
result = resp.json()["result"]
success = resp.json()["success"]
return success and result
except Exception:
return False
def __getattr__(self, name):
if name in self.bad_names:
raise AttributeError(name)
# As a check that the API endpoint exists, try to fetch the
# documentation for it. Any error in that will be translated
# as a non-existent attribute
try:
doc = self.docs.get(name)
if doc is None:
urlh = urljoin(self.ckanapi, "help_show")
resp = requests.post(urlh, json={"name": name}, headers=self.headers)
doc = resp.json()["result"]
self.docs[name] = doc
except Exception as e:
self.bad_names.add(name)
raise AttributeError(name) from e
url = urljoin(self.ckanapi, name)
def ckan_call(json_obj={}, **kwargs):
json = json_obj | kwargs
resp = requests.post(url, json=json, headers=self.headers)
return resp.json()
ckan_call.__name__ = name
ckan_call.__doc__ = doc
setattr(self, name, ckan_call)
return ckan_call
def __repr__(self):
return f"<CKAN {self.ckanapi} {'active' if self.status else 'bad'}>"
[docs]
class api_call_DC(api_context):
"""Class that exposes the CKAN API for the Data Catalog.
`api_call(proxy).foo(...)`
returns the 'result' of the CKAN API response on success,
and raises a ProxyOperationError on failure.
`api_call(client).foo(...)`
does the same.
"""
def __init__(self, arg: Proxy | Client):
super().__init__(arg)
self.ckan = self.client.DC
def __getattr__(self, name):
func = getattr(self.ckan, name)
@wraps(func)
def wrapped_call(*args, **kwargs):
response = func(*args, **kwargs)
if not response["success"]:
err = response["error"]
if err["__type"] == "Not Found Error":
raise EntityNotFound(self.proxy_type, self.proxy_id, name)
else:
# Generic
raise ProxyOperationError(
self.proxy_type, self.proxy_id, name, response["error"]
)
return response["result"]
return wrapped_call
[docs]
def get_call(self, proxy_type, op):
_map_to_ckan = {
"Dataset": "package",
"Resource": "resource",
"Organization": "organization",
"Group": "group",
"Vocabulary": "vocabulary",
"Tag": "tag",
"User": "user",
}
ckan_type = _map_to_ckan[proxy_type.__name__]
if ckan_type == "package" and op == "purge":
call_name = "dataset_purge"
else:
call_name = f"{ckan_type}_{op}"
return getattr(self, call_name)