Skip to content

Instantly share code, notes, and snippets.

@nezarfadle
Last active November 28, 2020 07:19
Show Gist options
  • Save nezarfadle/1b9f926dcc56499e337430f7a9722ba6 to your computer and use it in GitHub Desktop.
Save nezarfadle/1b9f926dcc56499e337430f7a9722ba6 to your computer and use it in GitHub Desktop.
<?php

interface IObserver
{
    public function run( $o );
}

interface IObservable
{
    public function notify( $streamObject );
    public function from( IStream $stream );
}

interface IStream
{
    public function start( IObservable $observable );    
}

class Observable implements IObservable
{
    private $observer;
    private $stream;  
    private $filters = [];
    private $maps = [];
    
    private function applyMaps( &$streamObject )
    {
        foreach( $this->maps as $map )
        {
            if( is_callable( $map ))
            {
                $streamObject = $map( $streamObject );
            }
        }
        
        return $this;
    }
    
    public function filter( $filter )
    {
        if( !is_callable( $filter ) ) return $this;
        
        $this->filters[] = $filter;
        return $this;
    }
    
    public function map( $fn )
    {
        if( !is_callable ( $fn ) ) return $this;
        
        $this->maps[] = $fn ;
        return $this;
    }
    
    public function subscribe(IObserver $observer)
    {
        $this->observer = $observer;
        ($this->stream)();
        $this->observer = null;
        $this->stream = null;
        $this->filters = [];
        $this->maps = [];
    }
    
    public function from( IStream $stream ): Observable
    {
        $this->stream = function() use( $stream ) {
            return $stream->start( clone $this );
        };
        
        return $this;
    }

    public function notify( $streamObject )
    {
        foreach( $this->filters as $filter )
        {
            
            if( !$filter( $streamObject )) continue;
            
            $this->applyMaps( $streamObject )
                 ->observer
                 ->run( $streamObject );
                
        }
            
    }
}

class CounterStream implements IStream
{
    public function start( IObservable $observable )
    {
        for( $i = 1; $i <= 10; $i++ )
        {
            $observable->notify( $i );
        }
    }
}


$stream = new Observable();

$stream
->from( new CounterStream() )
->filter( function( $num ){
  return $num > 5 ;
})
->map(function( $num ){
    return "<h1> $num </h1>";
})
->subscribe( new class() implements IObserver {
    public function run($data)
    {
        echo $data, "\r\n";
    }
});

echo "-----------------------------------------\r\n";

$stream
->from( new CounterStream() )
->filter( function( $num ){
  return $num <= 5 ;
})
->map(function( $num ){
    return "<li> $num </li>";
})
->subscribe( new class() implements IObserver {
    public function run($data)
    {
        echo $data, "\r\n";
    }
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment