Skip to content

Instantly share code, notes, and snippets.

@bigsnarfdude
Created July 1, 2014 05:03
Show Gist options
  • Save bigsnarfdude/c252d3b13ec2507b5012 to your computer and use it in GitHub Desktop.
Save bigsnarfdude/c252d3b13ec2507b5012 to your computer and use it in GitHub Desktop.
Load Spark SQL from File, JSON file, or arrays
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": "",
"signature": "sha256:bd3b1a4d62b88d99f662b693b788f433bbf49f9f1d7f66187514a7a3a04e21d8"
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Experiments to get json files loaded into SQL table"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"cat sql.py"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"#\r\n",
"# Licensed to the Apache Software Foundation (ASF) under one or more\r\n",
"# contributor license agreements. See the NOTICE file distributed with\r\n",
"# this work for additional information regarding copyright ownership.\r\n",
"# The ASF licenses this file to You under the Apache License, Version 2.0\r\n",
"# (the \"License\"); you may not use this file except in compliance with\r\n",
"# the License. You may obtain a copy of the License at\r\n",
"#\r\n",
"# http://www.apache.org/licenses/LICENSE-2.0\r\n",
"#\r\n",
"# Unless required by applicable law or agreed to in writing, software\r\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\r\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r\n",
"# See the License for the specific language governing permissions and\r\n",
"# limitations under the License.\r\n",
"#\r\n",
"\r\n",
"from pyspark.rdd import RDD, PipelinedRDD\r\n",
"from pyspark.serializers import BatchedSerializer, PickleSerializer\r\n",
"\r\n",
"from py4j.protocol import Py4JError\r\n",
"\r\n",
"__all__ = [\"SQLContext\", \"HiveContext\", \"LocalHiveContext\", \"TestHiveContext\", \"SchemaRDD\", \"Row\"]\r\n",
"\r\n",
"\r\n",
"class SQLContext:\r\n",
" \"\"\"Main entry point for SparkSQL functionality.\r\n",
"\r\n",
" A SQLContext can be used create L{SchemaRDD}s, register L{SchemaRDD}s as\r\n",
" tables, execute SQL over tables, cache tables, and read parquet files.\r\n",
" \"\"\"\r\n",
"\r\n",
" def __init__(self, sparkContext, sqlContext = None):\r\n",
" \"\"\"Create a new SQLContext.\r\n",
"\r\n",
" @param sparkContext: The SparkContext to wrap.\r\n",
"\r\n",
" >>> srdd = sqlCtx.inferSchema(rdd)\r\n",
" >>> sqlCtx.inferSchema(srdd) # doctest: +IGNORE_EXCEPTION_DETAIL\r\n",
" Traceback (most recent call last):\r\n",
" ...\r\n",
" ValueError:...\r\n",
"\r\n",
" >>> bad_rdd = sc.parallelize([1,2,3])\r\n",
" >>> sqlCtx.inferSchema(bad_rdd) # doctest: +IGNORE_EXCEPTION_DETAIL\r\n",
" Traceback (most recent call last):\r\n",
" ...\r\n",
" ValueError:...\r\n",
"\r\n",
" >>> allTypes = sc.parallelize([{\"int\" : 1, \"string\" : \"string\", \"double\" : 1.0, \"long\": 1L,\r\n",
" ... \"boolean\" : True}])\r\n",
" >>> srdd = sqlCtx.inferSchema(allTypes).map(lambda x: (x.int, x.string, x.double, x.long,\r\n",
" ... x.boolean))\r\n",
" >>> srdd.collect()[0]\r\n",
" (1, u'string', 1.0, 1, True)\r\n",
" \"\"\"\r\n",
" self._sc = sparkContext\r\n",
" self._jsc = self._sc._jsc\r\n",
" self._jvm = self._sc._jvm\r\n",
" self._pythonToJavaMap = self._jvm.PythonRDD.pythonToJavaMap\r\n",
"\r\n",
" if sqlContext:\r\n",
" self._scala_SQLContext = sqlContext\r\n",
"\r\n",
" @property\r\n",
" def _ssql_ctx(self):\r\n",
" \"\"\"Accessor for the JVM SparkSQL context.\r\n",
"\r\n",
" Subclasses can override this property to provide their own\r\n",
" JVM Contexts.\r\n",
" \"\"\"\r\n",
" if not hasattr(self, '_scala_SQLContext'):\r\n",
" self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc())\r\n",
" return self._scala_SQLContext\r\n",
"\r\n",
" def inferSchema(self, rdd):\r\n",
" \"\"\"Infer and apply a schema to an RDD of L{dict}s.\r\n",
"\r\n",
" We peek at the first row of the RDD to determine the fields names\r\n",
" and types, and then use that to extract all the dictionaries. Nested\r\n",
" collections are supported, which include array, dict, list, set, and\r\n",
" tuple.\r\n",
"\r\n",
" >>> srdd = sqlCtx.inferSchema(rdd)\r\n",
" >>> srdd.collect() == [{\"field1\" : 1, \"field2\" : \"row1\"}, {\"field1\" : 2, \"field2\": \"row2\"},\r\n",
" ... {\"field1\" : 3, \"field2\": \"row3\"}]\r\n",
" True\r\n",
"\r\n",
" >>> from array import array\r\n",
" >>> srdd = sqlCtx.inferSchema(nestedRdd1)\r\n",
" >>> srdd.collect() == [{\"f1\" : array('i', [1, 2]), \"f2\" : {\"row1\" : 1.0}},\r\n",
" ... {\"f1\" : array('i', [2, 3]), \"f2\" : {\"row2\" : 2.0}}]\r\n",
" True\r\n",
"\r\n",
" >>> srdd = sqlCtx.inferSchema(nestedRdd2)\r\n",
" >>> srdd.collect() == [{\"f1\" : [[1, 2], [2, 3]], \"f2\" : set([1, 2]), \"f3\" : (1, 2)},\r\n",
" ... {\"f1\" : [[2, 3], [3, 4]], \"f2\" : set([2, 3]), \"f3\" : (2, 3)}]\r\n",
" True\r\n",
" \"\"\"\r\n",
" if (rdd.__class__ is SchemaRDD):\r\n",
" raise ValueError(\"Cannot apply schema to %s\" % SchemaRDD.__name__)\r\n",
" elif not isinstance(rdd.first(), dict):\r\n",
" raise ValueError(\"Only RDDs with dictionaries can be converted to %s: %s\" %\r\n",
" (SchemaRDD.__name__, rdd.first()))\r\n",
"\r\n",
" jrdd = self._pythonToJavaMap(rdd._jrdd)\r\n",
" srdd = self._ssql_ctx.inferSchema(jrdd.rdd())\r\n",
" return SchemaRDD(srdd, self)\r\n",
"\r\n",
" def registerRDDAsTable(self, rdd, tableName):\r\n",
" \"\"\"Registers the given RDD as a temporary table in the catalog.\r\n",
"\r\n",
" Temporary tables exist only during the lifetime of this instance of\r\n",
" SQLContext.\r\n",
"\r\n",
" >>> srdd = sqlCtx.inferSchema(rdd)\r\n",
" >>> sqlCtx.registerRDDAsTable(srdd, \"table1\")\r\n",
" \"\"\"\r\n",
" if (rdd.__class__ is SchemaRDD):\r\n",
" jschema_rdd = rdd._jschema_rdd\r\n",
" self._ssql_ctx.registerRDDAsTable(jschema_rdd, tableName)\r\n",
" else:\r\n",
" raise ValueError(\"Can only register SchemaRDD as table\")\r\n",
"\r\n",
" def parquetFile(self, path):\r\n",
" \"\"\"Loads a Parquet file, returning the result as a L{SchemaRDD}.\r\n",
"\r\n",
" >>> import tempfile, shutil\r\n",
" >>> parquetFile = tempfile.mkdtemp()\r\n",
" >>> shutil.rmtree(parquetFile)\r\n",
" >>> srdd = sqlCtx.inferSchema(rdd)\r\n",
" >>> srdd.saveAsParquetFile(parquetFile)\r\n",
" >>> srdd2 = sqlCtx.parquetFile(parquetFile)\r\n",
" >>> sorted(srdd.collect()) == sorted(srdd2.collect())\r\n",
" True\r\n",
" \"\"\"\r\n",
" jschema_rdd = self._ssql_ctx.parquetFile(path)\r\n",
" return SchemaRDD(jschema_rdd, self)\r\n",
"\r\n",
"\r\n",
" def jsonFile(self, path):\r\n",
" \"\"\"Loads a text file storing one JSON object per line,\r\n",
" returning the result as a L{SchemaRDD}.\r\n",
" It goes through the entire dataset once to determine the schema.\r\n",
"\r\n",
" >>> import tempfile, shutil\r\n",
" >>> jsonFile = tempfile.mkdtemp()\r\n",
" >>> shutil.rmtree(jsonFile)\r\n",
" >>> ofn = open(jsonFile, 'w')\r\n",
" >>> for json in jsonStrings:\r\n",
" ... print>>ofn, json\r\n",
" >>> ofn.close()\r\n",
" >>> srdd = sqlCtx.jsonFile(jsonFile)\r\n",
" >>> sqlCtx.registerRDDAsTable(srdd, \"table1\")\r\n",
" >>> srdd2 = sqlCtx.sql(\"SELECT field1 AS f1, field2 as f2, field3 as f3 from table1\")\r\n",
" >>> srdd2.collect() == [{\"f1\": 1, \"f2\": \"row1\", \"f3\":{\"field4\":11}},\r\n",
" ... {\"f1\": 2, \"f2\": \"row2\", \"f3\":{\"field4\":22}},\r\n",
" ... {\"f1\": 3, \"f2\": \"row3\", \"f3\":{\"field4\":33}}]\r\n",
" True\r\n",
" \"\"\"\r\n",
" jschema_rdd = self._ssql_ctx.jsonFile(path)\r\n",
" return SchemaRDD(jschema_rdd, self)\r\n",
"\r\n",
" def jsonRDD(self, rdd):\r\n",
" \"\"\"Loads an RDD storing one JSON object per string, returning the result as a L{SchemaRDD}.\r\n",
" It goes through the entire dataset once to determine the schema.\r\n",
"\r\n",
" >>> srdd = sqlCtx.jsonRDD(json)\r\n",
" >>> sqlCtx.registerRDDAsTable(srdd, \"table1\")\r\n",
" >>> srdd2 = sqlCtx.sql(\"SELECT field1 AS f1, field2 as f2, field3 as f3 from table1\")\r\n",
" >>> srdd2.collect() == [{\"f1\": 1, \"f2\": \"row1\", \"f3\":{\"field4\":11}},\r\n",
" ... {\"f1\": 2, \"f2\": \"row2\", \"f3\":{\"field4\":22}},\r\n",
" ... {\"f1\": 3, \"f2\": \"row3\", \"f3\":{\"field4\":33}}]\r\n",
" True\r\n",
" \"\"\"\r\n",
" def func(split, iterator):\r\n",
" for x in iterator:\r\n",
" if not isinstance(x, basestring):\r\n",
" x = unicode(x)\r\n",
" yield x.encode(\"utf-8\")\r\n",
" keyed = PipelinedRDD(rdd, func)\r\n",
" keyed._bypass_serializer = True\r\n",
" jrdd = keyed._jrdd.map(self._jvm.BytesToString())\r\n",
" jschema_rdd = self._ssql_ctx.jsonRDD(jrdd.rdd())\r\n",
" return SchemaRDD(jschema_rdd, self)\r\n",
"\r\n",
" def sql(self, sqlQuery):\r\n",
" \"\"\"Return a L{SchemaRDD} representing the result of the given query.\r\n",
"\r\n",
" >>> srdd = sqlCtx.inferSchema(rdd)\r\n",
" >>> sqlCtx.registerRDDAsTable(srdd, \"table1\")\r\n",
" >>> srdd2 = sqlCtx.sql(\"SELECT field1 AS f1, field2 as f2 from table1\")\r\n",
" >>> srdd2.collect() == [{\"f1\" : 1, \"f2\" : \"row1\"}, {\"f1\" : 2, \"f2\": \"row2\"},\r\n",
" ... {\"f1\" : 3, \"f2\": \"row3\"}]\r\n",
" True\r\n",
" \"\"\"\r\n",
" return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self)\r\n",
"\r\n",
" def table(self, tableName):\r\n",
" \"\"\"Returns the specified table as a L{SchemaRDD}.\r\n",
"\r\n",
" >>> srdd = sqlCtx.inferSchema(rdd)\r\n",
" >>> sqlCtx.registerRDDAsTable(srdd, \"table1\")\r\n",
" >>> srdd2 = sqlCtx.table(\"table1\")\r\n",
" >>> sorted(srdd.collect()) == sorted(srdd2.collect())\r\n",
" True\r\n",
" \"\"\"\r\n",
" return SchemaRDD(self._ssql_ctx.table(tableName), self)\r\n",
"\r\n",
" def cacheTable(self, tableName):\r\n",
" \"\"\"Caches the specified table in-memory.\"\"\"\r\n",
" self._ssql_ctx.cacheTable(tableName)\r\n",
"\r\n",
" def uncacheTable(self, tableName):\r\n",
" \"\"\"Removes the specified table from the in-memory cache.\"\"\"\r\n",
" self._ssql_ctx.uncacheTable(tableName)\r\n",
"\r\n",
"\r\n",
"class HiveContext(SQLContext):\r\n",
" \"\"\"A variant of Spark SQL that integrates with data stored in Hive.\r\n",
"\r\n",
" Configuration for Hive is read from hive-site.xml on the classpath.\r\n",
" It supports running both SQL and HiveQL commands.\r\n",
" \"\"\"\r\n",
"\r\n",
" @property\r\n",
" def _ssql_ctx(self):\r\n",
" try:\r\n",
" if not hasattr(self, '_scala_HiveContext'):\r\n",
" self._scala_HiveContext = self._get_hive_ctx()\r\n",
" return self._scala_HiveContext\r\n",
" except Py4JError as e:\r\n",
" raise Exception(\"You must build Spark with Hive. Export 'SPARK_HIVE=true' and run \" \\\r\n",
" \"sbt/sbt assembly\" , e)\r\n",
"\r\n",
" def _get_hive_ctx(self):\r\n",
" return self._jvm.HiveContext(self._jsc.sc())\r\n",
"\r\n",
" def hiveql(self, hqlQuery):\r\n",
" \"\"\"\r\n",
" Runs a query expressed in HiveQL, returning the result as a L{SchemaRDD}.\r\n",
" \"\"\"\r\n",
" return SchemaRDD(self._ssql_ctx.hiveql(hqlQuery), self)\r\n",
"\r\n",
" def hql(self, hqlQuery):\r\n",
" \"\"\"\r\n",
" Runs a query expressed in HiveQL, returning the result as a L{SchemaRDD}.\r\n",
" \"\"\"\r\n",
" return self.hiveql(hqlQuery)\r\n",
"\r\n",
"\r\n",
"class LocalHiveContext(HiveContext):\r\n",
" \"\"\"Starts up an instance of hive where metadata is stored locally.\r\n",
"\r\n",
" An in-process metadata data is created with data stored in ./metadata.\r\n",
" Warehouse data is stored in in ./warehouse.\r\n",
"\r\n",
" >>> import os\r\n",
" >>> hiveCtx = LocalHiveContext(sc)\r\n",
" >>> try:\r\n",
" ... supress = hiveCtx.hql(\"DROP TABLE src\")\r\n",
" ... except Exception:\r\n",
" ... pass\r\n",
" >>> kv1 = os.path.join(os.environ[\"SPARK_HOME\"], 'examples/src/main/resources/kv1.txt')\r\n",
" >>> supress = hiveCtx.hql(\"CREATE TABLE IF NOT EXISTS src (key INT, value STRING)\")\r\n",
" >>> supress = hiveCtx.hql(\"LOAD DATA LOCAL INPATH '%s' INTO TABLE src\" % kv1)\r\n",
" >>> results = hiveCtx.hql(\"FROM src SELECT value\").map(lambda r: int(r.value.split('_')[1]))\r\n",
" >>> num = results.count()\r\n",
" >>> reduce_sum = results.reduce(lambda x, y: x + y)\r\n",
" >>> num\r\n",
" 500\r\n",
" >>> reduce_sum\r\n",
" 130091\r\n",
" \"\"\"\r\n",
"\r\n",
" def _get_hive_ctx(self):\r\n",
" return self._jvm.LocalHiveContext(self._jsc.sc())\r\n",
"\r\n",
"\r\n",
"class TestHiveContext(HiveContext):\r\n",
"\r\n",
" def _get_hive_ctx(self):\r\n",
" return self._jvm.TestHiveContext(self._jsc.sc())\r\n",
"\r\n",
"\r\n",
"# TODO: Investigate if it is more efficient to use a namedtuple. One problem is that named tuples\r\n",
"# are custom classes that must be generated per Schema.\r\n",
"class Row(dict):\r\n",
" \"\"\"A row in L{SchemaRDD}.\r\n",
"\r\n",
" An extended L{dict} that takes a L{dict} in its constructor, and\r\n",
" exposes those items as fields.\r\n",
"\r\n",
" >>> r = Row({\"hello\" : \"world\", \"foo\" : \"bar\"})\r\n",
" >>> r.hello\r\n",
" 'world'\r\n",
" >>> r.foo\r\n",
" 'bar'\r\n",
" \"\"\"\r\n",
"\r\n",
" def __init__(self, d):\r\n",
" d.update(self.__dict__)\r\n",
" self.__dict__ = d\r\n",
" dict.__init__(self, d)\r\n",
"\r\n",
"\r\n",
"class SchemaRDD(RDD):\r\n",
" \"\"\"An RDD of L{Row} objects that has an associated schema.\r\n",
"\r\n",
" The underlying JVM object is a SchemaRDD, not a PythonRDD, so we can\r\n",
" utilize the relational query api exposed by SparkSQL.\r\n",
"\r\n",
" For normal L{pyspark.rdd.RDD} operations (map, count, etc.) the\r\n",
" L{SchemaRDD} is not operated on directly, as it's underlying\r\n",
" implementation is an RDD composed of Java objects. Instead it is\r\n",
" converted to a PythonRDD in the JVM, on which Python operations can\r\n",
" be done.\r\n",
" \"\"\"\r\n",
"\r\n",
" def __init__(self, jschema_rdd, sql_ctx):\r\n",
" self.sql_ctx = sql_ctx\r\n",
" self._sc = sql_ctx._sc\r\n",
" self._jschema_rdd = jschema_rdd\r\n",
"\r\n",
" self.is_cached = False\r\n",
" self.is_checkpointed = False\r\n",
" self.ctx = self.sql_ctx._sc\r\n",
" self._jrdd_deserializer = self.ctx.serializer\r\n",
"\r\n",
" @property\r\n",
" def _jrdd(self):\r\n",
" \"\"\"Lazy evaluation of PythonRDD object.\r\n",
"\r\n",
" Only done when a user calls methods defined by the\r\n",
" L{pyspark.rdd.RDD} super class (map, filter, etc.).\r\n",
" \"\"\"\r\n",
" if not hasattr(self, '_lazy_jrdd'):\r\n",
" self._lazy_jrdd = self._toPython()._jrdd\r\n",
" return self._lazy_jrdd\r\n",
"\r\n",
" @property\r\n",
" def _id(self):\r\n",
" return self._jrdd.id()\r\n",
"\r\n",
" def saveAsParquetFile(self, path):\r\n",
" \"\"\"Save the contents as a Parquet file, preserving the schema.\r\n",
"\r\n",
" Files that are written out using this method can be read back in as\r\n",
" a SchemaRDD using the L{SQLContext.parquetFile} method.\r\n",
"\r\n",
" >>> import tempfile, shutil\r\n",
" >>> parquetFile = tempfile.mkdtemp()\r\n",
" >>> shutil.rmtree(parquetFile)\r\n",
" >>> srdd = sqlCtx.inferSchema(rdd)\r\n",
" >>> srdd.saveAsParquetFile(parquetFile)\r\n",
" >>> srdd2 = sqlCtx.parquetFile(parquetFile)\r\n",
" >>> sorted(srdd2.collect()) == sorted(srdd.collect())\r\n",
" True\r\n",
" \"\"\"\r\n",
" self._jschema_rdd.saveAsParquetFile(path)\r\n",
"\r\n",
" def registerAsTable(self, name):\r\n",
" \"\"\"Registers this RDD as a temporary table using the given name.\r\n",
"\r\n",
" The lifetime of this temporary table is tied to the L{SQLContext}\r\n",
" that was used to create this SchemaRDD.\r\n",
"\r\n",
" >>> srdd = sqlCtx.inferSchema(rdd)\r\n",
" >>> srdd.registerAsTable(\"test\")\r\n",
" >>> srdd2 = sqlCtx.sql(\"select * from test\")\r\n",
" >>> sorted(srdd.collect()) == sorted(srdd2.collect())\r\n",
" True\r\n",
" \"\"\"\r\n",
" self._jschema_rdd.registerAsTable(name)\r\n",
"\r\n",
" def insertInto(self, tableName, overwrite = False):\r\n",
" \"\"\"Inserts the contents of this SchemaRDD into the specified table.\r\n",
"\r\n",
" Optionally overwriting any existing data.\r\n",
" \"\"\"\r\n",
" self._jschema_rdd.insertInto(tableName, overwrite)\r\n",
"\r\n",
" def saveAsTable(self, tableName):\r\n",
" \"\"\"Creates a new table with the contents of this SchemaRDD.\"\"\"\r\n",
" self._jschema_rdd.saveAsTable(tableName)\r\n",
"\r\n",
" def schemaString(self):\r\n",
" \"\"\"Returns the output schema in the tree format.\"\"\"\r\n",
" return self._jschema_rdd.schemaString()\r\n",
"\r\n",
" def printSchema(self):\r\n",
" \"\"\"Prints out the schema in the tree format.\"\"\"\r\n",
" print self.schemaString()\r\n",
"\r\n",
" def count(self):\r\n",
" \"\"\"Return the number of elements in this RDD.\r\n",
"\r\n",
" Unlike the base RDD implementation of count, this implementation\r\n",
" leverages the query optimizer to compute the count on the SchemaRDD,\r\n",
" which supports features such as filter pushdown.\r\n",
"\r\n",
" >>> srdd = sqlCtx.inferSchema(rdd)\r\n",
" >>> srdd.count()\r\n",
" 3L\r\n",
" >>> srdd.count() == srdd.map(lambda x: x).count()\r\n",
" True\r\n",
" \"\"\"\r",
"\r\n",
" return self._jschema_rdd.count()\r\n",
"\r\n",
" def _toPython(self):\r\n",
" # We have to import the Row class explicitly, so that the reference Pickler has is\r\n",
" # pyspark.sql.Row instead of __main__.Row\r\n",
" from pyspark.sql import Row\r\n",
" jrdd = self._jschema_rdd.javaToPython()\r\n",
" # TODO: This is inefficient, we should construct the Python Row object\r\n",
" # in Java land in the javaToPython function. May require a custom\r\n",
" # pickle serializer in Pyrolite\r\n",
" return RDD(jrdd, self._sc, BatchedSerializer(\r\n",
" PickleSerializer())).map(lambda d: Row(d))\r\n",
"\r\n",
" # We override the default cache/persist/checkpoint behavior as we want to cache the underlying\r\n",
" # SchemaRDD object in the JVM, not the PythonRDD checkpointed by the super class\r\n",
" def cache(self):\r\n",
" self.is_cached = True\r\n",
" self._jschema_rdd.cache()\r\n",
" return self\r\n",
"\r\n",
" def persist(self, storageLevel):\r\n",
" self.is_cached = True\r\n",
" javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)\r\n",
" self._jschema_rdd.persist(javaStorageLevel)\r\n",
" return self\r\n",
"\r\n",
" def unpersist(self):\r\n",
" self.is_cached = False\r\n",
" self._jschema_rdd.unpersist()\r\n",
" return self\r\n",
"\r\n",
" def checkpoint(self):\r\n",
" self.is_checkpointed = True\r\n",
" self._jschema_rdd.checkpoint()\r\n",
"\r\n",
" def isCheckpointed(self):\r\n",
" return self._jschema_rdd.isCheckpointed()\r\n",
"\r\n",
" def getCheckpointFile(self):\r\n",
" checkpointFile = self._jschema_rdd.getCheckpointFile()\r\n",
" if checkpointFile.isDefined():\r\n",
" return checkpointFile.get()\r\n",
" else:\r\n",
" return None\r\n",
"\r\n",
" def coalesce(self, numPartitions, shuffle=False):\r\n",
" rdd = self._jschema_rdd.coalesce(numPartitions, shuffle)\r\n",
" return SchemaRDD(rdd, self.sql_ctx)\r\n",
"\r\n",
" def distinct(self):\r\n",
" rdd = self._jschema_rdd.distinct()\r\n",
" return SchemaRDD(rdd, self.sql_ctx)\r\n",
"\r\n",
" def intersection(self, other):\r\n",
" if (other.__class__ is SchemaRDD):\r\n",
" rdd = self._jschema_rdd.intersection(other._jschema_rdd)\r\n",
" return SchemaRDD(rdd, self.sql_ctx)\r\n",
" else:\r\n",
" raise ValueError(\"Can only intersect with another SchemaRDD\")\r\n",
"\r\n",
" def repartition(self, numPartitions):\r\n",
" rdd = self._jschema_rdd.repartition(numPartitions)\r\n",
" return SchemaRDD(rdd, self.sql_ctx)\r\n",
"\r\n",
" def subtract(self, other, numPartitions=None):\r\n",
" if (other.__class__ is SchemaRDD):\r\n",
" if numPartitions is None:\r\n",
" rdd = self._jschema_rdd.subtract(other._jschema_rdd)\r\n",
" else:\r\n",
" rdd = self._jschema_rdd.subtract(other._jschema_rdd, numPartitions)\r\n",
" return SchemaRDD(rdd, self.sql_ctx)\r\n",
" else:\r\n",
" raise ValueError(\"Can only subtract another SchemaRDD\")\r\n",
"\r\n",
"def _test():\r\n",
" import doctest\r\n",
" from array import array\r\n",
" from pyspark.context import SparkContext\r\n",
" globs = globals().copy()\r\n",
" # The small batch size here ensures that we see multiple batches,\r\n",
" # even in these small test examples:\r\n",
" sc = SparkContext('local[4]', 'PythonTest', batchSize=2)\r\n",
" globs['sc'] = sc\r\n",
" globs['sqlCtx'] = SQLContext(sc)\r\n",
" globs['rdd'] = sc.parallelize([{\"field1\" : 1, \"field2\" : \"row1\"},\r\n",
" {\"field1\" : 2, \"field2\": \"row2\"}, {\"field1\" : 3, \"field2\": \"row3\"}])\r\n",
" jsonStrings = ['{\"field1\": 1, \"field2\": \"row1\", \"field3\":{\"field4\":11}}',\r\n",
" '{\"field1\" : 2, \"field2\": \"row2\", \"field3\":{\"field4\":22}}',\r\n",
" '{\"field1\" : 3, \"field2\": \"row3\", \"field3\":{\"field4\":33}}']\r\n",
" globs['jsonStrings'] = jsonStrings\r\n",
" globs['json'] = sc.parallelize(jsonStrings)\r\n",
" globs['nestedRdd1'] = sc.parallelize([\r\n",
" {\"f1\" : array('i', [1, 2]), \"f2\" : {\"row1\" : 1.0}},\r\n",
" {\"f1\" : array('i', [2, 3]), \"f2\" : {\"row2\" : 2.0}}])\r\n",
" globs['nestedRdd2'] = sc.parallelize([\r\n",
" {\"f1\" : [[1, 2], [2, 3]], \"f2\" : set([1, 2]), \"f3\" : (1, 2)},\r\n",
" {\"f1\" : [[2, 3], [3, 4]], \"f2\" : set([2, 3]), \"f3\" : (2, 3)}])\r\n",
" (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)\r\n",
" globs['sc'].stop()\r\n",
" if failure_count:\r\n",
" exit(-1)\r\n",
"\r\n",
"\r\n",
"if __name__ == \"__main__\":\r\n",
" _test()\r\n",
"\r\n"
]
}
],
"prompt_number": 1
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from sql import *"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 2
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"import tempfile, shutil"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 3
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from array import array\n",
"from pyspark.sql import SQLContext"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 4
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"sqlCtx = SQLContext(sc)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 5
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"srdd = sqlCtx.jsonFile('jsonFile')"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 23
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"srdd.collect() == [{\"f1\": 1, \"f2\": \"row1\", \"f3\":{\"field4\":11}},\n",
" {\"f1\": 2, \"f2\": \"row2\", \"f3\":{\"field4\":22}},\n",
" {\"f1\": 3, \"f2\": \"row3\", \"f3\":{\"field4\":33}}]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 26,
"text": [
"True"
]
}
],
"prompt_number": 26
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#https://www.mail-archive.com/commits@spark.apache.org/msg01502.html"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from array import array\n",
"from pyspark.sql import SQLContext"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 25
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"sqlCtx = SQLContext(sc)\n",
"rdd = sc.parallelize([\n",
" {\"f1\" : array('i', [1, 2]), \"f2\" : {\"row1\" : 1.0}},\n",
" {\"f1\" : array('i', [2, 3]), \"f2\" : {\"row2\" : 2.0}}])"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 21
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"srdd = sqlCtx.inferSchema(rdd)\n",
"srdd.collect() == [{\"f1\" : array('i', [1, 2]), \"f2\" : {\"row1\" : 1.0}},\n",
" {\"f1\" : array('i', [2, 3]), \"f2\" : {\"row2\" : 2.0}}]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 22,
"text": [
"True"
]
}
],
"prompt_number": 22
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"rdd = sc.parallelize([\n",
" {\"f1\" : [[1, 2], [2, 3]], \"f2\" : set([1, 2]), \"f3\" : (1, 2)},\n",
" {\"f1\" : [[2, 3], [3, 4]], \"f2\" : set([2, 3]), \"f3\" : (2, 3)}])"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 23
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"srdd = sqlCtx.inferSchema(rdd)\n",
"srdd.collect() == [{\"f1\" : [[1, 2], [2, 3]], \"f2\" : set([1, 2]), \"f3\" : (1, 2)},\n",
" {\"f1\" : [[2, 3], [3, 4]], \"f2\" : set([2, 3]), \"f3\" : (2, 3)}]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 26,
"text": [
"True"
]
}
],
"prompt_number": 26
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Load Spark SQL from JSON files"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"jsonFile\n",
"!cat 'jsonFile'"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"{\"f1\": 1, \"f2\": \"row1\", \"f3\":{\"field4\":11}}\r\n",
"{\"f1\": 2, \"f2\": \"row2\", \"f3\":{\"field4\":22}}\r\n",
"{\"f1\": 3, \"f2\": \"row3\", \"f3\":{\"field4\":33}}\r\n"
]
}
],
"prompt_number": 29
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"jsonStrings = !cat jsonFile"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 15
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"ofn = open(jsonFile, 'w')"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 16
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"for json in jsonStrings:\n",
" print>>ofn, json"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 17
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"json"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 18,
"text": [
"'{\"f1\": 3, \"f2\": \"row3\", \"f3\":{\"field4\":33}}'"
]
}
],
"prompt_number": 18
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"ofn.close()\n",
"!cat '/var/folders/dj/92mp96m54d90mdpqlmwbz1m40000gn/T/tmpMFXhlP'"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"{'f1': 1, 'f2': 'row1', 'f3': {'field4': 11}}\r\n",
"{'f1': 2, 'f2': 'row2', 'f3': {'field4': 22}}\r\n",
"{'f1': 3, 'f2': 'row3', 'f3': {'field4': 33}}\r\n"
]
}
],
"prompt_number": 19
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"srdd = sqlCtx.jsonFile(jsonFile)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 20
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"srdd.collect() == [{\"f1\": 1, \"f2\": \"row1\", \"f3\":{\"field4\":11}},\n",
" {\"f1\": 2, \"f2\": \"row2\", \"f3\":{\"field4\":22}},\n",
" {\"f1\": 3, \"f2\": \"row3\", \"f3\":{\"field4\":33}}]"
],
"language": "python",
"metadata": {},
"outputs": [
{
"metadata": {},
"output_type": "pyout",
"prompt_number": 22,
"text": [
"True"
]
}
],
"prompt_number": 22
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#http://people.apache.org/~pwendell/spark-1.0.1-rc1-docs/sql-programming-guide.html"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from pyspark.sql import SQLContext\n",
"sqlContext = SQLContext(sc)"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 30
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"lines = sc.textFile(\"../../examples/src/main/resources/people.txt\")\n",
"parts = lines.map(lambda l: l.split(\",\"))\n",
"people = parts.map(lambda p: {\"name\": p[0], \"age\": int(p[1])})\n"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 33
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"schemaPeople = sqlContext.inferSchema(people)\n",
"schemaPeople.registerAsTable(\"people\")"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 34
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"teenagers = sqlContext.sql(\"SELECT name FROM people WHERE age >= 13 AND age <= 19\")\n"
],
"language": "python",
"metadata": {},
"outputs": [],
"prompt_number": 35
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"teenNames = teenagers.map(lambda p: \"Name: \" + p.name)\n",
"for teenName in teenNames.collect():\n",
" print teenName"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"Name: Justin\n"
]
}
],
"prompt_number": 36
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"!cat ../../examples/src/main/resources/people.txt "
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"Michael, 29\r\n",
"Andy, 30\r\n",
"Justin, 19\r\n"
]
}
],
"prompt_number": 37
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "code",
"collapsed": false,
"input": [],
"language": "python",
"metadata": {},
"outputs": []
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment