Skip to content

Instantly share code, notes, and snippets.

@chrismay
Last active June 18, 2023 21:32
Show Gist options
  • Save chrismay/57c1e166ae6831bfdb38a634ba4e7fa1 to your computer and use it in GitHub Desktop.
Save chrismay/57c1e166ae6831bfdb38a634ba4e7fa1 to your computer and use it in GitHub Desktop.
pyspark parallel http reqs
import aiohttp
import asyncio
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list,udf
spark = SparkSession.builder.getOrCreate()
data = range(1, 10000)
df = spark.createDataFrame([(i,) for i in data], ["value"])
grouped_df = (
df.withColumn("mod", col("value") % 500)
.groupBy("mod")
.agg(collect_list("value").alias("values"))
)
async def fetch(url, session):
try:
async with session.get(url) as response:
content = await response.read()
return (url, "OK", content)
except Exception as e:
print(e)
return (url, "ERROR", str(e))
async def run(url_list):
tasks = []
async with aiohttp.ClientSession() as session:
for url in url_list:
task = asyncio.ensure_future(fetch(url, session))
tasks.append(task)
responses = asyncio.gather(*tasks)
await responses
return responses
def request_multi(counter_list):
url_list = [f"http://localhost:8080/?req={i}" for i in counter_list]
loop = asyncio.get_event_loop()
asyncio.set_event_loop(loop)
task = asyncio.ensure_future(run(url_list))
loop.run_until_complete(task)
result = task.result().result()
return result
multi_req_udf = udf(request_multi)
def noop(*args):
pass
start = time.perf_counter()
results = grouped_df.withColumn("result", multi_req_udf("values"))
# this seems to be enough to force execution of all rows.
f = results.foreach(noop)
duration = time.perf_counter() - start
print("10000 reqs took ", duration, "s")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment