Last active
March 7, 2023 20:35
-
-
Save amotl/8ee866bf204c5c5ee4bad463ec3660a0 to your computer and use it in GitHub Desktop.
Difference between test cases for CrateDB using "asyncpg" vs. "psycopg3"
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- tests/client_tests/python/asyncpg/test_asyncpg.py 2023-03-07 21:33:30.000000000 +0100 | |
+++ tests/client_tests/python/psycopg3/test_psycopg3.py 2023-03-07 21:33:30.000000000 +0100 | |
@@ -1,14 +1,15 @@ | |
""" | |
About | |
===== | |
-Test cases for CrateDB using `asyncpg`. | |
+ | |
+Test cases for CrateDB using `psycopg3` [1,2]. | |
Usage | |
===== | |
Normally, this will be executed through the main Python test suite | |
triggered through the toplevel `Jenkinsfile`. | |
-However, to run the `asyncpg` tests only, for example on a CrateDB | |
+However, to run the `psycopg3` tests only, for example on a CrateDB | |
instance already provided through Docker, there's an alternative | |
option which goes along like this:: | |
@@ -17,61 +18,73 @@ | |
# Run test suite. | |
export CRATEDB_URI=postgres://crate@localhost:5432/doc | |
- python -m unittest discover -vvvf -s tests/client_tests -k test_asyncpg | |
+ python -m unittest discover -vvvf -s tests/client_tests -k test_psycopg3 | |
+References | |
+========== | |
+[1] https://www.psycopg.org/psycopg3/docs/ | |
+[2] https://github.com/psycopg/psycopg | |
""" | |
import os | |
import asyncio | |
-import asyncpg | |
+ | |
+import psycopg | |
+import psycopg.rows | |
import unittest | |
+ | |
+import psycopg_pool | |
+ | |
from crate.qa.tests import NodeProvider | |
-async def basic_queries(test, conn): | |
+async def basic_queries(test, conn: psycopg.AsyncConnection): | |
await conn.execute("DROP TABLE IF EXISTS t1") | |
result = await conn.execute( | |
"CREATE TABLE t1 (x int primary key, y int)") | |
- test.assertEqual(result, 'CREATE 1') | |
+ test.assertResultCommandEqual(result, 'CREATE 1') | |
- result = await conn.execute('INSERT INTO t1 (x) VALUES (?)', 1) | |
- test.assertEqual(result, 'INSERT 0 1') | |
+ result = await conn.execute('INSERT INTO t1 (x) VALUES (%s)', [1]) | |
+ test.assertResultCommandEqual(result, 'INSERT 0 1') | |
result = await conn.execute('REFRESH TABLE t1') | |
- test.assertEqual(result, 'REFRESH 1') | |
+ test.assertResultCommandEqual(result, 'REFRESH 1') | |
- result = await conn.execute('UPDATE t1 SET y = ?', 2) | |
- test.assertEqual(result, 'UPDATE 1') | |
+ result = await conn.execute('UPDATE t1 SET y = %s', [2]) | |
+ test.assertResultCommandEqual(result, 'UPDATE 1') | |
result = await conn.execute('REFRESH TABLE t1') | |
- test.assertEqual(result, 'REFRESH 1') | |
+ test.assertResultCommandEqual(result, 'REFRESH 1') | |
- result = await conn.execute('DELETE FROM t1 WHERE y = ?', 2) | |
- test.assertEqual(result, 'DELETE 1') | |
+ result = await conn.execute('DELETE FROM t1 WHERE y = %s', [2]) | |
+ test.assertResultCommandEqual(result, 'DELETE 1') | |
result = await conn.execute(''' | |
INSERT INTO t1 (x) ( | |
- SELECT unnest FROM unnest([1, 2]) WHERE unnest = ? | |
+ SELECT unnest FROM unnest([1, 2]) WHERE unnest = %s | |
) | |
- ''', 1) | |
- test.assertEqual(result, 'INSERT 0 1') | |
+ ''', [1]) | |
+ test.assertResultCommandEqual(result, 'INSERT 0 1') | |
result = await conn.execute(''' | |
INSERT INTO t1 (x) ( | |
- SELECT unnest FROM unnest([1, 2]) WHERE unnest = ?) | |
- ON CONFLICT (x) DO UPDATE SET y = ? | |
- ''', 1, 2) | |
- test.assertEqual(result, 'INSERT 0 1') | |
+ SELECT unnest FROM unnest([1, 2]) WHERE unnest = %s) | |
+ ON CONFLICT (x) DO UPDATE SET y = %s | |
+ ''', [1, 2]) | |
+ test.assertResultCommandEqual(result, 'INSERT 0 1') | |
async def record_type_can_be_read_using_binary_streaming(test, conn): | |
+ return unittest.skip("Not sure how to implement with psycopg3") | |
result = await conn.fetch('SELECT pg_catalog.pg_get_keywords()') | |
keyword = sorted([row[0] for row in result], key=lambda x: x[0])[0] | |
test.assertEqual(keyword, ('absolute', 'U', 'unreserved')) | |
async def bitstring_can_be_inserted_and_selected_using_binary_encoding(test, conn): | |
+ return unittest.skip("Not supported by psycopg3, see https://github.com/psycopg/psycopg/tree/3.1.8/psycopg/psycopg/types") | |
+ """ | |
xs = asyncpg.BitString('0101') | |
await conn.execute('drop table if exists tbl_bit') | |
await conn.execute('create table tbl_bit (xs bit(4))') | |
@@ -79,15 +92,51 @@ | |
await conn.execute('refresh table tbl_bit') | |
result = await conn.fetch('select xs from tbl_bit') | |
test.assertEqual(result[0][0], xs) | |
+ """ | |
+ | |
+ | |
+async def fetch_summits_execute(test, uri): | |
+ """ | |
+ Use the `cursor.execute` method to acquire results. | |
+ | |
+ https://www.psycopg.org/psycopg3/docs/advanced/async.html | |
+ """ | |
+ conn = await psycopg.AsyncConnection.connect(uri) | |
+ async with conn.transaction(): | |
+ async with conn.cursor(row_factory=psycopg.rows.dict_row) as cursor: | |
+ cur = await cursor.execute( | |
+ 'select mountain from sys.summits order by height desc') | |
+ first, second = await cur.fetchmany(size=2) | |
+ third, fourth = await cur.fetchmany(size=2) | |
+ await conn.close() | |
+ test.assertEqual(first['mountain'], 'Mont Blanc') | |
+ test.assertEqual(second['mountain'], 'Monte Rosa') | |
+ test.assertEqual(third['mountain'], 'Dom') | |
+ test.assertEqual(fourth['mountain'], 'Liskamm') | |
-async def fetch_summits(test, uri): | |
- conn = await asyncpg.connect(uri) | |
+async def fetch_summits_stream(test, uri): | |
+ """ | |
+ Use the `cursor.stream` method to acquire results. | |
+ | |
+ https://www.psycopg.org/psycopg3/docs/advanced/async.html | |
+ https://www.psycopg.org/psycopg3/docs/api/cursors.html#psycopg.AsyncCursor.stream | |
+ """ | |
+ records = [] | |
+ conn = await psycopg.AsyncConnection.connect(uri) | |
async with conn.transaction(): | |
- cur = await conn.cursor( | |
- 'select mountain from sys.summits order by height desc') | |
- first, second = await cur.fetch(2) | |
- third, fourth = await cur.fetch(2) | |
+ async with conn.cursor(row_factory=psycopg.rows.dict_row) as cursor: | |
+ gen = cursor.stream( | |
+ 'select mountain from sys.summits order by height desc') | |
+ | |
+ # NOTE: Must exhaust the generator completely. | |
+ # When using `await anext(cur)`, the program stalls. | |
+ async for record in gen: | |
+ records.append(record) | |
+ | |
+ await conn.close() | |
+ first, second = records[0:2] | |
+ third, fourth = records[2:4] | |
test.assertEqual(first['mountain'], 'Mont Blanc') | |
test.assertEqual(second['mountain'], 'Monte Rosa') | |
test.assertEqual(third['mountain'], 'Dom') | |
@@ -95,14 +144,15 @@ | |
async def exec_queries_pooled(test, uri): | |
- pool = await asyncpg.create_pool(uri) | |
- async with pool.acquire() as conn: | |
+ pool = psycopg_pool.AsyncConnectionPool(uri) | |
+ async with pool.connection() as conn: | |
await basic_queries(test, conn) | |
await record_type_can_be_read_using_binary_streaming(test, conn) | |
await bitstring_can_be_inserted_and_selected_using_binary_encoding(test, conn) | |
+ await pool.close() | |
-class AsyncpgTestCase(NodeProvider, unittest.TestCase): | |
+class Psycopg3AsyncTestCase(NodeProvider, unittest.TestCase): | |
def ensure_cratedb(self): | |
if "CRATEDB_URI" in os.environ: | |
@@ -117,11 +167,22 @@ | |
def test_basic_statements(self): | |
crate_psql_url = self.ensure_cratedb() | |
- loop = asyncio.get_event_loop() | |
- loop.run_until_complete(exec_queries_pooled(self, crate_psql_url)) | |
+ asyncio.run(exec_queries_pooled(self, crate_psql_url)) | |
+ | |
+ def test_result_execute_using_fetch_size(self): | |
+ crate_psql_url = self.ensure_cratedb() | |
+ asyncio.run(fetch_summits_execute(self, crate_psql_url)) | |
- def test_result_streaming_using_fetch_size(self): | |
+ def test_result_streaming(self): | |
crate_psql_url = self.ensure_cratedb() | |
- loop = asyncio.get_event_loop() | |
- loop.run_until_complete( | |
- fetch_summits(self, crate_psql_url)) | |
+ asyncio.run(fetch_summits_stream(self, crate_psql_url)) | |
+ | |
+ def assertResultCommandEqual(self, result: psycopg.Cursor, command: str, msg=None): | |
+ | |
+ # Would be correct, but also would be a little strict, and mask the error message. | |
+ # self.assertEqual(result.pgresult.status, psycopg.pq.ExecStatus.COMMAND_OK) | |
+ | |
+ # Satisfy mypy. | |
+ assert result.pgresult | |
+ | |
+ self.assertEqual(result.pgresult.command_status, command.encode(), msg=msg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment