Created
April 2, 2015 22:49
-
-
Save rubanm/d9cfc6148a2ab3827224 to your computer and use it in GitHub Desktop.
Finding webhdfs endpoint for a file with HA namenode
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.hadoop.hdfs.DFSUtil | |
import org.apache.hadoop.mapred.JobConf | |
import org.slf4j.LoggerFactory | |
import java.net.{ HttpURLConnection, InetSocketAddress, URL } | |
import scala.collection.JavaConverters._ | |
import scala.util.Try | |
/** | |
* Detects the currently active namenode and returns its webhdfs url for the specified path. | |
* | |
* Usage: | |
* HdfsUtil.webhdfsUrl("/path/to/file", conf) | |
*/ | |
object HdfsUtil { | |
private val log = LoggerFactory.getLogger(this.getClass) | |
// finds the active namenode from addrs provided | |
// and returns the webhdfs url for it | |
private def findActiveNnUrl(addrs: Iterable[InetSocketAddress], conf: JobConf): Option[String] = | |
addrs | |
.map { addr => | |
// convert rpc address to http | |
val httpUrl = DFSUtil.getInfoServer(addr, conf, "http").toString | |
s"${httpUrl}/webhdfs/v1" | |
} | |
.find { url => | |
// liststatus used as ping operation | |
val checkUrl = s"$url/?OP=LISTSTATUS" | |
log.debug(s"Checking namenode status: $checkUrl") | |
try { | |
val conn = new URL(checkUrl).openConnection.asInstanceOf[HttpURLConnection] | |
conn.setRequestMethod("GET") | |
conn.connect() | |
val resp = conn.getResponseCode | |
val msg = conn.getResponseMessage | |
conn.disconnect() | |
log.debug(s"Received response code $resp: $msg") | |
resp == 200 | |
} catch { | |
case _: java.io.IOException => | |
log.info(s"Failed to connect to $checkUrl") | |
false | |
} | |
} | |
// returns wdbhdfs url for the provided path name | |
def webhdfsUrl(pathString: String, conf: JobConf): Try[String] = Try { | |
val fs = FileSystem.get(conf) | |
val federatedName = fs.resolvePath(new Path(pathString)).toString.replaceAll("hdfs://", "").split("/")(0) | |
val nnRpcAddrs = DFSUtil.getHaNnRpcAddresses(conf).asScala | |
val activeNn = nnRpcAddrs.get(federatedName) match { | |
// returns namenode -> address map for each nn configured | |
case Some(nnmap) => findActiveNnUrl(nnmap.asScala.values, conf) | |
case None => sys.error(s"No namenodes found for federated name $federatedName in current configuration $nnRpcAddrs") | |
} | |
activeNn match { | |
case Some(nn) => | |
log.info(s"Found active namenode $nn") | |
nn | |
case None => | |
sys.error(s"No active namenode found for federated name: $federatedName") | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment