Skip to content

Instantly share code, notes, and snippets.

@azat
Created October 12, 2023 16:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save azat/da85ff0bde3f0da259144b0ba361cd64 to your computer and use it in GitHub Desktop.
Save azat/da85ff0bde3f0da259144b0ba361cd64 to your computer and use it in GitHub Desktop.
diff --git a/clickhouse_driver/client.py b/clickhouse_driver/client.py
index 3c4c097..5dbed7f 100644
--- a/clickhouse_driver/client.py
+++ b/clickhouse_driver/client.py
@@ -597,7 +597,7 @@ class Client(object):
if sample_block:
rv = self.send_data(sample_block, data,
types_check=types_check, columnar=columnar)
- self.receive_end_of_query()
+ self.receive_end_of_insert_query()
return rv
def receive_sample_block(self):
@@ -657,6 +657,8 @@ class Client(object):
# Empty block means end of data.
self.connection.send_data(block_cls())
+ self.connection.receive_profile_events()
+
return inserted_rows
def receive_end_of_query(self):
@@ -681,6 +683,25 @@ class Client(object):
elif packet.type == ServerPacketTypes.PROFILE_EVENTS:
pass
+ else:
+ message = self.connection.unexpected_packet_message(
+ 'Exception, EndOfStream, Progress, TableColumns, ProfileEvents or Log', packet.type
+ )
+ raise errors.UnexpectedPacketFromServerError(message)
+
+ def receive_end_of_insert_query(self):
+ while True:
+ packet = self.connection.receive_packet()
+
+ if packet.type == ServerPacketTypes.END_OF_STREAM:
+ break
+
+ elif packet.type == ServerPacketTypes.EXCEPTION:
+ raise packet.exception
+
+ elif packet.type == ServerPacketTypes.LOG:
+ log_block(packet.block)
+
else:
message = self.connection.unexpected_packet_message(
'Exception, EndOfStream or Log', packet.type
diff --git a/tests/test_insert.py b/tests/test_insert.py
index 703c28c..2d76ed8 100644
--- a/tests/test_insert.py
+++ b/tests/test_insert.py
@@ -163,6 +163,13 @@ class InsertTestCase(BaseTestCase):
inserted = self.emit_cli(query)
self.assertEqual(inserted, '1\n')
+ def test_profile_events(self):
+ with self.create_table('x Int32'):
+ data = [{'x': 1}]
+ self.client.execute(
+ 'INSERT INTO test (x) VALUES', data
+ )
+
class InsertColumnarTestCase(BaseTestCase):
def test_insert_tuple_ok(self):
diff --git a/tests/test_long_insert.py b/tests/test_long_insert.py
deleted file mode 100644
index 9a713d5..0000000
--- a/tests/test_long_insert.py
+++ /dev/null
@@ -1,50 +0,0 @@
-from time import sleep
-from unittest.mock import patch
-
-from tests.testcase import BaseTestCase
-
-
-class LongInsertTestCase(BaseTestCase):
- client_kwargs = {
- 'settings': {
- 'insert_block_size': 1,
- 'send_timeout': 1,
- 'receive_timeout': 1,
- },
- }
-
- def test_long_insert(self):
- """
- In this test we are trying to emulate the situation, where we have a
- lot of insert blocks. From specific clickhouse version server would
- send ProfileEvents packet in response to each insert.
-
- This insert should work normally for all clickhouse versions,
- even without response ProfileEvents on each insert.
- The 8_000 rows used to provide somewhat consistent experience of
- bug reproducibility without too long of a test duration.
-
- `send_timeout` & `receive_timeout` are set to 1,
- so we can emulate the real world situation on synthetic data.
- The server will send exception and timeout if the client will not
- receive the ProfileEvent during this time.
-
- We modify receive_end_of_query with sleep here to emulate long pause
- before this method calling under normal circumstances.
- """
- original_receive_end_of_query = self.client.receive_end_of_query
-
- def mocked_receive_end_of_query(*args, **kwargs):
- sleep(2)
- return original_receive_end_of_query(*args, **kwargs)
-
- with self.create_table('x Int32'):
- with patch.object(
- self.client,
- 'receive_end_of_query',
- new=mocked_receive_end_of_query
- ):
- data = [{'x': 1}] * 8_000
- self.client.execute(
- 'INSERT INTO test (x) VALUES', data
- )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment