Skip to content

Instantly share code, notes, and snippets.

@smarden1
Last active December 26, 2015 08:59
Show Gist options
  • Save smarden1/7126014 to your computer and use it in GitHub Desktop.
Save smarden1/7126014 to your computer and use it in GitHub Desktop.
sketch of providing access to the FlowProcess from within scalding
object RichFlowProcess extends java.io.Serializable {
private var _flowProcess : Option[FlowProcess[_]] = None
def setFlowProcess(fp : FlowProcess[_]) =
_flowProcess = Some(fp)
def flowProcess() : FlowProcess[_] = {
assert(_flowProcess.isDefined, "flow process is undefined: your operation needs to set the flow process")
_flowProcess.get
}
def incrementCounter(group : String, counter : String, amount : Int) =
flowProcess.increment(group, counter, amount)
def getCurrentFileName() : String =
getConfigProperty("map.input.file")
def keepAlive() =
flowProcess.keepAlive()
def getConfigProperty(key : String) : String =
flowProcess.getStringProperty(key)
def isDefined() : Boolean =
_flowProcess.isDefined
def isEmpty() : Boolean =
_flowProcess.isEmpty
}
// example Operation
class MapFunction[S,T](@transient fn : S => T, fields : Fields,
conv : TupleConverter[S], set : TupleSetter[T])
extends BaseOperation[Any](fields) with Function[Any] {
val lockedFn = MeatLocker(fn)
def operate(flowProcess : FlowProcess[_], functionCall : FunctionCall[Any]) {
flowProcess.increment("mygroup", "static_counter", 1)
// all operation would need this new line
RichFlowProcess.setFlowProcess(flowProcess)
val res = lockedFn.get(conv(functionCall.getArguments))
functionCall.getOutputCollector.add(set(res))
}
}
// example job
class TestFlowProcessJob(args : Args) extends Job(args) {
MyData()
.maps('in, 'out){
in : String =>
RichFlowProcess.incrementCounter("mygroup", "mycounter", 1)
in
}
.write(Tsv("test"))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment