Last active
August 8, 2020 00:33
-
-
Save markbahnman/2c557be43087dcb135dc081c547d7200 to your computer and use it in GitHub Desktop.
Testing out json rust parsing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "json_rust" | |
version = "0.1.0" | |
authors = ["mark"] | |
edition = "2018" | |
[dependencies] | |
serde = { version = "1.0.104", features = ["derive"] } | |
serde_derive = "1.0.104" | |
serde_json = "1.0.44" | |
flate2 = "1.0" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"compress/gzip" | |
"encoding/json" | |
"fmt" | |
"log" | |
"os" | |
) | |
func main() { | |
file, err := os.Open("./test_file.jsonl.gz") | |
if err != nil { | |
log.Fatal("Could not open test file") | |
} | |
type serpData struct { | |
OrganicRank int32 `json:"organic_rank"` | |
Domain string `json:"domain"` | |
} | |
type enhancedSerp struct { | |
Serp []serpData `json:"serp"` | |
Volume int32 `json:"volume"` | |
} | |
gz, err := gzip.NewReader(file) | |
if err != nil { | |
log.Fatal("Error with gzip stream") | |
} | |
decoder := json.NewDecoder(gz) | |
var count int32 = 0 | |
for decoder.More() { | |
var s enhancedSerp | |
if err := decoder.Decode(&s); err != nil { | |
fmt.Println("error:", err) | |
} | |
count += s.Volume | |
} | |
fmt.Println("Done: ", count) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zlib | |
import json | |
class StreamHandler: | |
def __init__(self, file): | |
self.file = file | |
# Get gzip file from S3 and decompress it | |
# This uses the `yield` functionality as it is streaming the file | |
# So this would return a generator object | |
def stream(self): | |
fs = open(self.file, "rb") | |
dstream = self.__decompress(fs) | |
for line in self.__iterlines(dstream): | |
yield line | |
# Code taken from: | |
# https://gist.github.com/chekunkov/1ebcb461c4afd4d98cd4bf3893ce2059 | |
def __decompress(self, body): | |
d = zlib.decompressobj(16 + zlib.MAX_WBITS) | |
for chunk in body: | |
yield d.decompress(chunk) | |
yield d.flush() | |
# Code taken from: | |
# https://gist.github.com/chekunkov/1ebcb461c4afd4d98cd4bf3893ce2059 | |
def __iterlines(self, decompressed_stream): | |
buf = b'' | |
# keep reading chunks of bytes into the buffer | |
for chunk in decompressed_stream: | |
buf += chunk | |
start = 0 | |
# process all lines within the current buffer | |
while True: | |
end = buf.find(b'\n', start) + 1 | |
if end: | |
yield buf[start:end] | |
start = end | |
else: | |
# no more newlines => break out to read more data from s3 into the buffer | |
buf = buf[start:] | |
break | |
# process the last line, too | |
if buf: | |
yield buf | |
volume = 0 | |
s = StreamHandler(file="./test_file.jsonl.gz") | |
for jsonl in s.stream(): | |
enhanced_serp = json.loads(jsonl) | |
volume += enhanced_serp['volume'] | |
print("Done: ", volume) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[macro_use] | |
extern crate serde_derive; | |
use flate2::read::GzDecoder; | |
use std::fs::File; | |
use std::io::prelude::*; | |
use std::io::BufReader; | |
#[derive(Deserialize, Debug)] | |
pub struct Serp { | |
pub organic_rank: i32, | |
pub domain: String, | |
} | |
#[derive(Deserialize, Debug)] | |
pub struct QuerySetRow { | |
pub serp: Vec<Serp>, | |
pub volume: i32, | |
} | |
fn main() { | |
let file = File::open("./test_file.jsonl.gz").expect("could not open file"); | |
let mut reader = BufReader::new(GzDecoder::new(file)); | |
let mut volume = 0; | |
let mut line = String::new(); | |
loop { | |
let num_bytes = reader.read_line(&mut line); | |
match num_bytes { | |
Ok(0) => break, | |
Err(e) => println!("{:#?}", e), | |
Ok(_) => { | |
let record: QuerySetRow = | |
serde_json::from_str(&line).expect("Failed to deserialize line"); | |
volume += record.volume; | |
line.clear(); | |
} | |
} | |
} | |
println!("Done: {}", volume); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note: running these on my macbook with a large, gzipped file produced times of around ~7sec in rust and ~22sec with go and python