Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@hoffrocket
Created August 29, 2012 20:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hoffrocket/3518541 to your computer and use it in GitHub Desktop.
Save hoffrocket/3518541 to your computer and use it in GitHub Desktop.
Mongo streaming dump in scala
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.EOFException
import java.io.InputStream
import java.io.OutputStream
import org.bson.BSONCallback
import org.bson.BSONObject
import com.mongodb.DBCallback
import com.mongodb.DBCollection
import com.mongodb.DBCursor
import com.mongodb.DBDecoder
import com.mongodb.DBDecoderFactory
import com.mongodb.DBObject
import com.mongodb.DefaultDBDecoder
import com.mongodb.Mongo
object StreamingDump {
class StreamingDBDecoderFactory(os: OutputStream) extends DBDecoderFactory {
def create(): DBDecoder = new StreamingDBDecoder(os)
}
class StreamingDBDecoder(os: OutputStream) extends DBDecoder {
val buffer: Array[Byte] = new Array[Byte](4096)
def decode(in: InputStream, collection: DBCollection): DBObject = {
val ch1 = in.read()
val ch2 = in.read()
val ch3 = in.read()
val ch4 = in.read()
if ((ch1 | ch2 | ch3 | ch4) < 0)
throw new EOFException()
// little endian
var toread = ((ch4 << 24) + (ch3 << 16) + (ch2 << 8) + (ch1 << 0))
os.write(ch1)
os.write(ch2)
os.write(ch3)
os.write(ch4)
// already read 4 bytes
toread -= 4
while (toread > 0) {
val read = in.read(buffer, 0, math.min(toread, buffer.length))
toread -= read
os.write(buffer, 0, read)
}
null
}
def getDBCallback(collection: DBCollection): DBCallback = {
throw new UnsupportedOperationException("Not implemented")
}
def decode(bytes: Array[Byte], collection: DBCollection): DBObject = {
throw new UnsupportedOperationException("Not implemented")
}
def readObject(bytes: Array[Byte]): BSONObject = {
throw new UnsupportedOperationException("Not implemented")
}
def readObject(in: InputStream): BSONObject = {
throw new UnsupportedOperationException("Not implemented")
}
def decode(bytes: Array[Byte], callback: BSONCallback): Int = {
throw new UnsupportedOperationException("Not implemented")
}
def decode(in: InputStream, callback: BSONCallback): Int = {
throw new UnsupportedOperationException("Not implemented")
}
}
def main(args: Array[String]) {
val mongo = new Mongo("127.0.0.1", 27017)
val coll: DBCollection = mongo.getDB("test").getCollection("test")
val os = new ByteArrayOutputStream()
val cursor: DBCursor = coll.find()
cursor.setDecoderFactory(new StreamingDBDecoderFactory(os))
// run through the whole cursor
while (cursor.hasNext()) {
cursor.next()
}
cursor.close()
mongo.close()
// test that actually worked
val decoder = new DefaultDBDecoder
val is = new ByteArrayInputStream(os.toByteArray())
val results = Stream.continually(decoder.readObject(is)).takeWhile(_ => is.available() > 0)
println(results.toList)
}
}
@hoffrocket
Copy link
Author

Note: using this code to write to a file seems to be faster than the mongodump command line tool

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment