Skip to content

Instantly share code, notes, and snippets.

@pabloem
Created October 6, 2016 16:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save pabloem/1755254018999c64900d99e29259c8aa to your computer and use it in GitHub Desktop.
Save pabloem/1755254018999c64900d99e29259c8aa to your computer and use it in GitHub Desktop.
## -*- coding: utf-8 -*-
import json
import sys
import time
import apache_beam as beam
from apache_beam.io.fileio import CompressionTypes as ct
from apache_beam.utils.options import PipelineOptions
from beam_utils.sources import CsvFileSource
class InegiPipelineOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input', dest='inputFile',
default='data/2015/*csv',
help="Input file to process.")
parser.add_argument('--output', dest='outputFile',
default='outputFile_inegi',
help='Prefix for the output text files')
#parser.add_argument('--runner', dest='runner',
#default='DirectPipelineRunner')
opts = InegiPipelineOptions(flags=sys.argv)
p = beam.Pipeline(options=opts)
# Ahora leemos el archivo de entrada
pairSA = (
p
| 'read_files' >> beam.io.Read(
CsvFileSource(opts.inputFile,
compression_type = ct.UNCOMPRESSED))
# Filtramos filas que no tienen las columnas que nos interesan
| 'filter_rows' >> beam.Filter(lambda x: ('Entidad federativa' in x and
'Nombre de clase de la actividad' in x))
# Ahora filtramos las columnas que nos interesan
| 'filter_columns' >> beam.Map(
lambda x: tuple((x['Entidad federativa'].strip(),
x['Nombre de clase de la actividad'].strip())))
# Finalmente contamos cuantos negocios hay del mismo tipo en cada estado
| 'count_pairs' >> beam.combiners.Count.PerElement()
)
# Primero tenemos que hacer el negocio la CLAVE en una tupla de CLAVE-VALOR
# Por eso lo organizamos en una estructura de (negocio, (estado, conteo))
# Donde negocio es la CLAVE
sortByBiz = (
pairSA
|'business as key' >> beam.Map(lambda x: (x[0][1], (x[0][0], x[1])))
# Ahora agrupamos por CLAVE, de manera que obtengamos una
# coleccion de (negocio, [(estado, conteo), (estado, conteo),..])
| 'group by key' >> beam.GroupByKey()
# Y dentro de esa coleccion ordenamos por CONTEO
# y juntamos los resultados - para despues guardarlos
| 'sort then join' >> beam.FlatMap(
lambda x: [(x[0], y[0], y[1])
for y in sorted(x[1], key=lambda y:y[1])])
)
# Finalmente escribimos a un archivo de salida
(sortByBiz
| "jsonize" >> beam.Map(lambda x: json.dumps(x))
| "write out" >> beam.Write(beam.io.TextFileSink(opts.outputFile))
)
st_time = time.time()
p.run()
print("---- Runtime: {} seconds ----".format(time.time() - st_time))
mkdir bigdata-inegi ; cd bigdata-inegi
mkdir data ; cd data
wget --no-check-certificate http://storage.googleapis.com/noogler-projects.appspot.com/denue-2015.tar.gz
tar -xvzf denue-2015.tar.gz
cd ..
virtualenv venv
source venv/bin/activate
git clone https://github.com/apache/incubator-beam.git
cd incubator-beam ; git checkout python-sdk
cd sdks/python ; python setup.py install --root /
cd ../../../
cp ../analisis.py .
pip install google-cloud-dataflow
pip install beam_utils
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment