Skip to content

Instantly share code, notes, and snippets.

@sanp
Last active March 13, 2017 15:59
Show Gist options
  • Save sanp/ab6ea866e7e2f7f9b9a371bf01a8edda to your computer and use it in GitHub Desktop.
Save sanp/ab6ea866e7e2f7f9b9a371bf01a8edda to your computer and use it in GitHub Desktop.
Lightening Talk for Centro Tech Team on 3/10/17
# Parse JSON data with this one weird trick!
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import Row
# Set up basic spark session
conf = (SparkConf()
.setAppName('My App')
.set("spark.executor.memory", '10g'))
sc = SparkContext(conf=conf)
sql_context = SQLContext(sc)
spark_session = sql_context.sparkSession
# Sample Data
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])
df.show()
# >>> df.show()
# +--------------------+
# | json|
# +--------------------+
# |{"header":{"id":1...|
# |{"header":{"id":1...|
# |{"header":{"id":4...|
# +--------------------+
# Create a sql view of the data
df.createOrReplaceTempView('df')
# Parse it - step by step:
# 1. Just the top level JSON
q = """
select
json
from df
"""
result = spark_session.sql(q)
result.show()
# >>> result.show()
# +--------------------+
# | json|
# +--------------------+
# |{"header":{"id":1...|
# |{"header":{"id":1...|
# |{"header":{"id":4...|
# +--------------------+
# 2. Add the next level of nested data
q = """
select
a.json
, b.header
, b.body
from df a
lateral view json_tuple(a.json, 'header', 'body') b
as header, body
"""
result = spark_session.sql(q)
result.show()
# >>> result.show()
# +--------------------+--------------------+--------------------+
# | json| header| body|
# +--------------------+--------------------+--------------------+
# |{"header":{"id":1...|{"id":12345,"foo"...|{"id":111000,"nam...|
# |{"header":{"id":1...|{"id":12346,"foo"...|{"id":111002,"nam...|
# |{"header":{"id":4...|{"id":43256,"foo"...|{"id":20192,"name...|
# +--------------------+--------------------+--------------------+
# 3. Go deeper into the nesting
q = """
select
a.json
, b.header
, c.id
, c.foo
from df a
lateral view json_tuple(a.json, 'header', 'body') b
as header, body
lateral view json_tuple(b.header, 'id', 'foo') c
as id, foo
"""
result = spark_session.sql(q)
result.show()
# >>> result.show()
# +--------------------+--------------------+-----+------+
# | json| header| id| foo|
# +--------------------+--------------------+-----+------+
# |{"header":{"id":1...|{"id":12345,"foo"...|12345| bar|
# |{"header":{"id":1...|{"id":12346,"foo"...|12346| baz|
# |{"header":{"id":4...|{"id":43256,"foo"...|43256|foobaz|
# +--------------------+--------------------+-----+------+
# ...
# Fully parsed out everything
q = """
select
c.header_id
, c.foo
, d.body_id
, d.name
, e.id as sub_json_id
, f.col1
, f.col2
from df a
lateral view json_tuple(a.json, 'header', 'body') b
as header, body
lateral view json_tuple(b.header, 'id', 'foo') c
as header_id, foo
lateral view json_tuple(b.body, 'id', 'name', 'sub_json') d
as body_id, name, sub_json
lateral view json_tuple(d.sub_json, 'id', 'sub_sub_json') e
as id, sub_sub_json
lateral view json_tuple(e.sub_sub_json, 'col1', 'col2') f
as col1, col2
"""
result = spark_session.sql(q)
result.show()
# >>> result.show()
# +---------+------+-------+------+-----------+----+--------------+
# |header_id| foo|body_id| name|sub_json_id|col1| col2|
# +---------+------+-------+------+-----------+----+--------------+
# | 12345| bar| 111000|foobar| 54321| 20| somethong|
# | 12346| baz| 111002|barfoo| 23456| 30|something else|
# | 43256|foobaz| 20192|bazbar| 39283| 50| another thing|
# +---------+------+-------+------+-----------+----+--------------+
# Now you can parse anything!
@ddrscott
Copy link

I couldn't stop myself:

SELECT
  header_id,
  foo,
  body_id,
  name,
  sub_json_id,
  col1,
  col2
FROM (
  VALUES
    ($${"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}$$),
    ($${"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}$$),
    ($${"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}$$)
) as strings(raw)
JOIN LATERAL (
  -- Convert string to JSON
  SELECT strings.raw::json AS json
) jsons ON true
JOIN LATERAL (
  -- Extract first level
  SELECT
    jsons.json->'body' AS body,
    jsons.json->'header' AS header
) t1 ON true
JOIN LATERAL (
  -- Extract next levels
  SELECT
    header->>'id' AS header_id,
    header->>'foo' AS foo,
    body->>'id' AS body_id,
    body->>'name' AS name,
    body->'sub_json' AS sub_json
) t2 ON true
JOIN LATERAL (
  -- Extract next levels
  SELECT
    sub_json->>'id' AS sub_json_id,
    sub_json#>>'{sub_sub_json,col1}' AS col1,
    sub_json#>>'{sub_sub_json,col2}' AS col2
) t3 ON true
 header_id |  foo   | body_id |  name  | sub_json_id | col1 |      col2
-----------+--------+---------+--------+-------------+------+----------------
 12345     | bar    | 111000  | foobar | 54321       | 20   | somethong
 12346     | baz    | 111002  | barfoo | 23456       | 30   | something else
 43256     | foobaz | 20192   | bazbar | 39283       | 50   | another thing
(3 rows)

@sanp
Copy link
Author

sanp commented Mar 13, 2017

awesome!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment