This gist tests the serializable isolation in Iceberg and compares with same in Postgres.
Table of content:
- Test Serializable Isolation in Postgres
- Conclusion: Serializable isolation in Postgres works as documented
- Test Serializable Isolation in Iceberg
- Conclusion: Serializable isolation in Iceberg doesnt work as documented
- Locally install postgres and use the sql client -
psql
, for this test. Sample instructions to setup postgres locally - https://www.sqlshack.com/setting-up-a-postgresql-database-on-mac/ - Use two iTerm2 panes (either split vertically or Horizontally for testing concurrent merges).
The following section implements the example described in the postgresql docs - https://www.postgresql.org/docs/current/transaction-iso.html#XACT-SERIALIZABLE
Start using psql -U postgres -h localhost
-- The default isolation level is read committed. So change to serializable before testing this
SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE;
DROP TABLE tbl_A;
CREATE TABLE tbl_A
(
id int,
class int,
value int
);
INSERT INTO tbl_A VALUES
(1001, 1, 10),
(1002, 1, 20),
(1003, 2, 100),
(1004, 2, 200);
-- Verify session isolation level using
SHOW TRANSACTION ISOLATION LEVEL;
Start using psql -U postgres -h localhost
-- The default isolation level is read committed. So change to serializable before testing this
SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE;
-- Verify session isolation level using
SHOW TRANSACTION ISOLATION LEVEL;
MERGE INTO tbl_A t USING (
SELECT 1005 as id, 2 as class, SUM(value) as value FROM tbl_A WHERE class = 1
) as u
ON t.id = u.id
WHEN MATCHED THEN UPDATE SET value = u.value
WHEN NOT MATCHED THEN INSERT (id, class, value) VALUES (u.id, u.class, u.value);
MERGE INTO tbl_A t USING (
SELECT 1006 as id, 1 as class, SUM(value) as value FROM v WHERE class = 2
) as u
ON t.id = u.id
WHEN MATCHED THEN UPDATE SET value = u.value
WHEN NOT MATCHED THEN INSERT (id, class, value) VALUES (u.id, u.class, u.value);
By Right click and choosing that option. This allows to run commands simultaneously across multiple iTerm2 windows
The previously pasted commands shoud be executed in both the windows.
select * from tbl_A;
| id | class | value |
|-------|-------|-------|
| 1001 | 1 | 10 |
| 1002 | 1 | 20 |
| 1003 | 2 | 100 |
| 1004 | 2 | 200 |
| 1005 | 2 | 30 |
Second Window would see as expected:
ERROR: could not serialize access due to read/write dependencies among transactions
DETAIL: Reason code: Canceled on identification as a pivot, during write.
HINT: The transaction might succeed if retried.
If retried, the MERGE INTO
would result in.
| 1006 | 1 | 330 |
select * from tbl_A;
| id | class | value |
|-------|-------|-------|
| 1001 | 1 | 10 |
| 1002 | 1 | 20 |
| 1003 | 2 | 100 |
| 1004 | 2 | 200 |
| 1006 | 1 | 300 |
First Window would see as expected:
ERROR: could not serialize access due to read/write dependencies among transactions
DETAIL: Reason code: Canceled on identification as a pivot, during write.
HINT: The transaction might succeed if retried.
If retried, the MERGE INTO
would result in.
| 1005 | 2 | 330 |
- We will use the docker setup here - https://iceberg.apache.org/spark-quickstart/ to set our environment and use SparkSQL to run the tests. As of this gist creation, the docker setup uses iceberg version 1.3.1 and spark 3.4.
- We will use two iTerm2 panes (either split vertically or Horizontally for testing concurrent merges). Start both of them using the command
docker exec -it spark-iceberg spark-sql
- The iceberg data can be accessed via the minio cluster started in http://127.0.0.1:9001/login using the creds
admin:password
if needed.
We will use the similar table and schema we used for testing Postgres. Iceberg docs suggest serializable isolation as default - https://iceberg.apache.org/docs/1.3.1/configuration/.
We will err on the safe side to set the isolation explicitly. There was no docs found on the iceberg side on how to set the isolation level for SparkSQL. But Tabular.io has a cheat sheet for iceberg spark - https://tabular.io/downloads/tabular_iceberg-spark_cheat-sheet.pdf that shows how to set the isolation-level. We will use that.
Start using docker exec -it spark-iceberg spark-sql
DROP TABLE demo.nyc.tbl_A;
CREATE TABLE demo.nyc.tbl_A
(
id int,
class int,
value int
);
INSERT INTO demo.nyc.tbl_A VALUES
(1001, 1, 10),
(1002, 1, 20),
(1003, 2, 100),
(1004, 2, 200);
-- Refer https://tabular.io/downloads/tabular_iceberg-spark_cheat-sheet.pdf for changing isolation level of merge writes to serializable level. This happens at table level.
ALTER TABLE demo.nyc.tbl_A SET TBLPROPERTIES('write.merge.isolation-level' 'serializable');
-- Verify table level isolation level using. Should show serializable.
SHOW TBLPROPERTIES demo.nyc.tbl_A;
Start using docker exec -it spark-iceberg spark-sql
-- Refresh table
REFRESH TABLE demo.nyc.tbl_A;
-- Verify table level isolation level using. Should show serializable.
SHOW TBLPROPERTIES demo.nyc.tbl_A;
MERGE INTO demo.nyc.tbl_A t USING (
SELECT 1005 as id, 2 as class, SUM(value) as value FROM demo.nyc.tbl_A WHERE class = 1
) u
ON t.id = u.id
WHEN MATCHED THEN UPDATE SET t.value = u.value
WHEN NOT MATCHED THEN INSERT (id, class, value) VALUES (u.id, u.class, u.value);
MERGE INTO demo.nyc.tbl_A t USING (
SELECT 1006 as id, 1 as class, SUM(value) as value FROM demo.nyc.tbl_A WHERE class = 2
) u
ON t.id = u.id
WHEN MATCHED THEN UPDATE SET t.value = u.value
WHEN NOT MATCHED THEN INSERT (id, class, value) VALUES (u.id, u.class, u.value);
By Right click and choosing that option. This allows to run commands simultaneously across multiple iTerm2 windows
The previously pasted commands shoud be executed in both the windows.
Run in either iTerm2 window
Either of these iTerm2 windows can succeed without issues and on refresh you should see.
REFRESH TABLE demo.nyc.tbl_A;
SELECT * FROM demo.nyc.tbl_A;
1001 1 10
1002 1 20
1003 2 100
1004 2 200
1005 2 30
1006 1 300
The other iTerm2 window would show an error stack like:
23/10/17 21:13:38 WARN Tasks: Retrying task after failure: Commit failed: Requirement failed: branch main has changed: expected id 7472540111209715110 != 1241237555039253666
org.apache.iceberg.exceptions.CommitFailedException: Commit failed: Requirement failed: branch main has changed: expected id 7472540111209715110 != 1241237555039253666
at org.apache.iceberg.rest.ErrorHandlers$CommitErrorHandler.accept(ErrorHandlers.java:80)
at org.apache.iceberg.rest.ErrorHandlers$CommitErrorHandler.accept(ErrorHandlers.java:71)
...
at org.apache.iceberg.rest.RESTClient.post(RESTClient.java:112)
at org.apache.iceberg.rest.RESTTableOperations.commit(RESTTableOperations.java:144)
at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:390)
at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:364)
at org.apache.iceberg.BaseOverwriteFiles.commit(BaseOverwriteFiles.java:31)
at org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:219)
at org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:84)
at org.apache.iceberg.spark.source.SparkWrite$CopyOnWriteOperation.commitWithSerializableIsolation(SparkWrite.java:443)
This is basically Iceberg retrying the failed commit. And eventually it will succeed. On Refreshing the table you would see the same values as the other window.
However after the above retries iceberg would let the second commit pass. Now, row with id 1005 has 30, but row with id 1006 is 300, showing a concurrent execution, that is INCORRECT per serializable isolation. The values should be either:
Case 1: MERGE INTO with id 1005 goes through first, followed by the other id 1006
1005 2 30 => sum(10,20)
1006 1 330 => sum(100,200,30)
Here Table woudl look like:
SELECT * FROM demo.nyc.tbl_A;
1001 1 10
1002 1 20
1003 2 100
1004 2 200
1005 2 30
1006 1 330
OR
Case 2: MERGE INTO with id 1006 goes through first, followed by the other id 1005
1006 1 300 => sum(100,200)
1005 2 330 => sum(10,20,300)
Here Table woudl look like:
SELECT * FROM demo.nyc.tbl_A;
1001 1 10
1002 1 20
1003 2 100
1004 2 200
1005 2 330
1006 1 300
While running this test repeatedly, if you were to re-run the commands froms cratch, often times you can hit the following exception:
Issue 1:
org.apache.iceberg.exceptions.ServiceFailureException: Server error: NotFoundException: Location does not exist: s3://warehouse/nyc/tbl_A/metadata/****.metadata.json
This happens when you kill the docker accidentally and bring it up and run any command on an existing table. The data from the minio cluster would have been deleted but there is some cached data related to the docker container iceberg-rest
which is the catalog. It is not possible to test the same table commands until this issue is fixed. To fix follow these steps:
1. docker compose down
2. rm -rf warehouse/ notebooks/
3. docker compose up
Retested this with the latest version of Iceberg and Nessie catalog, couldn't reproduce. Btw, I don't think the catalog that you use in this demo is recommended for production, on its docker hub page they talk about using this for experimentation, so maybe the serializable isolation feature isn't implemented in it at all. On the github page of this catalog there are barely any commits, too.