Skip to content

Instantly share code, notes, and snippets.

@kangaechu
Last active December 1, 2018 14:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kangaechu/c6ae299aa75d6e609b11f608f2edd666 to your computer and use it in GitHub Desktop.
Save kangaechu/c6ae299aa75d6e609b11f608f2edd666 to your computer and use it in GitHub Desktop.
PySpark day2
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# CSVファイルを扱う"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## CSVファイルの読み込み"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"spark = SparkSession.builder.getOrCreate()\n",
"# https://gihyo.jp/book/2015/978-4-7741-7631-4/support\n",
"df = spark.read.csv(\"click_data_sample.csv\", header=True)"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# to_timestampの第一引数の指定方法がわからなかったので列名をリネーム\n",
"old_cols = df.columns\n",
"new_cols = [c.replace('.', '_') for c in df.columns]\n",
"# for old,new in zip(old_cols, new_cols):\n",
"# df = df.withColumnRenamed(old, new)\n",
"df = df.toDF(*new_cols)"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------------------+-------+-----------+\n",
"| click_at|user_id|campaign_id|\n",
"+-------------------+-------+-----------+\n",
"|2015-04-27 20:40:40| 144012|Campaign077|\n",
"|2015-04-27 00:27:55| 24485|Campaign063|\n",
"|2015-04-27 00:28:13| 24485|Campaign063|\n",
"|2015-04-27 00:33:42| 24485|Campaign038|\n",
"|2015-04-27 01:00:04| 24485|Campaign063|\n",
"+-------------------+-------+-----------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"df.show(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 型の確認"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[('click_at', 'string'), ('user_id', 'string'), ('campaign_id', 'string')]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.dtypes"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- click_at: string (nullable = true)\n",
" |-- user_id: string (nullable = true)\n",
" |-- campaign_id: string (nullable = true)\n",
"\n"
]
}
],
"source": [
"df.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Datetime型に変換"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------------------+-------+-----------+-------------------+\n",
"| click_at|user_id|campaign_id| parsed|\n",
"+-------------------+-------+-----------+-------------------+\n",
"|2015-04-27 20:40:40| 144012|Campaign077|2015-04-27 20:40:40|\n",
"|2015-04-27 00:27:55| 24485|Campaign063|2015-04-27 00:27:55|\n",
"|2015-04-27 00:28:13| 24485|Campaign063|2015-04-27 00:28:13|\n",
"|2015-04-27 00:33:42| 24485|Campaign038|2015-04-27 00:33:42|\n",
"|2015-04-27 01:00:04| 24485|Campaign063|2015-04-27 01:00:04|\n",
"+-------------------+-------+-----------+-------------------+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"from pyspark.sql.functions import to_timestamp\n",
"\n",
"df = df.withColumn(\"parsed\", to_timestamp('click_at', 'yyyy-MM-dd HH:mm:ss'))\n",
"df.show(5)"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- click_at: string (nullable = true)\n",
" |-- user_id: string (nullable = true)\n",
" |-- campaign_id: string (nullable = true)\n",
" |-- parsed: timestamp (nullable = true)\n",
"\n"
]
}
],
"source": [
"df.printSchema()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 時間ごとに集計"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-------------+-----+\n",
"| hour|count|\n",
"+-------------+-----+\n",
"|2015-04-27 00| 2322|\n",
"|2015-04-27 01| 1571|\n",
"|2015-04-27 02| 943|\n",
"|2015-04-27 03| 631|\n",
"|2015-04-27 04| 438|\n",
"+-------------+-----+\n",
"only showing top 5 rows\n",
"\n"
]
}
],
"source": [
"from pyspark.sql.functions import date_format\n",
"\n",
"df.select(date_format('parsed', 'yyyy-MM-dd HH').alias('hour'), 'user_id').groupBy('hour').count().sort('hour').show(5)\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment