Created
October 12, 2023 16:34
-
-
Save azat/da85ff0bde3f0da259144b0ba361cd64 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/clickhouse_driver/client.py b/clickhouse_driver/client.py | |
index 3c4c097..5dbed7f 100644 | |
--- a/clickhouse_driver/client.py | |
+++ b/clickhouse_driver/client.py | |
@@ -597,7 +597,7 @@ class Client(object): | |
if sample_block: | |
rv = self.send_data(sample_block, data, | |
types_check=types_check, columnar=columnar) | |
- self.receive_end_of_query() | |
+ self.receive_end_of_insert_query() | |
return rv | |
def receive_sample_block(self): | |
@@ -657,6 +657,8 @@ class Client(object): | |
# Empty block means end of data. | |
self.connection.send_data(block_cls()) | |
+ self.connection.receive_profile_events() | |
+ | |
return inserted_rows | |
def receive_end_of_query(self): | |
@@ -681,6 +683,25 @@ class Client(object): | |
elif packet.type == ServerPacketTypes.PROFILE_EVENTS: | |
pass | |
+ else: | |
+ message = self.connection.unexpected_packet_message( | |
+ 'Exception, EndOfStream, Progress, TableColumns, ProfileEvents or Log', packet.type | |
+ ) | |
+ raise errors.UnexpectedPacketFromServerError(message) | |
+ | |
+ def receive_end_of_insert_query(self): | |
+ while True: | |
+ packet = self.connection.receive_packet() | |
+ | |
+ if packet.type == ServerPacketTypes.END_OF_STREAM: | |
+ break | |
+ | |
+ elif packet.type == ServerPacketTypes.EXCEPTION: | |
+ raise packet.exception | |
+ | |
+ elif packet.type == ServerPacketTypes.LOG: | |
+ log_block(packet.block) | |
+ | |
else: | |
message = self.connection.unexpected_packet_message( | |
'Exception, EndOfStream or Log', packet.type | |
diff --git a/tests/test_insert.py b/tests/test_insert.py | |
index 703c28c..2d76ed8 100644 | |
--- a/tests/test_insert.py | |
+++ b/tests/test_insert.py | |
@@ -163,6 +163,13 @@ class InsertTestCase(BaseTestCase): | |
inserted = self.emit_cli(query) | |
self.assertEqual(inserted, '1\n') | |
+ def test_profile_events(self): | |
+ with self.create_table('x Int32'): | |
+ data = [{'x': 1}] | |
+ self.client.execute( | |
+ 'INSERT INTO test (x) VALUES', data | |
+ ) | |
+ | |
class InsertColumnarTestCase(BaseTestCase): | |
def test_insert_tuple_ok(self): | |
diff --git a/tests/test_long_insert.py b/tests/test_long_insert.py | |
deleted file mode 100644 | |
index 9a713d5..0000000 | |
--- a/tests/test_long_insert.py | |
+++ /dev/null | |
@@ -1,50 +0,0 @@ | |
-from time import sleep | |
-from unittest.mock import patch | |
- | |
-from tests.testcase import BaseTestCase | |
- | |
- | |
-class LongInsertTestCase(BaseTestCase): | |
- client_kwargs = { | |
- 'settings': { | |
- 'insert_block_size': 1, | |
- 'send_timeout': 1, | |
- 'receive_timeout': 1, | |
- }, | |
- } | |
- | |
- def test_long_insert(self): | |
- """ | |
- In this test we are trying to emulate the situation, where we have a | |
- lot of insert blocks. From specific clickhouse version server would | |
- send ProfileEvents packet in response to each insert. | |
- | |
- This insert should work normally for all clickhouse versions, | |
- even without response ProfileEvents on each insert. | |
- The 8_000 rows used to provide somewhat consistent experience of | |
- bug reproducibility without too long of a test duration. | |
- | |
- `send_timeout` & `receive_timeout` are set to 1, | |
- so we can emulate the real world situation on synthetic data. | |
- The server will send exception and timeout if the client will not | |
- receive the ProfileEvent during this time. | |
- | |
- We modify receive_end_of_query with sleep here to emulate long pause | |
- before this method calling under normal circumstances. | |
- """ | |
- original_receive_end_of_query = self.client.receive_end_of_query | |
- | |
- def mocked_receive_end_of_query(*args, **kwargs): | |
- sleep(2) | |
- return original_receive_end_of_query(*args, **kwargs) | |
- | |
- with self.create_table('x Int32'): | |
- with patch.object( | |
- self.client, | |
- 'receive_end_of_query', | |
- new=mocked_receive_end_of_query | |
- ): | |
- data = [{'x': 1}] * 8_000 | |
- self.client.execute( | |
- 'INSERT INTO test (x) VALUES', data | |
- ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment