Skip to content

Instantly share code, notes, and snippets.

@manish
Created December 24, 2025 12:25
Show Gist options
  • Select an option

  • Save manish/8f2902d484459baf21fed482fff27bfa to your computer and use it in GitHub Desktop.

Select an option

Save manish/8f2902d484459baf21fed482fff27bfa to your computer and use it in GitHub Desktop.
Manta: In-Memory Analytics Database - Implementation Plan

Manta: In-Memory Analytics Database

Fast, graceful, glides through data — inspired by Facebook Scuba


EXECUTIVE SUMMARY

What We're Building

A Scuba-inspired, single-node, in-memory analytics database in Rust. Optimized for high-throughput event ingestion and fast aggregation queries.

Key Characteristics

  • Columnar storage with time-partitioned slabs
  • SQL subset (no joins) over HTTP
  • 30-second visibility from ingest to query
  • Periodic snapshots for durability
  • Single node (distributed can come later)

Technology Stack

  • Language: Rust
  • HTTP: axum + tokio
  • SQL: sqlparser crate
  • Parallelism: rayon

Timeline (Milestones)

# Milestone Scope
1 MVP End-to-end working: ingest JSON → query SQL → get results
2 Production Time partitioning, TTL, parallel queries, snapshots
3 Optimized SIMD, compression, 100M+ row support

API Preview

Ingest:

curl -X POST http://localhost:8080/ingest -d '{
  "table": "events",
  "rows": [{"timestamp": 1703424000, "latency_ms": 42, "status": "ok"}]
}'

Query:

curl -X POST http://localhost:8080/query -d '{
  "sql": "SELECT status, AVG(latency_ms) FROM events GROUP BY status"
}'

Success Metrics

  • Ingest: 10K+ rows/sec
  • Query: 1M rows in <100ms
  • Visibility: <30 seconds
  • Memory: Predictable, bounded


DETAILED PLAN

1. Architecture Overview

┌─────────────────────────────────────────────────────────────┐
│                      HTTP Layer (axum)                      │
│  POST /ingest    POST /query    GET /tables    GET /health  │
└─────────────────────────────────────────────────────────────┘
                              │
              ┌───────────────┴───────────────┐
              ▼                               ▼
┌─────────────────────────┐     ┌─────────────────────────────┐
│    Ingestion Pipeline   │     │       Query Engine          │
│  ┌───────────────────┐  │     │  ┌───────────────────────┐  │
│  │   Write Buffer    │  │     │  │    SQL Parser         │  │
│  │  (thread-local)   │  │     │  │    (sqlparser)        │  │
│  └─────────┬─────────┘  │     │  └───────────┬───────────┘  │
│            │ flush      │     │              │              │
│            ▼            │     │              ▼              │
│  ┌───────────────────┐  │     │  ┌───────────────────────┐  │
│  │   Columnarizer    │  │     │  │    Query Planner      │  │
│  │  rows → columns   │  │     │  │    (slab selection)   │  │
│  └───────────────────┘  │     │  └───────────┬───────────┘  │
└─────────────┬───────────┘     │              │              │
              │                 │              ▼              │
              │                 │  ┌───────────────────────┐  │
              │                 │  │   Parallel Executor   │  │
              │                 │  │      (rayon)          │  │
              │                 │  └───────────┬───────────┘  │
              │                 │              │              │
              │                 │              ▼              │
              │                 │  ┌───────────────────────┐  │
              │                 │  │   Result Merger       │  │
              │                 │  └───────────────────────┘  │
              │                 └─────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────────┐
│                     Storage Engine                          │
│  ┌─────────────────────────────────────────────────────┐    │
│  │                      Table                          │    │
│  │  ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌───────────┐  │    │
│  │  │  Slab   │ │  Slab   │ │  Slab   │ │ Hot Slab  │  │    │
│  │  │ (old)   │ │ (warm)  │ │ (warm)  │ │ (current) │  │    │
│  │  │ sealed  │ │ sealed  │ │ sealed  │ │ writable  │  │    │
│  │  └─────────┘ └─────────┘ └─────────┘ └───────────┘  │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                             │
│  ┌─────────────────────────────────────────────────────┐    │
│  │              String Dictionary                       │    │
│  │   "ok" → 0,  "error" → 1,  "timeout" → 2, ...       │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘
              │
              ▼
┌─────────────────────────────────────────────────────────────┐
│                    Snapshot Manager                         │
│         (periodic serialize sealed slabs to disk)           │
└─────────────────────────────────────────────────────────────┘

2. Project Structure

manta/
├── Cargo.toml
├── src/
│   ├── main.rs                 # Entry point, server startup
│   ├── lib.rs                  # Library exports
│   │
│   ├── storage/
│   │   ├── mod.rs
│   │   ├── types.rs            # ColumnType, Value enums
│   │   ├── column.rs           # Column<T> with typed arrays
│   │   ├── dictionary.rs       # String interning/dictionary
│   │   ├── slab.rs             # Time-partitioned chunk
│   │   ├── table.rs            # Collection of slabs + schema
│   │   └── database.rs         # Collection of tables
│   │
│   ├── ingestion/
│   │   ├── mod.rs
│   │   ├── buffer.rs           # Per-thread write buffer
│   │   ├── flusher.rs          # Buffer → columnar → slab
│   │   └── schema_inference.rs # Infer types from JSON
│   │
│   ├── query/
│   │   ├── mod.rs
│   │   ├── parser.rs           # SQL → AST (using sqlparser)
│   │   ├── planner.rs          # AST → execution plan
│   │   ├── executor.rs         # Execute scans + aggregates
│   │   ├── aggregates.rs       # COUNT, SUM, AVG, etc.
│   │   └── filter.rs           # WHERE clause evaluation
│   │
│   ├── api/
│   │   ├── mod.rs
│   │   ├── server.rs           # axum router setup
│   │   ├── ingest.rs           # POST /ingest handler
│   │   ├── query.rs            # POST /query handler
│   │   └── admin.rs            # /tables, /health, /snapshot
│   │
│   └── snapshot/
│       ├── mod.rs
│       ├── writer.rs           # Serialize slabs to disk
│       └── reader.rs           # Restore slabs on startup
│
├── tests/
│   ├── integration/
│   │   ├── ingest_test.rs
│   │   └── query_test.rs
│   └── benchmarks/
│       └── scan_benchmark.rs
│
└── examples/
    └── demo.rs                 # Quick demo script

3. Data Model

3.1 Column Types

#[derive(Debug, Clone, PartialEq)]
pub enum ColumnType {
    Int64,      // Timestamps, counts, measures
    Float64,    // Floating-point measures
    String,     // Dictionary-encoded strings
    Bool,       // Boolean flags
    // Future: StringSet, StringVec
}

#[derive(Debug, Clone)]
pub enum Value {
    Int64(i64),
    Float64(f64),
    String(String),
    Bool(bool),
    Null,
}

3.2 Schema

pub struct Schema {
    /// Column name → (type, position)
    columns: IndexMap<String, (ColumnType, usize)>,
    /// Which column is the timestamp (required)
    timestamp_column: String,
}

Schema is inferred from the first row ingested. After that, types are enforced.

3.3 Slab Structure

pub struct Slab {
    /// Unique ID
    id: u64,
    /// Time range covered [start, end)
    time_range: (i64, i64),
    /// Is this slab still accepting writes?
    sealed: bool,
    /// Row count
    row_count: usize,
    /// Columns (same order as schema)
    columns: Vec<Column>,
}

pub enum Column {
    Int64(Vec<i64>),
    Float64(Vec<f64>),
    String(Vec<u32>),   // Dictionary IDs
    Bool(Vec<bool>),
}

3.4 Table Structure

pub struct Table {
    name: String,
    schema: Schema,
    /// Slabs ordered by time (oldest first)
    slabs: Vec<Arc<RwLock<Slab>>>,
    /// String dictionary (shared across slabs)
    dictionary: Arc<Dictionary>,
    /// Current hot slab for writes
    hot_slab: Arc<RwLock<Slab>>,
    /// Configuration
    config: TableConfig,
}

pub struct TableConfig {
    slab_duration_secs: u64,    // Default: 300 (5 min)
    retention_secs: u64,         // Default: 86400 (1 day)
    flush_interval_ms: u64,      // Default: 5000 (5 sec)
    flush_threshold_rows: usize, // Default: 10000
}

4. Ingestion Pipeline

4.1 Flow

1. HTTP request arrives at POST /ingest
2. Parse JSON: { "table": "...", "rows": [...] }
3. For each row:
   a. If table doesn't exist → create with inferred schema
   b. Validate row matches schema
   c. Add to write buffer
4. If buffer full OR flush timer fires:
   a. Convert buffered rows → columnar format
   b. Dictionary-encode strings
   c. Append to hot slab
   d. If hot slab time window passed → seal it, create new hot slab
5. Return success response

4.2 Write Buffer

pub struct WriteBuffer {
    /// Buffered rows (row-oriented for fast appends)
    rows: Vec<Vec<Value>>,
    /// Last flush time
    last_flush: Instant,
}

4.3 Flush Process

impl WriteBuffer {
    pub fn flush(&mut self, table: &Table) {
        // 1. Group rows by column
        let columnar = self.to_columnar(&table.schema);

        // 2. Dictionary-encode strings
        let encoded = self.encode_strings(columnar, &table.dictionary);

        // 3. Append to hot slab
        table.hot_slab.write().append(encoded);

        // 4. Check if slab should be sealed
        if table.hot_slab.read().should_seal() {
            table.seal_hot_slab();
            table.create_new_hot_slab();
        }

        self.rows.clear();
        self.last_flush = Instant::now();
    }
}

5. Query Engine

5.1 Supported SQL

-- Full syntax
SELECT
    column1,
    column2,
    COUNT(*),
    SUM(column3),
    AVG(column4),
    MIN(column5),
    MAX(column6)
FROM table_name
WHERE
    timestamp >= 1703424000
    AND timestamp < 1703510400
    AND status = 'error'
    AND latency_ms > 100
GROUP BY column1, column2
ORDER BY COUNT(*) DESC
LIMIT 100

-- Not supported: JOIN, subqueries, HAVING, UNION, window functions

5.2 Query Execution Plan

pub struct QueryPlan {
    /// Table to query
    table: String,
    /// Columns to project
    projections: Vec<Projection>,
    /// Filter predicates
    filters: Vec<Filter>,
    /// Group-by columns
    group_by: Vec<String>,
    /// Aggregates to compute
    aggregates: Vec<Aggregate>,
    /// Order by clause
    order_by: Option<OrderBy>,
    /// Limit
    limit: Option<usize>,
}

pub enum Projection {
    Column(String),
    Aggregate(Aggregate),
}

pub enum Aggregate {
    Count,
    CountColumn(String),
    Sum(String),
    Avg(String),
    Min(String),
    Max(String),
}

pub struct Filter {
    column: String,
    op: FilterOp,
    value: Value,
}

pub enum FilterOp {
    Eq, Ne, Lt, Le, Gt, Ge, In, NotIn, Between,
}

5.3 Parallel Execution

pub fn execute(plan: &QueryPlan, table: &Table) -> QueryResult {
    // 1. Prune slabs by time range
    let relevant_slabs = table.slabs_in_range(plan.time_range());

    // 2. Execute in parallel across slabs
    let partial_results: Vec<PartialResult> = relevant_slabs
        .par_iter()  // rayon parallel iterator
        .map(|slab| execute_on_slab(plan, slab))
        .collect();

    // 3. Merge partial results
    let merged = merge_results(partial_results, &plan.aggregates);

    // 4. Apply ORDER BY and LIMIT
    let final_result = apply_order_and_limit(merged, &plan.order_by, plan.limit);

    final_result
}

5.4 Slab Execution

fn execute_on_slab(plan: &QueryPlan, slab: &Slab) -> PartialResult {
    // 1. Create bitmap for matching rows
    let mut matches = BitVec::repeat(true, slab.row_count);

    // 2. Apply filters (narrow down matches)
    for filter in &plan.filters {
        let column = slab.get_column(&filter.column);
        apply_filter(&mut matches, column, filter);
    }

    // 3. If no GROUP BY, compute aggregates directly
    if plan.group_by.is_empty() {
        return compute_aggregates_ungrouped(slab, &matches, &plan.aggregates);
    }

    // 4. GROUP BY: build hash table
    let mut groups: HashMap<GroupKey, AggregateState> = HashMap::new();

    for row_idx in matches.iter_ones() {
        let key = extract_group_key(slab, row_idx, &plan.group_by);
        let state = groups.entry(key).or_insert_with(AggregateState::new);
        state.update(slab, row_idx, &plan.aggregates);
    }

    PartialResult { groups }
}

6. String Dictionary

6.1 Structure

pub struct Dictionary {
    /// String → ID mapping
    str_to_id: DashMap<String, u32>,
    /// ID → String mapping
    id_to_str: RwLock<Vec<String>>,
    /// Next ID to assign
    next_id: AtomicU32,
}

6.2 Operations

impl Dictionary {
    /// Get or create ID for a string
    pub fn intern(&self, s: &str) -> u32 {
        if let Some(id) = self.str_to_id.get(s) {
            return *id;
        }

        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
        self.str_to_id.insert(s.to_string(), id);
        self.id_to_str.write().push(s.to_string());
        id
    }

    /// Get string for ID (for output)
    pub fn resolve(&self, id: u32) -> Option<&str> {
        self.id_to_str.read().get(id as usize).map(|s| s.as_str())
    }
}

7. HTTP API

7.1 Endpoints

Method Path Description
POST /ingest Ingest rows into a table
POST /query Execute SQL query
GET /tables List all tables
GET /tables/{name} Get table info (schema, stats)
POST /admin/snapshot Trigger snapshot
GET /health Health check

7.2 Ingest Request/Response

// Request
POST /ingest
{
  "table": "events",
  "rows": [
    {"timestamp": 1703424000, "latency_ms": 42, "status": "ok", "endpoint": "/api/users"},
    {"timestamp": 1703424001, "latency_ms": 87, "status": "error", "endpoint": "/api/orders"}
  ]
}

// Response (success)
{
  "success": true,
  "ingested": 2,
  "table": "events"
}

// Response (error)
{
  "success": false,
  "error": "Schema mismatch: expected Int64 for 'latency_ms', got String"
}

7.3 Query Request/Response

// Request
POST /query
{
  "sql": "SELECT status, endpoint, COUNT(*), AVG(latency_ms) FROM events WHERE timestamp >= 1703424000 GROUP BY status, endpoint ORDER BY COUNT(*) DESC LIMIT 10"
}

// Response
{
  "success": true,
  "columns": ["status", "endpoint", "COUNT(*)", "AVG(latency_ms)"],
  "rows": [
    ["ok", "/api/users", 15234, 38.5],
    ["error", "/api/orders", 142, 421.2]
  ],
  "stats": {
    "scanned_rows": 15376,
    "scanned_slabs": 5,
    "elapsed_ms": 12
  }
}

8. Snapshots

8.1 Strategy

  • Snapshot sealed slabs only (hot slab is ephemeral)
  • Run periodically in background thread (e.g., every 5 minutes)
  • Non-blocking: snapshot a consistent view while writes continue
  • Format: Custom binary with header + columns

8.2 File Format

snapshot_v1/
├── metadata.json           # Tables, schemas, slab info
├── events/
│   ├── slab_001.bin        # Serialized slab
│   ├── slab_002.bin
│   └── dictionary.bin      # String dictionary
└── metrics/
    ├── slab_001.bin
    └── dictionary.bin

8.3 Recovery

On startup:

  1. Check for snapshot directory
  2. Load metadata
  3. Deserialize slabs into memory
  4. Recreate hot slab for each table
  5. Resume accepting writes

9. Milestone Details

Milestone 1: MVP

Goal: Working end-to-end system you can demo

Tasks:

  1. Set up Cargo project with dependencies
  2. Implement basic column types (Int64, Float64, String)
  3. Implement single-slab storage (no time partitioning)
  4. Implement simple write buffer + flush
  5. Implement string dictionary (basic)
  6. Implement SQL parser wrapper (using sqlparser)
  7. Implement query executor (single-threaded)
  8. Implement basic aggregates: COUNT, SUM, AVG
  9. Implement HTTP server with /ingest and /query
  10. Basic error handling and responses

Deliverable: Can ingest JSON rows, query with SQL, get results

Milestone 2: Production-Ready

Goal: Handles sustained load, survives restarts

Tasks:

  1. Time-partitioned slabs (5-minute windows)
  2. Slab sealing logic (time-based + size-based)
  3. TTL-based slab expiration
  4. Parallel query execution with rayon
  5. Improved string dictionary (DashMap for concurrency)
  6. Memory usage tracking
  7. Snapshot writer (serialize sealed slabs)
  8. Snapshot reader (restore on startup)
  9. Background flush thread
  10. Graceful shutdown

Deliverable: Production-viable single-node system

Milestone 3: Optimized

Goal: Handles 100M+ rows efficiently

Tasks:

  1. SIMD filter evaluation (WHERE clauses)
  2. SIMD aggregate computation
  3. Delta encoding for timestamps
  4. Run-length encoding for low-cardinality columns
  5. t-digest for PERCENTILE aggregates
  6. HyperLogLog for COUNT(DISTINCT)
  7. Memory budget enforcement + eviction
  8. Query timeout support
  9. Sampling support (SAMPLE clause)
  10. Comprehensive benchmarks

Deliverable: Optimized, high-performance system


10. Dependencies (Cargo.toml)

[package]
name = "manta"
version = "0.1.0"
edition = "2021"

[dependencies]
# HTTP
axum = "0.7"
tokio = { version = "1", features = ["full"] }
tower = "0.4"
tower-http = { version = "0.5", features = ["cors", "trace"] }

# Serialization
serde = { version = "1", features = ["derive"] }
serde_json = "1"
bincode = "1"

# SQL Parsing
sqlparser = "0.40"

# Parallelism
rayon = "1"

# Concurrency
dashmap = "5"
parking_lot = "0.12"

# Utilities
chrono = "0.4"
thiserror = "1"
anyhow = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
indexmap = "2"
bitvec = "1"

# Phase 3 (optional)
# tdigest = "0.2"
# hyperloglog = "1"

[dev-dependencies]
criterion = "0.5"
tokio-test = "0.4"
reqwest = { version = "0.11", features = ["json"] }

11. Testing Strategy

Unit Tests

  • Column operations (append, filter, aggregate)
  • Dictionary encoding/decoding
  • SQL parser correctness
  • Filter evaluation
  • Aggregate computation

Integration Tests

  • Ingest → Query round-trip
  • Schema inference
  • Time range filtering
  • GROUP BY correctness
  • ORDER BY + LIMIT
  • Snapshot save/restore

Benchmarks

  • Ingestion throughput (rows/sec)
  • Query latency by row count
  • Filter selectivity impact
  • GROUP BY cardinality impact

12. Future Extensions (Not in Scope)

  • Distributed mode (sharding, replication)
  • JOIN support
  • Materialized views
  • Real-time aggregation (push-based)
  • Kafka/Kinesis ingestion
  • Grafana integration
  • Authentication/authorization
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment