Skip to content

Instantly share code, notes, and snippets.

@Spasi
Created October 12, 2014 18:56
Show Gist options
  • Save Spasi/4052e4e8c8d88a7325fb to your computer and use it in GitHub Desktop.
Save Spasi/4052e4e8c8d88a7325fb to your computer and use it in GitHub Desktop.
// Copyright 2014 Cognitect. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS-IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// This is a Kotlin port of https://github.com/cognitect-labs/transducers-java
package transducers
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.ThreadLocalRandom
import java.util.ArrayList
/**
* A reducing step function.
* @param <R> Type of first argument and return value
* @param <T> Type of input to reduce
*/
public trait StepFunction<R, T> {
/**
* Applies the reducing function to the current result and
* the new input, returning a new result.
*
* A reducing function can indicate that no more input
* should be processed by setting the value of reduced to
* true. This causes the reduction process to complete,
* returning the most recent result.
* @param result The current result value
* @param input New input to process
* @param reduced A boolean value which can be set to true
* to stop the reduction process
* @return A new result value
*/
public fun apply(result: R, input: T, reduced: AtomicBoolean): R
}
/**
* A complete reducing function. Extends a single reducing step
* function and adds a zero-arity function for initializing a new
* result and a single-arity function for processing the final
* result after the reduction process has completed.
* @param <R> Type of first argument and return value
* @param <T> Type of input to reduce
*/
public trait ReducingFunction<R, T> : StepFunction<R, T> {
/**
* Returns a newly initialized result.
* @return a new result
*/
public fun apply(): R = throw UnsupportedOperationException()
/**
* Completes processing of a final result.
* @param result the final reduction result
* @return the completed result
*/
public fun apply(result: R): R = result
}
/**
* Abstract base class for implementing a reducing function that chains to
* another reducing function. Zero-arity and single-arity overloads of apply
* delegate to the chained reducing function. Derived classes must implement
* the three-arity overload of apply, and may implement either of the other
* two overloads as required.
* @param <R> Type of first argument and return value of the reducing functions
* @param <A> Input type of reducing function being chained to
* @param <B> Input type of this reducing function
*/
public abstract class ReducingFunctionOn<R, A, B>(val rf: ReducingFunction<R, in A>) : ReducingFunction<R, B> {
override fun apply() = rf.apply()
override fun apply(result: R) = rf apply result
}
/**
* A Transducer transforms a reducing function of one type into a
* reducing function of another (possibly the same) type, applying
* mapping, filtering, flattening, etc. logic as desired.
* @param <B> The type of data processed by an input process
* @param <C> The type of data processed by the transduced process
*/
public trait Transducer<B, C> {
/**
* Transforms a reducing function of B into a reducing function
* of C.
* @param rf The input reducing function
* @param <R> The result type of both the input and the output
* reducing functions
* @return The transformed reducing function
*/
public fun <R> apply(rf: ReducingFunction<R, in B>): ReducingFunction<R, C>
/**
* Composes a transducer with another transducer, yielding
* a new transducer.
* @param right the transducer to compose with this transducer
* @param <A> the type of input processed by the reducing function
* the composed transducer returns when applied
* @return A new composite transducer
*/
public fun <A> comp(right: Transducer<A, in B>): Transducer<A, C> = object : Transducer<A, C> {
override fun <R> apply(rf: ReducingFunction<R, in A>) = this@Transducer.apply(right apply rf)
}
}
/**
* Applies given reducing function to current result and each T in input, using
* the result returned from each reduction step as input to the next step. Returns
* final result.
* @param f a reducing function
* @param result an initial result value
* @param input the input to process
* @param reduced a boolean flag that can be set to indicate that the reducing process
* should stop, even though there is still input to process
* @param <R> the type of the result
* @param <T> the type of each item in input
* @return the final reduced result
*/
public fun <R, T> reduce(f: ReducingFunction<R, in T>, result: R, input: Iterable<T>, reduced: AtomicBoolean = AtomicBoolean()): R {
var ret = result
for ( t in input ) {
ret = f.apply(ret, t, reduced)
if ( reduced.get() )
break
}
return f.apply(ret)
}
public fun <R, T> completing(sf: StepFunction<R, in T>): ReducingFunction<R, in T> =
if ( sf is ReducingFunction )
sf
else
object : ReducingFunction<R, T> {
override fun apply(result: R, input: T, reduced: AtomicBoolean) = sf.apply(result, input, reduced)
}
/**
* Reduces input using transformed reducing function. Transforms reducing function by applying
* transducer. Reducing function must implement zero-arity apply that returns initial result
* to start reducing process.
* @param xf a transducer (or composed transducers) that transforms the reducing function
* @param rf a reducing function
* @param input the input to reduce
* @param <R> return type
* @param <A> type of input expected by reducing function
* @param <B> type of input and type accepted by reducing function returned by transducer
* @return result of reducing transformed input
*/
public fun <R, A, B> transduce(xf: Transducer<A, B>, rf: ReducingFunction<R, in A>, input: Iterable<B>): R = reduce(xf apply rf, rf.apply(), input)
/**
* Reduces input using transformed reducing function. Transforms reducing function by applying
* transducer. Step function is converted to reducing function if necessary. Accepts initial value
* for reducing process as argument.
* @param xf a transducer (or composed transducers) that transforms the reducing function
* @param rf a reducing function
* @param init an initial value to start reducing process
* @param input the input to reduce
* @param <R> return type
* @param <A> type expected by reducing function
* @param <B> type of input and type accepted by reducing function returned by transducer
* @return result of reducing transformed input
*/
public fun <R, A, B> transduce(xf: Transducer<A, B>, rf: StepFunction<R, in A>, init: R, input: Iterable<B>): R = reduce(xf apply completing(rf), init, input)
public fun <R : MutableCollection<A>, A, B> into(xf: Transducer<A, B>, init: R, input: Iterable<B>): R =
transduce(xf, object : ReducingFunction<R, A> {
override fun apply(result: R, input: A, reduced: AtomicBoolean): R {
result add input
return result
}
}, init, input)
/**
* Composes a transducer with another transducer, yielding a new transducer that
* @param left left hand transducer
* @param right right hand transducer
*/
public fun <A, B, C> compose(left: Transducer<B, C>, right: Transducer<A, B>): Transducer<A, C> = left comp right
/**
* Creates a transducer that transforms a reducing function by applying a mapping
* function to each input.
* @param f a mapping function from one type to another (can be the same type)
* @param <A> input type of input reducing function
* @param <B> input type of output reducing function
* @return a new transducer
*/
public fun <A, B> map(f: (B) -> A): Transducer<A, B> = object : Transducer<A, B> {
override fun <R> apply(rf: ReducingFunction<R, in A>) = object : ReducingFunctionOn<R, A, B>(rf) {
override fun apply(result: R, input: B, reduced: AtomicBoolean) = rf.apply(result, f(input), reduced)
}
}
/**
* Creates a transducer that transforms a reducing function by applying a
* predicate to each input and processing only those inputs for which the
* predicate is true.
* @param p a predicate function
* @param <A> input type of input and output reducing functions
* @return a new transducer
*/
public fun <A> filter(p: (A) -> Boolean): Transducer<A, A> = object : Transducer<A, A> {
override fun <R> apply(rf: ReducingFunction<R, in A>) = object : ReducingFunctionOn<R, A, A>(rf) {
override fun apply(result: R, input: A, reduced: AtomicBoolean) = if ( p(input) ) rf.apply(result, input, reduced) else result
}
}
/**
* Creates a transducer that transforms a reducing function by accepting
* an iterable of the expected input type and reducing it
* @param <A> input type of input reducing function
* @param <B> input type of output reducing function
* @return a new transducer
*/
public fun <A, B : Iterable<A>> cat(): Transducer<A, B> = object : Transducer<A, B> {
override fun <R> apply(rf: ReducingFunction<R, in A>) = object : ReducingFunctionOn<R, A, B>(rf) {
override fun apply(result: R, input: B, reduced: AtomicBoolean) = reduce(rf, result, input, reduced)
}
}
/**
* Creates a transducer that transforms a reducing function using
* a composition of map and cat.
* @param f a mapping function from one type to another (can be the same type)
* @param <A> input type of input reducing function
* @param <B> output type of output reducing function and iterable of input type
* of input reducing function
* @param <C> input type of output reducing function
* @return a new transducer
*/
public fun<A, B : Iterable<A>, C> mapcat(f: (C) -> B): Transducer<A, C> = map(f) comp cat()
/**
* Creates a transducer that transforms a reducing function by applying a
* predicate to each input and not processing those inputs for which the
* predicate is true.
* @param p a predicate function
* @param <A> input type of input and output reducing functions
* @return a new transducer
*/
public fun <A> remove(p: (A) -> Boolean): Transducer<A, A> = filter { !p(it) }
/*
object: Transducer<A, A> {
override fun <R> apply(rf: ReducingFunction<R, in A>) = object: ReducingFunctionOn<R, A, A>(rf) {
override fun apply(result: R, input: A, reduced: AtomicBoolean) = if ( !p(input) ) rf.apply(result, input, reduced) else result
}
}
*/
/**
* Creates a transducer that transforms a reducing function such that
* it only processes n inputs, then the reducing process stops.
* @param n the number of inputs to process
* @param <A> input type of input and output reducing functions
* @return a new transducer
*/
public fun <A> take(n: Long): Transducer<A, A> = object : Transducer<A, A> {
override fun <R> apply(rf: ReducingFunction<R, in A>) = object : ReducingFunctionOn<R, A, A>(rf) {
private var taken = 0L
override fun apply(result: R, input: A, reduced: AtomicBoolean): R {
var ret = result
if ( taken < n ) {
ret = rf.apply(result, input, reduced)
taken++
} else
reduced set true
return ret
}
}
}
/**
* Creates a transducer that transforms a reducing function such that
* it processes inputs as long as the provided predicate returns true.
* If the predicate returns false, the reducing process stops.
* @param p a predicate used to test inputs
* @param <A> input type of input and output reducing functions
* @return a new transducer
*/
public fun <A> takeWhile(p: (A) -> Boolean): Transducer<A, A> = object : Transducer<A, A> {
override fun <R> apply(rf: ReducingFunction<R, in A>) = object : ReducingFunctionOn<R, A, A>(rf) {
override fun apply(result: R, input: A, reduced: AtomicBoolean): R {
var ret = result
if ( p(input) )
ret = rf.apply(ret, input, reduced)
else
reduced set true
return ret
}
}
}
/**
* Creates a transducer that transforms a reducing function such that
* it skips n inputs, then processes the rest of the inputs.
* @param n the number of inputs to skip
* @param <A> input type of input and output reducing functions
* @return a new transducer
*/
public fun <A> drop(n: Long): Transducer<A, A> = object : Transducer<A, A> {
override fun <R> apply(rf: ReducingFunction<R, in A>) = object : ReducingFunctionOn<R, A, A>(rf) {
private var dropped = 0L
override fun apply(result: R, input: A, reduced: AtomicBoolean): R {
var ret = result
if ( dropped < n )
dropped++
else
ret = rf.apply(ret, input, reduced)
return ret
}
}
}
/**
* Creates a transducer that transforms a reducing function such that
* it skips inputs as long as the provided predicate returns true.
* Once the predicate returns false, the rest of the inputs are
* processed.
* @param p a predicate used to test inputs
* @param <A> input type of input and output reducing functions
* @return a new transducer
*/
public fun <A> dropWhile(p: (A) -> Boolean): Transducer<A, A> = object : Transducer<A, A> {
override fun <R> apply(rf: ReducingFunction<R, in A>) = object : ReducingFunctionOn<R, A, A>(rf) {
private var drop = true
override fun apply(result: R, input: A, reduced: AtomicBoolean): R {
if ( drop && p(input) )
return result
drop = false
return rf.apply(result, input, reduced)
}
}
}
/**
* Creates a transducer that transforms a reducing function such that
* it processes every nth input.
* @param n The frequence of inputs to process (e.g., 3 processes every third input).
* @param <A> The input type of the input and output reducing functions
* @return a new transducer
*/
public fun <A> takeNth(n: Long): Transducer<A, A> = object : Transducer<A, A> {
override fun <R> apply(rf: ReducingFunction<R, in A>) = object : ReducingFunctionOn<R, A, A>(rf) {
private var nth = 0L
override fun apply(result: R, input: A, reduced: AtomicBoolean): R = if ((nth++ % n) == 0L) rf.apply(result, input, reduced) else result
}
}
/**
* Creates a transducer that transforms a reducing function such that
* inputs that are keys in the provided map are replaced by the corresponding
* value in the map.
* @param smap a map of replacement values
* @param <A> the input type of the input and output reducing functions
* @return a new transducer
*/
public fun <A> replace(smap: Map<A, A>): Transducer<A, A> = map { if ( smap.containsKey(it) ) smap[it]!! else it }
/**
* Creates a transducer that transforms a reducing function by applying a
* function to each input and processing the resulting value, ignoring values
* that are null.
* @param f a function for processing inputs
* @param <A> the input type of the input and output reducing functions
* @return a new transducer
*/
public fun <A : Any> keep(f: (A) -> A?): Transducer<A, A> = object : Transducer<A, A> {
override fun <R> apply(rf: ReducingFunction<R, in A>) = object : ReducingFunctionOn<R, A, A>(rf) {
override fun apply(result: R, input: A, reduced: AtomicBoolean): R {
val _input = f(input)
return if ( _input != null ) rf.apply(result, _input, reduced) else result
}
}
}
/**
* Creates a transducer that transforms a reducing function by applying a
* function to each input and processing the resulting value, ignoring values
* that are null.
* @param f a function for processing inputs
* @param <A> the input type of the input and output reducing functions
* @return a new transducer
*/
public fun <A : Any> keepIndexed(f: (Long, A) -> A?): Transducer<A, A> = object : Transducer<A, A> {
override fun <R> apply(rf: ReducingFunction<R, in A>) = object : ReducingFunctionOn<R, A, A>(rf) {
private var n = 0L
override fun apply(result: R, input: A, reduced: AtomicBoolean): R {
val _input = f(++n, input)
return if ( _input != null ) rf.apply(result, _input, reduced) else result
}
}
}
/**
* Creates a transducer that transforms a reducing function such that
* consecutive identical input values are removed, only a single value
* is processed.
* @param <A> the input type of the input and output reducing functions
* @return a new transducer
*/
public fun <A : Any> dedupe(): Transducer<A, A> = object : Transducer<A, A> {
override fun <R> apply(rf: ReducingFunction<R, in A>) = object : ReducingFunctionOn<R, A, A>(rf) {
var prior: A? = null
override fun apply(result: R, input: A, reduced: AtomicBoolean): R {
var ret = result
if ( prior != input ) {
prior = input
ret = rf.apply(ret, input, reduced)
}
return ret
}
}
}
/**
* Creates a transducer that transforms a reducing function such that
* it has the specified probability of processing each input.
* @param prob the probability between expressed as a value between 0 and 1.
* @param <A> the input type of the input and output reducing functions
* @return a new transducer
*/
public fun <A : Any> randomSample(prob: Double): Transducer<A, A> = filter { ThreadLocalRandom.current().nextDouble() < prob }
/**
* Creates a transducer that transforms a reducing function that processes
* iterables of input into a reducing function that processes individual inputs
* by gathering series of inputs for which the provided partitioning function returns
* the same value, only forwarding them to the next reducing function when the value
* the partitioning function returns for a given input is different from the value
* returned for the previous input.
* @param f the partitioning function
* @param <A> the input type of the input and output reducing functions
* @param <P> the type returned by the partitioning function
* @return a new transducer
*/
public fun <A, P> partitionBy(f: (A) -> P): Transducer<Iterable<A>, A> = object : Transducer<Iterable<A>, A> {
override fun <R> apply(rf: ReducingFunction<R, in Iterable<A>>) = object : ReducingFunction<R, A> {
val part = ArrayList<A>()
val mark: Any = Unit
var prior = mark
override fun apply(): R = rf.apply()
override fun apply(result: R): R {
var ret = result
if ( part.isNotEmpty() ) {
ret = rf.apply(result, ArrayList(part), AtomicBoolean())
part.clear()
}
return rf apply ret
}
override fun apply(result: R, input: A, reduced: AtomicBoolean): R {
val p = f(input)
if ( prior identityEquals mark || prior equals p ) {
prior = p
part add input
return result
}
val copy = ArrayList(part)
prior = p
part.clear()
val ret = rf.apply(result, copy, reduced)
if ( !reduced.get() )
part add input
return ret
}
}
}
/**
* Creates a transducer that transforms a reducing function that processes
* iterables of input into a reducing function that processes individual inputs
* by gathering series of inputs into partitions of a given size, only forwarding
* them to the next reducing function when enough inputs have been accrued. Processes
* any remaining buffered inputs when the reducing process completes.
* @param n the size of each partition
* @param <A> the input type of the input and output reducing functions
* @return a new transducer
*/
public fun <A> partitionAll(n: Int): Transducer<Iterable<A>, A> = object : Transducer<Iterable<A>, A> {
override fun <R> apply(rf: ReducingFunction<R, in Iterable<A>>) = object : ReducingFunction<R, A> {
val part = ArrayList<A>()
override fun apply(): R = rf.apply()
override fun apply(result: R): R {
var ret = result
if ( part.isNotEmpty() ) {
ret = rf.apply(result, ArrayList(part), AtomicBoolean())
part.clear()
}
return rf apply ret
}
override fun apply(result: R, input: A, reduced: AtomicBoolean): R {
part add input
return if ( n == part.size )
try {
rf.apply(result, ArrayList(part), reduced)
} finally {
part.clear()
}
else
result
}
}
}
@xgrommx
Copy link

xgrommx commented Dec 12, 2014

Do you planned create a library for kotlin? Also could you please to add some examples how can I use it in several cases.

@hastebrot
Copy link

Updated this example and its tests to Kotlin 1.0.0 (https://gist.github.com/hastebrot/aa7b5366309d42270cc1). Thanks for the great code!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment