-
-
Save iokasimov/f67c1a117f806d63522c85f66742aa1f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE LambdaCase, PackageImports, RankNTypes, OverloadedStrings, TypeApplications #-} | |
import "async" Control.Concurrent.Async (async, race, wait) | |
import "base" Control.Applicative (Applicative (pure)) | |
import "base" Control.Concurrent (threadDelay) | |
import "base" Control.Monad (Monad, forever, void) | |
import "base" Data.Function (($), (.), flip) | |
import "base" Data.Traversable (Traversable (traverse)) | |
import "binary" Data.Binary (Binary (put, get), getWord8, putWord8, decodeOrFail, encode) | |
import "bytestring" Data.ByteString.Lazy (ByteString, fromStrict) | |
import "hedgehog" Hedgehog (Group (Group), Property, checkParallel, property, success, failure, withTests) | |
import "hedgehog" Hedgehog.Internal.Property (TestLimit (TestLimit)) | |
import "network-simple" Network.Simple.TCP (HostPreference (Host), Socket, connect, recv, sendLazy, serve) | |
import "observable" Control.Observable (Observable, obs, dispatch, notify, subscribe, follow, watch, (.:~.), (.:~*), (*:~*)) | |
import "transformers" Control.Monad.Trans.Cont (ContT (..)) | |
import "transformers" Control.Monad.Trans.Class (lift) | |
-------------------------------------------------------------------------------- | |
-- Nodes' messages | |
data Command = Do | Wait | |
instance Binary Command where | |
put Do = putWord8 . fromIntegral $ 1 | |
put Wait = putWord8 . fromIntegral $ 2 | |
get = getWord8 >>= \case | |
1 -> pure Do | |
2 -> pure Wait | |
data Query = More | Done | |
instance Binary Query where | |
put More = putWord8 . fromIntegral $ 1 | |
put Done = putWord8 . fromIntegral $ 2 | |
get = getWord8 >>= \case | |
1 -> pure More | |
2 -> pure Done | |
-------------------------------------------------------------------------------- | |
-- Socket utils | |
receive :: forall msg . Binary msg => Socket -> IO msg | |
receive socket = recv socket 1024 >>= maybe (receive socket) decoding where | |
decoding bytes = case decodeOrFail . fromStrict $ bytes of | |
Left err -> print err *> receive socket | |
Right (_, _, x) -> pure x | |
send :: forall msg . Binary msg => Socket -> msg -> IO () | |
send socket = sendLazy socket . encode | |
-------------------------------------------------------------------------------- | |
-- Nodes observables and observers | |
administrator :: Observable IO Socket () | |
administrator = dispatch . (<$>) fst . ContT $ serve (Host "127.0.0.1") "9000" | |
administer :: Socket -> IO () | |
administer socket = receive @Query socket >>= interpret where | |
interpret :: Query -> IO () | |
interpret More = print "Master: you got the task." *> send socket Do | |
interpret Done = print "Master: well done!" *> send socket Wait | |
worker :: Observable IO Socket () | |
worker = dispatch . (<$>) fst . ContT $ connect "127.0.0.1" "9000" | |
work :: Socket -> IO () | |
work socket = send socket More *> receive @Command socket >>= interpret where | |
interpret :: Command -> IO () | |
interpret Do = print "Worker: copy that..." *> threadDelay 1000000 *> send socket Done | |
interpret Wait = print "Worker: okay, I will wait..." *> threadDelay 1000000 | |
-------------------------------------------------------------------------------- | |
-- Property testing | |
data Beaten a = Alive a | Dead | |
-- | Listen first event from action untill time limit is up | |
alive :: Int -> Observable IO a r -> (a -> IO r) -> IO (Beaten r) | |
alive limit observable handle = race (threadDelay limit) (notify observable handle) | |
>>= either (const . pure $ Dead) (pure . Alive) | |
-- | Listen every event one by one from action untill limit time between events is up | |
heartbeat :: Int -> Observable IO a r -> (a -> IO r) -> IO (Beaten r) | |
heartbeat limit observable handle = race (threadDelay limit) (notify observable handle) | |
>>= either (const . pure $ Dead) ((*>) (heartbeat limit observable handle) . pure . Alive) | |
data Check = Check | |
instance Binary Check where | |
put Check = putWord8 . fromIntegral $ 1 | |
get = getWord8 >>= \case { 1 -> pure Check } | |
administrator_starts_and_reply :: Int -> Property | |
administrator_starts_and_reply seconds = withTests (TestLimit 1) . property $ do | |
lift . async $ administrator .:~. (const $ pure ()) | |
lift (alive seconds worker $ const $ pure ()) >>= | |
\case { Alive _ -> success; Dead -> failure } | |
worker_receives_administrator_message :: Int -> Property | |
worker_receives_administrator_message seconds = withTests (TestLimit 1) . property $ do | |
lift . async $ administrator .:~* (flip send Check) | |
lift $ threadDelay seconds -- Give some time to administrator to up | |
lift (alive seconds (obs . notify worker $ void . receive @Check) pure) >>= | |
\case { Alive _ -> success; Dead -> failure } | |
-------------------------------------------------------------------------------- | |
main = do | |
checkParallel $ Group "Property test" $ | |
("Administrator starts and reply in 1 second", administrator_starts_and_reply 1000000) : | |
("Worker receive administrator's message in 1 second", worker_receives_administrator_message 1000000) : [] | |
print "For each connected worker we run message listening loop" | |
async $ administrator *:~* administer -- watch administrator administer | |
print "Let's give a second to up administrator" *> threadDelay 1000000 | |
print "Worker starts working as soon as it connects to administrator" | |
async $ worker .:~* work -- chase worker work | |
forever $ threadDelay 1000000 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment