Last active
December 26, 2015 08:59
-
-
Save smarden1/7126014 to your computer and use it in GitHub Desktop.
sketch of providing access to the FlowProcess from within scalding
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object RichFlowProcess extends java.io.Serializable { | |
private var _flowProcess : Option[FlowProcess[_]] = None | |
def setFlowProcess(fp : FlowProcess[_]) = | |
_flowProcess = Some(fp) | |
def flowProcess() : FlowProcess[_] = { | |
assert(_flowProcess.isDefined, "flow process is undefined: your operation needs to set the flow process") | |
_flowProcess.get | |
} | |
def incrementCounter(group : String, counter : String, amount : Int) = | |
flowProcess.increment(group, counter, amount) | |
def getCurrentFileName() : String = | |
getConfigProperty("map.input.file") | |
def keepAlive() = | |
flowProcess.keepAlive() | |
def getConfigProperty(key : String) : String = | |
flowProcess.getStringProperty(key) | |
def isDefined() : Boolean = | |
_flowProcess.isDefined | |
def isEmpty() : Boolean = | |
_flowProcess.isEmpty | |
} | |
// example Operation | |
class MapFunction[S,T](@transient fn : S => T, fields : Fields, | |
conv : TupleConverter[S], set : TupleSetter[T]) | |
extends BaseOperation[Any](fields) with Function[Any] { | |
val lockedFn = MeatLocker(fn) | |
def operate(flowProcess : FlowProcess[_], functionCall : FunctionCall[Any]) { | |
flowProcess.increment("mygroup", "static_counter", 1) | |
// all operation would need this new line | |
RichFlowProcess.setFlowProcess(flowProcess) | |
val res = lockedFn.get(conv(functionCall.getArguments)) | |
functionCall.getOutputCollector.add(set(res)) | |
} | |
} | |
// example job | |
class TestFlowProcessJob(args : Args) extends Job(args) { | |
MyData() | |
.maps('in, 'out){ | |
in : String => | |
RichFlowProcess.incrementCounter("mygroup", "mycounter", 1) | |
in | |
} | |
.write(Tsv("test")) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment