Last active
January 2, 2020 11:56
-
-
Save sadikovi/5f71e022e062150ecb9077a0442d16a2 to your computer and use it in GitHub Desktop.
HDFS notification system example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.net.URI | |
import org.apache.hadoop.conf.Configuration | |
import org.apache.hadoop.fs._ | |
import org.apache.hadoop.hdfs.client._ | |
import org.apache.hadoop.hdfs.inotify._ | |
val url = new URI("hdfs://localhost:8020") | |
val conf = new Configuration(false) | |
val dfs = new HdfsAdmin(url, conf) | |
val stream = dfs.getInotifyEventStream() | |
val thread = new Thread { | |
private def eventToStr(event: Event): String = event match { | |
case s: Event.AppendEvent => s"APPEND(path=${s.getPath})" | |
case s: Event.CloseEvent => s"CLOSE(filesize=${s.getFileSize}, path=${s.getPath}, ts=${s.getTimestamp})" | |
case s: Event.CreateEvent => s"CREATE(group=${s.getGroupName}, owner=${s.getOwnerName}, path=${s.getPath})" | |
case s: Event.MetadataUpdateEvent => s"METADATA(acls=${s.getAcls()}, group=${s.getGroupName}, owner=${s.getOwnerName}, path=${s.getPath}, atime=${s.getAtime}, perms=${s.getPerms})" | |
case s: Event.RenameEvent => s"RENAME(ts=${s.getTimestamp}, src=${s.getSrcPath}, dst=${s.getDstPath})" | |
case s: Event.UnlinkEvent => s"UNLINK(ts=${s.getTimestamp}, path=${s.getPath})" | |
case other => s"Invalid event $event" | |
} | |
override def run(): Unit = { | |
while (true) { | |
val batch = stream.poll() | |
if (batch != null) { | |
// process event | |
println(s"Batch $batch (transaction ${batch.getTxid}):") | |
for (event <- batch.getEvents) { | |
println(s"+ ${eventToStr(event)}") | |
} | |
} else { | |
Thread.sleep(500) | |
} | |
} | |
} | |
} | |
thread.start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment