View echo.hs
import Network.Socket hiding (send, sendTo, recv, recvFrom) | |
import Network.Socket.ByteString | |
import Control.Concurrent | |
port = 4242 | |
incomingBufferSize = 4096 | |
main = do | |
-- Listen | |
sock <- socket AF_INET Stream 0 |
View echoDelay.hs
import Network.Socket hiding (send, sendTo, recv, recvFrom) | |
import Network.Socket.ByteString | |
import Control.Concurrent | |
port = 4242 | |
incomingBufferSize = 4096 | |
delayMicroseconds = 500000 | |
main = do | |
-- Listen |
View sudoku.hs
import Control.Lens | |
import Control.Monad.Loops | |
import Control.Monad.State.Strict | |
import Data.List | |
import Data.List.Split | |
import Data.Maybe | |
data SudokuValue = S1 | S2 | S3 | S4 | S5 | S6 | S7 | S8 | S9 deriving (Eq, Enum) | |
instance Show SudokuValue where | |
show s = show $ fromJust (s `elemIndex` [S1 ..]) + 1 |
View normalise_environment.py
def normalise_environment(key_values): | |
''' Converts denormalised dict of (string -> string) pairs, where the first string | |
is treated as a path into a nested list/dictionary structure | |
{ | |
"FOO__1__BAR": "setting-1", | |
"FOO__1__BAZ": "setting-2", | |
"FOO__2__FOO": "setting-3", | |
"FOO__2__BAR": "setting-4", | |
"FIZZ": "setting-5", |
View aws_sig_v4_headers.py
import datetime | |
import hashlib | |
import hmac | |
import urllib.parse | |
def aws_sig_v4_headers(access_key_id, secret_access_key, pre_auth_headers, | |
service, region, host, method, path, query, payload): | |
algorithm = 'AWS4-HMAC-SHA256' |
View aws_sig_v4_headers_aiohttp_s3_put.py
import asyncio | |
import os | |
import aiohttp | |
from aws_sig_v4_headers import aws_sig_v4_headers | |
async def main(): | |
method = 'PUT' | |
host = 's3-eu-west-1.amazonaws.com' |
View asyncio_read_write_lock.py
import asyncio | |
import collections | |
import contextlib | |
class Read(asyncio.Future): | |
@staticmethod | |
def is_compatible(holds): | |
return not holds[Write] | |
class Write(asyncio.Future): |
View path_lock_global_exclusive.py
import asyncio | |
import contextlib | |
class PathLock(): | |
def __init__(self): | |
self.lock = asyncio.Lock() | |
@contextlib.asynccontextmanager | |
async def __call__(self, read, write): |
View path_lock_global_read_write.py
import asyncio | |
import contextlib | |
from fifolock import FifoLock | |
class Read(asyncio.Future): | |
@staticmethod | |
def is_compatible(holds): | |
return not holds[Write] |
View path_lock_local_read_write.py
import asyncio | |
import contextlib | |
import weakref | |
from fifolock import FifoLock | |
class Read(asyncio.Future): | |
@staticmethod | |
def is_compatible(holds): | |
return not holds[Write] |
OlderNewer