Skip to content

Instantly share code, notes, and snippets.

@weldpua2008
Created February 17, 2019 15:28
Show Gist options
  • Save weldpua2008/e634acb13a8639fa2b5de65842d44884 to your computer and use it in GitHub Desktop.
Save weldpua2008/e634acb13a8639fa2b5de65842d44884 to your computer and use it in GitHub Desktop.
Counts words of the file in German
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" Counts words of the file with German dictionary."""
__author__ = "Valeriy Soloviov"
__copyright__ = "Copyright 2019"
# Prepare file:
# iconv -f ISO-8859-15 -t UTF-8 ~/anna_k.txt > ~/anna_k_utf8.txt
from pyspark.sql import SparkSession
from pyspark.sql.types import (StringType,IntegerType,StructType,StructField)
import os
import shutil
from operator import add
print('working directory:'+os.getcwd())
spark = SparkSession.builder.appName('AnnaK').getOrCreate()
# lines = spark.read.option("encoding", "iso-8859-1").option("charset", "iso-8859-1").text("/Users/valeriys/anna_k.txt").rdd.map(lambda r: r[0])
lines = spark.read.text("~/anna_k_utf8.txt").rdd.map(lambda r: r[0])
# counts = lines.flatMap(lambda x: x.split(' ')) \
# .map(lambda x: (x, 1)) \
# .reduceByKey(add)
# counts = lines.flatMap(lambda x: x.replace(',',' ').replace('.',' ').replace('«',' ').replace('»',' ').replace(';',' ').replace('!',' ').replace('*',' ').replace('?',' ').replace(':',' ').replace('_',' ').split(' ')) \
# .filter(lambda word: word.strip() != '') \
# .filter(lambda word: len(word.strip()) > 3) \
# .filter(lambda word: not word.startswith(('0', '1', '2', '3', '4', '5', '6', '7', '8', '9','\\', '*', '=', '>','-'))) \
# .filter(lambda word: not word[0].islower() ) \
# .map(lambda x: (x.strip(), 1)) \
# .reduceByKey(lambda a, b: a + b) \
# .sortByKey(False)
counts = lines.flatMap(lambda x: x.replace(',',' ').replace('.',' ').replace('«',' ').replace('»',' ').replace(';',' ').replace('!',' ').replace('*',' ').replace('?',' ').replace(':',' ').replace('_',' ').split(' ')) \
.filter(lambda word: word.strip() != '') \
.filter(lambda word: len(word.strip()) > 3) \
.filter(lambda word: not word.startswith(('#','0', '1', '2', '3', '4', '5', '6', '7', '8', '9','\\', '*', '=', '>','-'))) \
.map(lambda x: (x.strip(), 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortByKey(False)
output = counts.collect()
dirpath = "~/anna_k_res.txt"
if os.path.exists(dirpath):
shutil.rmtree(dirpath)
counts.saveAsTextFile(dirpath)
# for (word, count) in output:
# print("%s: %i" % (word, count))
# counts.toDF(("Work", "Counts"))\
# .show()
# Write CSV
if os.path.exists('~/anna_k_csv_out'):
shutil.rmtree('~/anna_k_csv_out')
# counts.toDF(("Work", "Counts")).coalesce(1).write.csv('file:////Users/anna_k_csv_out')
counts.toDF(("Work", "Counts")).coalesce(1)\
.write.option("header", "false") \
.option("delimiter", "~") \
.option("charset", "UTF-8") \
.csv('file:////Users/anna_k_csv_out', sep=',')
spark.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment