Skip to content

Instantly share code, notes, and snippets.

@willtemperley
Created June 28, 2023 12:18
Show Gist options
  • Save willtemperley/0dc0f24adbd98b061d6d1aaffafa515d to your computer and use it in GitHub Desktop.
Save willtemperley/0dc0f24adbd98b061d6d1aaffafa515d to your computer and use it in GitHub Desktop.
Iterator over binary delimited protobuf input stream
//
// ProtoIter.swift
//
// Created by Will Temperley on 28/06/2023.
// Varint decoding adapted from SwiftProtobuf
//
import Foundation
import SwiftProtobuf
extension InputStream {
func readData(maxLength length: Int) throws -> Data {
var buffer = [UInt8](repeating: 0, count: length)
let result = self.read(&buffer, maxLength: buffer.count)
if result < 0 {
throw self.streamError ?? POSIXError(.EIO)
} else {
return Data(buffer.prefix(result))
}
}
}
/**
Iterates over a binary-delimited protobuf input stream
*/
class MessageIterator<M: Message>: IteratorProtocol {
let inputStream: InputStream
var pointer: Int = 0
let bufferLength: Int
var buffer: Data
var endOfStream = false
init(inputStream: InputStream, bufferLength: Int = 32768) throws {
self.inputStream = inputStream
self.bufferLength = bufferLength
buffer = try inputStream.readData(maxLength: bufferLength)
}
func next() -> M? {
do {
if (endOfStream) {
return nil
}
let messageLength = try decodeVarint()
var message = M()
let data = try read(for: Int(messageLength))
try message.merge(serializedData: data)
return message
} catch {
print(error)
return nil
}
}
/**
Read the required data quantity from the buffer,
loading more from the stream if necessary
*/
func read(for nBytes: Int) throws -> Data {
let end = pointer + nBytes
if (end < buffer.count) {
let messageData = buffer[pointer..<end]
pointer += nBytes
return messageData
} else {
let prefix = buffer[pointer..<buffer.count]
let remainingByteCount = nBytes - prefix.count
var toLoad = bufferLength
if (remainingByteCount > bufferLength) {
let nPages = Int(ceil(Float(remainingByteCount) / Float(bufferLength)))
toLoad = nPages * bufferLength
}
//more data required
buffer = try inputStream.readData(maxLength: toLoad)
if (buffer.isEmpty) {
endOfStream = true
}
let suffix = buffer[0..<remainingByteCount]
pointer = remainingByteCount
return prefix + suffix
}
}
//Adapted from SwiftProtobuf BinaryDecoder
func decodeVarint() throws -> UInt64 {
let slice: Data = try read(for: 1)
var c = slice[slice.startIndex]
if c & 0x80 == 0 {
return UInt64(c)
}
var value = UInt64(c & 0x7f)
var shift = UInt64(7)
while true {
if shift > 63 {
throw BinaryDecodingError.malformedProtobuf
}
let slice: Data = try read(for: 1)
c = slice[slice.startIndex]
value |= UInt64(c & 0x7f) << shift
if c & 0x80 == 0 {
return value
}
shift += 7
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment