Created
January 9, 2015 18:40
-
-
Save jhedev/77fd6b8e9ef1fe69391b to your computer and use it in GitHub Desktop.
Experimentations with conduit and data analysis (inspired by http://twdkz.wordpress.com/2013/05/31/data-analysis-with-monoids/)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{-# LANGUAGE TypeFamilies #-} | |
{-# LANGUAGE OverloadedStrings #-} | |
{-# LANGUAGE FlexibleContexts #-} | |
import Control.Applicative | |
import Control.Monad.Trans.Resource | |
import Control.Monad.IO.Class | |
import qualified Data.ByteString.Char8 as BSC | |
import Data.Conduit | |
import qualified Data.Conduit.Combinators as C | |
import qualified Data.Conduit.List as CL | |
import qualified Data.Conduit.Binary as CB | |
import Data.Monoid | |
import Data.Foldable | |
data Min a = Min a | MinEmpty deriving Show | |
data Max a = Max a | MaxEmpty deriving Show | |
newtype Count = Count Int deriving Show | |
newtype Mean a = Mean (Sum a, Count) deriving Show | |
instance (Ord a) => Monoid (Min a) where | |
mempty = MinEmpty | |
mappend MinEmpty m = m | |
mappend m MinEmpty = m | |
mappend (Min a) (Min b) = Min (min a b) | |
instance (Ord a) => Monoid (Max a) where | |
mempty = MaxEmpty | |
mappend MaxEmpty m = m | |
mappend m MaxEmpty = m | |
mappend (Max a) (Max b) = Max (max a b) | |
instance Monoid Count where | |
mempty = Count 0 | |
mappend (Count n1) (Count n2) = Count (n1 + n2) | |
instance (Num a, Fractional a) => Monoid (Mean a) where | |
mempty = Mean mempty | |
mappend (Mean m1) (Mean m2) = Mean (mappend m1 m2) | |
class (Monoid a) => Aggregation a where | |
type AggResult a :: * | |
aggResult :: a -> AggResult a | |
instance (Fractional a) => Aggregation (Mean a) where | |
type AggResult (Mean a) = a | |
aggResult (Mean (Sum t, Count n)) = t / fromIntegral n | |
instance (Num a) => Aggregation (Sum a) where | |
type AggResult (Sum a) = a | |
aggResult (Sum a) = a | |
instance (Num a) => Aggregation (Product a) where | |
type AggResult (Product a) = a | |
aggResult (Product a) = a | |
instance (Ord a) => Aggregation (Min a) where | |
type AggResult (Min a) = a | |
aggResult (Min a) = a | |
instance (Ord a) => Aggregation (Max a) where | |
type AggResult (Max a) = a | |
aggResult (Max a) = a | |
instance Aggregation Count where | |
type AggResult Count = Int | |
aggResult (Count n) = n | |
instance (Aggregation a, Aggregation b) => Aggregation (a,b) where | |
type AggResult (a,b) = (AggResult a, AggResult b) | |
aggResult (a,b) = (aggResult a, aggResult b) | |
instance (Aggregation a, Aggregation b, Aggregation c) => Aggregation (a,b,c) where | |
type AggResult (a,b,c) = (AggResult a, AggResult b, AggResult c) | |
aggResult (a,b,c) = (aggResult a, aggResult b, aggResult c) | |
src :: Source (ResourceT IO) BSC.ByteString | |
src = C.sourceFile "digits.dat" $= CB.lines | |
transform :: Conduit BSC.ByteString (ResourceT IO) Integer | |
transform = do | |
elM <- await | |
case elM of | |
Nothing -> return () | |
Just x -> case BSC.readInteger x of | |
Nothing -> return () | |
Just (i,_) -> do | |
yield i | |
transform | |
meanZ :: (Num a, Monad m) => ZipSink a m (Mean a) | |
meanZ = Mean <$> ( (,) <$> ( Sum <$> ZipSink C.sum) <*> ( Count <$> ZipSink C.length)) | |
sumZ :: (Num a, Monad m) => ZipSink a m (Sum a) | |
sumZ = Sum <$> ZipSink C.sum | |
productZ :: (Num a, Monad m) => ZipSink a m (Product a) | |
productZ = Product <$> ZipSink C.product | |
mkMin :: (Ord a) => Maybe a -> Min a | |
mkMin = maybe MinEmpty Min | |
mkMax :: (Ord a) => Maybe a -> Max a | |
mkMax = maybe MaxEmpty Max | |
minZ :: (Ord a, Monad m) => ZipSink a m (Min a) | |
minZ = mkMin <$> ZipSink C.minimum | |
maxZ :: (Ord a, Monad m) => ZipSink a m (Max a) | |
maxZ = mkMax <$> ZipSink C.maximum | |
aggPipeM :: (Aggregation c, Monad m) | |
=> Source m a | |
-> Conduit a m b | |
-> Sink b m c | |
-> m (AggResult c) | |
aggPipeM src cond sink = do | |
value <- src $= cond $$ sink | |
return $ aggResult value | |
aggPipeIO' :: (Aggregation c, MonadBaseControl IO m, MonadIO m) | |
=> Source (ResourceT m) a | |
-> Conduit a (ResourceT m) b | |
-> Sink b (ResourceT m) c | |
-> m (AggResult c) | |
aggPipeIO' src cond sink = do | |
value <- runResourceT $ src $= cond $$ sink | |
return $ aggResult value | |
-- Some test functions | |
meanPipeline = aggPipeIO' src transform (CL.map fromIntegral =$ getZipSink meanZ) | |
minPipeline = aggPipeIO' src transform (getZipSink minZ) | |
maxPipeline = aggPipeIO' src transform (getZipSink maxZ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment