Source code for stelar.client.dataset

import pathlib

import pandas as pd

from stelar.client import pdutils
from stelar.client.mutils import is_s3url, s3spec_to_pair
from stelar.client.pdutils import infer_format

from .license import LicensedProxy
from .package import PackageCursor, PackageProxy
from .proxy import Property, StrField
from .reprstyle import dataset_to_html
from .resource import Resource
from .spatial import GeoJSON
from .utils import client_for


[docs] class Dataset(PackageProxy, LicensedProxy): """ A proxy of a STELAR dataset. """ title = Property(validator=StrField, updatable=True) url = Property(validator=StrField(nullable=True), updatable=True) version = Property( validator=StrField(nullable=True, maximum_len=100), updatable=True ) # The spatial property spatial = Property(validator=GeoJSON(nullable=True), updatable=True) # N.B. This has been removed from the schema # license_id = Property(validator=StrField(nullable=True), updatable=True) # profile # relationships_as_object # relationships_as subject
[docs] def add_dataframe( self, df: pd.DataFrame, s3path: str, format: str = None, **kwargs ) -> Resource: """Add a DataFrame as a resource to the dataset. Args: df (pd.DataFrame): The DataFrame to add. s3path (str): The S3 path to save the DataFrame. format (str): The format of the file. If not specified, an attempt will be done to infer it. **kwargs: Additional keyword arguments to pass to the write_dataframe function """ # Infer the format of the file to save fmt = infer_format(s3path, format) if fmt is None: raise ValueError("Could not infer the format of the file to save.") # Collect properties of the new resource if not is_s3url(s3path): raise ValueError("The path must be an S3 URL.") bucket, path = s3spec_to_pair(s3path) stem = pathlib.PurePosixPath(path).stem rsrc = self.add_resource( name=stem, url=s3path, format=fmt, mimetype=f"application/{fmt}", description=df.describe().to_json(), columns=df.columns.tolist(), rows=len(df), size=df.memory_usage().sum(), relation="owned", ) try: pdutils.write_dataframe(client_for(self), df, s3path, format=fmt, **kwargs) except Exception as e: rsrc.delete() raise e return rsrc
[docs] def read_dataframe(self, format: str | None = None, **kwargs): """Read the dataset as a DataFrame. Note: the dataframe need not be stored in S3 Storage. Args: format (str): The format of the file to read. If not specified, the format will be inferred from the file extension. kwargs (dict): Additional keyword arguments to pass to the read. Returns: pd.DataFrame: The DataFrame read from the dataset. """ if not self.url: raise ValueError("The dataset URL is nil.") if (not format) and hasattr(self, "format"): format = self.format fmt = infer_format(self.url, format) if fmt is None: raise ValueError("Could not infer the format of the file to save.") return pdutils.read_dataframe(client_for(self), self.url, format=fmt, **kwargs)
[docs] def export_zenodo(self) -> dict: """Export the dataset to Zenodo. This method will create and return a new Zenodo record for the dataset. The method returns the Zenodo record as a dictionary. Returns: dict: The Zenodo record for the dataset. """ ac = client_for(self).api_call() return ac.dataset_export_zenodo(self.id)
def _repr_html_(self): return dataset_to_html(self) def __disabled_str__(self): dataset_info = f"""Title: {self.title} | Dataset ID: {self.id} | Name: {self.name} | Tags: {self.tags} | Modified Date: {self.modified_date}\nDataset Resources:\n""" if self.resources: for resource in self.resources: dataset_info += "\t" + str(resource) + "\n" else: dataset_info += "\tNo Resources Associated" return dataset_info
[docs] class DatasetCursor(PackageCursor[Dataset]): def __init__(self, client): super().__init__(client, Dataset)
[docs] def publish_file( self, s3file: str, format: str | None = None, *, resource: dict | None = {}, **dataset_properties, ) -> Dataset: """Publish a new dataset in the catalog for a single file in Storage. The main input to this call is a single file in Storage, identified by its S3 URL. By default, this call will create a new dataset in the catalog and a new resource, which will be initialized by provided properties and also by infering properties by analyzing the given S3 URL. If the :code:`resource` parameter is set explicitly to None, no resource will be created; instead, the dataset will be assigned the given S3 URL as its URL. Args: s3path (str): The S3 path to the file. format (str): The format of the file. If not specified, an attempt will be done to infer it. resource_properties (dict): Properties of the new resource. If this is set explicitly to None, no resource will be created; instead, the dataset **dataset_properties: Properties of the new dataset. """
[docs] def publish_dataframe( self, df: pd.DataFrame, s3path: str, format: str | None = None, *, write={}, **properties, ) -> Dataset: """Publish a DataFrame as a new dataset. The dataframe will be stored at the given path in the format specified by the 'format' argument. If the format is not specified, an attempt will be done to infer it from the file extension of the s3path. Additional arguments to the pandas write method (DataFrame.to_{format}) can be passed using the 'write' argument. Args: df (pd.DataFrame): The DataFrame to publish. s3path (str): The S3 path to save the DataFrame. format (str): The format of the file. If not specified, an attempt will be done to infer it. write (dict): Keyword arguments to pass to the write_dataframe function **properties: Properties of the new dataset. """ # Infer the format of the file to save fmt = infer_format(s3path, format) if fmt is None: raise ValueError("Could not infer the format of the file to save.") # Collect properties of the new resource if not is_s3url(s3path): raise ValueError("The path must be an S3 URL.") bucket, path = s3spec_to_pair(s3path) stem = pathlib.PurePosixPath(path).stem if "name" not in properties: properties["name"] = stem if "title" not in properties: properties["title"] = stem if "author" not in properties: properties["author"] = self.client.users.current_user.fullname properties["author_email"] = self.client.users.current_user.email if "maintainer" not in properties: properties["maintainer"] = self.client.users.current_user.fullname properties["maintainer_email"] = self.client.users.current_user.email properties["url"] = s3path properties["columns"] = df.columns.tolist() properties["rows"] = len(df) properties["size"] = df.memory_usage().sum() properties["format"] = fmt properties["mimetype"] = f"application/{fmt}" properties["description"] = df.describe().to_json() dataset = self.create(**properties) try: pdutils.write_dataframe(self.client, df, s3path, format=fmt, **write) except Exception as e: dataset.delete(purge=True) raise e return dataset