Skip to content

Instantly share code, notes, and snippets.

@carsonfarmer
Last active January 1, 2024 02:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save carsonfarmer/852d4a520462b0be496354aaa2b9614e to your computer and use it in GitHub Desktop.
Save carsonfarmer/852d4a520462b0be496354aaa2b9614e to your computer and use it in GitHub Desktop.
SQLite and CAS: An Experimental Edge in Edge Compute Platforms
[package]
name = "rust-sqlite-test"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.72"
byteorder = "1.4.3"
cid = "0.10.1"
fvm_ipld_amt = "0.6.1"
fvm_ipld_blockstore = "0.2.0"
fvm_ipld_encoding = "0.4.0"
rand = "0.8.5"
rusqlite = "0.29.0"
sqlite-vfs = "0.2.0"
cargo new --lib rust-sqlite-test
cargo add rusqlite
cargo add sqlite-vfs
cargo add anyhow
cargo add rand
cargo add byteorder
cargo add cid
cargo add fvm_ipld_blockstore
cargo add fvm_ipld_amt
cargo add fvm_ipld_encoding
// Lots of stuff "borrowed" from:
// https://github.com/rkusa/wasm-sqlite/blob/main/wasm/src/vfs.rs, and
// https://github.com/sanderpick/builtin-actors/blob/sander/tableland-actor2/actors/tableland/src/vfs.rs
use byteorder::{BigEndian, ByteOrder};
use cid::Cid;
use fvm_ipld_amt::{Amt, Error as AmtError};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::BytesDe;
use sqlite_vfs::{LockKind, OpenKind, OpenOptions, Vfs};
use std::fmt::Debug;
use std::io::{self, ErrorKind};
use std::io::{Cursor, Read, Seek, SeekFrom, Write};
use std::sync::{Arc, Mutex};
fn tbytes(bz: &[u8]) -> BytesDe {
BytesDe(bz.to_vec())
}
#[derive(Debug, Clone)]
pub struct PagesVfs<BS>
where
BS: Blockstore + Send + Clone,
{
lock_state: Arc<Mutex<LockState>>,
state: Arc<Mutex<Amt<BytesDe, BS>>>,
// This _could_ be a generic value, but since we're reading from the initial_bytes
// we'll keep it dynamic like this.
page_size: usize,
}
#[derive(Debug, Default)]
struct LockState {
read: usize,
write: Option<bool>,
}
pub struct Connection<BS>
where
BS: Blockstore + Send + Clone,
{
lock_state: Arc<Mutex<LockState>>,
lock: LockKind,
state: Arc<Mutex<Amt<BytesDe, BS>>>,
kind: OpenKind,
// We can use an in-memory cursor for the journal (and wal files)
// Ideally, we store off the root CID on success in the blockstore,
// and then we can simply ignore the journal/wal files and rollback
// to past root CIDs on failure.
cache: Cursor<Vec<u8>>,
page_size: usize,
}
impl<BS> PagesVfs<BS>
where
BS: Blockstore + Send + Clone,
{
pub fn new(db: BS, initial_bytes: &[u8]) -> Self {
let mut amt = Amt::new(db);
// Assumes the SQLite file header is well formed
let page_size = match BigEndian::read_u16(&initial_bytes[16..18]) {
1 => 65_536,
x => x as u32,
};
// The largest power of 2 a u16 can hold _is_ 32_768
if page_size < 512 || !page_size.is_power_of_two() {
panic!("invalid page size: {}", page_size);
}
// Split the initial_bytes into chunks and add them to the Amt
for chunk in initial_bytes.chunks(page_size as usize) {
amt.set(0, tbytes(chunk)).unwrap();
}
PagesVfs {
lock_state: Arc::new(Mutex::new(Default::default())),
state: Arc::new(Mutex::new(amt)),
page_size: page_size as usize,
}
}
pub fn root(&self) -> Result<Cid, AmtError> {
self.state.lock().unwrap().flush()
}
// Just for testing purposes.
pub fn state(&self) -> Arc<Mutex<Amt<BytesDe, BS>>> {
self.state.clone()
}
}
impl<BS> Vfs for PagesVfs<BS>
where
BS: Blockstore + Send + Clone,
{
type Handle = Connection<BS>;
fn open(&self, db: &str, opts: OpenOptions) -> Result<Self::Handle, io::Error> {
match opts.kind {
OpenKind::MainDb => {}
OpenKind::MainJournal => {}
_ => {
return Err(io::Error::new(
ErrorKind::PermissionDenied,
"only main database supported",
));
}
}
// Always open the same database for now.
if !db.starts_with("main.db") {
return Err(io::Error::new(
ErrorKind::NotFound,
format!("unexpected database name `{db}`; expected `main.db`"),
));
}
Ok(Connection {
lock_state: self.lock_state.clone(),
lock: LockKind::None,
state: self.state.clone(),
kind: opts.kind,
cache: Cursor::default(),
page_size: self.page_size,
})
}
fn delete(&self, _db: &str) -> Result<(), io::Error> {
// Here we could update our root hash to be the root hash of "state",
// otherwise, on rollback, we would simply keep our old root hash.
Ok(())
}
fn exists(&self, db: &str) -> Result<bool, io::Error> {
Ok(db == "main.db" && self.state.lock().unwrap().count() > 0)
}
fn temporary_name(&self) -> String {
String::from("main.db")
}
fn random(&self, buffer: &mut [i8]) {
rand::Rng::fill(&mut rand::thread_rng(), buffer);
}
fn sleep(&self, duration: std::time::Duration) -> std::time::Duration {
std::thread::sleep(duration);
// From: https://github.com/rkusa/sqlite-vfs/blob/main/test-vfs/src/vfs.rs#L158
// Well, this function is only supposed to sleep at least `n_micro`μs, but there are
// tests that expect the return to match exactly `n_micro`. As those tests are flaky as
// a result, we are cheating here.
duration
}
}
impl<BS> sqlite_vfs::DatabaseHandle for Connection<BS>
where
BS: Blockstore + Send + Clone,
{
// We aren't going to worry about WAL files... but we could!
type WalIndex = sqlite_vfs::WalDisabled;
fn size(&self) -> Result<u64, io::Error> {
if self.kind == OpenKind::MainJournal {
return Ok(self.cache.get_ref().len() as u64);
}
let size = self.page_count() * self.page_size;
Ok(size as u64)
}
fn read_exact_at(&mut self, buf: &mut [u8], offset: u64) -> Result<(), io::Error> {
if self.kind == OpenKind::MainJournal {
self.cache.seek(SeekFrom::Start(offset))?;
self.cache.read_exact(buf)?;
return Ok(());
}
let index = offset as usize / self.page_size;
let offset = offset as usize % self.page_size;
let data = self.get_page(index as u32);
if data.len() < buf.len() + offset {
return Err(ErrorKind::UnexpectedEof.into());
}
buf.copy_from_slice(&data[offset..offset + buf.len()]);
Ok(())
}
fn write_all_at(&mut self, buf: &[u8], offset: u64) -> Result<(), io::Error> {
if self.kind == OpenKind::MainJournal {
self.cache.seek(SeekFrom::Start(offset))?;
self.cache.write_all(buf)?;
return Ok(());
}
if offset as usize % self.page_size > 0 {
return Err(io::Error::new(
ErrorKind::Other,
"unexpected write across page boundaries",
));
}
let index = offset as usize / self.page_size;
let page = buf.try_into().map_err(|_| {
io::Error::new(
ErrorKind::Other,
format!(
"unexpected write size {}; expected {}",
buf.len(),
self.page_size
),
)
})?;
self.put_page(index as u32, page);
Ok(())
}
fn sync(&mut self, _data_only: bool) -> Result<(), io::Error> {
if self.kind == OpenKind::MainJournal {
self.cache.flush()?;
}
self.state.lock().unwrap().flush().unwrap();
Ok(())
}
fn set_len(&mut self, size: u64) -> Result<(), io::Error> {
if self.kind == OpenKind::MainJournal {
let mut buffer = self.cache.clone().into_inner();
buffer.resize(size as usize, 0);
self.cache = Cursor::new(buffer);
}
let mut page_count = size / self.page_size as u64;
if size as usize % self.page_size > 0 {
page_count += 1;
}
let current_page_count = self.page_count() as u64;
if page_count > 0 && page_count < current_page_count {
let mut state = self.state.lock().unwrap();
let current_count = state.count() as u64;
state
.batch_delete((page_count + 1)..(current_count - 1), true)
.unwrap();
}
Ok(())
}
fn lock(&mut self, lock: LockKind) -> Result<bool, io::Error> {
Ok(Self::lock(self, lock))
}
fn reserved(&mut self) -> Result<bool, io::Error> {
Ok(Self::reserved(self))
}
fn current_lock(&self) -> Result<LockKind, io::Error> {
Ok(self.lock)
}
fn set_chunk_size(&self, chunk_size: usize) -> Result<(), io::Error> {
if chunk_size != self.page_size {
Err(io::Error::new(
ErrorKind::Other,
"changing chunk size is not allowed",
))
} else {
Ok(())
}
}
fn wal_index(&self, _readonly: bool) -> Result<Self::WalIndex, io::Error> {
Ok(sqlite_vfs::WalDisabled::default())
}
}
impl<BS> Connection<BS>
where
BS: Blockstore + Send + Clone,
{
fn get_page(&self, ix: u32) -> Vec<u8> {
let state = self.state.lock().unwrap();
state.get(ix.into()).unwrap().unwrap().clone().into_vec()
}
fn put_page(&self, ix: u32, data: &[u8]) {
let mut state = self.state.lock().unwrap();
state.set(ix.into(), tbytes(data)).unwrap();
}
fn page_count(&self) -> usize {
let state = self.state.lock().unwrap();
state.count() as usize
}
fn lock(&mut self, to: LockKind) -> bool {
if self.lock == to {
return true;
}
let mut lock_state = self.lock_state.lock().unwrap();
// The following locking implementation is probably not sound (wouldn't be surprised if it
// potentially dead-locks), but suffice for the experiment.
// See https://github.com/rkusa/wasm-sqlite/blob/main/wasm/src/vfs.rs#L206
match to {
LockKind::None => {
if self.lock == LockKind::Shared {
lock_state.read -= 1;
} else if self.lock > LockKind::Shared {
lock_state.write = None;
}
self.lock = LockKind::None;
true
}
LockKind::Shared => {
if lock_state.write == Some(true) && self.lock <= LockKind::Shared {
return false;
}
lock_state.read += 1;
if self.lock > LockKind::Shared {
lock_state.write = None;
}
self.lock = LockKind::Shared;
true
}
LockKind::Reserved => {
if lock_state.write.is_some() || self.lock != LockKind::Shared {
return false;
}
if self.lock == LockKind::Shared {
lock_state.read -= 1;
}
lock_state.write = Some(false);
self.lock = LockKind::Reserved;
true
}
LockKind::Pending => {
// cannot be requested directly
false
}
LockKind::Exclusive => {
if lock_state.write.is_some() && self.lock <= LockKind::Shared {
return false;
}
if self.lock == LockKind::Shared {
lock_state.read -= 1;
}
lock_state.write = Some(true);
if lock_state.read == 0 {
self.lock = LockKind::Exclusive;
true
} else {
self.lock = LockKind::Pending;
false
}
}
}
}
fn reserved(&self) -> bool {
if self.lock > LockKind::Shared {
return true;
}
let lock_state = self.lock_state.lock().unwrap();
lock_state.write.is_some()
}
}
impl<BS> Drop for Connection<BS>
where
BS: Blockstore + Send + Clone,
{
fn drop(&mut self) {
if self.lock != LockKind::None {
self.lock(LockKind::None);
}
}
}
#[cfg(test)]
mod tests {
use crate::PagesVfs;
use fvm_ipld_amt::{diff, Amt};
use fvm_ipld_blockstore::MemoryBlockstore;
use rusqlite::{Connection, OpenFlags};
use sqlite_vfs::register;
pub const SQLITE_PAGE_SIZE: usize = 4096;
#[test]
fn basic_get_set() {
let mem = MemoryBlockstore::default();
let initial_bytes: &'static [u8] = include_bytes!("file.db");
let vfs = PagesVfs::<_>::new(mem, initial_bytes);
let other = vfs.clone();
register("vfs", vfs, true).unwrap();
let mut conn = Connection::open_with_flags_and_vfs(
"main.db",
OpenFlags::SQLITE_OPEN_READ_WRITE
| OpenFlags::SQLITE_OPEN_CREATE
| OpenFlags::SQLITE_OPEN_NO_MUTEX,
"vfs",
)
.unwrap();
// If we set this to memory, then we don't need to "cache" cursor,
// but then we also don't get the "delete" hook to work with...
let journal_mode: String = conn
.query_row("pragma journal_mode = delete", [], |row| row.get(0))
.unwrap();
assert_eq!(journal_mode, "delete");
let page_size: usize = conn
.query_row("PRAGMA page_size", [], |row| row.get(0))
.unwrap();
assert_eq!(page_size, SQLITE_PAGE_SIZE);
let c = other.root().unwrap();
println!("{:?}", c.to_string().as_str());
let tx = conn.transaction().unwrap();
tx.execute(
"create table my_table(id integer primary key, msg text);",
[],
)
.unwrap();
tx.execute("insert into my_table(msg) values('hello');", [])
.unwrap();
tx.execute("insert into my_table(msg) values('world');", [])
.unwrap();
tx.commit().unwrap();
let c = other.root().unwrap();
let mut stmt = conn.prepare("select * from my_table;").unwrap();
let mut rows = stmt.query([]).unwrap();
while let Some(r) = rows.next().unwrap() {
let id: i32 = r.get(0).unwrap();
let msg: String = r.get(1).unwrap();
println!("found in db: {} = {}", id, msg);
}
let db = MemoryBlockstore::new();
let new_amt: Amt<Vec<u8>, _> = Amt::new(&db);
let results = diff(&other.state.lock().unwrap(), &new_amt).unwrap();
assert_eq!(results.len(), 2);
assert_eq!(
c.to_string().as_str(),
"bafy2bzaced3vtba4kcg4w5q4dnfdstxfom6t4cv72uuq4jgvkeav24ujdcgtg"
);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment