Created
March 15, 2023 20:59
-
-
Save aavogt/d0bf8a9042c5e796857d96ad9eb23e25 to your computer and use it in GitHub Desktop.
streaming-osm with binary and zstd cache
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE BlockArguments #-} | |
{-# LANGUAGE LambdaCase #-} | |
{-# LANGUAGE OverloadedStrings #-} | |
{-# LANGUAGE TemplateHaskell #-} | |
{-# LANGUAGE StandaloneDeriving #-} | |
{-# LANGUAGE DeriveGeneric #-} | |
{-# LANGUAGE PartialTypeSignatures #-} | |
{-# LANGUAGE ScopedTypeVariables #-} | |
{-# LANGUAGE TypeFamilies #-} | |
{-# LANGUAGE FlexibleContexts #-} | |
{-# OPTIONS_GHC -Wno-partial-type-signatures #-} | |
module OSM ( main ) where | |
import Codec.Compression.Zstd | |
import Control.Lens hiding ((:>)) | |
import Control.Lens.Unsound (lensProduct) | |
import Control.Monad | |
import Control.Monad.Trans.Resource (runResourceT, MonadResource (liftResourceT)) | |
import Data.Binary | |
import Data.BoundingBox | |
import Data.ByteString.Builder (Builder) | |
import Data.Maybe | |
import GHC.Generics (Generic) | |
import IPPrint | |
import Linear | |
import qualified Codec.Compression.Zstd.Streaming as Zstd | |
import qualified Data.ByteString as B | |
import qualified Streaming.Binary as SB | |
import qualified Streaming.Internal as S | |
import qualified Streaming.Prelude as S | |
import Streaming | |
import Streaming.ByteString as Q hiding (concatBuilders) | |
import Streaming.Internal | |
import Streaming.Osm as Osm | |
import Streaming.Osm.Types | |
import System.Directory (doesFileExist, getFileSize) | |
makeLenses ''Node | |
makeLenses ''Block | |
main :: IO () | |
main = do | |
-- https://download.geofabrik.de/north-america/canada/ | |
let nodes = blobs "ontario-latest.osm.pbf" | |
& blocks | |
& Osm.nodes | |
& S.filter (inBB1 bb) | |
& S.mapM (\x -> do | |
sz <- liftIO $ getFileSize "nodes.zstd" | |
liftIO $ pprint ("pre", sz, x) -- sz is always 0! | |
return x) | |
& zstdChunkCache "nodes.zstd" | |
runResourceT $ S.mapM_ (liftIO . pprint) nodes | |
return () | |
-- define lat0,lon0,lat1 to say which part of the pbf to keep | |
bb = sizePos 0 # (0, V2 lat0 lon0) & inflate (abs (lat0 - lat1)) | |
inBB1 :: Box V2 Double -> Node -> Bool | |
inBB1 bb n = V2 (n^.lat) (n^.lng) `isInside` bb | |
inBB :: Box V2 Double -> Block -> Bool | |
inBB bb b = fromMaybe False do | |
firstNode <- b^?OSM.nodes._head.lensProduct lat lng.to (uncurry V2) | |
Just (firstNode `isInside` bb) | |
instance Binary Node | |
instance Binary Info | |
deriving instance Generic Node | |
deriving instance Generic Info | |
-- TODO invalidate cache by taking `x :: ExpQ` instead of `eval` | |
zstdChunkCache :: (Binary a, MonadIO m, MonadResource m) => FilePath -> Stream (Of a) m () -> Stream (Of a) m _ | |
zstdChunkCache p x = effect $ do | |
exists <- liftIO $ doesFileExist p | |
unless exists $ Q.writeFile p $ encodeChunksZip 10 x | |
return $ decodeChunksZip $ Q.readFile p | |
encodeChunksZip :: (Binary a, MonadIO m) => Int -> Stream (Of a) m () -> ByteStream m () | |
encodeChunksZip n = Q.fromChunks . compressStream 3 . S.concats . S.mapsM g . chunksOf n | |
where g = fmap (Q.toChunks . fmap S.snd' . S._first SB.encode) . S.toList | |
decodeChunksZip :: forall a m. (Binary a, MonadIO m) => Q.ByteStream m () -> Stream (Of a) m _ | |
decodeChunksZip = S.concat . (id `asTypeOf` S.map (id :: [a] -> [a])) . | |
SB.decoded . Q.fromChunks . decompressStream . Q.toChunks | |
-- * streaming-zstd | |
decompressStream :: MonadIO m => Stream (Of B.ByteString) m () -> Stream (Of B.ByteString) m () | |
decompressStream input = effect $ liftIO $ resultToStream input <$> Zstd.decompress | |
compressStream :: MonadIO m => Int -> Stream (Of B.ByteString) m () -> Stream (Of B.ByteString) m () | |
compressStream n input = effect $ liftIO $ resultToStream input <$> Zstd.compress n | |
resultToStream :: MonadIO m => Stream (Of B.ByteString) m () -> Zstd.Result -> Stream (Of B.ByteString) m () | |
resultToStream _ (Zstd.Done bs) = S.yield bs | |
resultToStream _ (Zstd.Error msg1 msg2) = liftIO $ putStrLn $ "Error: " ++ msg1 ++ " - " ++ msg2 | |
resultToStream input (Zstd.Produce bs ioNext) = S.cons bs (liftIO ioNext >>= resultToStream input) | |
resultToStream (Step (bs :> rest)) (Zstd.Consume next) = liftIO (next bs) >>= resultToStream rest | |
resultToStream (Effect ms) (Zstd.Consume next) = effect do | |
m <- ms | |
return $ resultToStream m (Zstd.Consume next) | |
resultToStream (Return ()) (Zstd.Consume next) = liftIO (next B.empty) >>= resultToStream (Return ()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment