Skip to content

Instantly share code, notes, and snippets.

@dnene
Last active October 22, 2021 11:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dnene/1995fe2566a8e282552177e93d51fb77 to your computer and use it in GitHub Desktop.
Save dnene/1995fe2566a8e282552177e93d51fb77 to your computer and use it in GitHub Desktop.
This simulates passenger and cab traffic at the pickup point at the airport
package tech.dnene.trials4.airporttaxi
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.LocalDateTime
import java.util.*
/*
* Dependencies required in build.gradle.kts
*
* dependencies {
* implementation(kotlin("stdlib"))
* implementation("org.jetbrains.kotlinx","kotlinx-coroutines-core","1.5.1")
* implementation("com.github.javafaker:javafaker:1.0.2")
* implementation("org.slf4j", "slf4j-api","1.7.31")
* implementation("ch.qos.logback", "logback-classic", "1.2.6")
* }
*
*/
/**
* This program simulates the arrival of passengers and cabs at a pickup point at an airport.
* It can be used to track assignment delay (for passengers and cabs) under different assumptions of their
* arrival frequencies
*
* Inspired by https://medium.com/@anandpillai/uber-go-scheduling-at-bangalore-airport-a-simulation-using-go-50d94f7fafb9
*
*/
data class Passenger(val name: String, val age: Int, val address: String)
data class Cab(val registration: String, val driverName: String)
data class Assignment(val pas: Passenger, val pasTime: LocalDateTime, val cab: Cab, val cabTime: LocalDateTime) {
val assignmentTime = LocalDateTime.now()
val pasDelay = Duration.between(pasTime, assignmentTime)
val cabDelay = Duration.between(cabTime, assignmentTime)
}
val log = LoggerFactory.getLogger("simulate")
/**
* Faker for generating random passenger and cab instances
*/
class Faker(val stateCode: String) {
private val faker = com.github.javafaker.Faker(Locale("en-IN"))
private fun rtoDistrict() = (1..34).random()
private fun alphaChars() = List(2) { ('A'..'Z').random() }.joinToString("")
private fun registrationNumber(state: String) = "$state-${rtoDistrict()} ${alphaChars()} ${(0..9999).random()}"
fun passenger() = Passenger(faker.name().fullName(), (18..78).random(), faker.address().fullAddress())
fun cab() = Cab(registrationNumber(stateCode), faker.name().fullName())
}
/**
** This is a generator which in turn returns a function. The inner function returns an amount of delay in seconds
*
* Allows different policies for delayed generation. Implemented just two .. one for a random delay, and one for
* a fixed delay.
*/
fun rangeDelayGenerator(minSeconds: Int, maxSeconds: Int) = { -> (minSeconds * 1000L..maxSeconds * 1000L).random() }
fun fixedDelayGenerator(seconds: Int) = { -> (seconds * 1000L).toLong() }
/**
* This function is a producer. It implicitly creates a channel, and repeatedly calls the lambda `block` passed to it
* interspersed by the delay as specified by the `delayGenerator` function, and then submits the returned value from
* the `block` to the new channel it just created.
* It returns the receiving end of the channel which a consumer can then monitor
*/
suspend fun <T> CoroutineScope.traffic(delayGenerator: () -> Long, block: () -> T) = produce {
while (isActive && !isClosedForSend) {
delay(delayGenerator())
send(block())
}
}
/**
* A producer which pulls data from one channel, and resubmits it along with timestamp into another channel which is
* implicitly created. Returns the receiving end of the channel which a further downstream consumer can use
*/
suspend fun <T> CoroutineScope.queuer(incoming: ReceiveChannel<T>) = produce {
while (isActive && !isClosedForSend) {
val item = incoming.receive();
val now = LocalDateTime.now()
log.debug("Queued $item at $now")
send(item to now)
}
}
/**
* This function pulls one item each from two channels, and when both are received, creates an assignment between them
*/
suspend fun assignPassengerToCab(
pasQueue: ReceiveChannel<Pair<Passenger, LocalDateTime>>,
cabQueue: ReceiveChannel<Pair<Cab, LocalDateTime>>,
maxCount: Int,
) {
var processedCount = 0
while (!pasQueue.isClosedForReceive && !cabQueue.isClosedForReceive && processedCount < maxCount) {
// wait until you get a passenger (will return existing one if already queued)
val pa = pasQueue.receive()
// wait until you get a cab (will return existing one if already queued)
val ca = cabQueue.receive()
// Create an assignment between the two
val a = Assignment(pa.first, pa.second, ca.first, ca.second)
log.debug("Assigned ${a.pas.name} -> ${a.cab.registration} with delay ${a.pasDelay}")
processedCount++
}
log.debug("Maximum assignments done .. exiting")
// Once all the required count of assignments are done, start shutting down. First cancel both queues
pasQueue.cancel()
cabQueue.cancel()
}
/**
* Just a main
*/
fun main(args: Array<String>) {
val faker = Faker("KA")
val maxAssignments = 5
runBlocking {
// Create a producer which emits a new passenger based on specified delay policy and posts it to the channel
val passengerTraffic = traffic(rangeDelayGenerator(5, 20)) { faker.passenger() }
// Create a consumer/producer who notes a new passenger and resubmits them to another channel along with the time
val passengerArrivals = queuer(passengerTraffic)
// Create a producer which emits a new cab based on specified delay policy and posts it to the channel
val cabTraffic = traffic(fixedDelayGenerator(20)) { faker.cab() }
// Create a consumer/producer who notes a new cab and resubmits it to another channel along with the time
val cabArrivals = queuer(cabTraffic)
// Monitor queues and assign passenger to cab based on FIFO
assignPassengerToCab(passengerArrivals, cabArrivals, maxAssignments)
// The program wont exit yet, need to clean up the open channels
listOf(passengerTraffic, cabTraffic, passengerArrivals, cabArrivals).forEach { it.cancel() }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment