Skip to content

Instantly share code, notes, and snippets.

@twiceyuan
Created December 24, 2023 15:58
Show Gist options
  • Save twiceyuan/f745e3c0a99c84618978b3da53407a4b to your computer and use it in GitHub Desktop.
Save twiceyuan/f745e3c0a99c84618978b3da53407a4b to your computer and use it in GitHub Desktop.
[ByteBufferSource] 直接接收 ByteArray 作为数据来源的 okio Source 实现 #Kotlin #Okio
/**
* 直接接收 ByteArray 作为数据来源的 okio Source 实现
*/
class ByteBufferSource : Source {
private val buffer = Buffer()
private val queue: BlockingQueue<ByteArray> = ArrayBlockingQueue(1024)
private val isClosed = AtomicBoolean(false)
fun onDataReceived(data: ByteArray) {
queue.put(data)
}
override fun read(sink: Buffer, byteCount: Long): Long {
// 检查是否有可用数据或流是否已经关闭
while (buffer.size < byteCount && !isClosed.get()) {
val data = queue.poll()
if (data != null) {
buffer.write(data)
} else if (isClosed.get()) {
break // 如果没有数据并且流已关闭,则跳出循环
}
}
return if (buffer.size == 0L && isClosed.get()) {
-1 // 流结束
} else {
buffer.read(sink, minOf(buffer.size, byteCount))
}
}
override fun timeout() = buffer.timeout()
override fun close() {
isClosed.set(true)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment