Skip to content

Instantly share code, notes, and snippets.

@avcaliani
Created January 24, 2023 23:11
Show Gist options
  • Save avcaliani/d441db66f7c32777d2d9b527f56329ce to your computer and use it in GitHub Desktop.
Save avcaliani/d441db66f7c32777d2d9b527f56329ce to your computer and use it in GitHub Desktop.

💸 Credit Card - Fraud Investigation

By Anthony Vilarim Caliani

# #

This is an experiment using Spark array functions.
In this example I'm using a Fraudulent Transactions Data dataset, so thanks to Chitwan Manchanda for sharing his dataset.

How to use it?

If you want to execute it follow the next steps.

Dataset Download

Download the dataset from Kaggle, then add the file into the folder ./data/raw/

Python Dependencies

Now, install python dependencies.
You must have Java installed in order to make it work correctly.

# 👇 Setting PyEnv version
pyenv local 3.10.6

# 👇 Virtual Environment
python -m venv .venv \
  && source .venv/bin/activate \
  && python -m pip install --upgrade pip

# 👇 Dependencies
pip install pyspark==3.3.1

Execute the Script

spark-submit main.py

References

from contextlib import contextmanager
from pyspark.sql import Column, DataFrame, SparkSession
from pyspark.sql import functions as f
@contextmanager
def spark_session() -> SparkSession:
spark = SparkSession.builder.appName(f"app").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
yield spark
spark.stop()
def read(spark: SparkSession) -> DataFrame:
return spark.read \
.csv("data/raw", header=True, inferSchema=True)
def info(df: DataFrame, title: str) -> None:
print(title)
print(f"Records: {df.count()}")
df.printSchema()
df.show(5, truncate=False)
def pre_processing(data: DataFrame) -> DataFrame:
return (
data
.select(f.col("nameOrig").alias("name_origin"),
f.col("type"),
f.col("amount"),
f.col("isFraud").alias("is_fraud"))
.groupBy("name_origin")
.agg(f.count("*").alias("n_transactions"),
f.sum("is_fraud").alias("n_frauds"),
f.collect_list(f.struct("type", "amount", "is_fraud")).alias("transactions"))
.filter(f.col("n_frauds") > 0)
.filter(f.col("n_transactions") != f.col("n_frauds"))
.orderBy(f.desc("n_transactions"), f.col("name_origin"))
)
def update_struct_field(column: Column) -> Column:
return column.withField(
"is_fraud",
column.getField("is_fraud") == 1
)
def not_fraud(column: Column) -> Column:
return ~column.getField("is_fraud")
def calculate_transactions(data: DataFrame) -> DataFrame:
return (
data
.withColumn("transactions", f.transform("transactions", update_struct_field))
.withColumn("transactions", f.filter("transactions", not_fraud))
)
if __name__ == "__main__":
with spark_session() as ss:
df = read(ss)
info(df, "🥩 Raw Dataset")
df = pre_processing(df)
info(df, "🧹 Pre Processing")
df = calculate_transactions(df)
info(df, "🤘 Let's Rock")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment