Skip to content

Instantly share code, notes, and snippets.

@RajaniCode
Last active December 17, 2019 06:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RajaniCode/5652b1ffeab2806e76df997b250258e5 to your computer and use it in GitHub Desktop.
Save RajaniCode/5652b1ffeab2806e76df997b250258e5 to your computer and use it in GitHub Desktop.
PySpark
./spark-1.6.1/bin/pyspark --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/nasa.eva?readPreference=primaryPreferred" \
--conf "spark.mongodb.output.uri=mongodb://127.0.0.1/nasa.astronautTotals" \
--packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0
# SparkContext available as sc, HiveContext available as sqlContext.
>>> dir(sc)
['PACKAGE_EXTENSIONS', '__class__', '__delattr__', '__dict__', '__doc__', '__enter__', '__exit__', '__format__', '__getattribute__', '__getnewargs__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_accumulatorServer', '_active_spark_context', '_batchSize', '_callsite', '_checkpointFile', '_conf', '_dictToJavaMap', '_do_init', '_ensure_initialized', '_gateway', '_getJavaStorageLevel', '_initialize_context', '_javaAccumulator', '_jsc', '_jvm', '_lock', '_next_accum_id', '_pickled_broadcast_vars', '_python_includes', '_temp_dir', '_unbatched_serializer', 'accumulator', 'addFile', 'addPyFile', 'appName', 'applicationId', 'binaryFiles', 'binaryRecords', 'broadcast', 'cancelAllJobs', 'cancelJobGroup', 'clearFiles', 'defaultMinPartitions', 'defaultParallelism', 'dump_profiles', 'emptyRDD', 'environment', 'getLocalProperty', 'getOrCreate', 'hadoopFile', 'hadoopRDD', 'master', 'newAPIHadoopFile', 'newAPIHadoopRDD', 'parallelize', 'pickleFile', 'profiler_collector', 'pythonExec', 'pythonVer', 'range', 'runJob', 'sequenceFile', 'serializer', 'setCheckpointDir', 'setJobGroup', 'setLocalProperty', 'setLogLevel', 'setSystemProperty', 'show_profiles', 'sparkHome', 'sparkUser', 'startTime', 'statusTracker', 'stop', 'textFile', 'union', 'version', 'wholeTextFiles']
>>> dir(sqlContext)
['__class__', '__delattr__', '__dict__', '__doc__', '__format__', '__getattribute__', '__hash__', '__init__', '__module__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_createFromLocal', '_createFromRDD', '_get_hive_ctx', '_inferSchema', '_inferSchemaFromList', '_instantiatedContext', '_jsc', '_jvm', '_sc', '_scala_SQLContext', '_ssql_ctx', 'applySchema', 'cacheTable', 'clearCache', 'createDataFrame', 'createExternalTable', 'dropTempTable', 'getConf', 'getOrCreate', 'inferSchema', 'jsonFile', 'jsonRDD', 'load', 'newSession', 'parquetFile', 'range', 'read', 'refreshTable', 'registerDataFrameAsTable', 'registerFunction', 'setConf', 'sql', 'table', 'tableNames', 'tables', 'udf', 'uncacheTable']
>>> dir(pyspark)
['Accumulator', 'AccumulatorParam', 'BasicProfiler', 'Broadcast', 'HiveContext', 'MarshalSerializer', 'PickleSerializer', 'Profiler', 'RDD', 'Row', 'SQLContext', 'SchemaRDD', 'SparkConf', 'SparkContext', 'SparkFiles', 'SparkJobInfo', 'SparkStageInfo', 'StatusTracker', 'StorageLevel', '__all__', '__builtins__', '__doc__', '__file__', '__name__', '__package__', '__path__', 'accumulators', 'broadcast', 'cloudpickle', 'conf', 'context', 'files', 'heapq3', 'java_gateway', 'join', 'profiler', 'rdd', 'rddsampler', 'resultiterable', 'serializers', 'shuffle', 'since', 'sql', 'statcounter', 'status', 'storagelevel', 'traceback_utils']
>>> dir(pyspark.sql)
['Column', 'DataFrame', 'DataFrameNaFunctions', 'DataFrameReader', 'DataFrameStatFunctions', 'DataFrameWriter', 'GroupedData', 'HiveContext', 'Row', 'SQLContext', 'SchemaRDD', 'Window', 'WindowSpec', '__all__', '__builtins__', '__doc__', '__file__', '__name__', '__package__', '__path__', 'absolute_import', 'column', 'context', 'dataframe', 'functions', 'group', 'readwriter', 'types', 'utils', 'window']
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType, StructField, StringType
sqlcontext = SQLContext(sc)
schema = StructType([StructField("_id", StringType()),
StructField("name", StringType()),
StructField("minutes", StringType())])
def project(doc):
return {"_id": str(doc["_id"]),
"name": str(doc["name"]),
"minutes": str(doc["minutes"])}
data_rdd = sc.emptyRDD()
projected_rdd = data_rdd.map(project)
train_df = sqlcontext.jsonRDD(projected_rdd, schema)
train_df.first()
> db.eva.find()
{ "_id" : ObjectId("58d5308fe55f952cb603f1e2"), "EVA #" : 1, "Country" : "USA", "Crew" : "Ed White", "Vehicle" : "Gemini IV", "Date" : "06/03/1965", "Duration" : "0:36", "Purpose" : "First U.S. EVA. Used HHMU and took photos. Gas flow cooling of 25ft umbilical overwhelmed by vehicle ingress work and helmet fogged. Lost overglove. Jettisoned thermal gloves and helmet sun visor" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1e3"), "EVA #" : 2, "Country" : "USA", "Crew" : "David Scott", "Vehicle" : "Gemini VIII", "Date" : "March 16-17, 1966", "Duration" : "0:00", "Purpose" : "HHMU EVA cancelled before starting by stuck on vehicle thruster that ended mission early" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1e4"), "EVA #" : 3, "Country" : "USA", "Crew" : "Eugene Cernan", "Vehicle" : "Gemini IX-A", "Date" : "06/05/1966", "Duration" : "2:07", "Purpose" : "Inadequate restraints, stiff 25ft umbilical and high workloads exceeded suit vent loop cooling capacity and caused fogging. Demo called off of tethered astronaut maneuvering unit" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1e5"), "EVA #" : 4, "Country" : "USA", "Crew" : "Mike Collins", "Vehicle" : "Gemini X", "Date" : "07/19/1966", "Duration" : "0:50", "Purpose" : "Standup EVA. UV photos of stars. Ended by eye irritation that impaired vision" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1e6"), "EVA #" : 5, "Country" : "USA", "Crew" : "Mike Collins", "Vehicle" : "Gemini X", "Date" : "07/20/1966", "Duration" : "0:39", "Purpose" : "Retrieved MMOD experiment from docked Agena. Used HHMU. Lost camera and retrieved experiment. EVA ended early by unrelated spacecraft problem" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1e7"), "EVA #" : 6, "Country" : "USA", "Crew" : "Richard Gordon", "Vehicle" : "Gemini XI", "Date" : "09/13/1966", "Duration" : "0:44", "Purpose" : "Attached tether between Agena and Gemini. EVA ended early due to fatigue, overheating & eye sweat" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1e8"), "EVA #" : 7, "Country" : "USA", "Crew" : "Richard Gordon", "Vehicle" : "Gemini XI", "Date" : "09/14/1966", "Duration" : "2:10", "Purpose" : "Standup EVA. Took star photos. Agena tether ops" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1e9"), "EVA #" : 8, "Country" : "USA", "Crew" : "Buzz Aldrin", "Vehicle" : "Gemini XII", "Date" : "11/12/1966", "Duration" : "2:29", "Purpose" : "Standup EVA. Science tasks. Took star photos" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1ea"), "EVA #" : 9, "Country" : "USA", "Crew" : "Buzz Aldrin", "Vehicle" : "Gemini XII", "Date" : "11/13/1966", "Duration" : "2:06", "Purpose" : "Attached tether between Agena and Gemini. UV photos of stars. Waist tether and Dutch shoe eval" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1eb"), "EVA #" : 10, "Country" : "USA", "Crew" : "Buzz Aldrin", "Vehicle" : "Gemini XII", "Date" : "11/14/1966", "Duration" : "0:55", "Purpose" : "Standup EVA. Jettisoned equipment. Took photos" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1ec"), "EVA #" : 11, "Country" : "USA", "Crew" : "David Scott", "Vehicle" : "Apollo 9", "Date" : "03/06/1969", "Duration" : "0:47", "Purpose" : "Standup EVA from crew module. Retrieved thermal experiment samples" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1ed"), "EVA #" : 12, "Country" : "USA", "Crew" : "Russ Schweickart", "Vehicle" : "Apollo 9", "Date" : "03/06/1969", "Duration" : "0:51", "Purpose" : "Lunar module based. Took photos. Evaluated foot restraint and handrails. Retrieved thermal experiment samples. First use of PLSS followed by recharge demo after EVA" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1ee"), "EVA #" : 13, "Country" : "USA", "Crew" : "Neil Armstrong Buzz Aldrin", "Vehicle" : "Apollo 11", "Date" : "07/20/1969", "Duration" : "2:32", "Purpose" : "First to walk on the moon. Some trouble getting out small hatch. 46.3 lb of geologic material collected. EASEP seismograph and laser reflector exp deployed. Solar wind exp deployed & retrieved. 400 ft (120m) circuit on foot. Dust issue post EVA" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1ef"), "EVA #" : 14, "Country" : "USA", "Crew" : "Neil Armstrong Buzz Aldrin", "Vehicle" : "Apollo 11", "Date" : "07/20/1969", "Duration" : "0:05", "Purpose" : "Jettison suit backpacks and equip to lighten ascent" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1f0"), "EVA #" : 15, "Country" : "USA", "Crew" : "Allen Bean Pete Conrad", "Vehicle" : "Apollo 12", "Date" : "11/19/1969", "Duration" : "3:39", "Purpose" : "Collected 75.6 lb of geologic material. ALSEP exp deployed. 6000 ft (1800m) circuit on foot" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1f1"), "EVA #" : 16, "Country" : "USA", "Crew" : "Allen Bean Pete Conrad", "Vehicle" : "Apollo 12", "Date" : "11/20/1969", "Duration" : "3:48", "Purpose" : "Retrieved parts of Surveyor 3 spacecraft." }
{ "_id" : ObjectId("58d5308fe55f952cb603f1f2"), "EVA #" : 17, "Country" : "USA", "Crew" : "Allen Bean Pete Conrad", "Vehicle" : "Apollo 12", "Date" : "11/20/1969", "Duration" : "0:05", "Purpose" : "Jettison suit backpacks and equip to lighten ascent" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1f3"), "EVA #" : 18, "Country" : "USA", "Crew" : "Ed Mitchell Alan Shepard", "Vehicle" : "Apollo 14", "Date" : "02/05/1971", "Duration" : "4:48", "Purpose" : "Collected 94.4 lb of geologic material. ALSEP and laser reflector exp deployed. Hiked up to 0.9 miles (1.5km) from lunar module. Used MET rickshaw" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1f4"), "EVA #" : 19, "Country" : "USA", "Crew" : "Ed Mitchell Alan Shepard", "Vehicle" : "Apollo 14", "Date" : "02/06/1971", "Duration" : "4:34", "Purpose" : "Sought but did not quite reach crater. Golf demo" }
{ "_id" : ObjectId("58d5308fe55f952cb603f1f5"), "EVA #" : 20, "Country" : "USA", "Crew" : "Ed Mitchell Alan Shepard", "Vehicle" : "Apollo 14", "Date" : "02/06/1971", "Duration" : "0:05", "Purpose" : "Jettison suit backpacks and equip to lighten ascent" }
Type "it" for more
>
root
|-- Country: string (nullable = true)
|-- Crew: string (nullable = true)
|-- Date: string (nullable = true)
|-- Duration: string (nullable = true)
|-- EVA #: conflict (nullable = true)
|-- Purpose: string (nullable = true)
|-- Vehicle: string (nullable = true)
|-- _id: string (nullable = true)
evasRdd = sc.parallelize([(1, "USA", "Ed White", "Gemini IV", "06/03/1965", "0:36", "First U.S. EVA. Used HHMU and took photos. Gas flow cooling of 25ft umbilical overwhelmed by vehicle ingress work and helmet fogged. Lost overglove. Jettisoned thermal gloves and helmet sun visor"),
(2, "USA", "David Scott", "Gemini VIII", "March 16-17, 1966", "0:00", "HHMU EVA cancelled before starting by stuck on vehicle thruster that ended mission early"),
(3, "USA", "Eugene Cernan", "Gemini IX-A", "06/05/1966", "2:07", "Inadequate restraints, stiff 25ft umbilical and high workloads exceeded suit vent loop cooling capacity and caused fogging. Demo called off of tethered astronaut maneuvering unit"),
(4, "USA", "Mike Collins", "Gemini X", "07/19/1966", "0:50", "Standup EVA. UV photos of stars. Ended by eye irritation that impaired vision"),
(5, "USA", "Mike Collins", "Gemini X", "07/20/1966", "0:39", "Retrieved MMOD experiment from docked Agena. Used HHMU. Lost camera and retrieved experiment. EVA ended early by unrelated spacecraft problem"),
(6, "USA", "Richard Gordon", "Gemini XI", "09/13/1966", "0:44", "Attached tether between Agena and Gemini. EVA ended early due to fatigue, overheating & eye sweat"),
(7, "USA", "Richard Gordon", "Gemini XI", "09/14/1966", "2:10", "Standup EVA. Took star photos. Agena tether ops"),
(8, "USA", "Buzz Aldrin", "Gemini XII", "11/12/1966", "2:29", "Standup EVA. Science tasks. Took star photos"),
(9, "USA", "Buzz Aldrin", "Gemini XII", "11/13/1966", "2:06", "Attached tether between Agena and Gemini. UV photos of stars. Waist tether and Dutch shoe eval"),
(10, "USA", "Buzz Aldrin", "Gemini XII", "11/14/1966", "0:55", "Standup EVA. Jettisoned equipment. Took photos"),
(11, "USA", "David Scott", "Apollo 9", "03/06/1969", "0:47", "Standup EVA from crew module. Retrieved thermal experiment samples"),
(12, "USA", "Russ Schweickart", "Apollo 9", "03/06/1969", "0:51", "Lunar module based. Took photos. Evaluated foot restraint and handrails. Retrieved thermal experiment samples. First use of PLSS followed by recharge demo after EVA"),
(13, "USA", "Neil Armstrong Buzz Aldrin", "Apollo 11", "07/20/1969", "2:32", "First to walk on the moon. Some trouble getting out small hatch. 46.3 lb of geologic material collected. EASEP seismograph and laser reflector exp deployed. Solar wind exp deployed & retrieved. 400 ft (120m) circuit on foot. Dust issue post EVA"),
(14, "USA", "Neil Armstrong Buzz Aldrin", "Apollo 11", "07/20/1969", "0:05", "Jettison suit backpacks and equip to lighten ascent"),
(15, "USA", "Allen Bean Pete Conrad", "Apollo 12", "11/19/1969", "3:39", "Collected 75.6 lb of geologic material. ALSEP exp deployed. 6000 ft (1800m) circuit on foot"),
(16, "USA", "Allen Bean Pete Conrad", "Apollo 12", "11/20/1969", "3:48", "Retrieved parts of Surveyor 3 spacecraft."),
(17, "USA", "Allen Bean Pete Conrad", "Apollo 12", "11/20/1969", "0:05", "Jettison suit backpacks and equip to lighten ascent"),
(18, "USA", "Ed Mitchell Alan Shepard", "Apollo 14", "02/05/1971", "4:48", "Collected 94.4 lb of geologic material. ALSEP and laser reflector exp deployed. Hiked up to 0.9 miles (1.5km) from lunar module. Used MET rickshaw"),
(19, "USA", "Ed Mitchell Alan Shepard", "Apollo 14", "02/06/1971", "4:34", "Sought but did not quite reach crater. Golf demo"),
(20, "USA", "Ed Mitchell Alan Shepard", "Apollo 14", "02/06/1971", "0:05", "Jettison suit backpacks and equip to lighten ascent")])
evas = sqlContext.createDataFrame(evasRdd, ["EVA #", "Country", "Crew", "Vehicle", "Date", "Duration", "Purpose"])
evas.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").save()
df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.printSchema()
[
namesRdd = sc.parallelize([("Mike Gernhardt", 1396), ("Alvin Drew", 768), ("Leroy Chiao", 2176), ("Alexei Leonov", 12), ("Rick Mastracchio", 2311),
("Alexandr Viktorenko", 1184), ("Dave Williams", 1067), ("Chris Cassidy", 1875), ("Kathy Thornton", 1270), ("Yuri Onufrenko", 1830)])
names = sqlContext.createDataFrame(namesRdd, ["name", "minutes"])
names.write.format("com.mongodb.spark.sql.DefaultSource").mode("overwrite").save()
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment