diff options
Diffstat (limited to 'docs/advanced')
-rw-r--r-- | docs/advanced/adapt.rst | 269 | ||||
-rw-r--r-- | docs/advanced/async.rst | 360 | ||||
-rw-r--r-- | docs/advanced/cursors.rst | 192 | ||||
-rw-r--r-- | docs/advanced/index.rst | 21 | ||||
-rw-r--r-- | docs/advanced/pipeline.rst | 324 | ||||
-rw-r--r-- | docs/advanced/pool.rst | 332 | ||||
-rw-r--r-- | docs/advanced/prepare.rst | 57 | ||||
-rw-r--r-- | docs/advanced/rows.rst | 116 | ||||
-rw-r--r-- | docs/advanced/typing.rst | 180 |
9 files changed, 1851 insertions, 0 deletions
diff --git a/docs/advanced/adapt.rst b/docs/advanced/adapt.rst new file mode 100644 index 0000000..4323b07 --- /dev/null +++ b/docs/advanced/adapt.rst @@ -0,0 +1,269 @@ +.. currentmodule:: psycopg.adapt + +.. _adaptation: + +Data adaptation configuration +============================= + +The adaptation system is at the core of Psycopg and allows to customise the +way Python objects are converted to PostgreSQL when a query is performed and +how PostgreSQL values are converted to Python objects when query results are +returned. + +.. note:: + For a high-level view of the conversion of types between Python and + PostgreSQL please look at :ref:`query-parameters`. Using the objects + described in this page is useful if you intend to *customise* the + adaptation rules. + +- Adaptation configuration is performed by changing the + `~psycopg.abc.AdaptContext.adapters` object of objects implementing the + `~psycopg.abc.AdaptContext` protocol, for instance `~psycopg.Connection` + or `~psycopg.Cursor`. + +- Every context object derived from another context inherits its adapters + mapping: cursors created from a connection inherit the connection's + configuration. + + By default, connections obtain an adapters map from the global map + exposed as `psycopg.adapters`: changing the content of this object will + affect every connection created afterwards. You may specify a different + template adapters map using the `!context` parameter on + `~psycopg.Connection.connect()`. + + .. image:: ../pictures/adapt.svg + :align: center + +- The `!adapters` attributes are `AdaptersMap` instances, and contain the + mapping from Python types and `~psycopg.abc.Dumper` classes, and from + PostgreSQL OIDs to `~psycopg.abc.Loader` classes. Changing this mapping + (e.g. writing and registering your own adapters, or using a different + configuration of builtin adapters) affects how types are converted between + Python and PostgreSQL. + + - Dumpers (objects implementing the `~psycopg.abc.Dumper` protocol) are + the objects used to perform the conversion from a Python object to a bytes + sequence in a format understood by PostgreSQL. The string returned + *shouldn't be quoted*: the value will be passed to the database using + functions such as :pq:`PQexecParams()` so quoting and quotes escaping is + not necessary. The dumper usually also suggests to the server what type to + use, via its `~psycopg.abc.Dumper.oid` attribute. + + - Loaders (objects implementing the `~psycopg.abc.Loader` protocol) are + the objects used to perform the opposite operation: reading a bytes + sequence from PostgreSQL and creating a Python object out of it. + + - Dumpers and loaders are instantiated on demand by a `~Transformer` object + when a query is executed. + +.. note:: + Changing adapters in a context only affects that context and its children + objects created *afterwards*; the objects already created are not + affected. For instance, changing the global context will only change newly + created connections, not the ones already existing. + + +.. _adapt-example-xml: + +Writing a custom adapter: XML +----------------------------- + +Psycopg doesn't provide adapters for the XML data type, because there are just +too many ways of handling XML in Python. Creating a loader to parse the +`PostgreSQL xml type`__ to `~xml.etree.ElementTree` is very simple, using the +`psycopg.adapt.Loader` base class and implementing the +`~psycopg.abc.Loader.load()` method: + +.. __: https://www.postgresql.org/docs/current/datatype-xml.html + +.. code:: python + + >>> import xml.etree.ElementTree as ET + >>> from psycopg.adapt import Loader + + >>> # Create a class implementing the `load()` method. + >>> class XmlLoader(Loader): + ... def load(self, data): + ... return ET.fromstring(data) + + >>> # Register the loader on the adapters of a context. + >>> conn.adapters.register_loader("xml", XmlLoader) + + >>> # Now just query the database returning XML data. + >>> cur = conn.execute( + ... """select XMLPARSE (DOCUMENT '<?xml version="1.0"?> + ... <book><title>Manual</title><chapter>...</chapter></book>') + ... """) + + >>> elem = cur.fetchone()[0] + >>> elem + <Element 'book' at 0x7ffb55142ef0> + +The opposite operation, converting Python objects to PostgreSQL, is performed +by dumpers. The `psycopg.adapt.Dumper` base class makes it easy to implement one: +you only need to implement the `~psycopg.abc.Dumper.dump()` method:: + + >>> from psycopg.adapt import Dumper + + >>> class XmlDumper(Dumper): + ... # Setting an OID is not necessary but can be helpful + ... oid = psycopg.adapters.types["xml"].oid + ... + ... def dump(self, elem): + ... return ET.tostring(elem) + + >>> # Register the dumper on the adapters of a context + >>> conn.adapters.register_dumper(ET.Element, XmlDumper) + + >>> # Now, in that context, it is possible to use ET.Element objects as parameters + >>> conn.execute("SELECT xpath('//title/text()', %s)", [elem]).fetchone()[0] + ['Manual'] + +Note that it is possible to use a `~psycopg.types.TypesRegistry`, exposed by +any `~psycopg.abc.AdaptContext`, to obtain information on builtin types, or +extension types if they have been registered on that context using the +`~psycopg.types.TypeInfo`\.\ `~psycopg.types.TypeInfo.register()` method. + + +.. _adapt-example-float: + +Example: PostgreSQL numeric to Python float +------------------------------------------- + +Normally PostgreSQL :sql:`numeric` values are converted to Python +`~decimal.Decimal` instances, because both the types allow fixed-precision +arithmetic and are not subject to rounding. + +Sometimes, however, you may want to perform floating-point math on +:sql:`numeric` values, and `!Decimal` may get in the way (maybe because it is +slower, or maybe because mixing `!float` and `!Decimal` values causes Python +errors). + +If you are fine with the potential loss of precision and you simply want to +receive :sql:`numeric` values as Python `!float`, you can register on +:sql:`numeric` the same `Loader` class used to load +:sql:`float4`\/:sql:`float8` values. Because the PostgreSQL textual +representation of both floats and decimal is the same, the two loaders are +compatible. + +.. code:: python + + conn = psycopg.connect() + + conn.execute("SELECT 123.45").fetchone()[0] + # Decimal('123.45') + + conn.adapters.register_loader("numeric", psycopg.types.numeric.FloatLoader) + + conn.execute("SELECT 123.45").fetchone()[0] + # 123.45 + +In this example the customised adaptation takes effect only on the connection +`!conn` and on any cursor created from it, not on other connections. + + +.. _adapt-example-inf-date: + +Example: handling infinity date +------------------------------- + +Suppose you want to work with the "infinity" date which is available in +PostgreSQL but not handled by Python: + +.. code:: python + + >>> conn.execute("SELECT 'infinity'::date").fetchone() + Traceback (most recent call last): + ... + DataError: date too large (after year 10K): 'infinity' + +One possibility would be to store Python's `datetime.date.max` as PostgreSQL +infinity. For this, let's create a subclass for the dumper and the loader and +register them in the working scope (globally or just on a connection or +cursor): + +.. code:: python + + from datetime import date + + # Subclass existing adapters so that the base case is handled normally. + from psycopg.types.datetime import DateLoader, DateDumper + + class InfDateDumper(DateDumper): + def dump(self, obj): + if obj == date.max: + return b"infinity" + elif obj == date.min: + return b"-infinity" + else: + return super().dump(obj) + + class InfDateLoader(DateLoader): + def load(self, data): + if data == b"infinity": + return date.max + elif data == b"-infinity": + return date.min + else: + return super().load(data) + + # The new classes can be registered globally, on a connection, on a cursor + cur.adapters.register_dumper(date, InfDateDumper) + cur.adapters.register_loader("date", InfDateLoader) + + cur.execute("SELECT %s::text, %s::text", [date(2020, 12, 31), date.max]).fetchone() + # ('2020-12-31', 'infinity') + cur.execute("SELECT '2020-12-31'::date, 'infinity'::date").fetchone() + # (datetime.date(2020, 12, 31), datetime.date(9999, 12, 31)) + + +Dumpers and loaders life cycle +------------------------------ + +Registering dumpers and loaders will instruct Psycopg to use them +in the queries to follow, in the context where they have been registered. + +When a query is performed on a `~psycopg.Cursor`, a +`~psycopg.adapt.Transformer` object is created as a local context to manage +adaptation during the query, instantiating the required dumpers and loaders +and dispatching the values to perform the wanted conversions from Python to +Postgres and back. + +- The `!Transformer` copies the adapters configuration from the `!Cursor`, + thus inheriting all the changes made to the global `psycopg.adapters` + configuration, the current `!Connection`, the `!Cursor`. + +- For every Python type passed as query argument, the `!Transformer` will + instantiate a `!Dumper`. Usually all the objects of the same type will be + converted by the same dumper instance. + + - According to the placeholder used (``%s``, ``%b``, ``%t``), Psycopg may + pick a binary or a text dumper. When using the ``%s`` "`~PyFormat.AUTO`" + format, if the same type has both a text and a binary dumper registered, + the last one registered by `~AdaptersMap.register_dumper()` will be used. + + - Sometimes, just looking at the Python type is not enough to decide the + best PostgreSQL type to use (for instance the PostgreSQL type of a Python + list depends on the objects it contains, whether to use an :sql:`integer` + or :sql:`bigint` depends on the number size...) In these cases the + mechanism provided by `~psycopg.abc.Dumper.get_key()` and + `~psycopg.abc.Dumper.upgrade()` is used to create more specific dumpers. + +- The query is executed. Upon successful request, the result is received as a + `~psycopg.pq.PGresult`. + +- For every OID returned by the query, the `!Transformer` will instantiate a + `!Loader`. All the values with the same OID will be converted by the same + loader instance. + +- Recursive types (e.g. Python lists, PostgreSQL arrays and composite types) + will use the same adaptation rules. + +As a consequence it is possible to perform certain choices only once per query +(e.g. looking up the connection encoding) and then call a fast-path operation +for each value to convert. + +Querying will fail if a Python object for which there isn't a `!Dumper` +registered (for the right `~psycopg.pq.Format`) is used as query parameter. +If the query returns a data type whose OID doesn't have a `!Loader`, the +value will be returned as a string (or bytes string for binary types). diff --git a/docs/advanced/async.rst b/docs/advanced/async.rst new file mode 100644 index 0000000..3620ab6 --- /dev/null +++ b/docs/advanced/async.rst @@ -0,0 +1,360 @@ +.. currentmodule:: psycopg + +.. index:: asyncio + +.. _async: + +Asynchronous operations +======================= + +Psycopg `~Connection` and `~Cursor` have counterparts `~AsyncConnection` and +`~AsyncCursor` supporting an `asyncio` interface. + +The design of the asynchronous objects is pretty much the same of the sync +ones: in order to use them you will only have to scatter the `!await` keyword +here and there. + +.. code:: python + + async with await psycopg.AsyncConnection.connect( + "dbname=test user=postgres") as aconn: + async with aconn.cursor() as acur: + await acur.execute( + "INSERT INTO test (num, data) VALUES (%s, %s)", + (100, "abc'def")) + await acur.execute("SELECT * FROM test") + await acur.fetchone() + # will return (1, 100, "abc'def") + async for record in acur: + print(record) + +.. versionchanged:: 3.1 + + `AsyncConnection.connect()` performs DNS name resolution in a non-blocking + way. + + .. warning:: + + Before version 3.1, `AsyncConnection.connect()` may still block on DNS + name resolution. To avoid that you should `set the hostaddr connection + parameter`__, or use the `~psycopg._dns.resolve_hostaddr_async()` to + do it automatically. + + .. __: https://www.postgresql.org/docs/current/libpq-connect.html + #LIBPQ-PARAMKEYWORDS + +.. warning:: + + On Windows, Psycopg is not compatible with the default + `~asyncio.ProactorEventLoop`. Please use a different loop, for instance + the `~asyncio.SelectorEventLoop`. + + For instance, you can use, early in your program: + + .. parsed-literal:: + + `asyncio.set_event_loop_policy`\ ( + `asyncio.WindowsSelectorEventLoopPolicy`\ () + ) + + + +.. index:: with + +.. _async-with: + +`!with` async connections +------------------------- + +As seen in :ref:`the basic usage <usage>`, connections and cursors can act as +context managers, so you can run: + +.. code:: python + + with psycopg.connect("dbname=test user=postgres") as conn: + with conn.cursor() as cur: + cur.execute(...) + # the cursor is closed upon leaving the context + # the transaction is committed, the connection closed + +For asynchronous connections it's *almost* what you'd expect, but +not quite. Please note that `~Connection.connect()` and `~Connection.cursor()` +*don't return a context*: they are both factory methods which return *an +object which can be used as a context*. That's because there are several use +cases where it's useful to handle the objects manually and only `!close()` them +when required. + +As a consequence you cannot use `!async with connect()`: you have to do it in +two steps instead, as in + +.. code:: python + + aconn = await psycopg.AsyncConnection.connect() + async with aconn: + async with aconn.cursor() as cur: + await cur.execute(...) + +which can be condensed into `!async with await`: + +.. code:: python + + async with await psycopg.AsyncConnection.connect() as aconn: + async with aconn.cursor() as cur: + await cur.execute(...) + +...but no less than that: you still need to do the double async thing. + +Note that the `AsyncConnection.cursor()` function is not an `!async` function +(it never performs I/O), so you don't need an `!await` on it; as a consequence +you can use the normal `async with` context manager. + + +.. index:: Ctrl-C + +.. _async-ctrl-c: + +Interrupting async operations using Ctrl-C +------------------------------------------ + +If a long running operation is interrupted by a Ctrl-C on a normal connection +running in the main thread, the operation will be cancelled and the connection +will be put in error state, from which can be recovered with a normal +`~Connection.rollback()`. + +If the query is running in an async connection, a Ctrl-C will be likely +intercepted by the async loop and interrupt the whole program. In order to +emulate what normally happens with blocking connections, you can use +`asyncio's add_signal_handler()`__, to call `Connection.cancel()`: + +.. code:: python + + import asyncio + import signal + + async with await psycopg.AsyncConnection.connect() as conn: + loop.add_signal_handler(signal.SIGINT, conn.cancel) + ... + + +.. __: https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.add_signal_handler + + +.. index:: + pair: Asynchronous; Notifications + pair: LISTEN; SQL command + pair: NOTIFY; SQL command + +.. _async-messages: + +Server messages +--------------- + +PostgreSQL can send, together with the query results, `informative messages`__ +about the operation just performed, such as warnings or debug information. +Notices may be raised even if the operations are successful and don't indicate +an error. You are probably familiar with some of them, because they are +reported by :program:`psql`:: + + $ psql + =# ROLLBACK; + WARNING: there is no transaction in progress + ROLLBACK + +.. __: https://www.postgresql.org/docs/current/runtime-config-logging.html + #RUNTIME-CONFIG-SEVERITY-LEVELS + +Messages can be also sent by the `PL/pgSQL 'RAISE' statement`__ (at a level +lower than EXCEPTION, otherwise the appropriate `DatabaseError` will be +raised). The level of the messages received can be controlled using the +client_min_messages__ setting. + +.. __: https://www.postgresql.org/docs/current/plpgsql-errors-and-messages.html +.. __: https://www.postgresql.org/docs/current/runtime-config-client.html + #GUC-CLIENT-MIN-MESSAGES + + +By default, the messages received are ignored. If you want to process them on +the client you can use the `Connection.add_notice_handler()` function to +register a function that will be invoked whenever a message is received. The +message is passed to the callback as a `~errors.Diagnostic` instance, +containing all the information passed by the server, such as the message text +and the severity. The object is the same found on the `~psycopg.Error.diag` +attribute of the errors raised by the server: + +.. code:: python + + >>> import psycopg + + >>> def log_notice(diag): + ... print(f"The server says: {diag.severity} - {diag.message_primary}") + + >>> conn = psycopg.connect(autocommit=True) + >>> conn.add_notice_handler(log_notice) + + >>> cur = conn.execute("ROLLBACK") + The server says: WARNING - there is no transaction in progress + >>> print(cur.statusmessage) + ROLLBACK + +.. warning:: + + The `!Diagnostic` object received by the callback should not be used after + the callback function terminates, because its data is deallocated after + the callbacks have been processed. If you need to use the information + later please extract the attributes requested and forward them instead of + forwarding the whole `!Diagnostic` object. + + +.. index:: + pair: Asynchronous; Notifications + pair: LISTEN; SQL command + pair: NOTIFY; SQL command + +.. _async-notify: + +Asynchronous notifications +-------------------------- + +Psycopg allows asynchronous interaction with other database sessions using the +facilities offered by PostgreSQL commands |LISTEN|_ and |NOTIFY|_. Please +refer to the PostgreSQL documentation for examples about how to use this form +of communication. + +.. |LISTEN| replace:: :sql:`LISTEN` +.. _LISTEN: https://www.postgresql.org/docs/current/sql-listen.html +.. |NOTIFY| replace:: :sql:`NOTIFY` +.. _NOTIFY: https://www.postgresql.org/docs/current/sql-notify.html + +Because of the way sessions interact with notifications (see |NOTIFY|_ +documentation), you should keep the connection in `~Connection.autocommit` +mode if you wish to receive or send notifications in a timely manner. + +Notifications are received as instances of `Notify`. If you are reserving a +connection only to receive notifications, the simplest way is to consume the +`Connection.notifies` generator. The generator can be stopped using +`!close()`. + +.. note:: + + You don't need an `AsyncConnection` to handle notifications: a normal + blocking `Connection` is perfectly valid. + +The following example will print notifications and stop when one containing +the ``"stop"`` message is received. + +.. code:: python + + import psycopg + conn = psycopg.connect("", autocommit=True) + conn.execute("LISTEN mychan") + gen = conn.notifies() + for notify in gen: + print(notify) + if notify.payload == "stop": + gen.close() + print("there, I stopped") + +If you run some :sql:`NOTIFY` in a :program:`psql` session: + +.. code:: psql + + =# NOTIFY mychan, 'hello'; + NOTIFY + =# NOTIFY mychan, 'hey'; + NOTIFY + =# NOTIFY mychan, 'stop'; + NOTIFY + +You may get output from the Python process such as:: + + Notify(channel='mychan', payload='hello', pid=961823) + Notify(channel='mychan', payload='hey', pid=961823) + Notify(channel='mychan', payload='stop', pid=961823) + there, I stopped + +Alternatively, you can use `~Connection.add_notify_handler()` to register a +callback function, which will be invoked whenever a notification is received, +during the normal query processing; you will be then able to use the +connection normally. Please note that in this case notifications will not be +received immediately, but only during a connection operation, such as a query. + +.. code:: python + + conn.add_notify_handler(lambda n: print(f"got this: {n}")) + + # meanwhile in psql... + # =# NOTIFY mychan, 'hey'; + # NOTIFY + + print(conn.execute("SELECT 1").fetchone()) + # got this: Notify(channel='mychan', payload='hey', pid=961823) + # (1,) + + +.. index:: disconnections + +.. _disconnections: + +Detecting disconnections +------------------------ + +Sometimes it is useful to detect immediately when the connection with the +database is lost. One brutal way to do so is to poll a connection in a loop +running an endless stream of :sql:`SELECT 1`... *Don't* do so: polling is *so* +out of fashion. Besides, it is inefficient (unless what you really want is a +client-server generator of ones), it generates useless traffic and will only +detect a disconnection with an average delay of half the polling time. + +A more efficient and timely way to detect a server disconnection is to create +an additional connection and wait for a notification from the OS that this +connection has something to say: only then you can run some checks. You +can dedicate a thread (or an asyncio task) to wait on this connection: such +thread will perform no activity until awaken by the OS. + +In a normal (non asyncio) program you can use the `selectors` module. Because +the `!Connection` implements a `~Connection.fileno()` method you can just +register it as a file-like object. You can run such code in a dedicated thread +(and using a dedicated connection) if the rest of the program happens to have +something else to do too. + +.. code:: python + + import selectors + + sel = selectors.DefaultSelector() + sel.register(conn, selectors.EVENT_READ) + while True: + if not sel.select(timeout=60.0): + continue # No FD activity detected in one minute + + # Activity detected. Is the connection still ok? + try: + conn.execute("SELECT 1") + except psycopg.OperationalError: + # You were disconnected: do something useful such as panicking + logger.error("we lost our database!") + sys.exit(1) + +In an `asyncio` program you can dedicate a `~asyncio.Task` instead and do +something similar using `~asyncio.loop.add_reader`: + +.. code:: python + + import asyncio + + ev = asyncio.Event() + loop = asyncio.get_event_loop() + loop.add_reader(conn.fileno(), ev.set) + + while True: + try: + await asyncio.wait_for(ev.wait(), 60.0) + except asyncio.TimeoutError: + continue # No FD activity detected in one minute + + # Activity detected. Is the connection still ok? + try: + await conn.execute("SELECT 1") + except psycopg.OperationalError: + # Guess what happened + ... diff --git a/docs/advanced/cursors.rst b/docs/advanced/cursors.rst new file mode 100644 index 0000000..954d665 --- /dev/null +++ b/docs/advanced/cursors.rst @@ -0,0 +1,192 @@ +.. currentmodule:: psycopg + +.. index:: + single: Cursor + +.. _cursor-types: + +Cursor types +============ + +Psycopg can manage kinds of "cursors" which differ in where the state of a +query being processed is stored: :ref:`client-side-cursors` and +:ref:`server-side-cursors`. + +.. index:: + double: Cursor; Client-side + +.. _client-side-cursors: + +Client-side cursors +------------------- + +Client-side cursors are what Psycopg uses in its normal querying process. +They are implemented by the `Cursor` and `AsyncCursor` classes. In such +querying pattern, after a cursor sends a query to the server (usually calling +`~Cursor.execute()`), the server replies transferring to the client the whole +set of results requested, which is stored in the state of the same cursor and +from where it can be read from Python code (using methods such as +`~Cursor.fetchone()` and siblings). + +This querying process is very scalable because, after a query result has been +transmitted to the client, the server doesn't keep any state. Because the +results are already in the client memory, iterating its rows is very quick. + +The downside of this querying method is that the entire result has to be +transmitted completely to the client (with a time proportional to its size) +and the client needs enough memory to hold it, so it is only suitable for +reasonably small result sets. + + +.. index:: + double: Cursor; Client-binding + +.. _client-side-binding-cursors: + +Client-side-binding cursors +--------------------------- + +.. versionadded:: 3.1 + +The previously described :ref:`client-side cursors <client-side-cursors>` send +the query and the parameters separately to the server. This is the most +efficient way to process parametrised queries and allows to build several +features and optimizations. However, not all types of queries can be bound +server-side; in particular no Data Definition Language query can. See +:ref:`server-side-binding` for the description of these problems. + +The `ClientCursor` (and its `AsyncClientCursor` async counterpart) merge the +query on the client and send the query and the parameters merged together to +the server. This allows to parametrize any type of PostgreSQL statement, not +only queries (:sql:`SELECT`) and Data Manipulation statements (:sql:`INSERT`, +:sql:`UPDATE`, :sql:`DELETE`). + +Using `!ClientCursor`, Psycopg 3 behaviour will be more similar to `psycopg2` +(which only implements client-side binding) and could be useful to port +Psycopg 2 programs more easily to Psycopg 3. The objects in the `sql` module +allow for greater flexibility (for instance to parametrize a table name too, +not only values); however, for simple cases, a `!ClientCursor` could be the +right object. + +In order to obtain `!ClientCursor` from a connection, you can set its +`~Connection.cursor_factory` (at init time or changing its attribute +afterwards): + +.. code:: python + + from psycopg import connect, ClientCursor + + conn = psycopg.connect(DSN, cursor_factory=ClientCursor) + cur = conn.cursor() + # <psycopg.ClientCursor [no result] [IDLE] (database=piro) at 0x7fd977ae2880> + +If you need to create a one-off client-side-binding cursor out of a normal +connection, you can just use the `~ClientCursor` class passing the connection +as argument. + +.. code:: python + + conn = psycopg.connect(DSN) + cur = psycopg.ClientCursor(conn) + +.. warning:: + + Client-side cursors don't support :ref:`binary parameters and return + values <binary-data>` and don't support :ref:`prepared statements + <prepared-statements>`. + +.. tip:: + + The best use for client-side binding cursors is probably to port large + Psycopg 2 code to Psycopg 3, especially for programs making wide use of + Data Definition Language statements. + + The `psycopg.sql` module allows for more generic client-side query + composition, to mix client- and server-side parameters binding, and allows + to parametrize tables and fields names too, or entirely generic SQL + snippets. + +.. index:: + double: Cursor; Server-side + single: Portal + double: Cursor; Named + +.. _server-side-cursors: + +Server-side cursors +------------------- + +PostgreSQL has its own concept of *cursor* too (sometimes also called +*portal*). When a database cursor is created, the query is not necessarily +completely processed: the server might be able to produce results only as they +are needed. Only the results requested are transmitted to the client: if the +query result is very large but the client only needs the first few records it +is possible to transmit only them. + +The downside is that the server needs to keep track of the partially +processed results, so it uses more memory and resources on the server. + +Psycopg allows the use of server-side cursors using the classes `ServerCursor` +and `AsyncServerCursor`. They are usually created by passing the `!name` +parameter to the `~Connection.cursor()` method (reason for which, in +`!psycopg2`, they are usually called *named cursors*). The use of these classes +is similar to their client-side counterparts: their interface is the same, but +behind the scene they send commands to control the state of the cursor on the +server (for instance when fetching new records or when moving using +`~Cursor.scroll()`). + +Using a server-side cursor it is possible to process datasets larger than what +would fit in the client's memory. However for small queries they are less +efficient because it takes more commands to receive their result, so you +should use them only if you need to process huge results or if only a partial +result is needed. + +.. seealso:: + + Server-side cursors are created and managed by `ServerCursor` using SQL + commands such as DECLARE_, FETCH_, MOVE_. The PostgreSQL documentation + gives a good idea of what is possible to do with them. + + .. _DECLARE: https://www.postgresql.org/docs/current/sql-declare.html + .. _FETCH: https://www.postgresql.org/docs/current/sql-fetch.html + .. _MOVE: https://www.postgresql.org/docs/current/sql-move.html + + +.. _cursor-steal: + +"Stealing" an existing cursor +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +A Psycopg `ServerCursor` can be also used to consume a cursor which was +created in other ways than the :sql:`DECLARE` that `ServerCursor.execute()` +runs behind the scene. + +For instance if you have a `PL/pgSQL function returning a cursor`__: + +.. __: https://www.postgresql.org/docs/current/plpgsql-cursors.html + +.. code:: postgres + + CREATE FUNCTION reffunc(refcursor) RETURNS refcursor AS $$ + BEGIN + OPEN $1 FOR SELECT col FROM test; + RETURN $1; + END; + $$ LANGUAGE plpgsql; + +you can run a one-off command in the same connection to call it (e.g. using +`Connection.execute()`) in order to create the cursor on the server: + +.. code:: python + + conn.execute("SELECT reffunc('curname')") + +after which you can create a server-side cursor declared by the same name, and +directly call the fetch methods, skipping the `~ServerCursor.execute()` call: + +.. code:: python + + cur = conn.cursor('curname') + # no cur.execute() + for record in cur: # or cur.fetchone(), cur.fetchmany()... + # do something with record diff --git a/docs/advanced/index.rst b/docs/advanced/index.rst new file mode 100644 index 0000000..6920bd7 --- /dev/null +++ b/docs/advanced/index.rst @@ -0,0 +1,21 @@ +.. _advanced: + +More advanced topics +==================== + +Once you have familiarised yourself with the :ref:`Psycopg basic operations +<basic>`, you can take a look at the chapter of this section for more advanced +usages. + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + async + typing + rows + pool + cursors + adapt + prepare + pipeline diff --git a/docs/advanced/pipeline.rst b/docs/advanced/pipeline.rst new file mode 100644 index 0000000..980fea7 --- /dev/null +++ b/docs/advanced/pipeline.rst @@ -0,0 +1,324 @@ +.. currentmodule:: psycopg + +.. _pipeline-mode: + +Pipeline mode support +===================== + +.. versionadded:: 3.1 + +The *pipeline mode* allows PostgreSQL client applications to send a query +without having to read the result of the previously sent query. Taking +advantage of the pipeline mode, a client will wait less for the server, since +multiple queries/results can be sent/received in a single network roundtrip. +Pipeline mode can provide a significant performance boost to the application. + +Pipeline mode is most useful when the server is distant, i.e., network latency +(“ping time”) is high, and also when many small operations are being performed +in rapid succession. There is usually less benefit in using pipelined commands +when each query takes many multiples of the client/server round-trip time to +execute. A 100-statement operation run on a server 300 ms round-trip-time away +would take 30 seconds in network latency alone without pipelining; with +pipelining it may spend as little as 0.3 s waiting for results from the +server. + +The server executes statements, and returns results, in the order the client +sends them. The server will begin executing the commands in the pipeline +immediately, not waiting for the end of the pipeline. Note that results are +buffered on the server side; the server flushes that buffer when a +:ref:`synchronization point <pipeline-sync>` is established. + +.. seealso:: + + The PostgreSQL documentation about: + + - `pipeline mode`__ + - `extended query message flow`__ + + contains many details around when it is most useful to use the pipeline + mode and about errors management and interaction with transactions. + + .. __: https://www.postgresql.org/docs/current/libpq-pipeline-mode.html + .. __: https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY + + +Client-server messages flow +--------------------------- + +In order to understand better how the pipeline mode works, we should take a +closer look at the `PostgreSQL client-server message flow`__. + +During normal querying, each statement is transmitted by the client to the +server as a stream of request messages, terminating with a **Sync** message to +tell it that it should process the messages sent so far. The server will +execute the statement and describe the results back as a stream of messages, +terminating with a **ReadyForQuery**, telling the client that it may now send a +new query. + +For example, the statement (returning no result): + +.. code:: python + + conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["hello"]) + +results in the following two groups of messages: + +.. table:: + :align: left + + +---------------+-----------------------------------------------------------+ + | Direction | Message | + +===============+===========================================================+ + | Python | - Parse ``INSERT INTO ... (VALUE $1)`` (skipped if | + | | :ref:`the statement is prepared <prepared-statements>`) | + | |>| | - Bind ``'hello'`` | + | | - Describe | + | PostgreSQL | - Execute | + | | - Sync | + +---------------+-----------------------------------------------------------+ + | PostgreSQL | - ParseComplete | + | | - BindComplete | + | |<| | - NoData | + | | - CommandComplete ``INSERT 0 1`` | + | Python | - ReadyForQuery | + +---------------+-----------------------------------------------------------+ + +and the query: + +.. code:: python + + conn.execute("SELECT data FROM mytable WHERE id = %s", [1]) + +results in the two groups of messages: + +.. table:: + :align: left + + +---------------+-----------------------------------------------------------+ + | Direction | Message | + +===============+===========================================================+ + | Python | - Parse ``SELECT data FROM mytable WHERE id = $1`` | + | | - Bind ``1`` | + | |>| | - Describe | + | | - Execute | + | PostgreSQL | - Sync | + +---------------+-----------------------------------------------------------+ + | PostgreSQL | - ParseComplete | + | | - BindComplete | + | |<| | - RowDescription ``data`` | + | | - DataRow ``hello`` | + | Python | - CommandComplete ``SELECT 1`` | + | | - ReadyForQuery | + +---------------+-----------------------------------------------------------+ + +The two statements, sent consecutively, pay the communication overhead four +times, once per leg. + +The pipeline mode allows the client to combine several operations in longer +streams of messages to the server, then to receive more than one response in a +single batch. If we execute the two operations above in a pipeline: + +.. code:: python + + with conn.pipeline(): + conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["hello"]) + conn.execute("SELECT data FROM mytable WHERE id = %s", [1]) + +they will result in a single roundtrip between the client and the server: + +.. table:: + :align: left + + +---------------+-----------------------------------------------------------+ + | Direction | Message | + +===============+===========================================================+ + | Python | - Parse ``INSERT INTO ... (VALUE $1)`` | + | | - Bind ``'hello'`` | + | |>| | - Describe | + | | - Execute | + | PostgreSQL | - Parse ``SELECT data FROM mytable WHERE id = $1`` | + | | - Bind ``1`` | + | | - Describe | + | | - Execute | + | | - Sync (sent only once) | + +---------------+-----------------------------------------------------------+ + | PostgreSQL | - ParseComplete | + | | - BindComplete | + | |<| | - NoData | + | | - CommandComplete ``INSERT 0 1`` | + | Python | - ParseComplete | + | | - BindComplete | + | | - RowDescription ``data`` | + | | - DataRow ``hello`` | + | | - CommandComplete ``SELECT 1`` | + | | - ReadyForQuery (sent only once) | + +---------------+-----------------------------------------------------------+ + +.. |<| unicode:: U+25C0 +.. |>| unicode:: U+25B6 + +.. __: https://www.postgresql.org/docs/current/protocol-flow.html + + +.. _pipeline-usage: + +Pipeline mode usage +------------------- + +Psycopg supports the pipeline mode via the `Connection.pipeline()` method. The +method is a context manager: entering the ``with`` block yields a `Pipeline` +object. At the end of block, the connection resumes the normal operation mode. + +Within the pipeline block, you can use normally one or more cursors to execute +several operations, using `Connection.execute()`, `Cursor.execute()` and +`~Cursor.executemany()`. + +.. code:: python + + >>> with conn.pipeline(): + ... conn.execute("INSERT INTO mytable VALUES (%s)", ["hello"]) + ... with conn.cursor() as cur: + ... cur.execute("INSERT INTO othertable VALUES (%s)", ["world"]) + ... cur.executemany( + ... "INSERT INTO elsewhere VALUES (%s)", + ... [("one",), ("two",), ("four",)]) + +Unlike in normal mode, Psycopg will not wait for the server to receive the +result of each query; the client will receive results in batches when the +server flushes it output buffer. + +When a flush (or a sync) is performed, all pending results are sent back to +the cursors which executed them. If a cursor had run more than one query, it +will receive more than one result; results after the first will be available, +in their execution order, using `~Cursor.nextset()`: + +.. code:: python + + >>> with conn.pipeline(): + ... with conn.cursor() as cur: + ... cur.execute("INSERT INTO mytable (data) VALUES (%s) RETURNING *", ["hello"]) + ... cur.execute("INSERT INTO mytable (data) VALUES (%s) RETURNING *", ["world"]) + ... while True: + ... print(cur.fetchall()) + ... if not cur.nextset(): + ... break + + [(1, 'hello')] + [(2, 'world')] + +If any statement encounters an error, the server aborts the current +transaction and will not execute any subsequent command in the queue until the +next :ref:`synchronization point <pipeline-sync>`; a `~errors.PipelineAborted` +exception is raised for each such command. Query processing resumes after the +synchronization point. + +.. warning:: + + Certain features are not available in pipeline mode, including: + + - COPY is not supported in pipeline mode by PostgreSQL. + - `Cursor.stream()` doesn't make sense in pipeline mode (its job is the + opposite of batching!) + - `ServerCursor` are currently not implemented in pipeline mode. + +.. note:: + + Starting from Psycopg 3.1, `~Cursor.executemany()` makes use internally of + the pipeline mode; as a consequence there is no need to handle a pipeline + block just to call `!executemany()` once. + + +.. _pipeline-sync: + +Synchronization points +---------------------- + +Flushing query results to the client can happen either when a synchronization +point is established by Psycopg: + +- using the `Pipeline.sync()` method; +- on `Connection.commit()` or `~Connection.rollback()`; +- at the end of a `!Pipeline` block; +- possibly when opening a nested `!Pipeline` block; +- using a fetch method such as `Cursor.fetchone()` (which only flushes the + query but doesn't issue a Sync and doesn't reset a pipeline state error). + +The server might perform a flush on its own initiative, for instance when the +output buffer is full. + +Note that, even in :ref:`autocommit <autocommit>`, the server wraps the +statements sent in pipeline mode in an implicit transaction, which will be +only committed when the Sync is received. As such, a failure in a group of +statements will probably invalidate the effect of statements executed after +the previous Sync, and will propagate to the following Sync. + +For example, in the following block: + +.. code:: python + + >>> with psycopg.connect(autocommit=True) as conn: + ... with conn.pipeline() as p, conn.cursor() as cur: + ... try: + ... cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["one"]) + ... cur.execute("INSERT INTO no_such_table (data) VALUES (%s)", ["two"]) + ... conn.execute("INSERT INTO mytable (data) VALUES (%s)", ["three"]) + ... p.sync() + ... except psycopg.errors.UndefinedTable: + ... pass + ... cur.execute("INSERT INTO mytable (data) VALUES (%s)", ["four"]) + +there will be an error in the block, ``relation "no_such_table" does not +exist`` caused by the insert ``two``, but probably raised by the `!sync()` +call. At at the end of the block, the table will contain: + +.. code:: text + + =# SELECT * FROM mytable; + +----+------+ + | id | data | + +----+------+ + | 2 | four | + +----+------+ + (1 row) + +because: + +- the value 1 of the sequence is consumed by the statement ``one``, but + the record discarded because of the error in the same implicit transaction; +- the statement ``three`` is not executed because the pipeline is aborted (so + it doesn't consume a sequence item); +- the statement ``four`` is executed with + success after the Sync has terminated the failed transaction. + +.. warning:: + + The exact Python statement where an exception caused by a server error is + raised is somewhat arbitrary: it depends on when the server flushes its + buffered result. + + If you want to make sure that a group of statements is applied atomically + by the server, do make use of transaction methods such as + `~Connection.commit()` or `~Connection.transaction()`: these methods will + also sync the pipeline and raise an exception if there was any error in + the commands executed so far. + + +The fine prints +--------------- + +.. warning:: + + The Pipeline mode is an experimental feature. + + Its behaviour, especially around error conditions and concurrency, hasn't + been explored as much as the normal request-response messages pattern, and + its async nature makes it inherently more complex. + + As we gain more experience and feedback (which is welcome), we might find + bugs and shortcomings forcing us to change the current interface or + behaviour. + +The pipeline mode is available on any currently supported PostgreSQL version, +but, in order to make use of it, the client must use a libpq from PostgreSQL +14 or higher. You can use `Pipeline.is_supported()` to make sure your client +has the right library. diff --git a/docs/advanced/pool.rst b/docs/advanced/pool.rst new file mode 100644 index 0000000..adea0a7 --- /dev/null +++ b/docs/advanced/pool.rst @@ -0,0 +1,332 @@ +.. currentmodule:: psycopg_pool + +.. _connection-pools: + +Connection pools +================ + +A `connection pool`__ is an object managing a set of connections and allowing +their use in functions needing one. Because the time to establish a new +connection can be relatively long, keeping connections open can reduce latency. + +.. __: https://en.wikipedia.org/wiki/Connection_pool + +This page explains a few basic concepts of Psycopg connection pool's +behaviour. Please refer to the `ConnectionPool` object API for details about +the pool operations. + +.. note:: The connection pool objects are distributed in a package separate + from the main `psycopg` package: use ``pip install "psycopg[pool]"`` or ``pip + install psycopg_pool`` to make the `psycopg_pool` package available. See + :ref:`pool-installation`. + + +Pool life cycle +--------------- + +A simple way to use the pool is to create a single instance of it, as a +global object, and to use this object in the rest of the program, allowing +other functions, modules, threads to use it:: + + # module db.py in your program + from psycopg_pool import ConnectionPool + + pool = ConnectionPool(conninfo, **kwargs) + # the pool starts connecting immediately. + + # in another module + from .db import pool + + def my_function(): + with pool.connection() as conn: + conn.execute(...) + +Ideally you may want to call `~ConnectionPool.close()` when the use of the +pool is finished. Failing to call `!close()` at the end of the program is not +terribly bad: probably it will just result in some warnings printed on stderr. +However, if you think that it's sloppy, you could use the `atexit` module to +have `!close()` called at the end of the program. + +If you want to avoid starting to connect to the database at import time, and +want to wait for the application to be ready, you can create the pool using +`!open=False`, and call the `~ConnectionPool.open()` and +`~ConnectionPool.close()` methods when the conditions are right. Certain +frameworks provide callbacks triggered when the program is started and stopped +(for instance `FastAPI startup/shutdown events`__): they are perfect to +initiate and terminate the pool operations:: + + pool = ConnectionPool(conninfo, open=False, **kwargs) + + @app.on_event("startup") + def open_pool(): + pool.open() + + @app.on_event("shutdown") + def close_pool(): + pool.close() + +.. __: https://fastapi.tiangolo.com/advanced/events/#events-startup-shutdown + +Creating a single pool as a global variable is not the mandatory use: your +program can create more than one pool, which might be useful to connect to +more than one database, or to provide different types of connections, for +instance to provide separate read/write and read-only connections. The pool +also acts as a context manager and is open and closed, if necessary, on +entering and exiting the context block:: + + from psycopg_pool import ConnectionPool + + with ConnectionPool(conninfo, **kwargs) as pool: + run_app(pool) + + # the pool is now closed + +When the pool is open, the pool's background workers start creating the +requested `!min_size` connections, while the constructor (or the `!open()` +method) returns immediately. This allows the program some leeway to start +before the target database is up and running. However, if your application is +misconfigured, or the network is down, it means that the program will be able +to start, but the threads requesting a connection will fail with a +`PoolTimeout` only after the timeout on `~ConnectionPool.connection()` is +expired. If this behaviour is not desirable (and you prefer your program to +crash hard and fast, if the surrounding conditions are not right, because +something else will respawn it) you should call the `~ConnectionPool.wait()` +method after creating the pool, or call `!open(wait=True)`: these methods will +block until the pool is full, or will raise a `PoolTimeout` exception if the +pool isn't ready within the allocated time. + + +Connections life cycle +---------------------- + +The pool background workers create connections according to the parameters +`!conninfo`, `!kwargs`, and `!connection_class` passed to `ConnectionPool` +constructor, invoking something like :samp:`{connection_class}({conninfo}, +**{kwargs})`. Once a connection is created it is also passed to the +`!configure()` callback, if provided, after which it is put in the pool (or +passed to a client requesting it, if someone is already knocking at the door). + +If a connection expires (it passes `!max_lifetime`), or is returned to the pool +in broken state, or is found closed by `~ConnectionPool.check()`), then the +pool will dispose of it and will start a new connection attempt in the +background. + + +Using connections from the pool +------------------------------- + +The pool can be used to request connections from multiple threads or +concurrent tasks - it is hardly useful otherwise! If more connections than the +ones available in the pool are requested, the requesting threads are queued +and are served a connection as soon as one is available, either because +another client has finished using it or because the pool is allowed to grow +(when `!max_size` > `!min_size`) and a new connection is ready. + +The main way to use the pool is to obtain a connection using the +`~ConnectionPool.connection()` context, which returns a `~psycopg.Connection` +or subclass:: + + with my_pool.connection() as conn: + conn.execute("what you want") + +The `!connection()` context behaves like the `~psycopg.Connection` object +context: at the end of the block, if there is a transaction open, it will be +committed, or rolled back if the context is exited with as exception. + +At the end of the block the connection is returned to the pool and shouldn't +be used anymore by the code which obtained it. If a `!reset()` function is +specified in the pool constructor, it is called on the connection before +returning it to the pool. Note that the `!reset()` function is called in a +worker thread, so that the thread which used the connection can keep its +execution without being slowed down by it. + + +Pool connection and sizing +-------------------------- + +A pool can have a fixed size (specifying no `!max_size` or `!max_size` = +`!min_size`) or a dynamic size (when `!max_size` > `!min_size`). In both +cases, as soon as the pool is created, it will try to acquire `!min_size` +connections in the background. + +If an attempt to create a connection fails, a new attempt will be made soon +after, using an exponential backoff to increase the time between attempts, +until a maximum of `!reconnect_timeout` is reached. When that happens, the pool +will call the `!reconnect_failed()` function, if provided to the pool, and just +start a new connection attempt. You can use this function either to send +alerts or to interrupt the program and allow the rest of your infrastructure +to restart it. + +If more than `!min_size` connections are requested concurrently, new ones are +created, up to `!max_size`. Note that the connections are always created by the +background workers, not by the thread asking for the connection: if a client +requests a new connection, and a previous client terminates its job before the +new connection is ready, the waiting client will be served the existing +connection. This is especially useful in scenarios where the time to establish +a connection dominates the time for which the connection is used (see `this +analysis`__, for instance). + +.. __: https://github.com/brettwooldridge/HikariCP/blob/dev/documents/ + Welcome-To-The-Jungle.md + +If a pool grows above `!min_size`, but its usage decreases afterwards, a number +of connections are eventually closed: one every time a connection is unused +after the `!max_idle` time specified in the pool constructor. + + +What's the right size for the pool? +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Big question. Who knows. However, probably not as large as you imagine. Please +take a look at `this analysis`__ for some ideas. + +.. __: https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing + +Something useful you can do is probably to use the +`~ConnectionPool.get_stats()` method and monitor the behaviour of your program +to tune the configuration parameters. The size of the pool can also be changed +at runtime using the `~ConnectionPool.resize()` method. + + +.. _null-pool: + +Null connection pools +--------------------- + +.. versionadded:: 3.1 + +Sometimes you may want leave the choice of using or not using a connection +pool as a configuration parameter of your application. For instance, you might +want to use a pool if you are deploying a "large instance" of your application +and can dedicate it a handful of connections; conversely you might not want to +use it if you deploy the application in several instances, behind a load +balancer, and/or using an external connection pool process such as PgBouncer. + +Switching between using or not using a pool requires some code change, because +the `ConnectionPool` API is different from the normal `~psycopg.connect()` +function and because the pool can perform additional connection configuration +(in the `!configure` parameter) that, if the pool is removed, should be +performed in some different code path of your application. + +The `!psycopg_pool` 3.1 package introduces the `NullConnectionPool` class. +This class has the same interface, and largely the same behaviour, of the +`!ConnectionPool`, but doesn't create any connection beforehand. When a +connection is returned, unless there are other clients already waiting, it +is closed immediately and not kept in the pool state. + +A null pool is not only a configuration convenience, but can also be used to +regulate the access to the server by a client program. If `!max_size` is set to +a value greater than 0, the pool will make sure that no more than `!max_size` +connections are created at any given time. If more clients ask for further +connections, they will be queued and served a connection as soon as a previous +client has finished using it, like for the basic pool. Other mechanisms to +throttle client requests (such as `!timeout` or `!max_waiting`) are respected +too. + +.. note:: + + Queued clients will be handed an already established connection, as soon + as a previous client has finished using it (and after the pool has + returned it to idle state and called `!reset()` on it, if necessary). + +Because normally (i.e. unless queued) every client will be served a new +connection, the time to obtain the connection is paid by the waiting client; +background workers are not normally involved in obtaining new connections. + + +Connection quality +------------------ + +The state of the connection is verified when a connection is returned to the +pool: if a connection is broken during its usage it will be discarded on +return and a new connection will be created. + +.. warning:: + + The health of the connection is not checked when the pool gives it to a + client. + +Why not? Because doing so would require an extra network roundtrip: we want to +save you from its latency. Before getting too angry about it, just think that +the connection can be lost any moment while your program is using it. As your +program should already be able to cope with a loss of a connection during its +process, it should be able to tolerate to be served a broken connection: +unpleasant but not the end of the world. + +.. warning:: + + The health of the connection is not checked when the connection is in the + pool. + +Does the pool keep a watchful eye on the quality of the connections inside it? +No, it doesn't. Why not? Because you will do it for us! Your program is only +a big ruse to make sure the connections are still alive... + +Not (entirely) trolling: if you are using a connection pool, we assume that +you are using and returning connections at a good pace. If the pool had to +check for the quality of a broken connection before your program notices it, +it should be polling each connection even faster than your program uses them. +Your database server wouldn't be amused... + +Can you do something better than that? Of course you can, there is always a +better way than polling. You can use the same recipe of :ref:`disconnections`, +reserving a connection and using a thread to monitor for any activity +happening on it. If any activity is detected, you can call the pool +`~ConnectionPool.check()` method, which will run a quick check on each +connection in the pool, removing the ones found in broken state, and using the +background workers to replace them with fresh ones. + +If you set up a similar check in your program, in case the database connection +is temporarily lost, we cannot do anything for the threads which had taken +already a connection from the pool, but no other thread should be served a +broken connection, because `!check()` would empty the pool and refill it with +working connections, as soon as they are available. + +Faster than you can say poll. Or pool. + + +.. _pool-stats: + +Pool stats +---------- + +The pool can return information about its usage using the methods +`~ConnectionPool.get_stats()` or `~ConnectionPool.pop_stats()`. Both methods +return the same values, but the latter reset the counters after its use. The +values can be sent to a monitoring system such as Graphite_ or Prometheus_. + +.. _Graphite: https://graphiteapp.org/ +.. _Prometheus: https://prometheus.io/ + +The following values should be provided, but please don't consider them as a +rigid interface: it is possible that they might change in the future. Keys +whose value is 0 may not be returned. + + +======================= ===================================================== +Metric Meaning +======================= ===================================================== + ``pool_min`` Current value for `~ConnectionPool.min_size` + ``pool_max`` Current value for `~ConnectionPool.max_size` + ``pool_size`` Number of connections currently managed by the pool + (in the pool, given to clients, being prepared) + ``pool_available`` Number of connections currently idle in the pool + ``requests_waiting`` Number of requests currently waiting in a queue to + receive a connection + ``usage_ms`` Total usage time of the connections outside the pool + ``requests_num`` Number of connections requested to the pool + ``requests_queued`` Number of requests queued because a connection wasn't + immediately available in the pool + ``requests_wait_ms`` Total time in the queue for the clients waiting + ``requests_errors`` Number of connection requests resulting in an error + (timeouts, queue full...) + ``returns_bad`` Number of connections returned to the pool in a bad + state + ``connections_num`` Number of connection attempts made by the pool to the + server + ``connections_ms`` Total time spent to establish connections with the + server + ``connections_errors`` Number of failed connection attempts + ``connections_lost`` Number of connections lost identified by + `~ConnectionPool.check()` +======================= ===================================================== diff --git a/docs/advanced/prepare.rst b/docs/advanced/prepare.rst new file mode 100644 index 0000000..e41bcae --- /dev/null +++ b/docs/advanced/prepare.rst @@ -0,0 +1,57 @@ +.. currentmodule:: psycopg + +.. index:: + single: Prepared statements + +.. _prepared-statements: + +Prepared statements +=================== + +Psycopg uses an automatic system to manage *prepared statements*. When a +query is prepared, its parsing and planning is stored in the server session, +so that further executions of the same query on the same connection (even with +different parameters) are optimised. + +A query is prepared automatically after it is executed more than +`~Connection.prepare_threshold` times on a connection. `!psycopg` will make +sure that no more than `~Connection.prepared_max` statements are planned: if +further queries are executed, the least recently used ones are deallocated and +the associated resources freed. + +Statement preparation can be controlled in several ways: + +- You can decide to prepare a query immediately by passing `!prepare=True` to + `Connection.execute()` or `Cursor.execute()`. The query is prepared, if it + wasn't already, and executed as prepared from its first use. + +- Conversely, passing `!prepare=False` to `!execute()` will avoid to prepare + the query, regardless of the number of times it is executed. The default for + the parameter is `!None`, meaning that the query is prepared if the + conditions described above are met. + +- You can disable the use of prepared statements on a connection by setting + its `~Connection.prepare_threshold` attribute to `!None`. + +.. versionchanged:: 3.1 + You can set `!prepare_threshold` as a `~Connection.connect()` keyword + parameter too. + +.. seealso:: + + The `PREPARE`__ PostgreSQL documentation contains plenty of details about + prepared statements in PostgreSQL. + + Note however that Psycopg doesn't use SQL statements such as + :sql:`PREPARE` and :sql:`EXECUTE`, but protocol level commands such as the + ones exposed by :pq:`PQsendPrepare`, :pq:`PQsendQueryPrepared`. + + .. __: https://www.postgresql.org/docs/current/sql-prepare.html + +.. warning:: + + Using external connection poolers, such as PgBouncer, is not compatible + with prepared statements, because the same client connection may change + the server session it refers to. If such middleware is used you should + disable prepared statements, by setting the `Connection.prepare_threshold` + attribute to `!None`. diff --git a/docs/advanced/rows.rst b/docs/advanced/rows.rst new file mode 100644 index 0000000..c23efe5 --- /dev/null +++ b/docs/advanced/rows.rst @@ -0,0 +1,116 @@ +.. currentmodule:: psycopg + +.. index:: row factories + +.. _row-factories: + +Row factories +============= + +Cursor's `fetch*` methods, by default, return the records received from the +database as tuples. This can be changed to better suit the needs of the +programmer by using custom *row factories*. + +The module `psycopg.rows` exposes several row factories ready to be used. For +instance, if you want to return your records as dictionaries, you can use +`~psycopg.rows.dict_row`:: + + >>> from psycopg.rows import dict_row + + >>> conn = psycopg.connect(DSN, row_factory=dict_row) + + >>> conn.execute("select 'John Doe' as name, 33 as age").fetchone() + {'name': 'John Doe', 'age': 33} + +The `!row_factory` parameter is supported by the `~Connection.connect()` +method and the `~Connection.cursor()` method. Later usage of `!row_factory` +overrides a previous one. It is also possible to change the +`Connection.row_factory` or `Cursor.row_factory` attributes to change what +they return:: + + >>> cur = conn.cursor(row_factory=dict_row) + >>> cur.execute("select 'John Doe' as name, 33 as age").fetchone() + {'name': 'John Doe', 'age': 33} + + >>> from psycopg.rows import namedtuple_row + >>> cur.row_factory = namedtuple_row + >>> cur.execute("select 'John Doe' as name, 33 as age").fetchone() + Row(name='John Doe', age=33) + +If you want to return objects of your choice you can use a row factory +*generator*, for instance `~psycopg.rows.class_row` or +`~psycopg.rows.args_row`, or you can :ref:`write your own row factory +<row-factory-create>`:: + + >>> from dataclasses import dataclass + + >>> @dataclass + ... class Person: + ... name: str + ... age: int + ... weight: Optional[int] = None + + >>> from psycopg.rows import class_row + >>> cur = conn.cursor(row_factory=class_row(Person)) + >>> cur.execute("select 'John Doe' as name, 33 as age").fetchone() + Person(name='John Doe', age=33, weight=None) + + +.. index:: + single: Row Maker + single: Row Factory + +.. _row-factory-create: + +Creating new row factories +-------------------------- + +A *row factory* is a callable that accepts a `Cursor` object and returns +another callable, a *row maker*, which takes raw data (as a sequence of +values) and returns the desired object. + +The role of the row factory is to inspect a query result (it is called after a +query is executed and properties such as `~Cursor.description` and +`~Cursor.pgresult` are available on the cursor) and to prepare a callable +which is efficient to call repeatedly (because, for instance, the names of the +columns are extracted, sanitised, and stored in local variables). + +Formally, these objects are represented by the `~psycopg.rows.RowFactory` and +`~psycopg.rows.RowMaker` protocols. + +`~RowFactory` objects can be implemented as a class, for instance: + +.. code:: python + + from typing import Any, Sequence + from psycopg import Cursor + + class DictRowFactory: + def __init__(self, cursor: Cursor[Any]): + self.fields = [c.name for c in cursor.description] + + def __call__(self, values: Sequence[Any]) -> dict[str, Any]: + return dict(zip(self.fields, values)) + +or as a plain function: + +.. code:: python + + def dict_row_factory(cursor: Cursor[Any]) -> RowMaker[dict[str, Any]]: + fields = [c.name for c in cursor.description] + + def make_row(values: Sequence[Any]) -> dict[str, Any]: + return dict(zip(fields, values)) + + return make_row + +These can then be used by specifying a `row_factory` argument in +`Connection.connect()`, `Connection.cursor()`, or by setting the +`Connection.row_factory` attribute. + +.. code:: python + + conn = psycopg.connect(row_factory=DictRowFactory) + cur = conn.execute("SELECT first_name, last_name, age FROM persons") + person = cur.fetchone() + print(f"{person['first_name']} {person['last_name']}") diff --git a/docs/advanced/typing.rst b/docs/advanced/typing.rst new file mode 100644 index 0000000..71b4e41 --- /dev/null +++ b/docs/advanced/typing.rst @@ -0,0 +1,180 @@ +.. currentmodule:: psycopg + +.. _static-typing: + +Static Typing +============= + +Psycopg source code is annotated according to :pep:`0484` type hints and is +checked using the current version of Mypy_ in ``--strict`` mode. + +If your application is checked using Mypy too you can make use of Psycopg +types to validate the correct use of Psycopg objects and of the data returned +by the database. + +.. _Mypy: http://mypy-lang.org/ + + +Generic types +------------- + +Psycopg `Connection` and `Cursor` objects are `~typing.Generic` objects and +support a `!Row` parameter which is the type of the records returned. + +By default methods such as `Cursor.fetchall()` return normal tuples of unknown +size and content. As such, the `connect()` function returns an object of type +`!psycopg.Connection[Tuple[Any, ...]]` and `Connection.cursor()` returns an +object of type `!psycopg.Cursor[Tuple[Any, ...]]`. If you are writing generic +plumbing code it might be practical to use annotations such as +`!Connection[Any]` and `!Cursor[Any]`. + +.. code:: python + + conn = psycopg.connect() # type is psycopg.Connection[Tuple[Any, ...]] + + cur = conn.cursor() # type is psycopg.Cursor[Tuple[Any, ...]] + + rec = cur.fetchone() # type is Optional[Tuple[Any, ...]] + + recs = cur.fetchall() # type is List[Tuple[Any, ...]] + + +.. _row-factory-static: + +Type of rows returned +--------------------- + +If you want to use connections and cursors returning your data as different +types, for instance as dictionaries, you can use the `!row_factory` argument +of the `~Connection.connect()` and the `~Connection.cursor()` method, which +will control what type of record is returned by the fetch methods of the +cursors and annotate the returned objects accordingly. See +:ref:`row-factories` for more details. + +.. code:: python + + dconn = psycopg.connect(row_factory=dict_row) + # dconn type is psycopg.Connection[Dict[str, Any]] + + dcur = conn.cursor(row_factory=dict_row) + dcur = dconn.cursor() + # dcur type is psycopg.Cursor[Dict[str, Any]] in both cases + + drec = dcur.fetchone() + # drec type is Optional[Dict[str, Any]] + + +.. _example-pydantic: + +Example: returning records as Pydantic models +--------------------------------------------- + +Using Pydantic_ it is possible to enforce static typing at runtime. Using a +Pydantic model factory the code can be checked statically using Mypy and +querying the database will raise an exception if the rows returned is not +compatible with the model. + +.. _Pydantic: https://pydantic-docs.helpmanual.io/ + +The following example can be checked with ``mypy --strict`` without reporting +any issue. Pydantic will also raise a runtime error in case the +`!Person` is used with a query that returns incompatible data. + +.. code:: python + + from datetime import date + from typing import Optional + + import psycopg + from psycopg.rows import class_row + from pydantic import BaseModel + + class Person(BaseModel): + id: int + first_name: str + last_name: str + dob: Optional[date] + + def fetch_person(id: int) -> Person: + with psycopg.connect() as conn: + with conn.cursor(row_factory=class_row(Person)) as cur: + cur.execute( + """ + SELECT id, first_name, last_name, dob + FROM (VALUES + (1, 'John', 'Doe', '2000-01-01'::date), + (2, 'Jane', 'White', NULL) + ) AS data (id, first_name, last_name, dob) + WHERE id = %(id)s; + """, + {"id": id}, + ) + obj = cur.fetchone() + + # reveal_type(obj) would return 'Optional[Person]' here + + if not obj: + raise KeyError(f"person {id} not found") + + # reveal_type(obj) would return 'Person' here + + return obj + + for id in [1, 2]: + p = fetch_person(id) + if p.dob: + print(f"{p.first_name} was born in {p.dob.year}") + else: + print(f"Who knows when {p.first_name} was born") + + +.. _literal-string: + +Checking literal strings in queries +----------------------------------- + +The `~Cursor.execute()` method and similar should only receive a literal +string as input, according to :pep:`675`. This means that the query should +come from a literal string in your code, not from an arbitrary string +expression. + +For instance, passing an argument to the query should be done via the second +argument to `!execute()`, not by string composition: + +.. code:: python + + def get_record(conn: psycopg.Connection[Any], id: int) -> Any: + cur = conn.execute("SELECT * FROM my_table WHERE id = %s" % id) # BAD! + return cur.fetchone() + + # the function should be implemented as: + + def get_record(conn: psycopg.Connection[Any], id: int) -> Any: + cur = conn.execute("select * FROM my_table WHERE id = %s", (id,)) + return cur.fetchone() + +If you are composing a query dynamically you should use the `sql.SQL` object +and similar to escape safely table and field names. The parameter of the +`!SQL()` object should be a literal string: + +.. code:: python + + def count_records(conn: psycopg.Connection[Any], table: str) -> int: + query = "SELECT count(*) FROM %s" % table # BAD! + return conn.execute(query).fetchone()[0] + + # the function should be implemented as: + + def count_records(conn: psycopg.Connection[Any], table: str) -> int: + query = sql.SQL("SELECT count(*) FROM {}").format(sql.Identifier(table)) + return conn.execute(query).fetchone()[0] + +At the time of writing, no Python static analyzer implements this check (`mypy +doesn't implement it`__, Pyre_ does, but `doesn't work with psycopg yet`__). +Once the type checkers support will be complete, the above bad statements +should be reported as errors. + +.. __: https://github.com/python/mypy/issues/12554 +.. __: https://github.com/facebook/pyre-check/issues/636 + +.. _Pyre: https://pyre-check.org/ |