import json
from typing import Iterable, List, Optional, Tuple, Union
from jinja2 import BaseLoader, Environment, StrictUndefined
from thanosql._error import ThanoSQLValueError
def _stringify_list(val: object) -> str:
# If we use str(lst) to show the string representation of a list directly,
# there can be unwanted extra quotes. e.g. ['a', 'b'] -> ["'a'", "'b'"]
# so we have to join list elements manually; this also covers nested lists
if isinstance(val, list):
return f"[{', '.join([_stringify_list(item) for item in val])}]"
return str(val)
def _to_postgresql_value_helper(val: object) -> Union[str, list]:
# Helper function to convert Python objects into their PSQL representation
# First, recursively apply the function to each element of lists
if isinstance(val, list):
return [_to_postgresql_value_helper(item) for item in val]
# Convert Python None values to PSQL NULL (without single quotes)
if val is None:
return "NULL"
# If the object is a dictionary, convert it to string since we are going
# to use its string representation in our PSQL query
if isinstance(val, dict):
val = json.dumps(val)
# If the object is a string (including the stringified dictionary above),
# replace quoted single quotes (') from \' to '' (C-style to PSQL-style)
# Double quotes (") need not be replaced as they are treated as a normal
# character in PSQL strings, and are also C-style escaped in JSONs
# Then, enclose string/varchar values in single quotes (this includes
# JSON/dictionary objects)
if isinstance(val, str):
quoted_val = val.replace("\\'", "'").replace("'", "''")
return f"'{quoted_val}'"
# For other types of objects, return their string representation
return str(val)
def _to_postgresql_value(val: object) -> str:
val = _to_postgresql_value_helper(val)
# There are two possible array representations in PSQL,
# '{{...}, {...}}' and ARRAY[[...], [...]]
# For empty arrays/lists, we will use the '{}' notation
# For non-empty arrays/lists, we will use the ARRAY[[...]] notation
if isinstance(val, list):
if len(val) == 0:
return "'{}'"
return f"ARRAY{_stringify_list(val)}"
# All other values do not need special treatment
return val
def _split_query(query: str) -> Tuple[str, str]:
# Make sure there is only one {{ values }} placeholder
split_val = query.split("{{ values }}")
if len(split_val) != 2:
raise ThanoSQLValueError(
"One and only one {{ values }} placeholder is required"
)
return split_val[0], split_val[1]
def _paginate(seq: Iterable, page_size: int):
"""Consume an iterable and return it in chunks.
Every chunk is at most `page_size`. Never return an empty chunk.
From https://github.com/psycopg/psycopg2/blob/master/lib/extras.py#L1175
"""
page = []
it = iter(seq)
while True:
try:
for _ in range(page_size):
page.append(next(it))
yield page
page = []
except StopIteration:
if page:
yield page
return
def _render(query: str, params: dict) -> str:
template = Environment(loader=BaseLoader, undefined=StrictUndefined).from_string(
query
)
return template.render(**params)
[docs]
def fill_query_placeholder(
query: str,
values: Union[List[tuple], List[dict]],
template: Optional[str] = None,
page_size: int = 100,
) -> str:
# The query statement consists of three parts:
# {{ 1. before values }}{{ 2. values placeholder }}{{ 3. after values }}
# Split the query with regards to {{ values }} into pre and post parts
pre, post = _split_query(query)
full_query_list = []
# Fill in the values to the query according to the defined page size
# We will repeat the statement (with values) according to the number of pages
# Each page contains at most page_size number of value sets
for page in _paginate(values, page_size):
val_query_list = []
for args in page:
if not template:
if not isinstance(args, tuple):
raise ThanoSQLValueError(
"If template is not provided, values must be given as a list of tuples"
)
# If there is no template, simply join values inside brackets separated by commas
# We cannot directly use str(args) as this may result in unwanted quotations
args_transformed = tuple(map(_to_postgresql_value, args))
args_str = ", ".join(args_transformed)
args_query = f"({args_str})"
else:
if not isinstance(args, dict):
raise ThanoSQLValueError(
"If template is provided, values must be given as a list of dictionaries"
)
# If there is a template, we substitute the arguments into the template
args_transformed = {k: _to_postgresql_value(v) for k, v in args.items()}
args_query = _render(template, args_transformed)
val_query_list.append(args_query)
# Assemble: pre - values - post in query statement
val_query = pre + ", ".join(val_query_list) + post
full_query_list.append(val_query)
return ";\n".join(full_query_list)