Skip to content

Instantly share code, notes, and snippets.

@onelittlenightmusic
Created August 17, 2017 06:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save onelittlenightmusic/0049d766a6d11999ed8ef2747aeab704 to your computer and use it in GitHub Desktop.
Save onelittlenightmusic/0049d766a6d11999ed8ef2747aeab704 to your computer and use it in GitHub Desktop.
My first Apache-beam application with Python
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# My first Apache-Beam application with Python\n",
"\n",
"## Requirement (前提)\n",
"\n",
"- Installation (下記がインストールされていること)\n",
" - Python 2.7\n",
" - Apache Beam\n",
" - Refer to [Apache Beam Python SDK Quickstart](https://beam.apache.org/get-started/quickstart-py/) (インストール方法は左記参照)\n",
"- My environment\n",
" - Windows 10, Bash on Windows (Ubuntu 16.04)\n",
" - Conda installed"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"import apache_beam as beam"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/root/.pyenv/versions/anaconda3-4.1.1/envs/python2/lib/python2.7/site-packages/jupyter_client/jsonutil.py:67: DeprecationWarning: Interpreting naive datetime as local 2017-08-17 15:10:43.424933. Please add timezone info to timestamps.\n",
" new_obj[k] = extract_dates(v)\n"
]
}
],
"source": [
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"\n",
"p = beam.Pipeline(options=PipelineOptions())"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/root/.pyenv/versions/anaconda3-4.1.1/envs/python2/lib/python2.7/site-packages/jupyter_client/jsonutil.py:67: DeprecationWarning: Interpreting naive datetime as local 2017-08-17 15:10:44.928746. Please add timezone info to timestamps.\n",
" new_obj[k] = extract_dates(v)\n"
]
}
],
"source": [
"lines = (p\n",
" | beam.Create([\n",
" 'To be, or not to be: that is the question: ',\n",
" 'Whether \\'tis nobler in the mind to suffer ',\n",
" 'The slings and arrows of outrageous fortune, ',\n",
" 'Or to take arms against a sea of troubles, ']))\n"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/root/.pyenv/versions/anaconda3-4.1.1/envs/python2/lib/python2.7/site-packages/jupyter_client/jsonutil.py:67: DeprecationWarning: Interpreting naive datetime as local 2017-08-17 15:10:46.767488. Please add timezone info to timestamps.\n",
" new_obj[k] = extract_dates(v)\n"
]
}
],
"source": [
"word_lengths = lines | beam.Map(len)"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/root/.pyenv/versions/anaconda3-4.1.1/envs/python2/lib/python2.7/site-packages/jupyter_client/jsonutil.py:67: DeprecationWarning: Interpreting naive datetime as local 2017-08-17 15:10:48.640549. Please add timezone info to timestamps.\n",
" new_obj[k] = extract_dates(v)\n"
]
},
{
"data": {
"text/plain": [
"<PCollection[Map(len).None] at 0x7f6b69c92810>"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"word_lengths"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/root/.pyenv/versions/anaconda3-4.1.1/envs/python2/lib/python2.7/site-packages/jupyter_client/jsonutil.py:67: DeprecationWarning: Interpreting naive datetime as local 2017-08-17 14:55:42.156964. Please add timezone info to timestamps.\n",
" new_obj[k] = extract_dates(v)\n"
]
}
],
"source": [
"class ExtractWordsFn(beam.DoFn):\n",
" def process(self, element):\n",
" print(element)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/root/.pyenv/versions/anaconda3-4.1.1/envs/python2/lib/python2.7/site-packages/jupyter_client/jsonutil.py:67: DeprecationWarning: Interpreting naive datetime as local 2017-08-17 15:10:56.426725. Please add timezone info to timestamps.\n",
" new_obj[k] = extract_dates(v)\n"
]
}
],
"source": [
"p_end = word_lengths | beam.ParDo(ExtractWordsFn())"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/root/.pyenv/versions/anaconda3-4.1.1/envs/python2/lib/python2.7/site-packages/jupyter_client/jsonutil.py:67: DeprecationWarning: Interpreting naive datetime as local 2017-08-17 15:10:58.036077. Please add timezone info to timestamps.\n",
" new_obj[k] = extract_dates(v)\n"
]
},
{
"data": {
"text/plain": [
"<apache_beam.runners.direct.direct_runner.DirectPipelineResult at 0x7f6b69c92950>"
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"43\n",
"42\n",
"45\n",
"43\n"
]
}
],
"source": [
"p.run()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [python2]",
"language": "python",
"name": "Python [python2]"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.13"
}
},
"nbformat": 4,
"nbformat_minor": 1
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment