From 262415b29ccdfc6c493d1bb35813f437b19aec19 Mon Sep 17 00:00:00 2001 From: tezlm Date: Sat, 10 Feb 2024 18:23:32 -0800 Subject: [PATCH] beginning to integrate state stores --- src/bin/dag.rs | 263 +++++------------------------------------- src/lib.rs | 3 +- src/proto/resolver.rs | 12 +- src/proto/room.rs | 14 +-- src/proto/table.rs | 8 +- src/resolvers/kv.rs | 40 ++++--- src/store/mod.rs | 1 + src/store/sqlite.rs | 218 ++++++++++++++++++++++++++++++++++ 8 files changed, 291 insertions(+), 268 deletions(-) create mode 100644 src/store/mod.rs create mode 100644 src/store/sqlite.rs diff --git a/src/bin/dag.rs b/src/bin/dag.rs index 3938c49..0e33a3f 100644 --- a/src/bin/dag.rs +++ b/src/bin/dag.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, path::{Path, PathBuf}, sync::Arc, }; @@ -9,12 +8,12 @@ use clap::Parser; use dag_resolve::{ actor::{ActorId, ActorSecret}, event::{CoreContent, Event, HashType, SignatureType}, - proto::table::{self, State as _, Table as _}, - resolvers::kv::{KVEventContent, KVResolver, KVState}, + resolvers::kv::{KVEventContent, KVResolver}, room::Room, + store::sqlite, Error, }; -use rusqlite::{params_from_iter, Connection, ParamsFromIter}; +use rusqlite::Connection; #[derive(Parser)] #[clap(version, about)] @@ -34,9 +33,10 @@ enum Cli { } struct State { - db: Connection, - room: Room, + db: Arc, + room: Room, secret: ActorSecret, + store: Box, } impl State { @@ -69,223 +69,6 @@ impl State { } } -#[derive(Debug)] -struct Database { - connection: Arc, - config: Option, -} - -#[derive(Debug)] -struct Table { - connection: Arc, - name: String, - config: table::TableConfig, -} - -#[test] -fn testing_a_thing() { - let table = table::TableConfig::new() - .with_column("key", table::ColumnType::String) - .with_column("value", table::ColumnType::String) - .with_index("key", table::IndexType::LookupUnique); - let config = table::StateConfig::new().with_table("kv", table); - let db = Database::open("hello.db").unwrap().init(config).unwrap(); - let table = db.table("kv").unwrap(); - table.insert(table::TableRow { values: HashMap::from_iter([ - ("key".into(), "foo".into()), - ("value".into(), "bar".into()), - ]) }).unwrap(); - dbg!(table.lookup("key", "hello").unwrap()); - dbg!(table.lookup("key", "foo").unwrap()); - panic!("(panic so i can see log more easily)"); -} - -impl Database { - pub fn open(path: impl AsRef) -> Result { - let db = rusqlite::Connection::open(path)?; - Ok(Database { - connection: Arc::new(db), - config: None, - }) - } -} - -impl table::State for Database { - type Err = anyhow::Error; - - fn init(mut self, config: table::StateConfig) -> Result> { - let mut sql = String::new(); - for (table_name, table_column) in &config.tables { - sql.push_str("CREATE TABLE '"); - sql.push_str(&table_name.replace("'", "''")); - sql.push_str("' ("); - for (idx, (column_name, column_config)) in table_column.columns.iter().enumerate() { - if idx != 0 { - sql.push_str(", "); - } - sql.push_str(&column_name.replace("'", "''")); - sql.push_str(match column_config { - table::ColumnType::String => " TEXT", - table::ColumnType::Text => " TEXT", - table::ColumnType::Integer => " INT", - table::ColumnType::Float => " REAL", - table::ColumnType::Event => " TEXT", - }); - } - sql.push_str(");\n"); - } - self.connection.execute_batch(&sql).unwrap(); - self.config = Some(config); - Ok(Box::new(self)) - } - - fn table(&self, name: &str) -> Result
{ - let Some(table) = self.config.as_ref().unwrap().tables.get(name) else { - panic!("table does not exist"); - }; - Ok(Table { - connection: self.connection.clone(), - name: name.to_owned(), - config: table.to_owned(), - }) - } -} - -impl table::Table for Table { - type Err = anyhow::Error; - - fn lookup( - &self, - column: &str, - value: impl Into, - ) -> Result> { - let Some(column_config) = self.config.columns.get(column) else { - panic!("does not exist"); - }; - let has_index = self.config.indexes.iter().any(|idx| { - idx.column == column - && match idx.index_type { - table::IndexType::Lookup => true, - table::IndexType::LookupUnique => true, - _ => false, - } - }); - if !has_index { - panic!("no lookup index"); - } - let mut sql = String::from("SELECT * FROM "); - sql.push_str(&self.name.replace("'", "''")); - sql.push_str(" WHERE "); - sql.push_str(&column.replace("'", "''")); - sql.push_str(" = "); - match (column_config, value.into()) { - (table::ColumnType::String, table::ColumnValue::String(s)) => { - sql.push_str("'"); - sql.push_str(&s.replace("'", "''")); - sql.push_str("'"); - } - (table::ColumnType::Integer, table::ColumnValue::Integer(i)) => { - sql.push_str(&i.to_string()); - } - (_, _) => todo!(), - }; - let mut sql_query = self.connection.prepare(&sql).unwrap(); - let mut sql_rows = sql_query.query([]).unwrap(); - let mut rows = vec![]; - while let Some(sql_row) = sql_rows.next().unwrap() { - let mut map = HashMap::new(); - for (column_name, column_type) in &self.config.columns { - let val = match column_type { - table::ColumnType::String => { - table::ColumnValue::String(sql_row.get(column_name.as_str()).unwrap()) - } - table::ColumnType::Text => todo!(), - table::ColumnType::Integer => { - table::ColumnValue::Integer(sql_row.get(column_name.as_str()).unwrap()) - } - table::ColumnType::Float => { - table::ColumnValue::Float(sql_row.get(column_name.as_str()).unwrap()) - } - table::ColumnType::Event => todo!(), - }; - map.insert(column_name.to_string(), val); - } - rows.push(table::TableRow { values: map }); - } - Ok(rows) - } - - fn lookup_one( - &self, - column: &str, - value: impl Into, - ) -> Result { - Ok(self - .lookup(column, value) - .unwrap() - .into_iter() - .next() - .unwrap()) - } - - fn lookup_optional( - &self, - column: &str, - value: impl Into, - ) -> Result> { - Ok(self.lookup(column, value).unwrap().into_iter().next()) - } - - fn range( - &self, - _column: &str, - _paginate: table::Paginate, - _limit: u64, - ) -> Result> { - todo!() - } - - fn search_text( - &self, - _column: &str, - _search: &str, - _limit: u64, - _after: u64, - ) -> Result> { - todo!() - } - - fn insert(&self, mut row: table::TableRow) -> Result<()> { - let mut sql = String::from("INSERT INTO "); - sql.push_str(&self.name.replace("'", "''")); - sql.push_str(" ("); - for (idx, name) in self.config.columns.keys().enumerate() { - if idx != 0 { - sql.push_str(", "); - } - sql.push_str(&name.replace("'", "''")); - } - sql.push_str(") VALUES ("); - use rusqlite::types::Value; - let mut params: Vec = vec![]; - for (idx, (name, column_type)) in self.config.columns.iter().enumerate() { - if idx != 0 { - sql.push_str(", "); - } - sql.push_str("?"); - match (column_type, row.values.remove(name).unwrap()) { - (table::ColumnType::String, table::ColumnValue::String(s)) => params.push(Value::Text(s)), - (table::ColumnType::Integer, table::ColumnValue::Integer(i)) => params.push(Value::Integer(i.try_into().unwrap())), - (table::ColumnType::Float, table::ColumnValue::Float(r)) => params.push(Value::Real(r)), - _ => todo!(), - }; - } - sql.push_str(")"); - self.connection.execute(&sql, params_from_iter(params)).unwrap(); - Ok(()) - } -} - fn get_repo(path: &Path, create: bool) -> Result { match (create, path.try_exists()?) { (true, true) => panic!("repo already exists"), @@ -307,7 +90,7 @@ fn get_repo(path: &Path, create: bool) -> Result { "#, )?; let (actor, secret) = ActorId::new(SignatureType::Ed25519); - let resolver = Box::new(KVResolver); + let resolver = Box::new(KVResolver::::new()); let room = Room::builder() .with_resolver(resolver) .with_hasher(HashType::Sha256) @@ -327,7 +110,15 @@ fn get_repo(path: &Path, create: bool) -> Result { (event.id().to_string(), serde_json::to_string(&event)?), )?; } - Ok(State { db, room, secret }) + let db = Arc::new(db); + let store = dag_resolve::store::sqlite::Database::from_conn(db.clone()) + .init(room.get_resolver().get_state_config())?; + Ok(State { + db, + room, + secret, + store, + }) } (false, true) => { // restore repo @@ -344,7 +135,7 @@ fn get_repo(path: &Path, create: bool) -> Result { dbg!(&actor, &secret); let mut q = db.prepare("SELECT json FROM events")?; let mut rows = q.query([])?; - let mut room: Option> = None; + let mut room: Option> = None; while let Some(row) = rows.next()? { let s: String = dbg!(row.get(0)?); let ev = dbg!(serde_json::from_str(&s)?); @@ -353,7 +144,7 @@ fn get_repo(path: &Path, create: bool) -> Result { r.append_event(ev)?; } None => { - let resolver = Box::new(KVResolver); + let resolver = Box::new(KVResolver::::new()); let r = Room::from_root(resolver, ev)?; room = Some(r); } @@ -362,8 +153,16 @@ fn get_repo(path: &Path, create: bool) -> Result { drop(rows); drop(q); let room = room.unwrap(); + let db = Arc::new(db); dbg!(&room); - Ok(State { db, room, secret }) + let store = dag_resolve::store::sqlite::Database::from_conn(db.clone()) + .init(room.get_resolver().get_state_config())?; + Ok(State { + db, + room, + secret, + store, + }) } } } @@ -374,13 +173,12 @@ fn main() -> Result<()> { Cli::Init { repo } => get_repo(repo, true)?, Cli::Set { repo, .. } | Cli::Sync { repo, .. } => get_repo(repo, false)?, }; - dbg!(repo.room.get_resolver().get_state_config()); match cli { Cli::Init { .. } => {} Cli::Set { key, value, .. } => { repo.create_event(CoreContent::Custom(KVEventContent::Set(key, value)))?; dbg!(&repo.room); - dbg!(&repo.room.get_state()); + dbg!(&repo.room.resolve_state(*repo.store)); } Cli::Sync { other, .. } => { let mut other = get_repo(&other, false)?; @@ -390,7 +188,10 @@ fn main() -> Result<()> { for event in &other.room.events { repo.append_event(event.clone())?; } - dbg!(repo.room.get_state(), other.room.get_state()); + dbg!( + repo.room.resolve_state(*repo.store), + other.room.resolve_state(*other.store) + ); dbg!(repo.room); } }; diff --git a/src/lib.rs b/src/lib.rs index bd40292..64807bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,7 @@ -#![allow(dead_code)] // TODO: remove this later! - pub mod error; pub mod proto; pub mod resolvers; +pub mod store; pub use error::{Error, Result}; pub use proto::{actor, event, resolver, room}; diff --git a/src/proto/resolver.rs b/src/proto/resolver.rs index 28b1122..821fe8c 100644 --- a/src/proto/resolver.rs +++ b/src/proto/resolver.rs @@ -2,16 +2,13 @@ use serde::Serialize; use crate::event::{Event, EventId}; use std::{cmp::Ordering, collections::HashSet, fmt::Debug}; - -use super::table::StateConfig; +use crate::store::sqlite; +use super::table::{State, StateConfig}; /// small shards of code designed to resolve state -pub trait Resolver { +pub trait Resolver { /// Given a set of ordered events, resolve the final state - fn resolve(&self, state: State, event: &Event) -> State; - - /// TEMP: get initial state with no events - fn iniaial_state(&self) -> State; + fn resolve(&self, state: &mut dyn State
, event: &Event); /// Given two events, decide which one comes first /// if Ordering::Equal is returned, the event id is used @@ -66,3 +63,4 @@ impl Debug for dyn Resolver { ) } } + diff --git a/src/proto/room.rs b/src/proto/room.rs index 2394def..5f573bf 100644 --- a/src/proto/room.rs +++ b/src/proto/room.rs @@ -1,11 +1,7 @@ use serde::Serialize; use crate::{ - actor::ActorSecret, - event::EventId, - event::{CoreContent, CreateContent, Event, HashType, SignatureType}, - resolver::{sort, Resolver}, - Error, Result, + actor::ActorSecret, event::EventId, event::{CoreContent, CreateContent, Event, HashType, SignatureType}, proto, resolver::{sort, Resolver}, store::sqlite, Error, Result }; use std::fmt::Debug; @@ -64,12 +60,14 @@ impl Room { self.resolver.as_ref() } - pub fn get_state(&mut self) -> S { + pub fn resolve_state(&mut self, initial_state: A) -> A + where A: proto::table::State + { let resolver = self.get_resolver(); let sorted = sort(|a, b| resolver.tiebreak(a, b), &self.events); - let mut state = resolver.iniaial_state(); + let mut state = initial_state; for event in sorted { - state = resolver.resolve(state, event); + resolver.resolve(&mut state, event); } state } diff --git a/src/proto/table.rs b/src/proto/table.rs index 019fa81..07576ef 100644 --- a/src/proto/table.rs +++ b/src/proto/table.rs @@ -6,7 +6,7 @@ //! This is a work in progress; the goal is trying to find a good //! effort/payoff ratio in the database schema definition. -use std::collections::HashMap; +use std::{collections::HashMap, fmt::Debug}; use crate::event::EventId; @@ -104,11 +104,11 @@ pub enum Paginate { Backwards(Option), } -pub trait State { +pub trait State: Debug { type Err; + type Table: Table; - fn init(self, config: StateConfig) -> Result, Self::Err>; - fn table(&self, name: &str) -> Result; + fn table(&self, name: &str) -> Result; } pub trait Table { diff --git a/src/resolvers/kv.rs b/src/resolvers/kv.rs index 1d2cb66..5a57f78 100644 --- a/src/resolvers/kv.rs +++ b/src/resolvers/kv.rs @@ -4,41 +4,49 @@ use serde::{Deserialize, Serialize}; use crate::{ event::{CoreContent, Event}, - proto::table::{ColumnType, IndexType, StateConfig, TableConfig}, + proto::table::{ColumnType, IndexType, State, StateConfig, Table, TableConfig, TableRow}, resolver::Resolver, + store::sqlite, }; -use std::{cmp::Ordering, collections::BTreeMap}; +use std::{cmp::Ordering, collections::HashMap, fmt::Debug, marker::PhantomData}; #[derive(Debug)] /// A basic key-value store -pub struct KVResolver; +pub struct KVResolver { + _a: PhantomData, + _b: PhantomData, +} #[derive(Debug, Clone, Serialize, Deserialize)] pub enum KVEventContent { Set(String, String), } -#[derive(Debug)] -pub struct KVState { - entries: BTreeMap, +impl KVResolver { + pub fn new() -> Self { + Self { + _a: PhantomData, + _b: PhantomData, + } + } } -impl Resolver for KVResolver { - fn resolve(&self, mut state: KVState, event: &Event) -> KVState { +impl Resolver for KVResolver { + fn resolve(&self, state: &mut dyn State
, event: &Event) { dbg!(event); + let table = state.table("kv").unwrap(); match &event.content() { CoreContent::Create(_) => {} CoreContent::Custom(KVEventContent::Set(k, v)) => { - state.entries.insert(k.clone(), v.clone()); + let res = table.insert(TableRow { values: HashMap::from_iter([ + ("key".into(), k.to_owned().into()), + ("value".into(), v.to_owned().into()), + ]) }); + if res.is_err() { + panic!("could not insert"); + } } } - state - } - - fn iniaial_state(&self) -> KVState { - KVState { - entries: BTreeMap::new(), - } } fn tiebreak(&self, _a: &Event, _b: &Event) -> Ordering { diff --git a/src/store/mod.rs b/src/store/mod.rs new file mode 100644 index 0000000..6b1c108 --- /dev/null +++ b/src/store/mod.rs @@ -0,0 +1 @@ +pub mod sqlite; diff --git a/src/store/sqlite.rs b/src/store/sqlite.rs new file mode 100644 index 0000000..ff75a17 --- /dev/null +++ b/src/store/sqlite.rs @@ -0,0 +1,218 @@ +use std::{collections::HashMap, path::Path, sync::Arc}; + +use rusqlite::params_from_iter; +use thiserror::Error; +use crate::proto::table; + +#[derive(Debug)] +pub struct Database { + connection: Arc, + config: Option, +} + +#[derive(Debug)] +pub struct Table { + connection: Arc, + name: String, + config: table::TableConfig, +} + +#[derive(Debug, Error)] +pub enum Error { + +} + +impl Database { + pub fn open(path: impl AsRef) -> Result { + let db = rusqlite::Connection::open(path).unwrap(); + Ok(Database { + connection: Arc::new(db), + config: None, + }) + } + + pub fn from_conn(conn: Arc) -> Self { + Database { connection: conn, config: None } + } + + pub fn init(mut self, config: table::StateConfig) -> Result, Error> { + let mut sql = String::new(); + for (table_name, table_column) in &config.tables { + // FIXME: don't drop and resolve from scratch every time + sql.push_str("DROP TABLE IF EXISTS '"); + sql.push_str(&table_name.replace("'", "''")); + sql.push_str("';\n"); + sql.push_str("CREATE TABLE '"); + sql.push_str(&table_name.replace("'", "''")); + sql.push_str("' ("); + for (idx, (column_name, column_config)) in table_column.columns.iter().enumerate() { + if idx != 0 { + sql.push_str(", "); + } + sql.push_str(&column_name.replace("'", "''")); + sql.push_str(match column_config { + table::ColumnType::String => " TEXT", + table::ColumnType::Text => " TEXT", + table::ColumnType::Integer => " INT", + table::ColumnType::Float => " REAL", + table::ColumnType::Event => " TEXT", + }); + } + sql.push_str(");\n"); + } + self.connection.execute_batch(&sql).unwrap(); + self.config = Some(config); + Ok(Box::new(self)) + } +} + +impl table::State for Database { + type Table = Table; + type Err = Error; + + fn table(&self, name: &str) -> Result { + let Some(table) = self.config.as_ref().unwrap().tables.get(name) else { + panic!("table does not exist"); + }; + Ok(Table { + connection: self.connection.clone(), + name: name.to_owned(), + config: table.to_owned(), + }) + } +} + +impl table::Table for Table { + type Err = Error; + + fn lookup( + &self, + column: &str, + value: impl Into, + ) -> Result, Self::Err> { + let Some(column_config) = self.config.columns.get(column) else { + panic!("does not exist"); + }; + let has_index = self.config.indexes.iter().any(|idx| { + idx.column == column + && match idx.index_type { + table::IndexType::Lookup => true, + table::IndexType::LookupUnique => true, + _ => false, + } + }); + if !has_index { + panic!("no lookup index"); + } + let mut sql = String::from("SELECT * FROM "); + sql.push_str(&self.name.replace("'", "''")); + sql.push_str(" WHERE "); + sql.push_str(&column.replace("'", "''")); + sql.push_str(" = "); + match (column_config, value.into()) { + (table::ColumnType::String, table::ColumnValue::String(s)) => { + sql.push_str("'"); + sql.push_str(&s.replace("'", "''")); + sql.push_str("'"); + } + (table::ColumnType::Integer, table::ColumnValue::Integer(i)) => { + sql.push_str(&i.to_string()); + } + (_, _) => todo!(), + }; + let mut sql_query = self.connection.prepare(&sql).unwrap(); + let mut sql_rows = sql_query.query([]).unwrap(); + let mut rows = vec![]; + while let Some(sql_row) = sql_rows.next().unwrap() { + let mut map = HashMap::new(); + for (column_name, column_type) in &self.config.columns { + let val = match column_type { + table::ColumnType::String => { + table::ColumnValue::String(sql_row.get(column_name.as_str()).unwrap()) + } + table::ColumnType::Text => todo!(), + table::ColumnType::Integer => { + table::ColumnValue::Integer(sql_row.get(column_name.as_str()).unwrap()) + } + table::ColumnType::Float => { + table::ColumnValue::Float(sql_row.get(column_name.as_str()).unwrap()) + } + table::ColumnType::Event => todo!(), + }; + map.insert(column_name.to_string(), val); + } + rows.push(table::TableRow { values: map }); + } + Ok(rows) + } + + fn lookup_one( + &self, + column: &str, + value: impl Into, + ) -> Result { + Ok(self + .lookup(column, value) + .unwrap() + .into_iter() + .next() + .unwrap()) + } + + fn lookup_optional( + &self, + column: &str, + value: impl Into, + ) -> Result, Self::Err> { + Ok(self.lookup(column, value).unwrap().into_iter().next()) + } + + fn range( + &self, + _column: &str, + _paginate: table::Paginate, + _limit: u64, + ) -> Result, Self::Err> { + todo!() + } + + fn search_text( + &self, + _column: &str, + _search: &str, + _limit: u64, + _after: u64, + ) -> Result, Self::Err> { + todo!() + } + + fn insert(&self, mut row: table::TableRow) -> Result<(), Self::Err> { + let mut sql = String::from("INSERT INTO "); + sql.push_str(&self.name.replace("'", "''")); + sql.push_str(" ("); + for (idx, name) in self.config.columns.keys().enumerate() { + if idx != 0 { + sql.push_str(", "); + } + sql.push_str(&name.replace("'", "''")); + } + sql.push_str(") VALUES ("); + use rusqlite::types::Value; + let mut params: Vec = vec![]; + for (idx, (name, column_type)) in self.config.columns.iter().enumerate() { + if idx != 0 { + sql.push_str(", "); + } + sql.push_str("?"); + match (column_type, row.values.remove(name).unwrap()) { + (table::ColumnType::String, table::ColumnValue::String(s)) => params.push(Value::Text(s)), + (table::ColumnType::Integer, table::ColumnValue::Integer(i)) => params.push(Value::Integer(i.try_into().unwrap())), + (table::ColumnType::Float, table::ColumnValue::Float(r)) => params.push(Value::Real(r)), + _ => todo!(), + }; + } + sql.push_str(")"); + self.connection.execute(&sql, params_from_iter(params)).unwrap(); + Ok(()) + } +}