Last active
April 24, 2022 13:49
-
-
Save scalactic/46d0e2e85f7f08b2f8f70c61d56cb21d to your computer and use it in GitHub Desktop.
Fraud Detection on ATM Card Skimming: A case study
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} | |
import org.apache.flink.configuration.Configuration | |
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction | |
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} | |
import org.apache.flink.streaming.api.scala._ | |
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer | |
import org.apache.flink.util.Collector | |
import org.apache.hadoop.conf.{Configuration => HadoopConf} | |
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} | |
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Get, Put} | |
import play.api.libs.json._ | |
import scala.collection.JavaConverters._ | |
import scala.util.{Failure, Success, Try} | |
// Getting transactions from each Atm. | |
case class AtmTransaction( | |
ATMID: Long, | |
CARD_NO: String, | |
TIMESTAMP: Long | |
) | |
// Getting each Atm information | |
case class Atm( | |
ATMID: Long, | |
LATITUDE: Double, | |
LONGITUDE: Double | |
) | |
// A triple card combination on an Atm. | |
case class AtmUsage( | |
ATMID: Long, | |
LATITUDE: Double, | |
LONGITUDE: Double, | |
CARDS: List[String] | |
) | |
// Two same card combinations are found for different Atms. | |
case class AtmAlert( | |
UUID: String, | |
TIMESTAMP: Long, | |
ATMID: Long, | |
PREV_USAGE: AtmUsage, | |
CURR_USAGE: AtmUsage | |
) | |
object AtmCardSkimmingDetection extends App { | |
// We create a Flink runtime environment. | |
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment | |
// Consume atm transactions from kafka. In scala, ??? indicates something will be implemented. | |
val atmTrxSource: DataStream[AtmTransaction] = ??? // env.fromSource(???) | |
// Consume atm information from kafka. | |
val atmSource: DataStream[Atm] = ??? // env.fromSource(...) | |
// Scala Json to case class transformations using play-json library. | |
implicit val readsAtmUsage: Reads[AtmUsage] = Json.reads[AtmUsage] | |
implicit val writesAtmUsage: Writes[AtmUsage] = Json.writes[AtmUsage] | |
// Hbase connection configurations. | |
val hbaseConf: HadoopConf = HBaseConfiguration.create | |
val connection: Connection = ConnectionFactory.createConnection(hbaseConf) | |
val hbaseTableName: String = ??? | |
val tripleCardsTable = connection.getTable(TableName.valueOf(hbaseTableName)) | |
class AtmTrxTripleFlatMap extends RichCoFlatMapFunction[AtmTransaction, Atm, AtmAlert] { | |
private var atmTrxList: ValueState[List[AtmTransaction]] = _ | |
private var atm: ValueState[Atm] = _ | |
private val modColFamily: Int = ??? // hbase colFamily identifier | |
override def open(parameters: Configuration): Unit = { | |
// initialize keyed states. | |
atmTrxList = getRuntimeContext.getState(new ValueStateDescriptor("atmTrxList", createTypeInformation[List[AtmTransaction]])) | |
atm = getRuntimeContext.getState(new ValueStateDescriptor("atm", createTypeInformation[Atm])) | |
} | |
// Checks whether new atm trx cardno is already in the buffer or not. | |
private def isCardExist(cardNo: String): Boolean = atmTrxList.value.exists(_.CARD_NO == cardNo) | |
// flatMap1 is being called for each keyed atm trx. | |
override def flatMap1(value: AtmTransaction, out: Collector[AtmAlert]): Unit = { | |
atmTrxList.value match { | |
case null => atmTrxList.update(List(value)) | |
case _ => | |
Try(atm.value.ATMID) match { | |
case Failure(_) => | |
// we need to atm information to calculate distances. If 'atm' state is null yet, just skip. | |
println(s"Atm: ${value.ATMID} isn't available!") | |
case Success(_) => | |
val _atm: Atm = atm.value | |
val hourAgo: Long = ??? // calculate 1 hour ago from latest trx. | |
if (value.TIMESTAMP >= hourAgo) { | |
if (isCardExist(value.CARD_NO)) { // value is already in buffer, just filter state and update buffer. | |
atmTrxList.update(atmTrxList.value.filter(_.TIMESTAMP >= hourAgo)) | |
} else { | |
// new trx value is not in buffer yet. | |
// so, get binary card combinations from atmTrxList state and append new trx to each combination to get triple combinations. | |
val distinctTriples: List[(List[String], Get, Put)] = atmTrxList | |
.value | |
.filter(_.TIMESTAMP >= hourAgo) | |
.toSet | |
.subsets(2) | |
.map(l => l.+(value).toList.sortBy(_.CARD_NO)) //append new trx to each subset. | |
.map { triple => | |
val rowId: Array[Byte] = triple.map(_.CARD_NO.takeRight(2)).mkString.getBytes // get last 2 characters of each cardNo | |
val colFam: Array[Byte] = (triple.map(_.CARD_NO.takeRight(2).toInt).sum % modColFamily).toString.getBytes // sum last 2 characters of each cardNo, take mod | |
val colName: Array[Byte] = triple.map(_.CARD_NO).mkString.getBytes // col name => card1card2card3 | |
( | |
triple.map(l => l.CARD_NO), // card list | |
new Get(rowId).addColumn(colFam, colName), | |
new Put(rowId).addColumn(colFam, colName, | |
Json.toJson[AtmUsage](AtmUsage(value.ATMID, _atm.LATITUDE, _atm.LONGITUDE, triple.map(l => l.CARD_NO))).toString.getBytes | |
) | |
) | |
} | |
.toList | |
// Card triples are ready, query HBase for each triple. | |
Try( | |
tripleCardsTable.get(distinctTriples.map(_._2).asJava) // try to get triples from hbase | |
) match { | |
case Success(tripleCards) => | |
tripleCards | |
.toList | |
.zip(distinctTriples.map(_._1)) // zip each triple result with their cardList. | |
.filter(!_._1.isEmpty) // filter empty result, if result is not empty, then triple has a previous usage. | |
.map(l => ( | |
Json.fromJson[AtmUsage](Json.parse(l._1.value.map(_.toChar).mkString)).get, // parse HBase GET result to case class. | |
l._2 // cardList | |
)) | |
.filter(l => l._1.ATMID != value.ATMID) // fraudster withdraws cards usually at a distant atm. | |
.map(l => AtmAlert( | |
java.util.UUID.randomUUID.toString, // alert UUID | |
System.currentTimeMillis, // alert timestamp | |
value.ATMID, // alert atmid | |
l._1, // previous card triple on atm 1 | |
AtmUsage(value.ATMID, _atm.LATITUDE, _atm.LONGITUDE, l._2) // current card triple on atm 2 | |
)) | |
.foreach(l => out.collect(l)) | |
case Failure(exception) => println(s"Can't reach to HBase while GET -> $exception") | |
} | |
// Put new card triple combinations to HBase. | |
Try( | |
tripleCardsTable.put(distinctTriples.map(_._3).asJava) | |
) match { | |
case Success(_) => // everything is fine, do nothing. | |
case Failure(exception) => println(s"Can't PUT triples to HBase -> $exception") | |
} | |
// add new atm trx to state | |
atmTrxList.update(atmTrxList.value.:+(value)) | |
} | |
} | |
} | |
} | |
} | |
// flatMap2 is being called for each atm. | |
override def flatMap2(value: Atm, out: Collector[AtmAlert]): Unit = { | |
atm.update(value) // take atm information from control stream. | |
} | |
} | |
// Alert producer to kafka. | |
val producer: FlinkKafkaProducer[AtmAlert] = ??? | |
/* | |
We have two different source => atmTrxSource and atmSource. | |
we need to do keyBy partitioning on ATMID to join each atm trx with the atm information. | |
*/ | |
atmTrxSource.keyBy(_.ATMID) | |
.connect(atmSource.keyBy(_.ATMID)) | |
.flatMap(new AtmTrxTripleFlatMap) // Then for each atm trx we apply extended RichCoFlatMapFunction. | |
.addSink(producer) // Publish alerts to kafka | |
// Start flink application | |
env.execute("flink-atm-skimming") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment