Skip to content

Instantly share code, notes, and snippets.

# Najprv nacitame potrebne kniznice
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Pre pouzivanie Structured Streamingu budeme pouzivat objekt Spark Session
spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
# Najprv im importujeme potrebne moduly
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Vytvorime objekt Spark kontext a vytvorime Streaming kontext. Ten (okrem parametra Spark kontextu) definuje aj parameter, ktory specifikuje velkost okna pre mikro-davky. V tomto pripade
# V tomto pripade StreamingContext(sc, 1) vytvori Streaming kontext zo Spark kontextu sc a s intervalom mikro-davok 1 sekunda
sc = SparkContext(appName = "NetworkWordCount")
ssc = StreamingContext(sc, 1)
# do skriptu si naimportujeme typ SparkSession z modulu ‘pyspark.sql‘
from pyspark.sql import SparkSession
# vytvoríme objekt ‘spark‘ a ako parameter ‘appName‘ nastavíme názov aplikácie (v distribuovanom prostredí môže naraz bežať
# viacero aplikácií, ktoré je potrebné pomenovať aby sme ich vedeli rozlíšiť)
spark = SparkSession.builder.appName("example28").getOrCreate()
# ďalej už môžeme používať objekt rozhrania ‘spark‘ na vytvorenie a spracovanie dátových rámcov
# nasledujúce príkazy predspracujú dáta o sieťovej komunikácii medzi zariadeniami z predchádzajúceho cvičenia
# stiahneme si dáta z minulého cvičenia z internetu a uložíme ich do pracovného adresára
import urllib
urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
# ‘raw_data’ je RDD kolekcia riadkov (reťazcov) načítaných z textového súboru
raw_data = sc.textFile("./kddcup.data_10_percent.gz")
# dáta sú v dátových rámcoch reprezentované po riadkoch ako objekty typu ‘Row’
# naimportujeme potrebné moduly
from pyspark.sql import Row
import urllib
# stiahneme dáta a načítame ich ako zoznamy reťazcov
urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
raw_data = sc.textFile("./kddcup.data_10_percent.gz")
csv_data = raw_data.map(lambda x: x.split(","))
# vytvoríme dátový rámec zo zvolených atribútov
# naimportujeme si potrebné typy
from pyspark.sql import Row
import urllib
# stiahneme si dáta z internetu a uložíme ich do pracovného adresára
urllib.urlretrieve("http://people.tuke.sk/martin.sarnovsky/tsvd/files/iris.csv", "iris.csv")
# ‘iris’ je jednoduchá dátová množina pre klasifikáciu do troch tried
# príklady popisujú tri druhy rastlín (kosatcov) podľa rozmerov ich kvetov (štyry vstupné číselné atribúty)
# dáta sú v dátových rámcoch reprezentované po riadkoch ako objekty typu ‘Row’
from pyspark.sql import Row
# načítame si textový súbor po riadkoch a každý riadok rozdelíme na jednotlivé hodnoty (reťazce oddelené čiarkou)
csv_data = raw_data.map(lambda line: line.split(","))
# pre každý riadok (zoznam oddelených reťazcov) vytvoríme objekt typu ‘Row’, dáta zadáme ako zoznam pomenovaných parametrov, pričom
# názov parametra bude zodpovedať názvu dátového atribútu, pre číselné atribúty skonvertujeme hodnoty z reťazca na desatinné číslo
df_data = csv_data.map(lambda line: Row(
duration = float(line[0]),
# najprv si importujeme potrebné typy
import numpy
from pyspark.mllib.regression import LabeledPoint
# objekt typu LabeledPoint je zložený z hodnoty cieľového atribútu a z vektora hodnôt vstupných atribútov (príznakov)
# ako vektor hodnôt vstupných atribútov sa používa špeciálny typ číselných vektorov z knižnice NumPy, ktorý je optimalizovaný
# pre numerické výpočty, vektory je možné vytvoriť z číselných zoznamov pomocou funkcie ‘numpy.array’
# nasledujúci príkaz vytvorí jeden príklad s cieľovým atribútom s hodnotou 1.0 a 3 vstupnými atribútmi s hodnotami [2.0, 3.0, 4.0]
point = LabeledPoint(1.0, numpy.array([2.0, 3.0, 4.0]))
# najprv si stiahneme priamo v Pythone súbor dátovej množiny z internetu a uložíme ho do pracovného adresára
import urllib
urllib.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
# dátová množina obsahuje záznamy o sieťovej komunikácii medzi zariadeniami
# jednotlivé komunikačné spojenia sú charakterizované množinou atribútov (nominálnych aj numerických – pozri
# http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html), posledným atribútom je typ komunikácie (‘normal.‘ popisuje štandardnú
# komunikáciu, ostatné rôzne typy útokov)
# dáta načítame ako RDD kolekciu zo súboru a zobrazíme prvých 5 záznamov
# do skriptu si naimportujeme typ SparkContext z modulu ‘pyspark‘
from pyspark import SparkContext
# vytvoríme objekt kontextu a ako parameter ‘appName‘ nastavíme názov aplikácie (v distribuovanom prostredí môže naraz bežať
# viacero aplikácií, ktoré je potrebné pomenovať aby sme ich vedeli rozlíšiť)
sc = SparkContext(appName="example23")
# ďalej už môžeme používať kontext na spracovanie dát