Skip to content

Instantly share code, notes, and snippets.

@Bill
Last active July 26, 2019 23:39
Show Gist options
  • Save Bill/448ebc71037abe96653c465ee8e04421 to your computer and use it in GitHub Desktop.
Save Bill/448ebc71037abe96653c465ee8e04421 to your computer and use it in GitHub Desktop.
Demonstrate a Java-callable wrapper on a Kotlin coroutine
import static java.lang.Thread.sleep;
import java.time.LocalDateTime;
import java.util.Collection;
public class CallCoroutineFromJava {
public static void main(String[] args) throws InterruptedException {
final JavaCallableCoroutineScope jccs = new JavaCallableCoroutineScope();
final Thread contentProducer = new Thread(() -> {
try {
while (true) {
sleep(10);
final String content = LocalDateTime.now().toString();
DeferredKt.log2("sending content: " + content);
jccs.submitStuff(content);
}
} catch (InterruptedException e) {
}
});
final Thread snapshotter = new Thread(() -> {
try {
while (true) {
sleep(100);
DeferredKt.log2("sending snapshot request (synchronously)");
final Collection<String> snapshot = jccs.snapshot();
DeferredKt.log2("got snapshot (synchronously): " + snapshot);
}
} catch (InterruptedException e) {
}
});
contentProducer.setDaemon(true);
snapshotter.setDaemon(true);
contentProducer.start();
snapshotter.start();
Thread.sleep(10_000);
}
}
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ticker
import kotlinx.coroutines.selects.select
import java.time.LocalDateTime
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
/**
* This is a coroutine builder---similar to the one in deferred.kt
* It's usable from other coroutines, but not so much from plain old Java code, since
* you gotta communicate with it via channels and the interface to those is in terms
* of suspend functions---not so handy from plain old Java.
*/
fun CoroutineScope.theCoroutine(
submissionChannel: Channel<String>,
snapshotRequestChannel: Channel<CompletableDeferred<Collection<String>>>) = launch {
val submissions = mutableListOf<String>()
val clearContent = ticker(50)
while(true) {
select<Unit> {
submissionChannel.onReceive {
log2("received submission: $it")
submissions.add(it)
}
snapshotRequestChannel.onReceive {
log2("received snapshot request")
it.complete(submissions.toList())
}
clearContent.onReceive {
log2("clearing content")
submissions.clear()
}
}
}
}
/**
* This is a class for handy access to theCoroutine from Java
* It's a CoroutineScope so that its public functions have coroutine builders e.g.
* launch and async, available. Those need a CoroutineContext, which is established
* in the constructor for this class. In this way the coroutine(s) we construct
* adhere to the the structured concurrency protocol. Part of that entails ensuring that
* coroutines we spin up---the primary one in init, and the others in the public
* functions---are all run in the same coroutine scope.
*
* TODO: more words about why it's important to run in a a common coroutine scope
*/
class JavaCallableCoroutineScope(
override val coroutineContext: CoroutineContext = EmptyCoroutineContext) : CoroutineScope {
// TODO: figure out if I need to create a new context or if it's ok to reuse parent context directly
val submissionChannel: Channel<String> = Channel()
val snapshotRequestChannel: Channel<CompletableDeferred<Collection<String>>> =
Channel()
init {
theCoroutine(submissionChannel, snapshotRequestChannel)
}
// a fire-and-forget example
fun submitStuff(stuff: String) = launch {
submissionChannel.send(stuff)
}
/**
* a request-response example
* launch a coroutine to request and wait for the async result---return that result
*/
fun snapshot(): Collection<String> = runBlocking {
with(CompletableDeferred<Collection<String>>()) {
log2("sending snapshot request (deferred)")
snapshotRequestChannel.send(this)
await().also {
log2("got deferred snapshot value: $it")
}
}
}
}
/*
See Java class CallCoroutineFromJava for invocation from Java.
Compare those Java thread definitions to the Kotlin coroutines below.
*/
fun main() = runBlocking {
val jccs = JavaCallableCoroutineScope()
val contentProducer = launch {
while(true) {
delay(10)
val content = LocalDateTime.now().toString()
log2("sending content: $content")
jccs.submitStuff(content)
}
}
val snapshotter = launch {
while(true) {
delay(100)
log2("sending snapshot request (synchronously)")
val snapshot = jccs.snapshot()
log2("got snapshot (synchronously): $snapshot")
}
}
delay(10_000)
coroutineContext.cancelChildren()
}
@Bill
Copy link
Author

Bill commented Jul 24, 2019

output is like:

clearing content in thread Thread[DefaultDispatcher-worker-1,5,main]
sending content: 2019-07-24T16:21:57.512718 in thread Thread[main,5,main]
received submission: 2019-07-24T16:21:57.512718 in thread Thread[DefaultDispatcher-worker-2,5,main]
sending content: 2019-07-24T16:21:57.530276 in thread Thread[main,5,main]
received submission: 2019-07-24T16:21:57.530276 in thread Thread[DefaultDispatcher-worker-2,5,main]
sending content: 2019-07-24T16:21:57.544779 in thread Thread[main,5,main]
received submission: 2019-07-24T16:21:57.544779 in thread Thread[DefaultDispatcher-worker-4,5,main]
clearing content in thread Thread[DefaultDispatcher-worker-3,5,main]
sending content: 2019-07-24T16:21:57.555573 in thread Thread[main,5,main]
received submission: 2019-07-24T16:21:57.555573 in thread Thread[DefaultDispatcher-worker-3,5,main]
sending content: 2019-07-24T16:21:57.567629 in thread Thread[main,5,main]
received submission: 2019-07-24T16:21:57.567629 in thread Thread[DefaultDispatcher-worker-3,5,main]
sending snapshot request (synchronously) in thread Thread[main,5,main]
sending snapshot request (deferred) in thread Thread[main,5,main]
received snapshot request in thread Thread[DefaultDispatcher-worker-3,5,main]
sending content: 2019-07-24T16:21:57.578044 in thread Thread[main,5,main]
got deferred snapshot value: [2019-07-24T16:21:57.555573, 2019-07-24T16:21:57.567629] in thread Thread[main,5,main]
got snapshot (synchronously): [2019-07-24T16:21:57.555573, 2019-07-24T16:21:57.567629] in thread Thread[main,5,main]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment