Skip to content

Instantly share code, notes, and snippets.

@snehamehrin
Created August 6, 2020 12:38
Show Gist options
  • Save snehamehrin/64b6eefe6c40fae4cd96033aa05477d5 to your computer and use it in GitHub Desktop.
Save snehamehrin/64b6eefe6c40fae4cd96033aa05477d5 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "cc4eb79938e24d63adae65ec26b5db71",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"VBox()"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"#Import All Functions\n",
"from pyspark.sql import SQLContext\n",
"from pyspark.sql import functions as F\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.functions import unix_timestamp, to_date, date_format, month, year, dayofyear, dayofweek, col\n",
"from pyspark.sql.types import TimestampType\n",
"\n",
"from pyspark.sql import functions as F\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.functions import unix_timestamp, to_date, date_format, month, year, dayofyear, dayofweek, col\n",
"from pyspark.sql.types import TimestampType\n",
"from pyspark.sql.types import DateType\n",
"from datetime import datetime\n",
"from datetime import timedelta"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "e6b8e28486814edbbd1dc5947435eae9",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"VBox()"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Session created"
]
}
],
"source": [
"#Create a Spark Session\n",
"spark = SparkSession.builder.appName('Stack Overflow ML').getOrCreate()\n",
"print('Session created')"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "0d570b14f75445a1b6e267a85081ae63",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"VBox()"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"#Function To Create The Filename\n",
"def get_latest_file_name():\n",
" \"\"\"\n",
" This function connects to s3 and get the latest file from the s3 bucket\n",
" \"\"\"\n",
" Previous_Date = datetime.today() -timedelta(days=1)\n",
" year=Previous_Date.strftime ('%Y')\n",
" month=Previous_Date.strftime ('%m')\n",
" day=Previous_Date.strftime ('%d')\n",
" file_folder=\"s3://stack-overflow-bucket/StackOverFlow/year=\"+'{}'.format(year)+\"/month=\"+'{:0>2}'.format(month)+\"/day=\"+'{:0>2}'.format(day)+\"*\"\n",
" return file_folder\n"
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "d95dbbef113f42e5bfe9b1b74fd92cb1",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"VBox()"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"The file name to be picked by spark is s3://stack-overflow-bucket/StackOverFlow/year=2020/month=08/day=05*"
]
}
],
"source": [
"#Check if the file name returned is correct\n",
"filename=get_latest_file_name()\n",
"print(\"The file name to be picked by spark is \",filename)"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "56f98d1429d04ac49174ed7b1cb169a4",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"VBox()"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+------------+-------------+-----------+----------+-----+----------+\n",
"|answer_count|creation_date|is_answered|questionid|score|view_count|\n",
"+------------+-------------+-----------+----------+-----+----------+\n",
"| 1| 1567335230| False| 24377| 22| 472|\n",
"| 12| 1586607010| True| 5338| 25| 229|\n",
"| 11| 1555075543| False| 98145| 19| 414|\n",
"| 2| 1586533933| True| 35783| 18| 597|\n",
"| 15| 1562105692| False| 2162| 41| 663|\n",
"+------------+-------------+-----------+----------+-----+----------+\n",
"only showing top 5 rows"
]
}
],
"source": [
"stack = sc.textFile('{}'.format(filename))\n",
"stack.take(5)\n",
"#Convert it into a dataframe\n",
"df = spark.read.json(stack)\n",
"df.show(5)"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "858aaf22b498439fbbe3a6a43fde37b4",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"VBox()"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"#Drop any duplicates if any\n",
"df_duplicates=df.dropDuplicates(['questionid'])\n",
"#Convert the UnixTimesStamp into time stamp\n",
"df_duplicates=df_duplicates.withColumn(\"creation_date\", F.from_unixtime(\"creation_date\", \"yyyy-mm-dd\"))\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "PySpark",
"language": "",
"name": "pysparkkernel"
},
"language_info": {
"codemirror_mode": {
"name": "python",
"version": 2
},
"mimetype": "text/x-python",
"name": "pyspark",
"pygments_lexer": "python2"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment