Skip to content

Instantly share code, notes, and snippets.

@carlosmarin
Last active May 4, 2018 23:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save carlosmarin/c47bf0335aec660882d2aaaf32850d51 to your computer and use it in GitHub Desktop.
Save carlosmarin/c47bf0335aec660882d2aaaf32850d51 to your computer and use it in GitHub Desktop.
# pyspark --driver-class-path "/opt/mongodb-spark-connector/mongodb-driver-3.7.0.jar:/opt/mongodb-spark-connector/mongodb-driver-core-3.7.0.jar:/opt/mongodb-spark-connector/bson-3.7.0.jar:/opt/mongodb-spark-connector/mongo-spark-connector_2.10-1.1.0.jar"
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import getpass
import string
import urllib
try: input = raw_input #for Python 2.x compatibility
except NameError: pass
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.mongodb.spark:mongo-spark-connector_2.10:1.1.0 pyspark-shell'
MONGO_HOST = input("Mongo hostname: ")
MONGO_USER = input("Mongo username: ")
MONGO_PASSWORD = getpass.getpass('Mongo password:')
DATABASE = input("Database name: ")
COLLECTION = input("Collection name: ")
if '@' in MONGO_USER:
MONGO_USER = urllib.quote_plus(MONGO_USER) # Python 2
# MONGO_USER = urllib.parse.quote(MONGO_USER) # Python 3
# LDAP
MONGO_URI = string.Template("mongodb://${MONGO_USER}:${MONGO_PASSWORD}@${MONGO_HOST}:27315/${DATABASE}?&ssl=true&authMechanism=PLAIN&authSource=$$external").substitute(locals())
# Kerberos
# MONGO_URI = string.Template("mongodb://${MONGO_USER}@${MONGO_HOST}:27315/${DATABASE}?&ssl=true&authMechanism=GSSAPI&authSource=$$external").substitute(locals())
conf = SparkConf()\
.setAppName("pyspark mongo")\
.setMaster("local[*]")\
.set("spark.mongodb.input.uri", MONGO_URI)\
.set("spark.mongodb.input.database", DATABASE)\
.set("spark.mongodb.input.collection", COLLECTION)
sc.stop() # stop current shell SparkContext
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)
df = sql_context.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.count()
df.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment