Skip to content

Instantly share code, notes, and snippets.

@aavogt
Created March 15, 2023 20:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aavogt/d0bf8a9042c5e796857d96ad9eb23e25 to your computer and use it in GitHub Desktop.
Save aavogt/d0bf8a9042c5e796857d96ad9eb23e25 to your computer and use it in GitHub Desktop.
streaming-osm with binary and zstd cache
{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# OPTIONS_GHC -Wno-partial-type-signatures #-}
module OSM ( main ) where
import Codec.Compression.Zstd
import Control.Lens hiding ((:>))
import Control.Lens.Unsound (lensProduct)
import Control.Monad
import Control.Monad.Trans.Resource (runResourceT, MonadResource (liftResourceT))
import Data.Binary
import Data.BoundingBox
import Data.ByteString.Builder (Builder)
import Data.Maybe
import GHC.Generics (Generic)
import IPPrint
import Linear
import qualified Codec.Compression.Zstd.Streaming as Zstd
import qualified Data.ByteString as B
import qualified Streaming.Binary as SB
import qualified Streaming.Internal as S
import qualified Streaming.Prelude as S
import Streaming
import Streaming.ByteString as Q hiding (concatBuilders)
import Streaming.Internal
import Streaming.Osm as Osm
import Streaming.Osm.Types
import System.Directory (doesFileExist, getFileSize)
makeLenses ''Node
makeLenses ''Block
main :: IO ()
main = do
-- https://download.geofabrik.de/north-america/canada/
let nodes = blobs "ontario-latest.osm.pbf"
& blocks
& Osm.nodes
& S.filter (inBB1 bb)
& S.mapM (\x -> do
sz <- liftIO $ getFileSize "nodes.zstd"
liftIO $ pprint ("pre", sz, x) -- sz is always 0!
return x)
& zstdChunkCache "nodes.zstd"
runResourceT $ S.mapM_ (liftIO . pprint) nodes
return ()
-- define lat0,lon0,lat1 to say which part of the pbf to keep
bb = sizePos 0 # (0, V2 lat0 lon0) & inflate (abs (lat0 - lat1))
inBB1 :: Box V2 Double -> Node -> Bool
inBB1 bb n = V2 (n^.lat) (n^.lng) `isInside` bb
inBB :: Box V2 Double -> Block -> Bool
inBB bb b = fromMaybe False do
firstNode <- b^?OSM.nodes._head.lensProduct lat lng.to (uncurry V2)
Just (firstNode `isInside` bb)
instance Binary Node
instance Binary Info
deriving instance Generic Node
deriving instance Generic Info
-- TODO invalidate cache by taking `x :: ExpQ` instead of `eval`
zstdChunkCache :: (Binary a, MonadIO m, MonadResource m) => FilePath -> Stream (Of a) m () -> Stream (Of a) m _
zstdChunkCache p x = effect $ do
exists <- liftIO $ doesFileExist p
unless exists $ Q.writeFile p $ encodeChunksZip 10 x
return $ decodeChunksZip $ Q.readFile p
encodeChunksZip :: (Binary a, MonadIO m) => Int -> Stream (Of a) m () -> ByteStream m ()
encodeChunksZip n = Q.fromChunks . compressStream 3 . S.concats . S.mapsM g . chunksOf n
where g = fmap (Q.toChunks . fmap S.snd' . S._first SB.encode) . S.toList
decodeChunksZip :: forall a m. (Binary a, MonadIO m) => Q.ByteStream m () -> Stream (Of a) m _
decodeChunksZip = S.concat . (id `asTypeOf` S.map (id :: [a] -> [a])) .
SB.decoded . Q.fromChunks . decompressStream . Q.toChunks
-- * streaming-zstd
decompressStream :: MonadIO m => Stream (Of B.ByteString) m () -> Stream (Of B.ByteString) m ()
decompressStream input = effect $ liftIO $ resultToStream input <$> Zstd.decompress
compressStream :: MonadIO m => Int -> Stream (Of B.ByteString) m () -> Stream (Of B.ByteString) m ()
compressStream n input = effect $ liftIO $ resultToStream input <$> Zstd.compress n
resultToStream :: MonadIO m => Stream (Of B.ByteString) m () -> Zstd.Result -> Stream (Of B.ByteString) m ()
resultToStream _ (Zstd.Done bs) = S.yield bs
resultToStream _ (Zstd.Error msg1 msg2) = liftIO $ putStrLn $ "Error: " ++ msg1 ++ " - " ++ msg2
resultToStream input (Zstd.Produce bs ioNext) = S.cons bs (liftIO ioNext >>= resultToStream input)
resultToStream (Step (bs :> rest)) (Zstd.Consume next) = liftIO (next bs) >>= resultToStream rest
resultToStream (Effect ms) (Zstd.Consume next) = effect do
m <- ms
return $ resultToStream m (Zstd.Consume next)
resultToStream (Return ()) (Zstd.Consume next) = liftIO (next B.empty) >>= resultToStream (Return ())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment