"""SQL composition utility module """ # psycopg/sql.py - SQL composition utility module # # Copyright (C) 2016-2019 Daniele Varrazzo # Copyright (C) 2020-2021 The Psycopg Team # # psycopg2 is free software: you can redistribute it and/or modify it # under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # In addition, as a special exception, the copyright holders give # permission to link this program with the OpenSSL library (or with # modified versions of OpenSSL that use the same license as OpenSSL), # and distribute linked combinations including the two. # # You must obey the GNU Lesser General Public License in all respects for # all of the code used other than OpenSSL. # # psycopg2 is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. import string from psycopg2 import extensions as ext _formatter = string.Formatter() class Composable: """ Abstract base class for objects that can be used to compose an SQL string. `!Composable` objects can be passed directly to `~cursor.execute()`, `~cursor.executemany()`, `~cursor.copy_expert()` in place of the query string. `!Composable` objects can be joined using the ``+`` operator: the result will be a `Composed` instance containing the objects joined. The operator ``*`` is also supported with an integer argument: the result is a `!Composed` instance containing the left argument repeated as many times as requested. """ def __init__(self, wrapped): self._wrapped = wrapped def __repr__(self): return f"{self.__class__.__name__}({self._wrapped!r})" def as_string(self, context): """ Return the string value of the object. :param context: the context to evaluate the string into. :type context: `connection` or `cursor` The method is automatically invoked by `~cursor.execute()`, `~cursor.executemany()`, `~cursor.copy_expert()` if a `!Composable` is passed instead of the query string. """ raise NotImplementedError def __add__(self, other): if isinstance(other, Composed): return Composed([self]) + other if isinstance(other, Composable): return Composed([self]) + Composed([other]) else: return NotImplemented def __mul__(self, n): return Composed([self] * n) def __eq__(self, other): return type(self) is type(other) and self._wrapped == other._wrapped def __ne__(self, other): return not self.__eq__(other) class Composed(Composable): """ A `Composable` object made of a sequence of `!Composable`. The object is usually created using `!Composable` operators and methods. However it is possible to create a `!Composed` directly specifying a sequence of `!Composable` as arguments. Example:: >>> comp = sql.Composed( ... [sql.SQL("insert into "), sql.Identifier("table")]) >>> print(comp.as_string(conn)) insert into "table" `!Composed` objects are iterable (so they can be used in `SQL.join` for instance). """ def __init__(self, seq): wrapped = [] for i in seq: if not isinstance(i, Composable): raise TypeError( f"Composed elements must be Composable, got {i!r} instead") wrapped.append(i) super().__init__(wrapped) @property def seq(self): """The list of the content of the `!Composed`.""" return list(self._wrapped) def as_string(self, context): rv = [] for i in self._wrapped: rv.append(i.as_string(context)) return ''.join(rv) def __iter__(self): return iter(self._wrapped) def __add__(self, other): if isinstance(other, Composed): return Composed(self._wrapped + other._wrapped) if isinstance(other, Composable): return Composed(self._wrapped + [other]) else: return NotImplemented def join(self, joiner): """ Return a new `!Composed` interposing the *joiner* with the `!Composed` items. The *joiner* must be a `SQL` or a string which will be interpreted as an `SQL`. Example:: >>> fields = sql.Identifier('foo') + sql.Identifier('bar') # a Composed >>> print(fields.join(', ').as_string(conn)) "foo", "bar" """ if isinstance(joiner, str): joiner = SQL(joiner) elif not isinstance(joiner, SQL): raise TypeError( "Composed.join() argument must be a string or an SQL") return joiner.join(self) class SQL(Composable): """ A `Composable` representing a snippet of SQL statement. `!SQL` exposes `join()` and `format()` methods useful to create a template where to merge variable parts of a query (for instance field or table names). The *string* doesn't undergo any form of escaping, so it is not suitable to represent variable identifiers or values: you should only use it to pass constant strings representing templates or snippets of SQL statements; use other objects such as `Identifier` or `Literal` to represent variable parts. Example:: >>> query = sql.SQL("select {0} from {1}").format( ... sql.SQL(', ').join([sql.Identifier('foo'), sql.Identifier('bar')]), ... sql.Identifier('table')) >>> print(query.as_string(conn)) select "foo", "bar" from "table" """ def __init__(self, string): if not isinstance(string, str): raise TypeError("SQL values must be strings") super().__init__(string) @property def string(self): """The string wrapped by the `!SQL` object.""" return self._wrapped def as_string(self, context): return self._wrapped def format(self, *args, **kwargs): """ Merge `Composable` objects into a template. :param `Composable` args: parameters to replace to numbered (``{0}``, ``{1}``) or auto-numbered (``{}``) placeholders :param `Composable` kwargs: parameters to replace to named (``{name}``) placeholders :return: the union of the `!SQL` string with placeholders replaced :rtype: `Composed` The method is similar to the Python `str.format()` method: the string template supports auto-numbered (``{}``), numbered (``{0}``, ``{1}``...), and named placeholders (``{name}``), with positional arguments replacing the numbered placeholders and keywords replacing the named ones. However placeholder modifiers (``{0!r}``, ``{0:<10}``) are not supported. Only `!Composable` objects can be passed to the template. Example:: >>> print(sql.SQL("select * from {} where {} = %s") ... .format(sql.Identifier('people'), sql.Identifier('id')) ... .as_string(conn)) select * from "people" where "id" = %s >>> print(sql.SQL("select * from {tbl} where {pkey} = %s") ... .format(tbl=sql.Identifier('people'), pkey=sql.Identifier('id')) ... .as_string(conn)) select * from "people" where "id" = %s """ rv = [] autonum = 0 for pre, name, spec, conv in _formatter.parse(self._wrapped): if spec: raise ValueError("no format specification supported by SQL") if conv: raise ValueError("no format conversion supported by SQL") if pre: rv.append(SQL(pre)) if name is None: continue if name.isdigit(): if autonum: raise ValueError( "cannot switch from automatic field numbering to manual") rv.append(args[int(name)]) autonum = None elif not name: if autonum is None: raise ValueError( "cannot switch from manual field numbering to automatic") rv.append(args[autonum]) autonum += 1 else: rv.append(kwargs[name]) return Composed(rv) def join(self, seq): """ Join a sequence of `Composable`. :param seq: the elements to join. :type seq: iterable of `!Composable` Use the `!SQL` object's *string* to separate the elements in *seq*. Note that `Composed` objects are iterable too, so they can be used as argument for this method. Example:: >>> snip = sql.SQL(', ').join( ... sql.Identifier(n) for n in ['foo', 'bar', 'baz']) >>> print(snip.as_string(conn)) "foo", "bar", "baz" """ rv = [] it = iter(seq) try: rv.append(next(it)) except StopIteration: pass else: for i in it: rv.append(self) rv.append(i) return Composed(rv) class Identifier(Composable): """ A `Composable` representing an SQL identifier or a dot-separated sequence. Identifiers usually represent names of database objects, such as tables or fields. PostgreSQL identifiers follow `different rules`__ than SQL string literals for escaping (e.g. they use double quotes instead of single). .. __: https://www.postgresql.org/docs/current/static/sql-syntax-lexical.html# \ SQL-SYNTAX-IDENTIFIERS Example:: >>> t1 = sql.Identifier("foo") >>> t2 = sql.Identifier("ba'r") >>> t3 = sql.Identifier('ba"z') >>> print(sql.SQL(', ').join([t1, t2, t3]).as_string(conn)) "foo", "ba'r", "ba""z" Multiple strings can be passed to the object to represent a qualified name, i.e. a dot-separated sequence of identifiers. Example:: >>> query = sql.SQL("select {} from {}").format( ... sql.Identifier("table", "field"), ... sql.Identifier("schema", "table")) >>> print(query.as_string(conn)) select "table"."field" from "schema"."table" """ def __init__(self, *strings): if not strings: raise TypeError("Identifier cannot be empty") for s in strings: if not isinstance(s, str): raise TypeError("SQL identifier parts must be strings") super().__init__(strings) @property def strings(self): """A tuple with the strings wrapped by the `Identifier`.""" return self._wrapped @property def string(self): """The string wrapped by the `Identifier`. """ if len(self._wrapped) == 1: return self._wrapped[0] else: raise AttributeError( "the Identifier wraps more than one than one string") def __repr__(self): return f"{self.__class__.__name__}({', '.join(map(repr, self._wrapped))})" def as_string(self, context): return '.'.join(ext.quote_ident(s, context) for s in self._wrapped) class Literal(Composable): """ A `Composable` representing an SQL value to include in a query. Usually you will want to include placeholders in the query and pass values as `~cursor.execute()` arguments. If however you really really need to include a literal value in the query you can use this object. The string returned by `!as_string()` follows the normal :ref:`adaptation rules ` for Python objects. Example:: >>> s1 = sql.Literal("foo") >>> s2 = sql.Literal("ba'r") >>> s3 = sql.Literal(42) >>> print(sql.SQL(', ').join([s1, s2, s3]).as_string(conn)) 'foo', 'ba''r', 42 """ @property def wrapped(self): """The object wrapped by the `!Literal`.""" return self._wrapped def as_string(self, context): # is it a connection or cursor? if isinstance(context, ext.connection): conn = context elif isinstance(context, ext.cursor): conn = context.connection else: raise TypeError("context must be a connection or a cursor") a = ext.adapt(self._wrapped) if hasattr(a, 'prepare'): a.prepare(conn) rv = a.getquoted() if isinstance(rv, bytes): rv = rv.decode(ext.encodings[conn.encoding]) return rv class Placeholder(Composable): """A `Composable` representing a placeholder for query parameters. If the name is specified, generate a named placeholder (e.g. ``%(name)s``), otherwise generate a positional placeholder (e.g. ``%s``). The object is useful to generate SQL queries with a variable number of arguments. Examples:: >>> names = ['foo', 'bar', 'baz'] >>> q1 = sql.SQL("insert into table ({}) values ({})").format( ... sql.SQL(', ').join(map(sql.Identifier, names)), ... sql.SQL(', ').join(sql.Placeholder() * len(names))) >>> print(q1.as_string(conn)) insert into table ("foo", "bar", "baz") values (%s, %s, %s) >>> q2 = sql.SQL("insert into table ({}) values ({})").format( ... sql.SQL(', ').join(map(sql.Identifier, names)), ... sql.SQL(', ').join(map(sql.Placeholder, names))) >>> print(q2.as_string(conn)) insert into table ("foo", "bar", "baz") values (%(foo)s, %(bar)s, %(baz)s) """ def __init__(self, name=None): if isinstance(name, str): if ')' in name: raise ValueError(f"invalid name: {name!r}") elif name is not None: raise TypeError(f"expected string or None as name, got {name!r}") super().__init__(name) @property def name(self): """The name of the `!Placeholder`.""" return self._wrapped def __repr__(self): if self._wrapped is None: return f"{self.__class__.__name__}()" else: return f"{self.__class__.__name__}({self._wrapped!r})" def as_string(self, context): if self._wrapped is not None: return f"%({self._wrapped})s" else: return "%s" # Literals NULL = SQL("NULL") DEFAULT = SQL("DEFAULT")