Skip to content

Instantly share code, notes, and snippets.

@iokasimov
Created December 10, 2018 19:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save iokasimov/f67c1a117f806d63522c85f66742aa1f to your computer and use it in GitHub Desktop.
Save iokasimov/f67c1a117f806d63522c85f66742aa1f to your computer and use it in GitHub Desktop.
{-# LANGUAGE LambdaCase, PackageImports, RankNTypes, OverloadedStrings, TypeApplications #-}
import "async" Control.Concurrent.Async (async, race, wait)
import "base" Control.Applicative (Applicative (pure))
import "base" Control.Concurrent (threadDelay)
import "base" Control.Monad (Monad, forever, void)
import "base" Data.Function (($), (.), flip)
import "base" Data.Traversable (Traversable (traverse))
import "binary" Data.Binary (Binary (put, get), getWord8, putWord8, decodeOrFail, encode)
import "bytestring" Data.ByteString.Lazy (ByteString, fromStrict)
import "hedgehog" Hedgehog (Group (Group), Property, checkParallel, property, success, failure, withTests)
import "hedgehog" Hedgehog.Internal.Property (TestLimit (TestLimit))
import "network-simple" Network.Simple.TCP (HostPreference (Host), Socket, connect, recv, sendLazy, serve)
import "observable" Control.Observable (Observable, obs, dispatch, notify, subscribe, follow, watch, (.:~.), (.:~*), (*:~*))
import "transformers" Control.Monad.Trans.Cont (ContT (..))
import "transformers" Control.Monad.Trans.Class (lift)
--------------------------------------------------------------------------------
-- Nodes' messages
data Command = Do | Wait
instance Binary Command where
put Do = putWord8 . fromIntegral $ 1
put Wait = putWord8 . fromIntegral $ 2
get = getWord8 >>= \case
1 -> pure Do
2 -> pure Wait
data Query = More | Done
instance Binary Query where
put More = putWord8 . fromIntegral $ 1
put Done = putWord8 . fromIntegral $ 2
get = getWord8 >>= \case
1 -> pure More
2 -> pure Done
--------------------------------------------------------------------------------
-- Socket utils
receive :: forall msg . Binary msg => Socket -> IO msg
receive socket = recv socket 1024 >>= maybe (receive socket) decoding where
decoding bytes = case decodeOrFail . fromStrict $ bytes of
Left err -> print err *> receive socket
Right (_, _, x) -> pure x
send :: forall msg . Binary msg => Socket -> msg -> IO ()
send socket = sendLazy socket . encode
--------------------------------------------------------------------------------
-- Nodes observables and observers
administrator :: Observable IO Socket ()
administrator = dispatch . (<$>) fst . ContT $ serve (Host "127.0.0.1") "9000"
administer :: Socket -> IO ()
administer socket = receive @Query socket >>= interpret where
interpret :: Query -> IO ()
interpret More = print "Master: you got the task." *> send socket Do
interpret Done = print "Master: well done!" *> send socket Wait
worker :: Observable IO Socket ()
worker = dispatch . (<$>) fst . ContT $ connect "127.0.0.1" "9000"
work :: Socket -> IO ()
work socket = send socket More *> receive @Command socket >>= interpret where
interpret :: Command -> IO ()
interpret Do = print "Worker: copy that..." *> threadDelay 1000000 *> send socket Done
interpret Wait = print "Worker: okay, I will wait..." *> threadDelay 1000000
--------------------------------------------------------------------------------
-- Property testing
data Beaten a = Alive a | Dead
-- | Listen first event from action untill time limit is up
alive :: Int -> Observable IO a r -> (a -> IO r) -> IO (Beaten r)
alive limit observable handle = race (threadDelay limit) (notify observable handle)
>>= either (const . pure $ Dead) (pure . Alive)
-- | Listen every event one by one from action untill limit time between events is up
heartbeat :: Int -> Observable IO a r -> (a -> IO r) -> IO (Beaten r)
heartbeat limit observable handle = race (threadDelay limit) (notify observable handle)
>>= either (const . pure $ Dead) ((*>) (heartbeat limit observable handle) . pure . Alive)
data Check = Check
instance Binary Check where
put Check = putWord8 . fromIntegral $ 1
get = getWord8 >>= \case { 1 -> pure Check }
administrator_starts_and_reply :: Int -> Property
administrator_starts_and_reply seconds = withTests (TestLimit 1) . property $ do
lift . async $ administrator .:~. (const $ pure ())
lift (alive seconds worker $ const $ pure ()) >>=
\case { Alive _ -> success; Dead -> failure }
worker_receives_administrator_message :: Int -> Property
worker_receives_administrator_message seconds = withTests (TestLimit 1) . property $ do
lift . async $ administrator .:~* (flip send Check)
lift $ threadDelay seconds -- Give some time to administrator to up
lift (alive seconds (obs . notify worker $ void . receive @Check) pure) >>=
\case { Alive _ -> success; Dead -> failure }
--------------------------------------------------------------------------------
main = do
checkParallel $ Group "Property test" $
("Administrator starts and reply in 1 second", administrator_starts_and_reply 1000000) :
("Worker receive administrator's message in 1 second", worker_receives_administrator_message 1000000) : []
print "For each connected worker we run message listening loop"
async $ administrator *:~* administer -- watch administrator administer
print "Let's give a second to up administrator" *> threadDelay 1000000
print "Worker starts working as soon as it connects to administrator"
async $ worker .:~* work -- chase worker work
forever $ threadDelay 1000000
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment