Fast, graceful, glides through data — inspired by Facebook Scuba
A Scuba-inspired, single-node, in-memory analytics database in Rust. Optimized for high-throughput event ingestion and fast aggregation queries.
- Columnar storage with time-partitioned slabs
- SQL subset (no joins) over HTTP
- 30-second visibility from ingest to query
- Periodic snapshots for durability
- Single node (distributed can come later)
- Language: Rust
- HTTP: axum + tokio
- SQL: sqlparser crate
- Parallelism: rayon
| # | Milestone | Scope |
|---|---|---|
| 1 | MVP | End-to-end working: ingest JSON → query SQL → get results |
| 2 | Production | Time partitioning, TTL, parallel queries, snapshots |
| 3 | Optimized | SIMD, compression, 100M+ row support |
Ingest:
curl -X POST http://localhost:8080/ingest -d '{
"table": "events",
"rows": [{"timestamp": 1703424000, "latency_ms": 42, "status": "ok"}]
}'Query:
curl -X POST http://localhost:8080/query -d '{
"sql": "SELECT status, AVG(latency_ms) FROM events GROUP BY status"
}'- Ingest: 10K+ rows/sec
- Query: 1M rows in <100ms
- Visibility: <30 seconds
- Memory: Predictable, bounded
┌─────────────────────────────────────────────────────────────┐
│ HTTP Layer (axum) │
│ POST /ingest POST /query GET /tables GET /health │
└─────────────────────────────────────────────────────────────┘
│
┌───────────────┴───────────────┐
▼ ▼
┌─────────────────────────┐ ┌─────────────────────────────┐
│ Ingestion Pipeline │ │ Query Engine │
│ ┌───────────────────┐ │ │ ┌───────────────────────┐ │
│ │ Write Buffer │ │ │ │ SQL Parser │ │
│ │ (thread-local) │ │ │ │ (sqlparser) │ │
│ └─────────┬─────────┘ │ │ └───────────┬───────────┘ │
│ │ flush │ │ │ │
│ ▼ │ │ ▼ │
│ ┌───────────────────┐ │ │ ┌───────────────────────┐ │
│ │ Columnarizer │ │ │ │ Query Planner │ │
│ │ rows → columns │ │ │ │ (slab selection) │ │
│ └───────────────────┘ │ │ └───────────┬───────────┘ │
└─────────────┬───────────┘ │ │ │
│ │ ▼ │
│ │ ┌───────────────────────┐ │
│ │ │ Parallel Executor │ │
│ │ │ (rayon) │ │
│ │ └───────────┬───────────┘ │
│ │ │ │
│ │ ▼ │
│ │ ┌───────────────────────┐ │
│ │ │ Result Merger │ │
│ │ └───────────────────────┘ │
│ └─────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Storage Engine │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Table │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌───────────┐ │ │
│ │ │ Slab │ │ Slab │ │ Slab │ │ Hot Slab │ │ │
│ │ │ (old) │ │ (warm) │ │ (warm) │ │ (current) │ │ │
│ │ │ sealed │ │ sealed │ │ sealed │ │ writable │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └───────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ String Dictionary │ │
│ │ "ok" → 0, "error" → 1, "timeout" → 2, ... │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Snapshot Manager │
│ (periodic serialize sealed slabs to disk) │
└─────────────────────────────────────────────────────────────┘
manta/
├── Cargo.toml
├── src/
│ ├── main.rs # Entry point, server startup
│ ├── lib.rs # Library exports
│ │
│ ├── storage/
│ │ ├── mod.rs
│ │ ├── types.rs # ColumnType, Value enums
│ │ ├── column.rs # Column<T> with typed arrays
│ │ ├── dictionary.rs # String interning/dictionary
│ │ ├── slab.rs # Time-partitioned chunk
│ │ ├── table.rs # Collection of slabs + schema
│ │ └── database.rs # Collection of tables
│ │
│ ├── ingestion/
│ │ ├── mod.rs
│ │ ├── buffer.rs # Per-thread write buffer
│ │ ├── flusher.rs # Buffer → columnar → slab
│ │ └── schema_inference.rs # Infer types from JSON
│ │
│ ├── query/
│ │ ├── mod.rs
│ │ ├── parser.rs # SQL → AST (using sqlparser)
│ │ ├── planner.rs # AST → execution plan
│ │ ├── executor.rs # Execute scans + aggregates
│ │ ├── aggregates.rs # COUNT, SUM, AVG, etc.
│ │ └── filter.rs # WHERE clause evaluation
│ │
│ ├── api/
│ │ ├── mod.rs
│ │ ├── server.rs # axum router setup
│ │ ├── ingest.rs # POST /ingest handler
│ │ ├── query.rs # POST /query handler
│ │ └── admin.rs # /tables, /health, /snapshot
│ │
│ └── snapshot/
│ ├── mod.rs
│ ├── writer.rs # Serialize slabs to disk
│ └── reader.rs # Restore slabs on startup
│
├── tests/
│ ├── integration/
│ │ ├── ingest_test.rs
│ │ └── query_test.rs
│ └── benchmarks/
│ └── scan_benchmark.rs
│
└── examples/
└── demo.rs # Quick demo script
#[derive(Debug, Clone, PartialEq)]
pub enum ColumnType {
Int64, // Timestamps, counts, measures
Float64, // Floating-point measures
String, // Dictionary-encoded strings
Bool, // Boolean flags
// Future: StringSet, StringVec
}
#[derive(Debug, Clone)]
pub enum Value {
Int64(i64),
Float64(f64),
String(String),
Bool(bool),
Null,
}pub struct Schema {
/// Column name → (type, position)
columns: IndexMap<String, (ColumnType, usize)>,
/// Which column is the timestamp (required)
timestamp_column: String,
}Schema is inferred from the first row ingested. After that, types are enforced.
pub struct Slab {
/// Unique ID
id: u64,
/// Time range covered [start, end)
time_range: (i64, i64),
/// Is this slab still accepting writes?
sealed: bool,
/// Row count
row_count: usize,
/// Columns (same order as schema)
columns: Vec<Column>,
}
pub enum Column {
Int64(Vec<i64>),
Float64(Vec<f64>),
String(Vec<u32>), // Dictionary IDs
Bool(Vec<bool>),
}pub struct Table {
name: String,
schema: Schema,
/// Slabs ordered by time (oldest first)
slabs: Vec<Arc<RwLock<Slab>>>,
/// String dictionary (shared across slabs)
dictionary: Arc<Dictionary>,
/// Current hot slab for writes
hot_slab: Arc<RwLock<Slab>>,
/// Configuration
config: TableConfig,
}
pub struct TableConfig {
slab_duration_secs: u64, // Default: 300 (5 min)
retention_secs: u64, // Default: 86400 (1 day)
flush_interval_ms: u64, // Default: 5000 (5 sec)
flush_threshold_rows: usize, // Default: 10000
}1. HTTP request arrives at POST /ingest
2. Parse JSON: { "table": "...", "rows": [...] }
3. For each row:
a. If table doesn't exist → create with inferred schema
b. Validate row matches schema
c. Add to write buffer
4. If buffer full OR flush timer fires:
a. Convert buffered rows → columnar format
b. Dictionary-encode strings
c. Append to hot slab
d. If hot slab time window passed → seal it, create new hot slab
5. Return success response
pub struct WriteBuffer {
/// Buffered rows (row-oriented for fast appends)
rows: Vec<Vec<Value>>,
/// Last flush time
last_flush: Instant,
}impl WriteBuffer {
pub fn flush(&mut self, table: &Table) {
// 1. Group rows by column
let columnar = self.to_columnar(&table.schema);
// 2. Dictionary-encode strings
let encoded = self.encode_strings(columnar, &table.dictionary);
// 3. Append to hot slab
table.hot_slab.write().append(encoded);
// 4. Check if slab should be sealed
if table.hot_slab.read().should_seal() {
table.seal_hot_slab();
table.create_new_hot_slab();
}
self.rows.clear();
self.last_flush = Instant::now();
}
}-- Full syntax
SELECT
column1,
column2,
COUNT(*),
SUM(column3),
AVG(column4),
MIN(column5),
MAX(column6)
FROM table_name
WHERE
timestamp >= 1703424000
AND timestamp < 1703510400
AND status = 'error'
AND latency_ms > 100
GROUP BY column1, column2
ORDER BY COUNT(*) DESC
LIMIT 100
-- Not supported: JOIN, subqueries, HAVING, UNION, window functionspub struct QueryPlan {
/// Table to query
table: String,
/// Columns to project
projections: Vec<Projection>,
/// Filter predicates
filters: Vec<Filter>,
/// Group-by columns
group_by: Vec<String>,
/// Aggregates to compute
aggregates: Vec<Aggregate>,
/// Order by clause
order_by: Option<OrderBy>,
/// Limit
limit: Option<usize>,
}
pub enum Projection {
Column(String),
Aggregate(Aggregate),
}
pub enum Aggregate {
Count,
CountColumn(String),
Sum(String),
Avg(String),
Min(String),
Max(String),
}
pub struct Filter {
column: String,
op: FilterOp,
value: Value,
}
pub enum FilterOp {
Eq, Ne, Lt, Le, Gt, Ge, In, NotIn, Between,
}pub fn execute(plan: &QueryPlan, table: &Table) -> QueryResult {
// 1. Prune slabs by time range
let relevant_slabs = table.slabs_in_range(plan.time_range());
// 2. Execute in parallel across slabs
let partial_results: Vec<PartialResult> = relevant_slabs
.par_iter() // rayon parallel iterator
.map(|slab| execute_on_slab(plan, slab))
.collect();
// 3. Merge partial results
let merged = merge_results(partial_results, &plan.aggregates);
// 4. Apply ORDER BY and LIMIT
let final_result = apply_order_and_limit(merged, &plan.order_by, plan.limit);
final_result
}fn execute_on_slab(plan: &QueryPlan, slab: &Slab) -> PartialResult {
// 1. Create bitmap for matching rows
let mut matches = BitVec::repeat(true, slab.row_count);
// 2. Apply filters (narrow down matches)
for filter in &plan.filters {
let column = slab.get_column(&filter.column);
apply_filter(&mut matches, column, filter);
}
// 3. If no GROUP BY, compute aggregates directly
if plan.group_by.is_empty() {
return compute_aggregates_ungrouped(slab, &matches, &plan.aggregates);
}
// 4. GROUP BY: build hash table
let mut groups: HashMap<GroupKey, AggregateState> = HashMap::new();
for row_idx in matches.iter_ones() {
let key = extract_group_key(slab, row_idx, &plan.group_by);
let state = groups.entry(key).or_insert_with(AggregateState::new);
state.update(slab, row_idx, &plan.aggregates);
}
PartialResult { groups }
}pub struct Dictionary {
/// String → ID mapping
str_to_id: DashMap<String, u32>,
/// ID → String mapping
id_to_str: RwLock<Vec<String>>,
/// Next ID to assign
next_id: AtomicU32,
}impl Dictionary {
/// Get or create ID for a string
pub fn intern(&self, s: &str) -> u32 {
if let Some(id) = self.str_to_id.get(s) {
return *id;
}
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
self.str_to_id.insert(s.to_string(), id);
self.id_to_str.write().push(s.to_string());
id
}
/// Get string for ID (for output)
pub fn resolve(&self, id: u32) -> Option<&str> {
self.id_to_str.read().get(id as usize).map(|s| s.as_str())
}
}| Method | Path | Description |
|---|---|---|
| POST | /ingest |
Ingest rows into a table |
| POST | /query |
Execute SQL query |
| GET | /tables |
List all tables |
| GET | /tables/{name} |
Get table info (schema, stats) |
| POST | /admin/snapshot |
Trigger snapshot |
| GET | /health |
Health check |
// Request
POST /ingest
{
"table": "events",
"rows": [
{"timestamp": 1703424000, "latency_ms": 42, "status": "ok", "endpoint": "/api/users"},
{"timestamp": 1703424001, "latency_ms": 87, "status": "error", "endpoint": "/api/orders"}
]
}
// Response (success)
{
"success": true,
"ingested": 2,
"table": "events"
}
// Response (error)
{
"success": false,
"error": "Schema mismatch: expected Int64 for 'latency_ms', got String"
}// Request
POST /query
{
"sql": "SELECT status, endpoint, COUNT(*), AVG(latency_ms) FROM events WHERE timestamp >= 1703424000 GROUP BY status, endpoint ORDER BY COUNT(*) DESC LIMIT 10"
}
// Response
{
"success": true,
"columns": ["status", "endpoint", "COUNT(*)", "AVG(latency_ms)"],
"rows": [
["ok", "/api/users", 15234, 38.5],
["error", "/api/orders", 142, 421.2]
],
"stats": {
"scanned_rows": 15376,
"scanned_slabs": 5,
"elapsed_ms": 12
}
}- Snapshot sealed slabs only (hot slab is ephemeral)
- Run periodically in background thread (e.g., every 5 minutes)
- Non-blocking: snapshot a consistent view while writes continue
- Format: Custom binary with header + columns
snapshot_v1/
├── metadata.json # Tables, schemas, slab info
├── events/
│ ├── slab_001.bin # Serialized slab
│ ├── slab_002.bin
│ └── dictionary.bin # String dictionary
└── metrics/
├── slab_001.bin
└── dictionary.bin
On startup:
- Check for snapshot directory
- Load metadata
- Deserialize slabs into memory
- Recreate hot slab for each table
- Resume accepting writes
Goal: Working end-to-end system you can demo
Tasks:
- Set up Cargo project with dependencies
- Implement basic column types (Int64, Float64, String)
- Implement single-slab storage (no time partitioning)
- Implement simple write buffer + flush
- Implement string dictionary (basic)
- Implement SQL parser wrapper (using sqlparser)
- Implement query executor (single-threaded)
- Implement basic aggregates: COUNT, SUM, AVG
- Implement HTTP server with /ingest and /query
- Basic error handling and responses
Deliverable: Can ingest JSON rows, query with SQL, get results
Goal: Handles sustained load, survives restarts
Tasks:
- Time-partitioned slabs (5-minute windows)
- Slab sealing logic (time-based + size-based)
- TTL-based slab expiration
- Parallel query execution with rayon
- Improved string dictionary (DashMap for concurrency)
- Memory usage tracking
- Snapshot writer (serialize sealed slabs)
- Snapshot reader (restore on startup)
- Background flush thread
- Graceful shutdown
Deliverable: Production-viable single-node system
Goal: Handles 100M+ rows efficiently
Tasks:
- SIMD filter evaluation (WHERE clauses)
- SIMD aggregate computation
- Delta encoding for timestamps
- Run-length encoding for low-cardinality columns
- t-digest for PERCENTILE aggregates
- HyperLogLog for COUNT(DISTINCT)
- Memory budget enforcement + eviction
- Query timeout support
- Sampling support (SAMPLE clause)
- Comprehensive benchmarks
Deliverable: Optimized, high-performance system
[package]
name = "manta"
version = "0.1.0"
edition = "2021"
[dependencies]
# HTTP
axum = "0.7"
tokio = { version = "1", features = ["full"] }
tower = "0.4"
tower-http = { version = "0.5", features = ["cors", "trace"] }
# Serialization
serde = { version = "1", features = ["derive"] }
serde_json = "1"
bincode = "1"
# SQL Parsing
sqlparser = "0.40"
# Parallelism
rayon = "1"
# Concurrency
dashmap = "5"
parking_lot = "0.12"
# Utilities
chrono = "0.4"
thiserror = "1"
anyhow = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
indexmap = "2"
bitvec = "1"
# Phase 3 (optional)
# tdigest = "0.2"
# hyperloglog = "1"
[dev-dependencies]
criterion = "0.5"
tokio-test = "0.4"
reqwest = { version = "0.11", features = ["json"] }- Column operations (append, filter, aggregate)
- Dictionary encoding/decoding
- SQL parser correctness
- Filter evaluation
- Aggregate computation
- Ingest → Query round-trip
- Schema inference
- Time range filtering
- GROUP BY correctness
- ORDER BY + LIMIT
- Snapshot save/restore
- Ingestion throughput (rows/sec)
- Query latency by row count
- Filter selectivity impact
- GROUP BY cardinality impact
- Distributed mode (sharding, replication)
- JOIN support
- Materialized views
- Real-time aggregation (push-based)
- Kafka/Kinesis ingestion
- Grafana integration
- Authentication/authorization