Skip to content

Instantly share code, notes, and snippets.

@vepetkov
Created September 4, 2018 11:10
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save vepetkov/db4ac08899a1cec434c55a1cca597d8c to your computer and use it in GitHub Desktop.
Save vepetkov/db4ac08899a1cec434c55a1cca597d8c to your computer and use it in GitHub Desktop.
Python HDFS + Parquet (hdfs3, PyArrow + libhdfs, HdfsCLI + Knox)
##################################################################
## Native hdfs access (only on the cluster)
# conda install -c conda-forge libhdfs3=2.3.0=1 hdfs3 --yes
import hdfs3
import pandas as pd
nameNodeHost = 'hadoopnn1.localdomain'
nameNodeIPCPort = 8020
hdfs = hdfs3.HDFileSystem(nameNodeHost, port=nameNodeIPCPort)
# ls remote folder
hdfs.ls('/user/hdfs', detail=False)
# pandas csv using a file descriptor
with hdfs.open('/user/hdfs/user_stats.log') as f:
df = pd.read_csv(f, nrows=1000)
import pyarrow as pa
with hdfs.open('/user/hdfs/user_loc_data.parquet') as f:
table = pa.read_parquet(f)
##################################################################
## PyArrow
# libhdfs.so path
import os
import subprocess
from subprocess import PIPE
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
cmd = ["locate", "-l", "1", "libhdfs.so"]
libhdfsso_path = subprocess\
.Popen(cmd, stdout=PIPE)\
.stdout\
.read()\
.rstrip()
os.environ["ARROW_LIBHDFS_DIR"] = os.path.dirname(libhdfsso_path.decode("utf-8") )
# classpath
cmd = ["/usr/bin/hdfs", "classpath", "--glob"]
hadoop_cp = subprocess\
.Popen(cmd, stdout=PIPE)\
.stdout\
.read()\
.rstrip()
os.environ["CLASSPATH"] = hadoop_cp.decode("utf-8")
nameNodeHost = 'hadoopnn1.localdomain'
nameNodeIPCPort = 8020
fs = pa.hdfs.connect(nameNodeHost, nameNodeIPCPort, user='hdfs')
fs.ls('/user/hdfs/')
# Either use a Python file as with hdfs3 or
# define a pyarrow dataset (much faster access)
dataset = pq.ParquetDataset('/user/hdfs/user_loc_data.parquet', filesystem=fs)
table = dataset.read(nthreads=10)
# print some info about the table
table.num_columns
table.num_rows
table.schema
# Convert the PyArrow Table to Pandas DF
df = table.to_pandas()
##################################################################
## Access using Knox Gateway and the HdfsCLI
# conda install -c conda-forge python-hdfs
from hdfs import InsecureClient, HdfsError
import pandas as pd
nameNodeHost = 'hadoopnn1.localdomain'
nameNodeHttpPort = 50070
webhdfsUrl = f'http://{nameNodeHost}:{nameNodeHttpPort}'
client = InsecureClient(webhdfsUrl, user='hdfs')
with client.read('/user/hdfs/user_stats.log', encoding = 'utf-8') as f:
df = pd.read_csv(f, nrows=1000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment