Skip to content

Instantly share code, notes, and snippets.

@rubanm
Created April 2, 2015 22:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save rubanm/d9cfc6148a2ab3827224 to your computer and use it in GitHub Desktop.
Save rubanm/d9cfc6148a2ab3827224 to your computer and use it in GitHub Desktop.
Finding webhdfs endpoint for a file with HA namenode
import org.apache.hadoop.hdfs.DFSUtil
import org.apache.hadoop.mapred.JobConf
import org.slf4j.LoggerFactory
import java.net.{ HttpURLConnection, InetSocketAddress, URL }
import scala.collection.JavaConverters._
import scala.util.Try
/**
* Detects the currently active namenode and returns its webhdfs url for the specified path.
*
* Usage:
* HdfsUtil.webhdfsUrl("/path/to/file", conf)
*/
object HdfsUtil {
private val log = LoggerFactory.getLogger(this.getClass)
// finds the active namenode from addrs provided
// and returns the webhdfs url for it
private def findActiveNnUrl(addrs: Iterable[InetSocketAddress], conf: JobConf): Option[String] =
addrs
.map { addr =>
// convert rpc address to http
val httpUrl = DFSUtil.getInfoServer(addr, conf, "http").toString
s"${httpUrl}/webhdfs/v1"
}
.find { url =>
// liststatus used as ping operation
val checkUrl = s"$url/?OP=LISTSTATUS"
log.debug(s"Checking namenode status: $checkUrl")
try {
val conn = new URL(checkUrl).openConnection.asInstanceOf[HttpURLConnection]
conn.setRequestMethod("GET")
conn.connect()
val resp = conn.getResponseCode
val msg = conn.getResponseMessage
conn.disconnect()
log.debug(s"Received response code $resp: $msg")
resp == 200
} catch {
case _: java.io.IOException =>
log.info(s"Failed to connect to $checkUrl")
false
}
}
// returns wdbhdfs url for the provided path name
def webhdfsUrl(pathString: String, conf: JobConf): Try[String] = Try {
val fs = FileSystem.get(conf)
val federatedName = fs.resolvePath(new Path(pathString)).toString.replaceAll("hdfs://", "").split("/")(0)
val nnRpcAddrs = DFSUtil.getHaNnRpcAddresses(conf).asScala
val activeNn = nnRpcAddrs.get(federatedName) match {
// returns namenode -> address map for each nn configured
case Some(nnmap) => findActiveNnUrl(nnmap.asScala.values, conf)
case None => sys.error(s"No namenodes found for federated name $federatedName in current configuration $nnRpcAddrs")
}
activeNn match {
case Some(nn) =>
log.info(s"Found active namenode $nn")
nn
case None =>
sys.error(s"No active namenode found for federated name: $federatedName")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment