Skip to content

Instantly share code, notes, and snippets.

@jhedev
Created January 9, 2015 18:40
Show Gist options
  • Save jhedev/77fd6b8e9ef1fe69391b to your computer and use it in GitHub Desktop.
Save jhedev/77fd6b8e9ef1fe69391b to your computer and use it in GitHub Desktop.
Experimentations with conduit and data analysis (inspired by http://twdkz.wordpress.com/2013/05/31/data-analysis-with-monoids/)
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}
import Control.Applicative
import Control.Monad.Trans.Resource
import Control.Monad.IO.Class
import qualified Data.ByteString.Char8 as BSC
import Data.Conduit
import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import Data.Monoid
import Data.Foldable
data Min a = Min a | MinEmpty deriving Show
data Max a = Max a | MaxEmpty deriving Show
newtype Count = Count Int deriving Show
newtype Mean a = Mean (Sum a, Count) deriving Show
instance (Ord a) => Monoid (Min a) where
mempty = MinEmpty
mappend MinEmpty m = m
mappend m MinEmpty = m
mappend (Min a) (Min b) = Min (min a b)
instance (Ord a) => Monoid (Max a) where
mempty = MaxEmpty
mappend MaxEmpty m = m
mappend m MaxEmpty = m
mappend (Max a) (Max b) = Max (max a b)
instance Monoid Count where
mempty = Count 0
mappend (Count n1) (Count n2) = Count (n1 + n2)
instance (Num a, Fractional a) => Monoid (Mean a) where
mempty = Mean mempty
mappend (Mean m1) (Mean m2) = Mean (mappend m1 m2)
class (Monoid a) => Aggregation a where
type AggResult a :: *
aggResult :: a -> AggResult a
instance (Fractional a) => Aggregation (Mean a) where
type AggResult (Mean a) = a
aggResult (Mean (Sum t, Count n)) = t / fromIntegral n
instance (Num a) => Aggregation (Sum a) where
type AggResult (Sum a) = a
aggResult (Sum a) = a
instance (Num a) => Aggregation (Product a) where
type AggResult (Product a) = a
aggResult (Product a) = a
instance (Ord a) => Aggregation (Min a) where
type AggResult (Min a) = a
aggResult (Min a) = a
instance (Ord a) => Aggregation (Max a) where
type AggResult (Max a) = a
aggResult (Max a) = a
instance Aggregation Count where
type AggResult Count = Int
aggResult (Count n) = n
instance (Aggregation a, Aggregation b) => Aggregation (a,b) where
type AggResult (a,b) = (AggResult a, AggResult b)
aggResult (a,b) = (aggResult a, aggResult b)
instance (Aggregation a, Aggregation b, Aggregation c) => Aggregation (a,b,c) where
type AggResult (a,b,c) = (AggResult a, AggResult b, AggResult c)
aggResult (a,b,c) = (aggResult a, aggResult b, aggResult c)
src :: Source (ResourceT IO) BSC.ByteString
src = C.sourceFile "digits.dat" $= CB.lines
transform :: Conduit BSC.ByteString (ResourceT IO) Integer
transform = do
elM <- await
case elM of
Nothing -> return ()
Just x -> case BSC.readInteger x of
Nothing -> return ()
Just (i,_) -> do
yield i
transform
meanZ :: (Num a, Monad m) => ZipSink a m (Mean a)
meanZ = Mean <$> ( (,) <$> ( Sum <$> ZipSink C.sum) <*> ( Count <$> ZipSink C.length))
sumZ :: (Num a, Monad m) => ZipSink a m (Sum a)
sumZ = Sum <$> ZipSink C.sum
productZ :: (Num a, Monad m) => ZipSink a m (Product a)
productZ = Product <$> ZipSink C.product
mkMin :: (Ord a) => Maybe a -> Min a
mkMin = maybe MinEmpty Min
mkMax :: (Ord a) => Maybe a -> Max a
mkMax = maybe MaxEmpty Max
minZ :: (Ord a, Monad m) => ZipSink a m (Min a)
minZ = mkMin <$> ZipSink C.minimum
maxZ :: (Ord a, Monad m) => ZipSink a m (Max a)
maxZ = mkMax <$> ZipSink C.maximum
aggPipeM :: (Aggregation c, Monad m)
=> Source m a
-> Conduit a m b
-> Sink b m c
-> m (AggResult c)
aggPipeM src cond sink = do
value <- src $= cond $$ sink
return $ aggResult value
aggPipeIO' :: (Aggregation c, MonadBaseControl IO m, MonadIO m)
=> Source (ResourceT m) a
-> Conduit a (ResourceT m) b
-> Sink b (ResourceT m) c
-> m (AggResult c)
aggPipeIO' src cond sink = do
value <- runResourceT $ src $= cond $$ sink
return $ aggResult value
-- Some test functions
meanPipeline = aggPipeIO' src transform (CL.map fromIntegral =$ getZipSink meanZ)
minPipeline = aggPipeIO' src transform (getZipSink minZ)
maxPipeline = aggPipeIO' src transform (getZipSink maxZ)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment