Skip to content

Instantly share code, notes, and snippets.

@scalactic
Last active April 24, 2022 13:49
Show Gist options
  • Save scalactic/46d0e2e85f7f08b2f8f70c61d56cb21d to your computer and use it in GitHub Desktop.
Save scalactic/46d0e2e85f7f08b2f8f70c61d56cb21d to your computer and use it in GitHub Desktop.
Fraud Detection on ATM Card Skimming: A case study
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.util.Collector
import org.apache.hadoop.conf.{Configuration => HadoopConf}
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Put}
import play.api.libs.json._
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
// Getting transactions from each Atm.
case class AtmTransaction(
ATMID: Long,
CARD_NO: String,
TIMESTAMP: Long
)
// Getting each Atm information
case class Atm(
ATMID: Long,
LATITUDE: Double,
LONGITUDE: Double
)
// A triple card combination on an Atm.
case class AtmUsage(
ATMID: Long,
LATITUDE: Double,
LONGITUDE: Double,
CARDS: List[String]
)
// Two same card combinations are found for different Atms.
case class AtmAlert(
UUID: String,
TIMESTAMP: Long,
ATMID: Long,
PREV_USAGE: AtmUsage,
CURR_USAGE: AtmUsage
)
object AtmCardSkimmingDetection extends App {
// We create a Flink runtime environment.
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// Consume atm transactions from kafka. In scala, ??? indicates something will be implemented.
val atmTrxSource: DataStream[AtmTransaction] = ??? // env.fromSource(???)
// Consume atm information from kafka.
val atmSource: DataStream[Atm] = ??? // env.fromSource(...)
// Scala Json to case class transformations using play-json library.
implicit val readsAtmUsage: Reads[AtmUsage] = Json.reads[AtmUsage]
implicit val writesAtmUsage: Writes[AtmUsage] = Json.writes[AtmUsage]
// Hbase connection configurations.
val hbaseConf: HadoopConf = HBaseConfiguration.create
val connection: Connection = ConnectionFactory.createConnection(hbaseConf)
val hbaseTableName: String = ???
val tripleCardsTable = connection.getTable(TableName.valueOf(hbaseTableName))
class AtmTrxTripleFlatMap extends RichCoFlatMapFunction[AtmTransaction, Atm, AtmAlert] {
private var atmTrxList: ValueState[List[AtmTransaction]] = _
private var atm: ValueState[Atm] = _
private val modColFamily: Int = ??? // hbase colFamily identifier
override def open(parameters: Configuration): Unit = {
// initialize keyed states.
atmTrxList = getRuntimeContext.getState(new ValueStateDescriptor("atmTrxList", createTypeInformation[List[AtmTransaction]]))
atm = getRuntimeContext.getState(new ValueStateDescriptor("atm", createTypeInformation[Atm]))
}
// Checks whether new atm trx cardno is already in the buffer or not.
private def isCardExist(cardNo: String): Boolean = atmTrxList.value.exists(_.CARD_NO == cardNo)
// flatMap1 is being called for each keyed atm trx.
override def flatMap1(value: AtmTransaction, out: Collector[AtmAlert]): Unit = {
atmTrxList.value match {
case null => atmTrxList.update(List(value))
case _ =>
Try(atm.value.ATMID) match {
case Failure(_) =>
// we need to atm information to calculate distances. If 'atm' state is null yet, just skip.
println(s"Atm: ${value.ATMID} isn't available!")
case Success(_) =>
val _atm: Atm = atm.value
val hourAgo: Long = ??? // calculate 1 hour ago from latest trx.
if (value.TIMESTAMP >= hourAgo) {
if (isCardExist(value.CARD_NO)) { // value is already in buffer, just filter state and update buffer.
atmTrxList.update(atmTrxList.value.filter(_.TIMESTAMP >= hourAgo))
} else {
// new trx value is not in buffer yet.
// so, get binary card combinations from atmTrxList state and append new trx to each combination to get triple combinations.
val distinctTriples: List[(List[String], Get, Put)] = atmTrxList
.value
.filter(_.TIMESTAMP >= hourAgo)
.toSet
.subsets(2)
.map(l => l.+(value).toList.sortBy(_.CARD_NO)) //append new trx to each subset.
.map { triple =>
val rowId: Array[Byte] = triple.map(_.CARD_NO.takeRight(2)).mkString.getBytes // get last 2 characters of each cardNo
val colFam: Array[Byte] = (triple.map(_.CARD_NO.takeRight(2).toInt).sum % modColFamily).toString.getBytes // sum last 2 characters of each cardNo, take mod
val colName: Array[Byte] = triple.map(_.CARD_NO).mkString.getBytes // col name => card1card2card3
(
triple.map(l => l.CARD_NO), // card list
new Get(rowId).addColumn(colFam, colName),
new Put(rowId).addColumn(colFam, colName,
Json.toJson[AtmUsage](AtmUsage(value.ATMID, _atm.LATITUDE, _atm.LONGITUDE, triple.map(l => l.CARD_NO))).toString.getBytes
)
)
}
.toList
// Card triples are ready, query HBase for each triple.
Try(
tripleCardsTable.get(distinctTriples.map(_._2).asJava) // try to get triples from hbase
) match {
case Success(tripleCards) =>
tripleCards
.toList
.zip(distinctTriples.map(_._1)) // zip each triple result with their cardList.
.filter(!_._1.isEmpty) // filter empty result, if result is not empty, then triple has a previous usage.
.map(l => (
Json.fromJson[AtmUsage](Json.parse(l._1.value.map(_.toChar).mkString)).get, // parse HBase GET result to case class.
l._2 // cardList
))
.filter(l => l._1.ATMID != value.ATMID) // fraudster withdraws cards usually at a distant atm.
.map(l => AtmAlert(
java.util.UUID.randomUUID.toString, // alert UUID
System.currentTimeMillis, // alert timestamp
value.ATMID, // alert atmid
l._1, // previous card triple on atm 1
AtmUsage(value.ATMID, _atm.LATITUDE, _atm.LONGITUDE, l._2) // current card triple on atm 2
))
.foreach(l => out.collect(l))
case Failure(exception) => println(s"Can't reach to HBase while GET -> $exception")
}
// Put new card triple combinations to HBase.
Try(
tripleCardsTable.put(distinctTriples.map(_._3).asJava)
) match {
case Success(_) => // everything is fine, do nothing.
case Failure(exception) => println(s"Can't PUT triples to HBase -> $exception")
}
// add new atm trx to state
atmTrxList.update(atmTrxList.value.:+(value))
}
}
}
}
}
// flatMap2 is being called for each atm.
override def flatMap2(value: Atm, out: Collector[AtmAlert]): Unit = {
atm.update(value) // take atm information from control stream.
}
}
// Alert producer to kafka.
val producer: FlinkKafkaProducer[AtmAlert] = ???
/*
We have two different source => atmTrxSource and atmSource.
we need to do keyBy partitioning on ATMID to join each atm trx with the atm information.
*/
atmTrxSource.keyBy(_.ATMID)
.connect(atmSource.keyBy(_.ATMID))
.flatMap(new AtmTrxTripleFlatMap) // Then for each atm trx we apply extended RichCoFlatMapFunction.
.addSink(producer) // Publish alerts to kafka
// Start flink application
env.execute("flink-atm-skimming")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment