Skip to content

Instantly share code, notes, and snippets.

View mkaranasou's full-sized avatar
🏠
Working from home

Maria Karanasou mkaranasou

🏠
Working from home
View GitHub Profile
@mkaranasou
mkaranasou / pyspark_ml_isolation_forest.py
Last active November 7, 2019 14:12
How to use https://github.com/titicaca/spark-iforest Isolation Forest in Pyspark - Spark ML
from pyspark import SparkConf
from pyspark.sql import SparkSession, functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark_iforest.ml.iforest import IForest, IForestModel
import tempfile
conf = SparkConf()
conf.set('spark.jars', '/full/path/to/spark-iforest/target/spark-iforest-2.4.0.jar')
spark = SparkSession \
@mkaranasou
mkaranasou / pyspark_scikit_isolation_forest.py
Last active October 27, 2021 08:29
How to use Scikit's Isolation Forest in Pyspark - udf and broadcast variables
from pyspark.sql import SparkSession, functions as F, types as T
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
np.random.seed(42)
conf = SparkConf()
spark_session = SparkSession.builder \
.config(conf=conf) \
.appName('test') \
@mkaranasou
mkaranasou / from_pandas_to_koalas.py
Last active October 28, 2019 10:47
From pandas to Koalas
import pandas as pd
from databricks import koalas as ks
from pyspark.sql import SparkSession, functions as F
# define the data - example taken from https://koalas.readthedocs.io/en/latest/getting_started/10min.html
data = {'a': [1, 2, 3, 4, 5, 6],
'b': [100, 200, 300, 400, 500, 600],
'c': ["one", "two", "three", "four", "five", "six"]}
index = [10, 20, 30, 40, 50, 60]
@mkaranasou
mkaranasou / example_null_column_returned_from_udf.py
Created October 17, 2019 17:22
Example of a udf returning a null column
from pyspark import SparkConf
from pyspark.sql import SparkSession, functions as F, types as T
conf = SparkConf()
spark_session = SparkSession.builder \
.config(conf=conf) \
.appName('test') \
.getOrCreate()
@mkaranasou
mkaranasou / pyspark_set_allowed_memory.py
Last active October 17, 2019 17:32
Setting Pyspark's memory consumption
from pyspark import SparkConf
from pyspark.sql import SparkSession
# depending on your set up:
# if you are running the spark app locally, set the driver memory to something your system can handle
# if you are running on a cluster, then also set the executor memory - if necessary (depends on how your cluster is configured)
conf = SparkConf()
conf.set('spark.executor.memory', '16g')
conf.set('spark.driver.memory', '8g')
@mkaranasou
mkaranasou / test_feature_a_to_b_ratio.py
Last active October 13, 2019 13:07
Example of pyspark unittest test case for feature a to b ratio
from pyspark.sql.utils import AnalysisException
from pyspark_unittesting import SparkSQLTestCase
class TestFeatureAToBRatio(SparkSQLTestCase):
def setUp(self):
super(TestFeatureAToBRatio, self).setUp()
self.feature = FeatureAToBRatio()
@mkaranasou
mkaranasou / pyspark_feature_a_to_b_ratio_example.py
Last active October 13, 2019 11:09
An example feature class
from pyspark.sql import functions as F
class FeatureAToBRatio(object):
feature_name = 'a_to_b_ratio'
default_value = 0.
def calculate(self, df):
"""
Given a dataframe that contains columns a and b,
@mkaranasou
mkaranasou / pyspark_unittesting.py
Last active November 6, 2019 12:51
Unittesting pyspark
import traceback
from sparktestingbase.sqltestcase import SQLTestCase
class SparkSQLTestCase(SQLTestCase):
def getConf(self):
from pyspark import SparkConf
conf = SparkConf()
conf.set(
@mkaranasou
mkaranasou / constructor_env_variables.py
Created October 5, 2019 12:26
The function to be provided as an environment variable resolver
def constructor_env_variables(loader, node):
"""
Extracts the environment variable from the node's value
:param yaml.Loader loader: the yaml loader
:param node: the current node in the yaml
:return: the parsed string that contains the value of the environment
variable
"""
value = loader.construct_scalar(node)
match = pattern.findall(value)
@mkaranasou
mkaranasou / pyspark_index_with_row_num_non_sortable_data.py
Last active March 9, 2021 09:14
Adding indexes to a dataframe with row_num if your data is NOT sortable
# First add a column using the F.monotonically_increasing_id().
# This will add monotonically increasing 64-bit integers like this:
>>> df_final = df_final.withColumn("monotonically_increasing_id", F.monotonically_increasing_id())
+--------+---+-----+-------+-------+----------+---------------------------+
| _1| _2|index|column1|column2|row_number|monotonically_increasing_id|
+--------+---+-----+-------+-------+----------+---------------------------+
| [1, 2]| 0| 0| 1| 2| 1| 0|
|[15, 21]| 1| 1| 15| 21| 2| 1|
+--------+---+-----+-------+-------+----------+---------------------------+