Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dineshdharme/99f90b5d787a24ac3bd8a4f546998033 to your computer and use it in GitHub Desktop.
Save dineshdharme/99f90b5d787a24ac3bd8a4f546998033 to your computer and use it in GitHub Desktop.
A demo pyspark script to show how to invoke parallel api calls using spark.
Here's an helpful example of using Dataframes and making parallel API calls.
import json
import sys
from pyspark.sql import SQLContext
import requests
from pyspark.sql.functions import *
from pyspark.sql.types import *
sc = SparkContext('local')
sqlContext = SQLContext(sc)
data1 = [[1], [2] ]
columns1 = ["postId"]
df1 = sqlContext.createDataFrame(data=data1, schema=columns1)
print("Given dataframe")
df1.show(n=100, truncate=False)
## Repartition into suitable numbers so that they get processed in proper manner.
## Increase number of parititons if still getting OOM errors.
df1 = df1.repartition(4).cache()
def my_method_for_schema(post_id):
# The API endpoint
post_url = "https://jsonplaceholder.typicode.com/posts/"
# Adding a payload
payload = {"id": [post_id], "userId": 1}
# A get request to the API
post_response = None
response = requests.get(post_url, params=payload)
if response.status_code == 200:
post_response = response.json()[0]
else:
return ""
# The API endpoint
comments_url = "https://jsonplaceholder.typicode.com/comments"
# Adding a payload
comment_ids = [1, 2, 3]
comments_retrieved = []
for com_id in comment_ids:
payload = {"id": com_id, "postId": 1}
# A get request to the API
response = requests.get(comments_url, params=payload)
if response.status_code == 200:
response_json = response.json()[0]
comments_retrieved.append(response_json)
else:
return ""
overall_json = {"post_response": post_response, "comments_response": comments_retrieved}
str_repr = json.dumps(overall_json)
return str_repr
@udf
def my_method_copy_for_pyspark(post_id):
# The API endpoint
post_url = "https://jsonplaceholder.typicode.com/posts/"
# Adding a payload
payload = {"id": [post_id], "userId": 1}
# A get request to the API
post_response = None
response = requests.get(post_url, params=payload)
if response.status_code == 200:
post_response = response.json()[0]
else:
return ""
# The API endpoint
comments_url = "https://jsonplaceholder.typicode.com/comments"
# Adding a payload
comment_ids = [1, 2, 3]
comments_retrieved = []
for com_id in comment_ids:
payload = {"id": com_id, "postId": 1}
# A get request to the API
response = requests.get(comments_url, params=payload)
if response.status_code == 200:
response_json = response.json()[0]
comments_retrieved.append(response_json)
else:
return ""
overall_json = {"post_response": post_response, "comments_response": comments_retrieved}
str_repr = json.dumps(overall_json)
return str_repr
## EXPLICIT SCHEMA REPRESENTATION
post_schema = StructType([
StructField('userId', IntegerType(), True),
StructField('id', IntegerType(), True),
StructField('title', StringType(), False),
StructField('body', StringType(), False)
]
)
comments_schema = StructType([
StructField('postId', IntegerType(), True),
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('email', StringType(), True),
StructField('body', StringType(), True)]
)
schema_struct = StructType([
StructField('post_response', post_schema, True),
StructField('comments_response', ArrayType(comments_schema, True), True)
]
)
# We have copied the same function so that we can call it once
# to retrive a sample response from API call and then infer the json schema required
# for parsing the result into struct.
retrived_value = my_method_for_schema("1")
print("API RESPONSE")
result_df = df1.withColumn("api_response", my_method_copy_for_pyspark(col("postId")))
result_df.cache().show(n=30, truncate=False)
print("USING EXPLICIT STRUCT SCHEMA")
first_method_df = result_df.withColumn("api_parsed_struct", from_json(col("api_response"), schema_struct)).drop("api_response")
first_method_df.show(n=30, truncate=False)
print("USING SCHEMA FROM ONE TEST API CALL")
second_method_df = result_df.withColumn("api_parsed_struct", from_json(col("api_response"), schema_of_json(retrived_value))).drop("api_response")
second_method_df.show(n=30, truncate=False)
Output :
Given dataframe
+------+
|postId|
+------+
|1 |
|2 |
+------+
API RESPONSE

|postId|api_response |
+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|2 |{"post_response": {"userId": 1, "id": 2, "title": "qui est esse", "body": "est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla"}, "comments_response": [{"postId": 1, "id": 1, "name": "id labore ex et quam laborum", "email": "Eliseo@gardner.biz", "body": "laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium"}, {"postId": 1, "id": 2, "name": "quo vero reiciendis velit similique earum", "email": "Jayne_Kuhic@sydney.com", "body": "est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et"}, {"postId": 1, "id": 3, "name": "odio adipisci rerum aut animi", "email": "Nikita@garfield.biz", "body": "quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione"}]} |
|1 |{"post_response": {"userId": 1, "id": 1, "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit", "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"}, "comments_response": [{"postId": 1, "id": 1, "name": "id labore ex et quam laborum", "email": "Eliseo@gardner.biz", "body": "laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium"}, {"postId": 1, "id": 2, "name": "quo vero reiciendis velit similique earum", "email": "Jayne_Kuhic@sydney.com", "body": "est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et"}, {"postId": 1, "id": 3, "name": "odio adipisci rerum aut animi", "email": "Nikita@garfield.biz", "body": "quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione"}]}|

USING EXPLICIT STRUCT SCHEMA

|postId|api_parsed_struct |

|2 |{{1, 2, qui est esse, est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla}, [{1, 1, id labore ex et quam laborum, Eliseo@gardner.biz, laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium}, {1, 2, quo vero reiciendis velit similique earum, Jayne_Kuhic@sydney.com, est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et}, {1, 3, odio adipisci rerum aut animi, Nikita@garfield.biz, quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione}]} |
|1 |{{1, 1, sunt aut facere repellat provident occaecati excepturi optio reprehenderit, quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto}, [{1, 1, id labore ex et quam laborum, Eliseo@gardner.biz, laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium}, {1, 2, quo vero reiciendis velit similique earum, Jayne_Kuhic@sydney.com, est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et}, {1, 3, odio adipisci rerum aut animi, Nikita@garfield.biz, quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione}]}|

USING SCHEMA FROM ONE TEST API CALL

|postId|api_parsed_struct |

|2 |{[{laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium, Eliseo@gardner.biz, 1, id labore ex et quam laborum, 1}, {est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et, Jayne_Kuhic@sydney.com, 2, quo vero reiciendis velit similique earum, 1}, {quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione, Nikita@garfield.biz, 3, odio adipisci rerum aut animi, 1}], {est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla, 2, qui est esse, 1}} |
|1 |{[{laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium, Eliseo@gardner.biz, 1, id labore ex et quam laborum, 1}, {est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et, Jayne_Kuhic@sydney.com, 2, quo vero reiciendis velit similique earum, 1}, {quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione, Nikita@garfield.biz, 3, odio adipisci rerum aut animi, 1}], {quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto, 1, sunt aut facere repellat provident occaecati excepturi optio reprehenderit, 1}}|

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment