Skip to content

Instantly share code, notes, and snippets.

@sadikovi
Last active January 2, 2020 11:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sadikovi/5f71e022e062150ecb9077a0442d16a2 to your computer and use it in GitHub Desktop.
Save sadikovi/5f71e022e062150ecb9077a0442d16a2 to your computer and use it in GitHub Desktop.
HDFS notification system example
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.hdfs.client._
import org.apache.hadoop.hdfs.inotify._
val url = new URI("hdfs://localhost:8020")
val conf = new Configuration(false)
val dfs = new HdfsAdmin(url, conf)
val stream = dfs.getInotifyEventStream()
val thread = new Thread {
private def eventToStr(event: Event): String = event match {
case s: Event.AppendEvent => s"APPEND(path=${s.getPath})"
case s: Event.CloseEvent => s"CLOSE(filesize=${s.getFileSize}, path=${s.getPath}, ts=${s.getTimestamp})"
case s: Event.CreateEvent => s"CREATE(group=${s.getGroupName}, owner=${s.getOwnerName}, path=${s.getPath})"
case s: Event.MetadataUpdateEvent => s"METADATA(acls=${s.getAcls()}, group=${s.getGroupName}, owner=${s.getOwnerName}, path=${s.getPath}, atime=${s.getAtime}, perms=${s.getPerms})"
case s: Event.RenameEvent => s"RENAME(ts=${s.getTimestamp}, src=${s.getSrcPath}, dst=${s.getDstPath})"
case s: Event.UnlinkEvent => s"UNLINK(ts=${s.getTimestamp}, path=${s.getPath})"
case other => s"Invalid event $event"
}
override def run(): Unit = {
while (true) {
val batch = stream.poll()
if (batch != null) {
// process event
println(s"Batch $batch (transaction ${batch.getTxid}):")
for (event <- batch.getEvents) {
println(s"+ ${eventToStr(event)}")
}
} else {
Thread.sleep(500)
}
}
}
}
thread.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment