Skip to content

Instantly share code, notes, and snippets.

@aaronj1335
Last active November 24, 2021 22:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aaronj1335/da1c1795c7678dfc7ef5de96a3ae0a59 to your computer and use it in GitHub Desktop.
Save aaronj1335/da1c1795c7678dfc7ef5de96a3ae0a59 to your computer and use it in GitHub Desktop.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import java.io.PrintWriter
import java.net.HttpURLConnection
import java.net.ServerSocket
import java.net.URL
import java.nio.CharBuffer
import java.util.concurrent.Executors
data class Request(val line: String, val headers: List<String>, val body: String)
data class Response(val body: String, val code: Int = 200) {
val message = if (code == 200) "ok" else "uh oh"
}
fun CoroutineScope.launchServer(handler: (Request) -> Response): Flow<Pair<Request, Response>> = flow {
val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
launch(dispatcher) {
ServerSocket(8080).use { serverSocket ->
while (true) {
val client = serverSocket.accept()
launch(Executors.newFixedThreadPool(4).asCoroutineDispatcher()) {
val input = client.getInputStream().bufferedReader()
val output = PrintWriter(client.getOutputStream())
val requestLine = input.readLine()
val headers = input.lineSequence().takeWhile { it.isNotBlank() }.toList()
val buffer = CharBuffer.allocate(8192)
input.read(buffer)
buffer.flip()
val body = buffer.toString()
val request = Request(requestLine, headers, body)
val response = handler(request)
output.print("HTTP/1.0 ${response.code} ${response.message}\n\n${response.body}")
output.flush()
client.close()
launch(dispatcher) {
emit(Pair(request, response))
}
}
}
}
}
}
fun CoroutineScope.launchAsyncRequest(body: String, wait: Long = 1000) = launch(Dispatchers.IO) {
delay(wait)
try {
(URL("http://localhost:8080").openConnection() as HttpURLConnection).apply {
requestMethod = "GET"
doOutput = true
val writer = outputStream.bufferedWriter()
writer.write(body)
writer.flush()
val code = responseCode
val responseBody = inputStream.bufferedReader().readText()
println("$body result [$responseCode]: $responseBody")
}
} catch (t: Throwable) {
println("failed to make <$body> request: $t")
}
}
fun main(args: Array<String>) = runBlocking {
val requests = launchServer {
if (it.body.contains("ghi")) {
Response("Tid${Thread.currentThread().id}(${it.body}) GOT IT")
} else {
Response("Tid${Thread.currentThread().id}(${it.body})")
}
}.shareIn(this, SharingStarted.Eagerly)
launchAsyncRequest("abc", 1000)
launchAsyncRequest("def", 2000)
launchAsyncRequest("ghi", 3000)
launchAsyncRequest("jkl", 3000)
launchAsyncRequest("mno", 4000)
// I want this to:
//
// 1. Suspend while collecting each emitted value.
// 2. Continue to collect until seeing a request with "ghi".
// 3. At that point continue.
// 4. Cancel the flow (thereby closing the server's socket).
//
// But instead it suspends, I don't see any of the "collected" logs, and it never resumes or cancels the flow.
requests
.takeWhile { (_, response) -> !response.body.contains("ghi") }
.onEach { System.err.println("collected $it") }
.collect()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment