beginning to integrate state stores

This commit is contained in:
tezlm 2024-02-10 18:23:32 -08:00
parent 03ff5b7c27
commit 262415b29c
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
8 changed files with 291 additions and 268 deletions

View file

@ -1,5 +1,4 @@
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
@ -9,12 +8,12 @@ use clap::Parser;
use dag_resolve::{
actor::{ActorId, ActorSecret},
event::{CoreContent, Event, HashType, SignatureType},
proto::table::{self, State as _, Table as _},
resolvers::kv::{KVEventContent, KVResolver, KVState},
resolvers::kv::{KVEventContent, KVResolver},
room::Room,
store::sqlite,
Error,
};
use rusqlite::{params_from_iter, Connection, ParamsFromIter};
use rusqlite::Connection;
#[derive(Parser)]
#[clap(version, about)]
@ -34,9 +33,10 @@ enum Cli {
}
struct State {
db: Connection,
room: Room<KVEventContent, KVState>,
db: Arc<Connection>,
room: Room<KVEventContent, ()>,
secret: ActorSecret,
store: Box<dag_resolve::store::sqlite::Database>,
}
impl State {
@ -69,223 +69,6 @@ impl State {
}
}
#[derive(Debug)]
struct Database {
connection: Arc<rusqlite::Connection>,
config: Option<table::StateConfig>,
}
#[derive(Debug)]
struct Table {
connection: Arc<rusqlite::Connection>,
name: String,
config: table::TableConfig,
}
#[test]
fn testing_a_thing() {
let table = table::TableConfig::new()
.with_column("key", table::ColumnType::String)
.with_column("value", table::ColumnType::String)
.with_index("key", table::IndexType::LookupUnique);
let config = table::StateConfig::new().with_table("kv", table);
let db = Database::open("hello.db").unwrap().init(config).unwrap();
let table = db.table("kv").unwrap();
table.insert(table::TableRow { values: HashMap::from_iter([
("key".into(), "foo".into()),
("value".into(), "bar".into()),
]) }).unwrap();
dbg!(table.lookup("key", "hello").unwrap());
dbg!(table.lookup("key", "foo").unwrap());
panic!("(panic so i can see log more easily)");
}
impl Database {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let db = rusqlite::Connection::open(path)?;
Ok(Database {
connection: Arc::new(db),
config: None,
})
}
}
impl table::State<Table> for Database {
type Err = anyhow::Error;
fn init(mut self, config: table::StateConfig) -> Result<Box<Self>> {
let mut sql = String::new();
for (table_name, table_column) in &config.tables {
sql.push_str("CREATE TABLE '");
sql.push_str(&table_name.replace("'", "''"));
sql.push_str("' (");
for (idx, (column_name, column_config)) in table_column.columns.iter().enumerate() {
if idx != 0 {
sql.push_str(", ");
}
sql.push_str(&column_name.replace("'", "''"));
sql.push_str(match column_config {
table::ColumnType::String => " TEXT",
table::ColumnType::Text => " TEXT",
table::ColumnType::Integer => " INT",
table::ColumnType::Float => " REAL",
table::ColumnType::Event => " TEXT",
});
}
sql.push_str(");\n");
}
self.connection.execute_batch(&sql).unwrap();
self.config = Some(config);
Ok(Box::new(self))
}
fn table(&self, name: &str) -> Result<Table> {
let Some(table) = self.config.as_ref().unwrap().tables.get(name) else {
panic!("table does not exist");
};
Ok(Table {
connection: self.connection.clone(),
name: name.to_owned(),
config: table.to_owned(),
})
}
}
impl table::Table for Table {
type Err = anyhow::Error;
fn lookup(
&self,
column: &str,
value: impl Into<table::ColumnValue>,
) -> Result<Vec<table::TableRow>> {
let Some(column_config) = self.config.columns.get(column) else {
panic!("does not exist");
};
let has_index = self.config.indexes.iter().any(|idx| {
idx.column == column
&& match idx.index_type {
table::IndexType::Lookup => true,
table::IndexType::LookupUnique => true,
_ => false,
}
});
if !has_index {
panic!("no lookup index");
}
let mut sql = String::from("SELECT * FROM ");
sql.push_str(&self.name.replace("'", "''"));
sql.push_str(" WHERE ");
sql.push_str(&column.replace("'", "''"));
sql.push_str(" = ");
match (column_config, value.into()) {
(table::ColumnType::String, table::ColumnValue::String(s)) => {
sql.push_str("'");
sql.push_str(&s.replace("'", "''"));
sql.push_str("'");
}
(table::ColumnType::Integer, table::ColumnValue::Integer(i)) => {
sql.push_str(&i.to_string());
}
(_, _) => todo!(),
};
let mut sql_query = self.connection.prepare(&sql).unwrap();
let mut sql_rows = sql_query.query([]).unwrap();
let mut rows = vec![];
while let Some(sql_row) = sql_rows.next().unwrap() {
let mut map = HashMap::new();
for (column_name, column_type) in &self.config.columns {
let val = match column_type {
table::ColumnType::String => {
table::ColumnValue::String(sql_row.get(column_name.as_str()).unwrap())
}
table::ColumnType::Text => todo!(),
table::ColumnType::Integer => {
table::ColumnValue::Integer(sql_row.get(column_name.as_str()).unwrap())
}
table::ColumnType::Float => {
table::ColumnValue::Float(sql_row.get(column_name.as_str()).unwrap())
}
table::ColumnType::Event => todo!(),
};
map.insert(column_name.to_string(), val);
}
rows.push(table::TableRow { values: map });
}
Ok(rows)
}
fn lookup_one(
&self,
column: &str,
value: impl Into<table::ColumnValue>,
) -> Result<table::TableRow> {
Ok(self
.lookup(column, value)
.unwrap()
.into_iter()
.next()
.unwrap())
}
fn lookup_optional(
&self,
column: &str,
value: impl Into<table::ColumnValue>,
) -> Result<Option<table::TableRow>> {
Ok(self.lookup(column, value).unwrap().into_iter().next())
}
fn range(
&self,
_column: &str,
_paginate: table::Paginate,
_limit: u64,
) -> Result<Vec<table::TableRow>> {
todo!()
}
fn search_text(
&self,
_column: &str,
_search: &str,
_limit: u64,
_after: u64,
) -> Result<Vec<table::TableRow>> {
todo!()
}
fn insert(&self, mut row: table::TableRow) -> Result<()> {
let mut sql = String::from("INSERT INTO ");
sql.push_str(&self.name.replace("'", "''"));
sql.push_str(" (");
for (idx, name) in self.config.columns.keys().enumerate() {
if idx != 0 {
sql.push_str(", ");
}
sql.push_str(&name.replace("'", "''"));
}
sql.push_str(") VALUES (");
use rusqlite::types::Value;
let mut params: Vec<Value> = vec![];
for (idx, (name, column_type)) in self.config.columns.iter().enumerate() {
if idx != 0 {
sql.push_str(", ");
}
sql.push_str("?");
match (column_type, row.values.remove(name).unwrap()) {
(table::ColumnType::String, table::ColumnValue::String(s)) => params.push(Value::Text(s)),
(table::ColumnType::Integer, table::ColumnValue::Integer(i)) => params.push(Value::Integer(i.try_into().unwrap())),
(table::ColumnType::Float, table::ColumnValue::Float(r)) => params.push(Value::Real(r)),
_ => todo!(),
};
}
sql.push_str(")");
self.connection.execute(&sql, params_from_iter(params)).unwrap();
Ok(())
}
}
fn get_repo(path: &Path, create: bool) -> Result<State> {
match (create, path.try_exists()?) {
(true, true) => panic!("repo already exists"),
@ -307,7 +90,7 @@ fn get_repo(path: &Path, create: bool) -> Result<State> {
"#,
)?;
let (actor, secret) = ActorId::new(SignatureType::Ed25519);
let resolver = Box::new(KVResolver);
let resolver = Box::new(KVResolver::<KVEventContent, sqlite::Table>::new());
let room = Room::builder()
.with_resolver(resolver)
.with_hasher(HashType::Sha256)
@ -327,7 +110,15 @@ fn get_repo(path: &Path, create: bool) -> Result<State> {
(event.id().to_string(), serde_json::to_string(&event)?),
)?;
}
Ok(State { db, room, secret })
let db = Arc::new(db);
let store = dag_resolve::store::sqlite::Database::from_conn(db.clone())
.init(room.get_resolver().get_state_config())?;
Ok(State {
db,
room,
secret,
store,
})
}
(false, true) => {
// restore repo
@ -344,7 +135,7 @@ fn get_repo(path: &Path, create: bool) -> Result<State> {
dbg!(&actor, &secret);
let mut q = db.prepare("SELECT json FROM events")?;
let mut rows = q.query([])?;
let mut room: Option<Room<KVEventContent, KVState>> = None;
let mut room: Option<Room<KVEventContent, ()>> = None;
while let Some(row) = rows.next()? {
let s: String = dbg!(row.get(0)?);
let ev = dbg!(serde_json::from_str(&s)?);
@ -353,7 +144,7 @@ fn get_repo(path: &Path, create: bool) -> Result<State> {
r.append_event(ev)?;
}
None => {
let resolver = Box::new(KVResolver);
let resolver = Box::new(KVResolver::<KVEventContent, sqlite::Table>::new());
let r = Room::from_root(resolver, ev)?;
room = Some(r);
}
@ -362,8 +153,16 @@ fn get_repo(path: &Path, create: bool) -> Result<State> {
drop(rows);
drop(q);
let room = room.unwrap();
let db = Arc::new(db);
dbg!(&room);
Ok(State { db, room, secret })
let store = dag_resolve::store::sqlite::Database::from_conn(db.clone())
.init(room.get_resolver().get_state_config())?;
Ok(State {
db,
room,
secret,
store,
})
}
}
}
@ -374,13 +173,12 @@ fn main() -> Result<()> {
Cli::Init { repo } => get_repo(repo, true)?,
Cli::Set { repo, .. } | Cli::Sync { repo, .. } => get_repo(repo, false)?,
};
dbg!(repo.room.get_resolver().get_state_config());
match cli {
Cli::Init { .. } => {}
Cli::Set { key, value, .. } => {
repo.create_event(CoreContent::Custom(KVEventContent::Set(key, value)))?;
dbg!(&repo.room);
dbg!(&repo.room.get_state());
dbg!(&repo.room.resolve_state(*repo.store));
}
Cli::Sync { other, .. } => {
let mut other = get_repo(&other, false)?;
@ -390,7 +188,10 @@ fn main() -> Result<()> {
for event in &other.room.events {
repo.append_event(event.clone())?;
}
dbg!(repo.room.get_state(), other.room.get_state());
dbg!(
repo.room.resolve_state(*repo.store),
other.room.resolve_state(*other.store)
);
dbg!(repo.room);
}
};

View file

@ -1,8 +1,7 @@
#![allow(dead_code)] // TODO: remove this later!
pub mod error;
pub mod proto;
pub mod resolvers;
pub mod store;
pub use error::{Error, Result};
pub use proto::{actor, event, resolver, room};

View file

@ -2,16 +2,13 @@ use serde::Serialize;
use crate::event::{Event, EventId};
use std::{cmp::Ordering, collections::HashSet, fmt::Debug};
use super::table::StateConfig;
use crate::store::sqlite;
use super::table::{State, StateConfig};
/// small shards of code designed to resolve state
pub trait Resolver<Type, State> {
pub trait Resolver<Type, S> {
/// Given a set of ordered events, resolve the final state
fn resolve(&self, state: State, event: &Event<Type>) -> State;
/// TEMP: get initial state with no events
fn iniaial_state(&self) -> State;
fn resolve(&self, state: &mut dyn State<Table = sqlite::Table, Err = sqlite::Error>, event: &Event<Type>);
/// Given two events, decide which one comes first
/// if Ordering::Equal is returned, the event id is used
@ -66,3 +63,4 @@ impl<T, S> Debug for dyn Resolver<T, S> {
)
}
}

View file

@ -1,11 +1,7 @@
use serde::Serialize;
use crate::{
actor::ActorSecret,
event::EventId,
event::{CoreContent, CreateContent, Event, HashType, SignatureType},
resolver::{sort, Resolver},
Error, Result,
actor::ActorSecret, event::EventId, event::{CoreContent, CreateContent, Event, HashType, SignatureType}, proto, resolver::{sort, Resolver}, store::sqlite, Error, Result
};
use std::fmt::Debug;
@ -64,12 +60,14 @@ impl<T: Debug + Serialize + Clone, S> Room<T, S> {
self.resolver.as_ref()
}
pub fn get_state(&mut self) -> S {
pub fn resolve_state<A>(&mut self, initial_state: A) -> A
where A: proto::table::State<Err = sqlite::Error, Table = sqlite::Table>
{
let resolver = self.get_resolver();
let sorted = sort(|a, b| resolver.tiebreak(a, b), &self.events);
let mut state = resolver.iniaial_state();
let mut state = initial_state;
for event in sorted {
state = resolver.resolve(state, event);
resolver.resolve(&mut state, event);
}
state
}

View file

@ -6,7 +6,7 @@
//! This is a work in progress; the goal is trying to find a good
//! effort/payoff ratio in the database schema definition.
use std::collections::HashMap;
use std::{collections::HashMap, fmt::Debug};
use crate::event::EventId;
@ -104,11 +104,11 @@ pub enum Paginate {
Backwards(Option<ColumnValue>),
}
pub trait State<Tb: Table> {
pub trait State: Debug {
type Err;
type Table: Table;
fn init(self, config: StateConfig) -> Result<Box<Self>, Self::Err>;
fn table(&self, name: &str) -> Result<Tb, Self::Err>;
fn table(&self, name: &str) -> Result<Self::Table, Self::Err>;
}
pub trait Table {

View file

@ -4,41 +4,49 @@ use serde::{Deserialize, Serialize};
use crate::{
event::{CoreContent, Event},
proto::table::{ColumnType, IndexType, StateConfig, TableConfig},
proto::table::{ColumnType, IndexType, State, StateConfig, Table, TableConfig, TableRow},
resolver::Resolver,
store::sqlite,
};
use std::{cmp::Ordering, collections::BTreeMap};
use std::{cmp::Ordering, collections::HashMap, fmt::Debug, marker::PhantomData};
#[derive(Debug)]
/// A basic key-value store
pub struct KVResolver;
pub struct KVResolver<A, B> {
_a: PhantomData<A>,
_b: PhantomData<B>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum KVEventContent {
Set(String, String),
}
#[derive(Debug)]
pub struct KVState {
entries: BTreeMap<String, String>,
impl<A, B> KVResolver<A, B> {
pub fn new() -> Self {
Self {
_a: PhantomData,
_b: PhantomData,
}
}
}
impl Resolver<KVEventContent, KVState> for KVResolver {
fn resolve(&self, mut state: KVState, event: &Event<KVEventContent>) -> KVState {
impl<A: Debug, B: Table> Resolver<KVEventContent, ()> for KVResolver<A, B> {
fn resolve(&self, state: &mut dyn State<Table = sqlite::Table, Err = sqlite::Error>, event: &Event<KVEventContent>) {
dbg!(event);
let table = state.table("kv").unwrap();
match &event.content() {
CoreContent::Create(_) => {}
CoreContent::Custom(KVEventContent::Set(k, v)) => {
state.entries.insert(k.clone(), v.clone());
let res = table.insert(TableRow { values: HashMap::from_iter([
("key".into(), k.to_owned().into()),
("value".into(), v.to_owned().into()),
]) });
if res.is_err() {
panic!("could not insert");
}
}
}
state
}
fn iniaial_state(&self) -> KVState {
KVState {
entries: BTreeMap::new(),
}
}
fn tiebreak(&self, _a: &Event<KVEventContent>, _b: &Event<KVEventContent>) -> Ordering {

1
src/store/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod sqlite;

218
src/store/sqlite.rs Normal file
View file

@ -0,0 +1,218 @@
use std::{collections::HashMap, path::Path, sync::Arc};
use rusqlite::params_from_iter;
use thiserror::Error;
use crate::proto::table;
#[derive(Debug)]
pub struct Database {
connection: Arc<rusqlite::Connection>,
config: Option<table::StateConfig>,
}
#[derive(Debug)]
pub struct Table {
connection: Arc<rusqlite::Connection>,
name: String,
config: table::TableConfig,
}
#[derive(Debug, Error)]
pub enum Error {
}
impl Database {
pub fn open(path: impl AsRef<Path>) -> Result<Self, Error> {
let db = rusqlite::Connection::open(path).unwrap();
Ok(Database {
connection: Arc::new(db),
config: None,
})
}
pub fn from_conn(conn: Arc<rusqlite::Connection>) -> Self {
Database { connection: conn, config: None }
}
pub fn init(mut self, config: table::StateConfig) -> Result<Box<Self>, Error> {
let mut sql = String::new();
for (table_name, table_column) in &config.tables {
// FIXME: don't drop and resolve from scratch every time
sql.push_str("DROP TABLE IF EXISTS '");
sql.push_str(&table_name.replace("'", "''"));
sql.push_str("';\n");
sql.push_str("CREATE TABLE '");
sql.push_str(&table_name.replace("'", "''"));
sql.push_str("' (");
for (idx, (column_name, column_config)) in table_column.columns.iter().enumerate() {
if idx != 0 {
sql.push_str(", ");
}
sql.push_str(&column_name.replace("'", "''"));
sql.push_str(match column_config {
table::ColumnType::String => " TEXT",
table::ColumnType::Text => " TEXT",
table::ColumnType::Integer => " INT",
table::ColumnType::Float => " REAL",
table::ColumnType::Event => " TEXT",
});
}
sql.push_str(");\n");
}
self.connection.execute_batch(&sql).unwrap();
self.config = Some(config);
Ok(Box::new(self))
}
}
impl table::State for Database {
type Table = Table;
type Err = Error;
fn table(&self, name: &str) -> Result<Table, Self::Err> {
let Some(table) = self.config.as_ref().unwrap().tables.get(name) else {
panic!("table does not exist");
};
Ok(Table {
connection: self.connection.clone(),
name: name.to_owned(),
config: table.to_owned(),
})
}
}
impl table::Table for Table {
type Err = Error;
fn lookup(
&self,
column: &str,
value: impl Into<table::ColumnValue>,
) -> Result<Vec<table::TableRow>, Self::Err> {
let Some(column_config) = self.config.columns.get(column) else {
panic!("does not exist");
};
let has_index = self.config.indexes.iter().any(|idx| {
idx.column == column
&& match idx.index_type {
table::IndexType::Lookup => true,
table::IndexType::LookupUnique => true,
_ => false,
}
});
if !has_index {
panic!("no lookup index");
}
let mut sql = String::from("SELECT * FROM ");
sql.push_str(&self.name.replace("'", "''"));
sql.push_str(" WHERE ");
sql.push_str(&column.replace("'", "''"));
sql.push_str(" = ");
match (column_config, value.into()) {
(table::ColumnType::String, table::ColumnValue::String(s)) => {
sql.push_str("'");
sql.push_str(&s.replace("'", "''"));
sql.push_str("'");
}
(table::ColumnType::Integer, table::ColumnValue::Integer(i)) => {
sql.push_str(&i.to_string());
}
(_, _) => todo!(),
};
let mut sql_query = self.connection.prepare(&sql).unwrap();
let mut sql_rows = sql_query.query([]).unwrap();
let mut rows = vec![];
while let Some(sql_row) = sql_rows.next().unwrap() {
let mut map = HashMap::new();
for (column_name, column_type) in &self.config.columns {
let val = match column_type {
table::ColumnType::String => {
table::ColumnValue::String(sql_row.get(column_name.as_str()).unwrap())
}
table::ColumnType::Text => todo!(),
table::ColumnType::Integer => {
table::ColumnValue::Integer(sql_row.get(column_name.as_str()).unwrap())
}
table::ColumnType::Float => {
table::ColumnValue::Float(sql_row.get(column_name.as_str()).unwrap())
}
table::ColumnType::Event => todo!(),
};
map.insert(column_name.to_string(), val);
}
rows.push(table::TableRow { values: map });
}
Ok(rows)
}
fn lookup_one(
&self,
column: &str,
value: impl Into<table::ColumnValue>,
) -> Result<table::TableRow, Self::Err> {
Ok(self
.lookup(column, value)
.unwrap()
.into_iter()
.next()
.unwrap())
}
fn lookup_optional(
&self,
column: &str,
value: impl Into<table::ColumnValue>,
) -> Result<Option<table::TableRow>, Self::Err> {
Ok(self.lookup(column, value).unwrap().into_iter().next())
}
fn range(
&self,
_column: &str,
_paginate: table::Paginate,
_limit: u64,
) -> Result<Vec<table::TableRow>, Self::Err> {
todo!()
}
fn search_text(
&self,
_column: &str,
_search: &str,
_limit: u64,
_after: u64,
) -> Result<Vec<table::TableRow>, Self::Err> {
todo!()
}
fn insert(&self, mut row: table::TableRow) -> Result<(), Self::Err> {
let mut sql = String::from("INSERT INTO ");
sql.push_str(&self.name.replace("'", "''"));
sql.push_str(" (");
for (idx, name) in self.config.columns.keys().enumerate() {
if idx != 0 {
sql.push_str(", ");
}
sql.push_str(&name.replace("'", "''"));
}
sql.push_str(") VALUES (");
use rusqlite::types::Value;
let mut params: Vec<Value> = vec![];
for (idx, (name, column_type)) in self.config.columns.iter().enumerate() {
if idx != 0 {
sql.push_str(", ");
}
sql.push_str("?");
match (column_type, row.values.remove(name).unwrap()) {
(table::ColumnType::String, table::ColumnValue::String(s)) => params.push(Value::Text(s)),
(table::ColumnType::Integer, table::ColumnValue::Integer(i)) => params.push(Value::Integer(i.try_into().unwrap())),
(table::ColumnType::Float, table::ColumnValue::Float(r)) => params.push(Value::Real(r)),
_ => todo!(),
};
}
sql.push_str(")");
self.connection.execute(&sql, params_from_iter(params)).unwrap();
Ok(())
}
}