Source code for thanosql.resources._table

from __future__ import annotations

import enum
import os
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional, Union

import pandas as pd
from numpy import nan
from pydantic import Field, TypeAdapter

from thanosql._error import ThanoSQLValueError
from thanosql._service import ThanoSQLService
from thanosql.resources._model import BaseModel
from thanosql.resources._record import Records

if TYPE_CHECKING:
    from thanosql._client import ThanoSQL


[docs] class BaseColumn(BaseModel): default: Optional[str] = None is_nullable: Optional[bool] = True type: str name: str
[docs] class Column(BaseModel): id: Optional[int] = None default: Optional[str] = None is_nullable: Optional[bool] = True type: str name: str
[docs] class Unique(BaseModel): name: Optional[str] = None columns: Optional[List[str]] = []
[docs] class PrimaryKey(BaseModel): name: Optional[str] = None columns: Optional[List[str]] = []
[docs] class ForeignKey(BaseModel): name: Optional[str] = None reference_schema: str = "public" reference_column: str reference_table: str column: str
[docs] class Constraints(BaseModel): unique: Optional[List[Unique]] = None primary_key: Optional[PrimaryKey] = None foreign_keys: Optional[List[ForeignKey]] = None
[docs] class BaseTable(BaseModel): name: Optional[str] = None table_schema: Optional[str] = Field(alias="schema", default=None) columns: Optional[List[BaseColumn]] = None constraints: Optional[Constraints] = None
[docs] class TableObject(BaseModel): columns: Optional[List[BaseColumn]] = None constraints: Optional[Constraints] = None
[docs] class IfExists(enum.Enum): FAIL = "fail" APPEND = "append" REPLACE = "replace"
[docs] class TableService(ThanoSQLService): """Service layer for table methods. Attributes ---------- client: ThanoSQL The ThanoSQL client used to make requests to the engine. template: TableTemplateService The table template service layer to access methods involving table templates. """ def __init__(self, client: ThanoSQL) -> None: super().__init__(client=client, tag="table") self.template: TableTemplateService = TableTemplateService(client) def _parse_table_response(self, raw_response: dict) -> Table: table_adapter = TypeAdapter(Table) parsed_response = table_adapter.validate_python(raw_response["table"]) parsed_response.service = self return parsed_response
[docs] def list( self, schema: Optional[str] = None, verbose: Optional[bool] = None, offset: Optional[int] = None, limit: Optional[int] = None, ) -> List[Table]: """Lists tables stored in the workspace. Parameters ---------- schema : str, optional The schema where the listed tables should reside in. If not set, all tables from all schemas will be included. verbose : bool, optional Whether to include the table columns and constraints in the results. By default, or if set to False, only retrieves the names and schemas of stored tables. offset : int, optional When set to n, skips the first n results and excludes them from the output list. Otherwise, starts the list from the first result stored. Must be greater than 0. limit : int, optional When set to n, limits the number of results listed to n. Otherwise, lists up to 100 results per call. Must range between 0 to 100. Returns ------- List[Table] A list of Table objects. Raises ------ ThanoSQLValueError If offset is less than 0 or if limit is not between 0 to 100 (inclusive). """ path = f"/{self.tag}/" query_params = self._create_input_dict( schema=schema, verbose=verbose, offset=offset, limit=limit ) raw_response = self.client._request( method="get", path=path, query_params=query_params ) tables_adapter = TypeAdapter(List[Table]) parsed_response = tables_adapter.validate_python(raw_response["tables"]) for table in parsed_response: table.service = self return parsed_response
[docs] def get(self, name: str, schema: Optional[str] = None) -> Table: """Shows the details of the specified table. Parameters ---------- name : str The name of the table to be retrieved. schema : str, optional The schema where the table to be retrieved is in. If not specified, this method will look for the table in "public". Returns ------- Table A Table object. """ path = f"/{self.tag}/{name}" query_params = self._create_input_dict(schema=schema) raw_response = self.client._request( method="get", path=path, query_params=query_params ) return self._parse_table_response(raw_response)
[docs] def update( self, name: str, schema: Optional[str] = None, table: Optional[BaseTable] = None ) -> Table: """Updates the specified table. Parameters ---------- name : str The name of the table to be updated. schema : str, optional The schema where the table to be updated is in. If not specified, this method will look for the table in "public". table : BaseTable, optional BaseTable object containing changed details of the table to be updated. Any attribute of BaseTable can be modified, and if left unset, the current value will be maintained after update. The attributes are as follows: - name: new name to rename the table to - schema: new schema to move the table to - columns: new columns of the updated table. All new columns \ must be in this object, including columns that already exist \ in the original table. If this attribute is set but some \ original columns are not included, they will be removed from \ the table. - constraints: new constraints of the updated table. All new \ constraints must be in this object, including constraints \ that already exist in the original table. If this attribute is \ set but some original constraints are not included, they will \ be removed from the table. Returns ------- Table Table object of the new table after update. Raises ------ ThanoSQLValueError If the table object contains invalid formatting. """ path = f"/{self.tag}/{name}" query_params = self._create_input_dict(schema=schema) payload = self._create_input_dict(table=table) raw_response = self.client._request( method="put", path=path, query_params=query_params, payload=payload ) return self._parse_table_response(raw_response)
[docs] def create( self, name: str, table: TableObject, schema: Optional[str] = None, if_not_exists: bool = False, ) -> Table: """Creates a new table. Parameters ---------- name : str The name of the table to be created. table : TableObject TableObject containing the columns and constraints of the table to be created. In order to create an empty table, pass in an empty object (TableObject()). schema : str, optional The schema to save the created table in. If not specified, the table will be saved to "public". if_not_exists: bool, default False Whether to throw an error if a table of the same name already exists. When set to False (default), an error will be shown. When True, the table will only be created if it does not exist already. Otherwise, do nothing. Returns ------- Table Table object of the created table. Raises ------ ThanoSQLValueError If the table object contains invalid formatting. """ path = f"/{self.tag}/{name}" query_params = self._create_input_dict( schema=schema, if_not_exists=if_not_exists ) payload = self._create_input_dict(table=table) raw_response = self.client._request( method="post", path=path, query_params=query_params, payload=payload ) return self._parse_table_response(raw_response)
[docs] def upload( self, name: str, file: Optional[Union[str, os.PathLike]] = None, df: Optional[pd.DataFrame] = None, schema: Optional[str] = None, table: Optional[TableObject] = None, if_exists: str = "fail", ) -> Table: """Uploads the contents of a CSV or Excel-like file or Pandas DataFrame into the specified table. Either a CSV or Excel-like (.xls, .xlsx, .xlsm, .xlsb, .odf, .ods, .odt) file or DataFrame must be specified. However, both should not be used at the same time. Note: If you are working with a Strict Open XML Spreadsheet (*.xlsx) file, change it to Excel Workbook (*.xlsx), save it, and try using it again. Parameters ---------- name : str The name of the table created from the file or DataFrame. file : str or PathLike, optional CSV or Excel-like file containing tabulated data to be uploaded to the specified table. df : DataFrame, optional Pandas DataFrame containing data to be uploaded to the specified table. schema : str, optional The schema to save the created table in. If not specified, the table will be saved to "public". table : TableObject, optional TableObject containing the columns and constraints of the table to be created. If specified, the created table will follow the object format and no type inference is conducted. Otherwise, type inference will be performed and the table will be created to match the columns from source. if_exists : str, default "fail" What to do if table of the same name already exists. There are only three available values: - fail: fails (throws an error) if the same table exists - append: appends records into an existing table (columns must match \ in order to not make an error) - replace: deletes existing table and creates a new one with the \ given name Returns ------- Table Table object of the uploaded table. Raises ------ ThanoSQLValueError - If if_exists is not one of "fail", "append", or "replace". - If neither file nor df is used, or if both are used at the same time. - If file is used but it is neither CSV nor Excel-like. - If the file or df contains badly-formatted contents. - If a table body is specified but does not match the contents of the \ file or df. - If if_exists is set to "append" but the new contents does not match \ the format of the existing table. """ try: if_exists_enum = IfExists(if_exists) except Exception as e: raise ThanoSQLValueError(str(e)) if file and df is not None: raise ThanoSQLValueError( "Cannot use both file and DataFrame for upload at the same time." ) if file: path = f"/{self.tag}/{name}/upload/" file_extension = Path(file).suffix.lower() if file_extension == ".csv": path = path + "csv" elif file_extension in [ ".xls", ".xlsx", ".xlsm", ".xlsb", ".odf", ".ods", ".odt", ]: path = path + "excel" else: raise ThanoSQLValueError( "Invalid format: only CSV and Excel files possible." ) query_params = self._create_input_dict( schema=schema, if_exists=if_exists_enum.value ) payload = self._create_input_dict(table=table) raw_response = self.client._request( method="post", path=path, query_params=query_params, payload=payload, file=file, ) return self._parse_table_response(raw_response) elif df is not None: path = f"/{self.tag}/{name}/upload/json" df = df.replace({nan: None}) df_json = df.to_dict(orient="records") query_params = self._create_input_dict( schema=schema, if_exists=if_exists_enum.value ) payload = self._create_input_dict(table=table, data=df_json) raw_response = self.client._request( method="post", path=path, query_params=query_params, payload=payload, file=file, ) return self._parse_table_response(raw_response) else: raise ThanoSQLValueError("No file or DataFrame provided for upload")
[docs] def delete(self, name: str, schema: Optional[str] = None) -> dict: """Deletes the specified table. Parameters ---------- name : str The name of the table to be deleted. schema : str, optional The schema where the table to be deleted is in. If not specified, this method will look for the table in "public". Returns ------- dict A dictionary containing a success message, table name, and schema in the format of:: { "message": "string", "table_name": "string", "schema": "string" } """ path = f"/{self.tag}/{name}" query_params = self._create_input_dict(schema=schema) return self.client._request( method="delete", path=path, query_params=query_params )
[docs] class OnConflict(enum.Enum): FAIL = "fail" SKIP = "skip"
[docs] class Table(BaseTable): """Extends the BaseTable class, which has name, schema, columns, and constraints as attributes, with a table service layer to allow connection to the ThanoSQL engine. """ service: Optional[TableService] = None """The table service layer to access the ThanoSQL client."""
[docs] def get_records( self, offset: Optional[int] = None, limit: Optional[int] = None, ) -> Records: """Lists the records of the table. Parameters ---------- offset : int, optional When set to n, skips the first n results and excludes them from the output list. Otherwise, starts the list from the first result stored. Must be greater than 0. limit : int, optional When set to n, limits the number of results listed to n. Otherwise, lists up to 100 results per call. Must range between 0 to 100. Returns ------- Records A Records object. Raises ------ ThanoSQLValueError If offset is less than 0 or if limit is not between 0 to 100 (inclusive). """ path = f"/{self.service.tag}/{self.name}/records" query_params = self.service._create_input_dict( schema=self.table_schema, offset=offset, limit=limit, ) res = self.service.client._request( method="get", path=path, query_params=query_params, ) return Records(data=res["records"], total=res["total"])
[docs] def get_records_as_csv( self, timezone_offset: Optional[int] = None, ) -> None: """Downloads the records of the table as a CSV file. Parameters ---------- timezone_offset : int, optional Timezone offset from Coordinated Universal Time (UTC). If not set, this value is 9, following the timezone in Seoul. This value is used to determine the time used in the file name. """ path = f"/{self.service.tag}/{self.name}/records/csv" query_params = self.service._create_input_dict( schema=self.table_schema, timezone_offset=timezone_offset, ) self.service.client._request( method="get", path=path, query_params=query_params, stream=True )
[docs] def insert( self, records: List[dict], on_conflict: str = "fail", ) -> Table: """Inserts records to the specified table. Parameters ---------- records : list of dict The records to be inserted in the format of a list of column-value pairs. on_conflict : str, default "fail" What to do when conflict(s) due to unique constraint violation happen(s). There are only two available values: - fail: fails (throws an error) if any of the record violates the target \ table's unique constraint(s) - skip: skips record(s) that violate(s) unique constraint(s) and insert \ the rest Returns ------- Table A Table object. Raises ------ ThanoSQLValueError - If on_conflict is not one of "fail" or "skip". - If the records are in an invalid format or contain invalid contents. """ try: on_conflict_enum = OnConflict(on_conflict) except Exception as e: raise ThanoSQLValueError(str(e)) path = f"/{self.service.tag}/{self.name}/records" query_params = self.service._create_input_dict( schema=self.table_schema, on_conflict=on_conflict_enum.value ) raw_response = self.service.client._request( method="post", path=path, query_params=query_params, payload=records ) return self.service._parse_table_response(raw_response)
[docs] class TableTemplate(BaseModel): name: str table_template: TableObject version: Optional[str] compatibility: Optional[str] created_at: Optional[datetime]
[docs] class TableTemplateService(ThanoSQLService): """Service layer for table template methods. Attributes ---------- client: ThanoSQL The ThanoSQL client used to make requests to the engine. """ def __init__(self, client: ThanoSQL) -> None: super().__init__(client=client, tag="table_template")
[docs] def list( self, search: Optional[str] = None, order_by: Optional[str] = None, latest: Optional[bool] = None, ) -> List[TableTemplate]: """Lists table templates in the workspace. Parameters ---------- search : str, optional Search keywords that the table template names in the results must contain. If not set, all table templates are returned by default. order_by : str, optional How to order the results. There are only three possible values: - recent: based on the date of creation, from most recent to oldest - name_asc: based on the name of the template, from A to Z - name_desc: based on the name of the template, from Z to A latest : bool, optional Whether to return only the latest version of each table template. By default, or if set to False, all versions of table templates are included in the results. Returns ------- List[TableTemplate] A list of TableTemplate objects. Raises ------ ThanoSQLValueError If order_by is not one of "recent", "name_asc", or "name_desc". """ path = f"/{self.tag}/" query_params = self._create_input_dict( search=search, order_by=order_by, latest=latest, ) raw_response = self.client._request( method="get", path=path, query_params=query_params ) table_templates_adapter = TypeAdapter(List[TableTemplate]) parsed_response = table_templates_adapter.validate_python( raw_response["table_templates"] ) return parsed_response
[docs] def get(self, name: str, version: Optional[str] = None) -> dict: """Shows the details of the specified table template. Parameters ---------- name : str The name of the table template to be retrieved. version : str, optional The version of the table template to be retrieved. The value can either be a specific version such as "1.0", or "latest". If "latest" is specified, only the latest version of the table template will be shown. If version is not set, all versions will be shown. Returns ------- dict A dictionary of matching table template(s) in the format of:: { "table_templates": ["TableTemplate"], "versions": ["string"] } """ path = f"/{self.tag}/{name}" query_params = self._create_input_dict(version=version) raw_response = self.client._request( method="get", path=path, query_params=query_params ) table_templates_adapter = TypeAdapter(List[TableTemplate]) parsed_response = {} parsed_response["table_templates"] = table_templates_adapter.validate_python( raw_response["table_templates"] ) parsed_response["versions"] = raw_response["versions"] return parsed_response
[docs] def create( self, name: str, table_template: TableObject, version: Optional[str] = None, compatibility: Optional[str] = None, ) -> TableTemplate: """Creates a new table template. Parameters ---------- name : str The name of the table template to be created. table_template : TableObject TableObject containing the columns and constraints of the table template to be created. In order to create an empty table template, pass in an empty object (TableObject()). version : str, optional The version of the table template to be created. It must be in the format of "[1-9].[0-9]". If not set, it will default to "1.0". compatibility : str, optional The compatibility setting of the table template to be created. If not set, it will default to "ignore" (no compatibility checks). Returns ------- TableTemplate TableTemplate object of the created table template. Raises ------ ThanoSQLValueError - If the template name contains invalid characters or is too long. - If version is specified but is not in the right format. - If the table template contains formatting errors. """ path = f"/{self.tag}/{name}" payload = self._create_input_dict( table_template=vars(table_template), version=version, compatibility=compatibility, ) raw_response = self.client._request(method="post", path=path, payload=payload) table_template_adapter = TypeAdapter(TableTemplate) parsed_response = table_template_adapter.validate_python( raw_response["table_template"] ) return parsed_response
[docs] def delete(self, name: str, version: Optional[str] = None) -> dict: """Deletes the specified table template. Parameters ---------- name : str The name of the table template to be removed. version : str, optional The version of the table template to be removed. If not specified, all versions of the table template will be removed. Returns ------- dict A dictionary containing a success message and the name of the table template in the format of:: { "message": "string", "table_template_name": "string" } """ path = f"/{self.tag}/{name}" query_params = self._create_input_dict(version=version) return self.client._request( method="delete", path=path, query_params=query_params )