Skip to content

Instantly share code, notes, and snippets.

@unclechu
Created December 28, 2021 14:12
Show Gist options
  • Save unclechu/03b15a7c822f3e3560c8cb8ed3c2f508 to your computer and use it in GitHub Desktop.
Save unclechu/03b15a7c822f3e3560c8cb8ed3c2f508 to your computer and use it in GitHub Desktop.
Haskell script that reproduces race condition in PostgreSQL database
#! /usr/bin/env nix-shell
#! nix-shell --pure -i runhaskell
#! nix-shell -E "with import (fetchTarball{url=\"https://github.com/NixOS/nixpkgs/archive/d887ac7aee92e8fc54dde9060d60d927afae9d69.tar.gz\";sha256=\"1bpgfv45b1yvrgpwdgc4fm4a6sav198yd41bsrvlmm3jn2wi6qx5\";}) {}; let hs=haskellPackages.ghcWithPackages(p:[p.postgresql-simple p.async p.turtle p.text p.process p.time]); in mkShell{buildInputs=[postgresql hs cacert];}"
{-
Haskell script that reproduces race condition in PostgreSQL database.
This is not considered a bug but rather an expected behavior,
just something you should be aware of.
This test demonstrates that if you have two transactions and data reads in both
for the same entity, and you are going to use that read data for a following
update you get a race condition thus you get some data lost or database state
inconsistency. This implies that you read data into the client state (but still
being inside a transaction for both read and update). Updates are blocking but
selects are not.
This test should demonstrate the importance of ensuring that the operation is
atomic on the database level, and not on client level.
So you should write a single SQL query in the way it is atomic.
You should also avoid having any local client-side state that is not meant for
read-only operations. Any update operation that uses data from client-side is
a candidate for a race condition.
To skip all the boilerplate you need to look at only these functions:
* runTransactionA
* runTransactionB
* runStateTest
P.S. The Nix pin above is from release-21.11 branch.
-}
{-# LANGUAGE UnicodeSyntax #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE NoMonomorphismRestriction #-}
import Prelude hiding (FilePath, log)
import Data.Text (unpack)
import Data.Bifunctor (first)
import Data.Function (fix)
import Data.Time.Clock (getCurrentTime, diffUTCTime)
import Control.Concurrent.Async hiding (wait)
import Control.Concurrent.MVar
import Control.Exception (SomeException, catch)
import Database.PostgreSQL.Simple
import Database.PostgreSQL.Simple.SqlQQ
import System.Process
import Turtle hiding (proc, procs, inproc, echo, err, printf, eprintf, stdout, stderr, view)
import qualified Turtle
main ∷ IO ()
main = do
currentDir ← pwd
pgdata ←
let
key = "PGDATA"
defaultValue = currentDir <> ".pgdata"
in
need key >>=
maybe
(defaultValue <$ export key (format fp defaultValue))
(pure . fromText)
export "PGHOST" (format fp pgdata)
pgdatabase ←
let
key = "PGDATABASE"
defaultValue = "test-race-condition"
in
need key >>= maybe (defaultValue <$ export key defaultValue) pure
loggerBus ∷ LoggerBus ← newEmptyMVar
let
runWithDbServer m =
withAsync (runLogger loggerBus) $ \loggerThread →
withAsync (runPostgreSqlServer loggerBus pgdata) $ \dbThread →
withAsync (sleep 1.0 >> m) $ \mThread → do
waitEitherCancel dbThread mThread
sendToLoggerBus loggerBus Cancel
wait loggerThread
runTheMainTest = do
addInitialData loggerBus pgdatabase
runTransactions
runStateTest loggerBus
runTransactions = void . runConcurrently $ (,)
<$> Concurrently (runTransactionA loggerBus)
<*> Concurrently (runTransactionB loggerBus)
runPsql cmd args = do
echoToLoggerBus loggerBus "Entering psql REPL..."
createProcess (proc (unpack cmd) (unpack <$> args)) >>= \case
(_, _, _, procHandle) → void $ waitForProcess procHandle
arguments >>= \case
[] → runWithDbServer runTheMainTest
x@"psql" : xs → runWithDbServer $ runPsql x xs
_ → fail "Incorrect arguments"
runPostgreSqlServer ∷ LoggerBus → FilePath → IO ()
runPostgreSqlServer loggerBus pgdata = do
echo' "Running PostgreSQL server..."
pgdataExists ← testdir pgdata
unless pgdataExists $ do
echo' $ format ("Creating database directory "%w%"...") (format fp pgdata)
stdout' $ inproc' "pg_ctl" ["initdb", "-D", format fp pgdata] empty
echo' "Running the PostgreSQL daemon..."
stdout' $ inproc' "postgres" ["-F", "-h", mempty, "-k", format fp pgdata] empty
where
echo' = echoToLoggerBus loggerBus
inproc' = loggedInproc loggerBus
stdout' = sh . (echo' . lineToText =<<)
addInitialData ∷ LoggerBus → Text → IO ()
addInitialData loggerBus pgdatabase = do
echo' "Create initial data (if not exists)..."
echo' $ format ("Checking if "%w%" database already exists...") pgdatabase
alreadyHasDb ←
let
reducer acc x = acc || case cut "|" (lineToText x) of
[] → False
x : _ → x == pgdatabase
in
empty
& inproc' "psql" ["-ltqA"]
& grep (begins (chars <* "|"))
& reduce (Fold reducer False id)
unless alreadyHasDb $ do
echo' $ format ("Creating "%w%" database...") pgdatabase
stdout' $ inproc' "createdb" ["--", pgdatabase] empty
echo' "Connecting to the database for creating initial data..."
conn ← connectPostgreSQL ""
echo' "(Re-)creating the test table and counters..."
withTransaction conn $
void $ execute_ conn [sql|
drop table if exists test_counter;
create table test_counter
( counter_id integer primary key
, counter integer not null default 0
);
insert into test_counter (counter_id) values (1);
|]
where
echo' = echoToLoggerBus loggerBus
inproc' = loggedInproc loggerBus
stdout' = sh . (echo' . lineToText =<<)
runTransactionA ∷ LoggerBus → IO ()
runTransactionA loggerBus = do
echo' "Running transaction A..."
echo' "Transaction A: Connecting to the database..."
conn ← connectPostgreSQL ""
echo' "Transaction A: Making a transaction..."
withTransaction conn $ do
[Only (x ∷ Int)] ←
query_ conn "select counter from test_counter where counter_id = 1"
sleep 1.0
void $ execute conn [sql|
update test_counter set counter = ? where counter_id = 1
|] (Only (succ x))
where
echo' = echoToLoggerBus loggerBus
runTransactionB ∷ LoggerBus → IO ()
runTransactionB loggerBus = do
echo' "Running transaction B..."
echo' "Transaction B: Connecting to the database..."
conn ← connectPostgreSQL ""
echo' "Transaction B: Making a transaction..."
withTransaction conn $ do
sleep 0.5
[Only (x ∷ Int)] ←
query_ conn "select counter from test_counter where counter_id = 1"
sleep 2.0
void $ execute conn [sql|
update test_counter set counter = ? where counter_id = 1
|] (Only (succ x))
where
echo' = echoToLoggerBus loggerBus
runStateTest ∷ LoggerBus → IO ()
runStateTest loggerBus = do
echo' "Running state test..."
echo' "Connecting to the database..."
conn ← connectPostgreSQL ""
echo' "Selecting the records from the database…"
result ← query_ conn "select counter_id, counter from test_counter"
-- The result is 1, there *IS* race condition
echo' $ format
( "Result is "%w%" "
% "(counter id#1 should have value 2 if everything is fine "
% "or 1 in case of race condition)"
)
(first (("id#" <>) . show) <$> (result ∷ [(Int, Int)]))
where
echo' = echoToLoggerBus loggerBus
-- * Helpers
echoToLoggerBus ∷ MonadIO m ⇒ LoggerBus → Text → m ()
echoToLoggerBus loggerBus = sendToLoggerBus loggerBus . Info
errToLoggerBus ∷ MonadIO m ⇒ LoggerBus → Text → m ()
errToLoggerBus loggerBus = sendToLoggerBus loggerBus . Error
sendToLoggerBus ∷ MonadIO m ⇒ LoggerBus → LoggerMsg → m ()
sendToLoggerBus loggerBus = liftIO . void . async . putMVar loggerBus
loggedInproc ∷ LoggerBus → Text → [Text] → Shell Line → Shell Line
loggedInproc loggerBus cmd args input =
inprocWithErr cmd args input >>=
either ((>> mzero) . errToLoggerBus loggerBus . lineToText) pure
type LoggerBus = MVar LoggerMsg
data LoggerMsg
= Info Text
| Error Text
| Cancel
| Done
runLogger ∷ MonadIO m ⇒ LoggerBus → m ()
runLogger loggerBus =
liftIO . (False &) . fix $ \again isStopping →
let
withTimeout = fmap (either id id) . race (sleep timeout >> timeoutLog)
doneLog = resolveLogWrite $ Left "Logger reached its end, exiting..."
timeoutLog = resolveLogWrite $ Left "Logger stopping timeout, exiting..."
f = (if isStopping then withTimeout else id) $ do
takeMVar loggerBus >>= \case
Info x → resolveLogWrite (Right x) >> again isStopping
Error x → resolveLogWrite (Left x) >> again isStopping
Done → doneLog
Cancel → do
errToLoggerBus loggerBus
"Finalizing the logger after receiving Cancel command..."
sendToLoggerBus loggerBus Done
again True
in
f `catch` \(e ∷ SomeException) →
if isStopping
then do
resolveLogWrite . Left $
format
("Secondary exception "%w%" for logger, exiting immediately...")
e
else do
errToLoggerBus loggerBus $
format
("Finalizing the logger after catching this exception: "%w%"...")
e
sendToLoggerBus loggerBus Done
again True
where
resolveLogWrite =
either
(Turtle.eprintf ("[ERROR] "%s%"\n"))
(Turtle.printf ("[INFO] "%s%"\n"))
timeout = 5.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment