Skip to content

Instantly share code, notes, and snippets.

@markbahnman
Last active August 8, 2020 00:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save markbahnman/2c557be43087dcb135dc081c547d7200 to your computer and use it in GitHub Desktop.
Save markbahnman/2c557be43087dcb135dc081c547d7200 to your computer and use it in GitHub Desktop.
Testing out json rust parsing
[package]
name = "json_rust"
version = "0.1.0"
authors = ["mark"]
edition = "2018"
[dependencies]
serde = { version = "1.0.104", features = ["derive"] }
serde_derive = "1.0.104"
serde_json = "1.0.44"
flate2 = "1.0"
package main
import (
"compress/gzip"
"encoding/json"
"fmt"
"log"
"os"
)
func main() {
file, err := os.Open("./test_file.jsonl.gz")
if err != nil {
log.Fatal("Could not open test file")
}
type serpData struct {
OrganicRank int32 `json:"organic_rank"`
Domain string `json:"domain"`
}
type enhancedSerp struct {
Serp []serpData `json:"serp"`
Volume int32 `json:"volume"`
}
gz, err := gzip.NewReader(file)
if err != nil {
log.Fatal("Error with gzip stream")
}
decoder := json.NewDecoder(gz)
var count int32 = 0
for decoder.More() {
var s enhancedSerp
if err := decoder.Decode(&s); err != nil {
fmt.Println("error:", err)
}
count += s.Volume
}
fmt.Println("Done: ", count)
}
import zlib
import json
class StreamHandler:
def __init__(self, file):
self.file = file
# Get gzip file from S3 and decompress it
# This uses the `yield` functionality as it is streaming the file
# So this would return a generator object
def stream(self):
fs = open(self.file, "rb")
dstream = self.__decompress(fs)
for line in self.__iterlines(dstream):
yield line
# Code taken from:
# https://gist.github.com/chekunkov/1ebcb461c4afd4d98cd4bf3893ce2059
def __decompress(self, body):
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
for chunk in body:
yield d.decompress(chunk)
yield d.flush()
# Code taken from:
# https://gist.github.com/chekunkov/1ebcb461c4afd4d98cd4bf3893ce2059
def __iterlines(self, decompressed_stream):
buf = b''
# keep reading chunks of bytes into the buffer
for chunk in decompressed_stream:
buf += chunk
start = 0
# process all lines within the current buffer
while True:
end = buf.find(b'\n', start) + 1
if end:
yield buf[start:end]
start = end
else:
# no more newlines => break out to read more data from s3 into the buffer
buf = buf[start:]
break
# process the last line, too
if buf:
yield buf
volume = 0
s = StreamHandler(file="./test_file.jsonl.gz")
for jsonl in s.stream():
enhanced_serp = json.loads(jsonl)
volume += enhanced_serp['volume']
print("Done: ", volume)
#[macro_use]
extern crate serde_derive;
use flate2::read::GzDecoder;
use std::fs::File;
use std::io::prelude::*;
use std::io::BufReader;
#[derive(Deserialize, Debug)]
pub struct Serp {
pub organic_rank: i32,
pub domain: String,
}
#[derive(Deserialize, Debug)]
pub struct QuerySetRow {
pub serp: Vec<Serp>,
pub volume: i32,
}
fn main() {
let file = File::open("./test_file.jsonl.gz").expect("could not open file");
let mut reader = BufReader::new(GzDecoder::new(file));
let mut volume = 0;
let mut line = String::new();
loop {
let num_bytes = reader.read_line(&mut line);
match num_bytes {
Ok(0) => break,
Err(e) => println!("{:#?}", e),
Ok(_) => {
let record: QuerySetRow =
serde_json::from_str(&line).expect("Failed to deserialize line");
volume += record.volume;
line.clear();
}
}
}
println!("Done: {}", volume);
}
@markbahnman
Copy link
Author

Note: running these on my macbook with a large, gzipped file produced times of around ~7sec in rust and ~22sec with go and python

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment