Skip to content

Instantly share code, notes, and snippets.

@bhasudha
Last active March 4, 2024 00:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bhasudha/f8e3abeef4b33a023d87a5bd98476ad7 to your computer and use it in GitHub Desktop.
Save bhasudha/f8e3abeef4b33a023d87a5bd98476ad7 to your computer and use it in GitHub Desktop.
Serializable Isolation on Postgres vs Iceberg

This gist tests the serializable isolation in Iceberg and compares with same in Postgres.

Table of content:

Test Serializable Isolation in Postgres

Setup

  1. Locally install postgres and use the sql client - psql, for this test. Sample instructions to setup postgres locally - https://www.sqlshack.com/setting-up-a-postgresql-database-on-mac/
  2. Use two iTerm2 panes (either split vertically or Horizontally for testing concurrent merges).

Test

The following section implements the example described in the postgresql docs - https://www.postgresql.org/docs/current/transaction-iso.html#XACT-SERIALIZABLE

First iTerm2 window

Start using psql -U postgres -h localhost

-- The default isolation level is read committed. So change to serializable before testing this
SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE;

DROP TABLE tbl_A;

CREATE TABLE tbl_A
(
  id int,
  class int,
  value int
);

INSERT INTO tbl_A VALUES
(1001, 1, 10), 
(1002, 1, 20),
(1003, 2, 100),
(1004, 2, 200);

-- Verify session isolation level using
SHOW TRANSACTION ISOLATION LEVEL;

Second iTerm2 window

Start using psql -U postgres -h localhost

-- The default isolation level is read committed. So change to serializable before testing this
SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE;

-- Verify session isolation level using
SHOW TRANSACTION ISOLATION LEVEL;

Test concurrent transactions

1. Paste on first iTerm2 (DO NOT HIT ENTER YET)
MERGE INTO tbl_A t USING (
    SELECT 1005 as id, 2 as class,  SUM(value) as value FROM tbl_A WHERE class = 1
  ) as u
  ON t.id = u.id
  WHEN MATCHED THEN UPDATE SET value = u.value
  WHEN NOT MATCHED THEN INSERT (id, class, value) VALUES (u.id, u.class, u.value);
2. Paste on second iTerm2 (DO NOT HIT ENTER YET)
MERGE INTO tbl_A t USING (
    SELECT 1006 as id, 1 as class,  SUM(value) as value FROM v WHERE class = 2
  ) as u
  ON t.id = u.id
  WHEN MATCHED THEN UPDATE SET value = u.value
  WHEN NOT MATCHED THEN INSERT (id, class, value) VALUES (u.id, u.class, u.value);
3. Enable Toggle Broadcasting Inputon both iTerm2 windows

By Right click and choosing that option. This allows to run commands simultaneously across multiple iTerm2 windows

4. Hit ENTER in either of the windows.

The previously pasted commands shoud be executed in both the windows.

Results

Possibility 1: If MERGE INTO in first window succeeded:
select * from tbl_A;
|   id  | class | value |
|-------|-------|-------|
|  1001 |     1 |    10 |
|  1002 |     1 |    20 |
|  1003 |     2 |   100 |
|  1004 |     2 |   200 |
|  1005 |     2 |    30 |

Second Window would see as expected:

ERROR:  could not serialize access due to read/write dependencies among transactions
DETAIL:  Reason code: Canceled on identification as a pivot, during write.
HINT:  The transaction might succeed if retried. 

If retried, the MERGE INTO would result in.

| 1006 | 1 | 330 |

Possibility 2: If MERGE INTO in second window succeeded:
select * from tbl_A;
|   id  | class | value |
|-------|-------|-------|
|  1001 |     1 |    10 |
|  1002 |     1 |    20 |
|  1003 |     2 |   100 |
|  1004 |     2 |   200 |
|  1006 |     1 |   300 |

First Window would see as expected:

ERROR:  could not serialize access due to read/write dependencies among transactions
DETAIL:  Reason code: Canceled on identification as a pivot, during write.
HINT:  The transaction might succeed if retried.

If retried, the MERGE INTO would result in.

| 1005 | 2 | 330 |

Conclusion: Serializable isolation in Postgres works as documented

Test Serializable Isolation in Iceberg

Setup

  1. We will use the docker setup here - https://iceberg.apache.org/spark-quickstart/ to set our environment and use SparkSQL to run the tests. As of this gist creation, the docker setup uses iceberg version 1.3.1 and spark 3.4.
  2. We will use two iTerm2 panes (either split vertically or Horizontally for testing concurrent merges). Start both of them using the command
docker exec -it spark-iceberg spark-sql
  1. The iceberg data can be accessed via the minio cluster started in http://127.0.0.1:9001/login using the creds admin:password if needed.

Test

We will use the similar table and schema we used for testing Postgres. Iceberg docs suggest serializable isolation as default - https://iceberg.apache.org/docs/1.3.1/configuration/.

We will err on the safe side to set the isolation explicitly. There was no docs found on the iceberg side on how to set the isolation level for SparkSQL. But Tabular.io has a cheat sheet for iceberg spark - https://tabular.io/downloads/tabular_iceberg-spark_cheat-sheet.pdf that shows how to set the isolation-level. We will use that.

First iTerm2 window

Start using docker exec -it spark-iceberg spark-sql

DROP TABLE demo.nyc.tbl_A;
CREATE TABLE demo.nyc.tbl_A
(
  id int,
  class int,
  value int
);

INSERT INTO demo.nyc.tbl_A VALUES 
  (1001, 1, 10), 
  (1002, 1, 20),
  (1003, 2, 100),
  (1004, 2, 200);

-- Refer https://tabular.io/downloads/tabular_iceberg-spark_cheat-sheet.pdf for changing isolation level of merge writes to serializable level. This happens at table level.
ALTER TABLE demo.nyc.tbl_A SET TBLPROPERTIES('write.merge.isolation-level' 'serializable');

-- Verify table level isolation level using. Should show serializable.
SHOW TBLPROPERTIES demo.nyc.tbl_A;

Second iTerm2 window

Start using docker exec -it spark-iceberg spark-sql

-- Refresh table
REFRESH TABLE demo.nyc.tbl_A;

-- Verify table level isolation level using. Should show serializable.
SHOW TBLPROPERTIES demo.nyc.tbl_A;

Test concurrent transactions

1. Paste on first iTerm2 (DO NOT HIT ENTER YET)
MERGE INTO demo.nyc.tbl_A t USING (
  SELECT 1005 as id, 2 as class, SUM(value)  as value FROM demo.nyc.tbl_A WHERE class = 1
) u 
ON t.id = u.id
WHEN MATCHED THEN UPDATE SET t.value = u.value
WHEN NOT MATCHED THEN INSERT (id, class, value) VALUES (u.id, u.class, u.value);
2. Paste on second iTerm2 (DO NOT HIT ENTER YET)
MERGE INTO demo.nyc.tbl_A t USING (
  SELECT 1006 as id, 1 as class, SUM(value)  as value FROM demo.nyc.tbl_A WHERE class = 2
) u 
ON t.id = u.id
WHEN MATCHED THEN UPDATE SET t.value = u.value
WHEN NOT MATCHED THEN INSERT (id, class, value) VALUES (u.id, u.class, u.value);
3. Enable Toggle Broadcasting Inputon both iTerm2 windows

By Right click and choosing that option. This allows to run commands simultaneously across multiple iTerm2 windows

4. Hit ENTER in either of the windows.

The previously pasted commands shoud be executed in both the windows.

Results

Run in either iTerm2 window

Either of these iTerm2 windows can succeed without issues and on refresh you should see.

REFRESH TABLE demo.nyc.tbl_A;
SELECT * FROM demo.nyc.tbl_A;

1001	1	10
1002	1	20
1003	2	100
1004	2	200
1005	2	30
1006	1	300

The other iTerm2 window would show an error stack like:

23/10/17 21:13:38 WARN Tasks: Retrying task after failure: Commit failed: Requirement failed: branch main has changed: expected id 7472540111209715110 != 1241237555039253666
org.apache.iceberg.exceptions.CommitFailedException: Commit failed: Requirement failed: branch main has changed: expected id 7472540111209715110 != 1241237555039253666
	at org.apache.iceberg.rest.ErrorHandlers$CommitErrorHandler.accept(ErrorHandlers.java:80)
	at org.apache.iceberg.rest.ErrorHandlers$CommitErrorHandler.accept(ErrorHandlers.java:71)
...
	at org.apache.iceberg.rest.RESTClient.post(RESTClient.java:112)
	at org.apache.iceberg.rest.RESTTableOperations.commit(RESTTableOperations.java:144)
	at org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:390)
	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
	at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:364)
	at org.apache.iceberg.BaseOverwriteFiles.commit(BaseOverwriteFiles.java:31)
	at org.apache.iceberg.spark.source.SparkWrite.commitOperation(SparkWrite.java:219)
	at org.apache.iceberg.spark.source.SparkWrite.access$1300(SparkWrite.java:84)
	at org.apache.iceberg.spark.source.SparkWrite$CopyOnWriteOperation.commitWithSerializableIsolation(SparkWrite.java:443)

This is basically Iceberg retrying the failed commit. And eventually it will succeed. On Refreshing the table you would see the same values as the other window.

However after the above retries iceberg would let the second commit pass. Now, row with id 1005 has 30, but row with id 1006 is 300, showing a concurrent execution, that is INCORRECT per serializable isolation. The values should be either:

Case 1:  MERGE INTO with id 1005 goes through first, followed by the other id 1006 
1005  2 30 => sum(10,20)
1006 1 330 => sum(100,200,30)
Here Table woudl look like:
SELECT * FROM demo.nyc.tbl_A;

1001	1	10
1002	1	20
1003	2	100
1004	2	200
1005	2	30
1006	1	330

OR

Case 2: MERGE INTO with id 1006 goes through first, followed by the other id 1005 
1006  1 300 => sum(100,200)
1005  2 330 => sum(10,20,300)
Here Table woudl look like:
SELECT * FROM demo.nyc.tbl_A;

1001	1	10
1002	1	20
1003	2	100
1004	2	200
1005	2	330
1006	1	300

Troubleshooting Iceberg docker setup

While running this test repeatedly, if you were to re-run the commands froms cratch, often times you can hit the following exception:

Issue 1:

org.apache.iceberg.exceptions.ServiceFailureException: Server error: NotFoundException: Location does not exist: s3://warehouse/nyc/tbl_A/metadata/****.metadata.json

This happens when you kill the docker accidentally and bring it up and run any command on an existing table. The data from the minio cluster would have been deleted but there is some cached data related to the docker container iceberg-rest which is the catalog. It is not possible to test the same table commands until this issue is fixed. To fix follow these steps:

1. docker compose down
2. rm -rf warehouse/ notebooks/ 
3. docker compose up

Conclusion: Serializable isolation in Iceberg doesnt work as documented

@paulpaul1076
Copy link

Retested this with the latest version of Iceberg and Nessie catalog, couldn't reproduce. Btw, I don't think the catalog that you use in this demo is recommended for production, on its docker hub page they talk about using this for experimentation, so maybe the serializable isolation feature isn't implemented in it at all. On the github page of this catalog there are barely any commits, too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment