Skip to content

Instantly share code, notes, and snippets.

@amotl
Last active March 7, 2023 20:35
Show Gist options
  • Save amotl/8ee866bf204c5c5ee4bad463ec3660a0 to your computer and use it in GitHub Desktop.
Save amotl/8ee866bf204c5c5ee4bad463ec3660a0 to your computer and use it in GitHub Desktop.
Difference between test cases for CrateDB using "asyncpg" vs. "psycopg3"
--- tests/client_tests/python/asyncpg/test_asyncpg.py 2023-03-07 21:33:30.000000000 +0100
+++ tests/client_tests/python/psycopg3/test_psycopg3.py 2023-03-07 21:33:30.000000000 +0100
@@ -1,14 +1,15 @@
"""
About
=====
-Test cases for CrateDB using `asyncpg`.
+
+Test cases for CrateDB using `psycopg3` [1,2].
Usage
=====
Normally, this will be executed through the main Python test suite
triggered through the toplevel `Jenkinsfile`.
-However, to run the `asyncpg` tests only, for example on a CrateDB
+However, to run the `psycopg3` tests only, for example on a CrateDB
instance already provided through Docker, there's an alternative
option which goes along like this::
@@ -17,61 +18,73 @@
# Run test suite.
export CRATEDB_URI=postgres://crate@localhost:5432/doc
- python -m unittest discover -vvvf -s tests/client_tests -k test_asyncpg
+ python -m unittest discover -vvvf -s tests/client_tests -k test_psycopg3
+References
+==========
+[1] https://www.psycopg.org/psycopg3/docs/
+[2] https://github.com/psycopg/psycopg
"""
import os
import asyncio
-import asyncpg
+
+import psycopg
+import psycopg.rows
import unittest
+
+import psycopg_pool
+
from crate.qa.tests import NodeProvider
-async def basic_queries(test, conn):
+async def basic_queries(test, conn: psycopg.AsyncConnection):
await conn.execute("DROP TABLE IF EXISTS t1")
result = await conn.execute(
"CREATE TABLE t1 (x int primary key, y int)")
- test.assertEqual(result, 'CREATE 1')
+ test.assertResultCommandEqual(result, 'CREATE 1')
- result = await conn.execute('INSERT INTO t1 (x) VALUES (?)', 1)
- test.assertEqual(result, 'INSERT 0 1')
+ result = await conn.execute('INSERT INTO t1 (x) VALUES (%s)', [1])
+ test.assertResultCommandEqual(result, 'INSERT 0 1')
result = await conn.execute('REFRESH TABLE t1')
- test.assertEqual(result, 'REFRESH 1')
+ test.assertResultCommandEqual(result, 'REFRESH 1')
- result = await conn.execute('UPDATE t1 SET y = ?', 2)
- test.assertEqual(result, 'UPDATE 1')
+ result = await conn.execute('UPDATE t1 SET y = %s', [2])
+ test.assertResultCommandEqual(result, 'UPDATE 1')
result = await conn.execute('REFRESH TABLE t1')
- test.assertEqual(result, 'REFRESH 1')
+ test.assertResultCommandEqual(result, 'REFRESH 1')
- result = await conn.execute('DELETE FROM t1 WHERE y = ?', 2)
- test.assertEqual(result, 'DELETE 1')
+ result = await conn.execute('DELETE FROM t1 WHERE y = %s', [2])
+ test.assertResultCommandEqual(result, 'DELETE 1')
result = await conn.execute('''
INSERT INTO t1 (x) (
- SELECT unnest FROM unnest([1, 2]) WHERE unnest = ?
+ SELECT unnest FROM unnest([1, 2]) WHERE unnest = %s
)
- ''', 1)
- test.assertEqual(result, 'INSERT 0 1')
+ ''', [1])
+ test.assertResultCommandEqual(result, 'INSERT 0 1')
result = await conn.execute('''
INSERT INTO t1 (x) (
- SELECT unnest FROM unnest([1, 2]) WHERE unnest = ?)
- ON CONFLICT (x) DO UPDATE SET y = ?
- ''', 1, 2)
- test.assertEqual(result, 'INSERT 0 1')
+ SELECT unnest FROM unnest([1, 2]) WHERE unnest = %s)
+ ON CONFLICT (x) DO UPDATE SET y = %s
+ ''', [1, 2])
+ test.assertResultCommandEqual(result, 'INSERT 0 1')
async def record_type_can_be_read_using_binary_streaming(test, conn):
+ return unittest.skip("Not sure how to implement with psycopg3")
result = await conn.fetch('SELECT pg_catalog.pg_get_keywords()')
keyword = sorted([row[0] for row in result], key=lambda x: x[0])[0]
test.assertEqual(keyword, ('absolute', 'U', 'unreserved'))
async def bitstring_can_be_inserted_and_selected_using_binary_encoding(test, conn):
+ return unittest.skip("Not supported by psycopg3, see https://github.com/psycopg/psycopg/tree/3.1.8/psycopg/psycopg/types")
+ """
xs = asyncpg.BitString('0101')
await conn.execute('drop table if exists tbl_bit')
await conn.execute('create table tbl_bit (xs bit(4))')
@@ -79,15 +92,51 @@
await conn.execute('refresh table tbl_bit')
result = await conn.fetch('select xs from tbl_bit')
test.assertEqual(result[0][0], xs)
+ """
+
+
+async def fetch_summits_execute(test, uri):
+ """
+ Use the `cursor.execute` method to acquire results.
+
+ https://www.psycopg.org/psycopg3/docs/advanced/async.html
+ """
+ conn = await psycopg.AsyncConnection.connect(uri)
+ async with conn.transaction():
+ async with conn.cursor(row_factory=psycopg.rows.dict_row) as cursor:
+ cur = await cursor.execute(
+ 'select mountain from sys.summits order by height desc')
+ first, second = await cur.fetchmany(size=2)
+ third, fourth = await cur.fetchmany(size=2)
+ await conn.close()
+ test.assertEqual(first['mountain'], 'Mont Blanc')
+ test.assertEqual(second['mountain'], 'Monte Rosa')
+ test.assertEqual(third['mountain'], 'Dom')
+ test.assertEqual(fourth['mountain'], 'Liskamm')
-async def fetch_summits(test, uri):
- conn = await asyncpg.connect(uri)
+async def fetch_summits_stream(test, uri):
+ """
+ Use the `cursor.stream` method to acquire results.
+
+ https://www.psycopg.org/psycopg3/docs/advanced/async.html
+ https://www.psycopg.org/psycopg3/docs/api/cursors.html#psycopg.AsyncCursor.stream
+ """
+ records = []
+ conn = await psycopg.AsyncConnection.connect(uri)
async with conn.transaction():
- cur = await conn.cursor(
- 'select mountain from sys.summits order by height desc')
- first, second = await cur.fetch(2)
- third, fourth = await cur.fetch(2)
+ async with conn.cursor(row_factory=psycopg.rows.dict_row) as cursor:
+ gen = cursor.stream(
+ 'select mountain from sys.summits order by height desc')
+
+ # NOTE: Must exhaust the generator completely.
+ # When using `await anext(cur)`, the program stalls.
+ async for record in gen:
+ records.append(record)
+
+ await conn.close()
+ first, second = records[0:2]
+ third, fourth = records[2:4]
test.assertEqual(first['mountain'], 'Mont Blanc')
test.assertEqual(second['mountain'], 'Monte Rosa')
test.assertEqual(third['mountain'], 'Dom')
@@ -95,14 +144,15 @@
async def exec_queries_pooled(test, uri):
- pool = await asyncpg.create_pool(uri)
- async with pool.acquire() as conn:
+ pool = psycopg_pool.AsyncConnectionPool(uri)
+ async with pool.connection() as conn:
await basic_queries(test, conn)
await record_type_can_be_read_using_binary_streaming(test, conn)
await bitstring_can_be_inserted_and_selected_using_binary_encoding(test, conn)
+ await pool.close()
-class AsyncpgTestCase(NodeProvider, unittest.TestCase):
+class Psycopg3AsyncTestCase(NodeProvider, unittest.TestCase):
def ensure_cratedb(self):
if "CRATEDB_URI" in os.environ:
@@ -117,11 +167,22 @@
def test_basic_statements(self):
crate_psql_url = self.ensure_cratedb()
- loop = asyncio.get_event_loop()
- loop.run_until_complete(exec_queries_pooled(self, crate_psql_url))
+ asyncio.run(exec_queries_pooled(self, crate_psql_url))
+
+ def test_result_execute_using_fetch_size(self):
+ crate_psql_url = self.ensure_cratedb()
+ asyncio.run(fetch_summits_execute(self, crate_psql_url))
- def test_result_streaming_using_fetch_size(self):
+ def test_result_streaming(self):
crate_psql_url = self.ensure_cratedb()
- loop = asyncio.get_event_loop()
- loop.run_until_complete(
- fetch_summits(self, crate_psql_url))
+ asyncio.run(fetch_summits_stream(self, crate_psql_url))
+
+ def assertResultCommandEqual(self, result: psycopg.Cursor, command: str, msg=None):
+
+ # Would be correct, but also would be a little strict, and mask the error message.
+ # self.assertEqual(result.pgresult.status, psycopg.pq.ExecStatus.COMMAND_OK)
+
+ # Satisfy mypy.
+ assert result.pgresult
+
+ self.assertEqual(result.pgresult.command_status, command.encode(), msg=msg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment