int DOWNSTREAM_INIT = Integer.MIN_VALUE >>> 1
int UPSTREAM_INIT = Integer.MIN_VALUE >>> 2
int SUBSCRIPTION_LOCK = Integer.MIN_VALUE >>> 3
AtomicInteger contenders = new AtomicInteger(Integer.MIN_VALUE)
private boolean halfInit(int mask) {
int c = contenders.getAndUpdate(v -> v < 0 ? v | mask : v);
return c < 0 && (c & mask) == 0;
}
@Override
public void onSubscribe(Subscription subscription) {
if (!halfInit(DOWNSTREAM_INIT)) {
// ...
return;
}
// ...
deferredInit();
}
@Override
public void subscribe(Subscriber<? super ReadableBodyPart> subscriber) {
if (!halfInit(UPSTREAM_INIT)) {
// ...
return;
}
// ...
deferredInit();
}
private void deferredInit() {
if (contenders.addAndGet(SUBSCRIPTION_LOCK) > 0) {
// ...
}
}
contenders = 0b1000 // initial state
downstream_init = 0b1000 >> 1 // 0b0100
upstream_init = 0b1000 >> 2 // 0b0010
subscription_offset = 0b1000 >> 3 // 0b0001
// updating state is done with OR mask
0b1000 | 0b0100 = 0b1100
0b1000 | 0b0010 = 0b1010
0b1010 | 0b0100 = 0b1110
// updating to the same state is idempotent
0b1100 | 0b0100 = 0b1100
0b1010 | 0b0010 = 0b1010
// testing the change of state is done with AND mask == 0
0b1000 & 0b0100 = 0b0000 // true
0b1000 & 0b0010 = 0b0000 // true
0b1010 & 0b0100 = 0b0000 // true
0b1100 & 0b0010 = 0b0000 // true
0b1100 & 0b0100 = 0b0100 // false
0b1010 & 0b0010 = 0b0010 // false
// offset is added after each change of state
0b1100 + 0b0001 = 0b1101
0b1010 + 0b0001 = 0b1011
// updating state with offset added
0b1101 | 0b0010 = 0b1111
0b1011 | 0b0100 = 0b1111
// offset is added one last time
0b1111 + 0b0001 = 0b0000 // overflow
// now contenders == 0 and can be used to track demand