Skip to content

Instantly share code, notes, and snippets.

@ruescasd
Last active July 2, 2021 20:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ruescasd/3b58f99ad0217d782edf58e64e7e1b33 to your computer and use it in GitHub Desktop.
Save ruescasd/3b58f99ad0217d782edf58e64e7e1b33 to your computer and use it in GitHub Desktop.
[package]
name = "b3"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rand = "0.7.3"
sha2 = "0.9.1"
hex = "0.4.3"
r2d2 = "0.8.9"
r2d2_postgres = "0.18.0"
/*
no lock: 896.861 / s
lock: 881.834 / s
*/
use std::thread;
use rand::{thread_rng, Rng};
use rand::distributions::Alphanumeric;
use std::sync::{Arc, Mutex};
use sha2::{Digest, Sha512};
use r2d2;
use r2d2_postgres::{postgres::NoTls, PostgresConnectionManager};
fn main() {
let mtx: Arc<Mutex<String>> = Arc::new(Mutex::new(String::from("")));
let manager = PostgresConnectionManager::new(
"host=localhost port=5433 user=b3".parse().unwrap(),
NoTls,
);
let pool = r2d2::Pool::new(manager).unwrap();
let mut client = pool.get().unwrap();
client.execute("drop table test", &[]).unwrap();
client.batch_execute("
CREATE TABLE test (
id SERIAL PRIMARY KEY,
content TEXT NOT NULL,
h1 TEXT NOT NULL,
h2 TEXT NOT NULL
)
").unwrap();
let now = std::time::Instant::now();
let handles: Vec<_> = (0..10).into_iter().map(|_| {
let mtx = mtx.clone();
let pool = pool.clone();
thread::spawn(move || {
for _ in 0..100 {
insert(mtx.clone(), pool.clone(), random_string());
}
})
}).collect();
for h in handles {
h.join().unwrap();
}
let rate = (1000.0 * 1000.0) / (now.elapsed().as_millis() as f32);
println!("{:.3} / s", rate);
/*
for row in client.query("SELECT id, content, h1, h2 FROM test order by id", &[]).unwrap() {
let id: i32 = row.get(0);
let content: &str = row.get(1);
let h1: &str = row.get(2);
let h2: &str = row.get(3);
println!("found: {} {} {} {}", id, content, h1, h2);
}*/
}
pub fn insert(mtx: Arc<Mutex<String>>, pool: r2d2::Pool<PostgresConnectionManager<NoTls>>, content: String) {
let mut client = pool.get().unwrap();
let mtx = Arc::clone(&mtx);
let mut head_mtx = mtx.lock().unwrap();
let head = if head_mtx.len() > 0 {
(*head_mtx).clone()
}
else {
content.clone()
};
// uncomment for no lock
// let head = String::from("fixed");
let (h1, h2): (String, String) = hash(content.clone(), head.clone());
*head_mtx = h2.clone();
drop(head_mtx);
let result = client.execute(
"INSERT INTO test (content, h1, h2) VALUES ($1, $2, $3)",
&[&content, &h1, &h2],
);
if result.is_err() {
let mut head_mtx = mtx.lock().unwrap();
*head_mtx = head;
}
}
fn random_string() -> String {
let ret: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(128)
.map(char::from)
.collect();
ret
}
pub fn hash(content: String, head: String) -> (String, String) {
let mut hasher = Sha512::new();
hasher.update(content.as_bytes());
let h1 = hex::encode(hasher.finalize());
let mut hasher2 = Sha512::new();
hasher2.update(h1.as_bytes());
hasher2.update(head.as_bytes());
let h2 = hex::encode(hasher2.finalize());
(h1, h2)
}
/*
508.388 / s
*/
#[macro_use]
extern crate quick_error;
use std::thread;
use std::io::Write;
use std::time::Duration;
use std::sync::{Arc, Mutex};
use rand::{thread_rng, Rng};
use rand::distributions::Alphanumeric;
use std::time::SystemTime;
use sha2::{Digest, Sha512};
use r2d2;
use r2d2::PooledConnection;
use r2d2_postgres::{postgres::NoTls, PostgresConnectionManager, postgres::Error};
quick_error! {
#[derive(Debug)]
pub enum WriteError {
Empty{}
PsqlError(err: Error) {
from()
}
IOError(err: std::io::Error) {
from()
}
}
}
struct Entry {
content: String,
h: String
}
impl Entry {
fn new(content: String, h: String) -> Entry {
Entry {
content, h
}
}
}
struct Entries {
head: String,
entries: Vec<Entry>,
time: SystemTime
}
impl Entries {
fn new(head: String, entries: Vec<Entry>, time: SystemTime) -> Entries {
Entries {
head, entries, time
}
}
}
impl Default for Entries {
fn default() -> Entries {
Entries::new(String::from("ROOT"), vec![], SystemTime::now())
}
}
fn main() {
let mtx: Arc<Mutex<Entries>> = Arc::new(Mutex::new(Entries::default()));
let manager = PostgresConnectionManager::new(
"host=localhost port=5433 user=b3".parse().unwrap(),
NoTls,
);
let pool = r2d2::Pool::new(manager).unwrap();
let mut client = pool.get().unwrap();
client.execute("drop table test", &[]).unwrap();
client.batch_execute("
CREATE TABLE test (
id SERIAL PRIMARY KEY,
content TEXT NOT NULL,
h1 TEXT NOT NULL,
h2 TEXT NOT NULL
)
").unwrap();
let now = std::time::Instant::now();
let handles: Vec<_> = (0..10).into_iter().map(|_| {
let mtx = mtx.clone();
thread::spawn(move || {
for _ in 0..100 {
let mut e = mtx.lock().unwrap();
let entry = Entry::new(random_string(), random_string());
e.entries.push(entry);
drop(e);
thread::sleep(Duration::from_millis(10));
}
})
}).collect();
let mtx = mtx.clone();
let mut hash_conn = pool.get().unwrap();
let hasher = thread::spawn(move || {
for _ in 0..20 {
let mut e = mtx.lock().unwrap();
let elapsed = e.time.elapsed().unwrap().as_millis();
let count = e.entries.len();
if count == 0 {
break;
}
if count > 50 || (elapsed > 500 && count > 0) {
print!("Running write [count={}] [elapsed={}]..", count, elapsed);
let now = std::time::Instant::now();
let result = write(&mut hash_conn, &e.entries, e.head.clone());
if result.is_ok() {
*e = Entries::default();
}
else {
println!("result is {:?}", result);
}
println!("done[{}ms]", now.elapsed().as_millis());
}
drop(e);
thread::sleep(Duration::from_millis(200));
}
});
hasher.join().unwrap();
for h in handles {
h.join().unwrap();
}
let rate = (1000.0 * 1000.0) / (now.elapsed().as_millis() as f32);
println!("{:.3} / s", rate);
/*for row in client.query("SELECT id, content, h1, h2 FROM test order by id", &[]).unwrap() {
let id: i32 = row.get(0);
let content: &str = row.get(1);
let h1: &str = row.get(2);
let h2: &str = row.get(3);
println!("found: {} {} {} {}", id, content, h1, h2);
}*/
}
fn write(client: &mut PooledConnection<PostgresConnectionManager<NoTls>>, entries: &Vec<Entry>, head: String) -> Result<(), WriteError> {
let mut writer = client.copy_in("COPY test(content, h1, h2) FROM stdin")?;
let mut head = head;
for entry in entries {
let (h1, h2) = hash(&entry.content, &head);
let row = format!("{}\t{}\t{}\n", entry.content, h1, h2);
writer.write_all(row.as_bytes())?;
head = h2;
}
writer.finish()?;
Ok(())
}
fn random_string() -> String {
let ret: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(128)
.map(char::from)
.collect();
ret
}
pub fn hash(content: &String, head: &String) -> (String, String) {
let mut hasher = Sha512::new();
hasher.update(content.as_bytes());
let h1 = hex::encode(hasher.finalize());
let mut hasher2 = Sha512::new();
hasher2.update(h1.as_bytes());
hasher2.update(head.as_bytes());
let h2 = hex::encode(hasher2.finalize());
(h1, h2)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment