implement table for state

This commit is contained in:
tezlm 2024-02-10 15:40:01 -08:00
parent 3e53f9c857
commit 03ff5b7c27
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
9 changed files with 553 additions and 41 deletions

View file

@ -1,11 +1,20 @@
use std::path::{Path, PathBuf};
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::Result;
use clap::Parser;
use dag_resolve::{
actor::{ActorId, ActorSecret}, event::{CoreContent, Event, HashType, SignatureType}, resolvers::kv::{KVEventContent, KVResolver, KVState}, room::Room, Error
actor::{ActorId, ActorSecret},
event::{CoreContent, Event, HashType, SignatureType},
proto::table::{self, State as _, Table as _},
resolvers::kv::{KVEventContent, KVResolver, KVState},
room::Room,
Error,
};
use rusqlite::Connection;
use rusqlite::{params_from_iter, Connection, ParamsFromIter};
#[derive(Parser)]
#[clap(version, about)]
@ -21,10 +30,7 @@ enum Cli {
},
/// Synchronize two repositories
Sync {
repo: PathBuf,
other: PathBuf,
},
Sync { repo: PathBuf, other: PathBuf },
}
struct State {
@ -42,7 +48,7 @@ impl State {
(event.id().to_string(), serde_json::to_string(&event)?),
)?;
Ok(())
},
}
Err(Error::AlreadyExists) => Ok(()),
Err(err) => Err(err.into()),
}
@ -56,13 +62,230 @@ impl State {
(event.id().to_string(), serde_json::to_string(&event)?),
)?;
Ok(())
},
}
Err(Error::AlreadyExists) => Ok(()),
Err(err) => Err(err.into()),
}
}
}
#[derive(Debug)]
struct Database {
connection: Arc<rusqlite::Connection>,
config: Option<table::StateConfig>,
}
#[derive(Debug)]
struct Table {
connection: Arc<rusqlite::Connection>,
name: String,
config: table::TableConfig,
}
#[test]
fn testing_a_thing() {
let table = table::TableConfig::new()
.with_column("key", table::ColumnType::String)
.with_column("value", table::ColumnType::String)
.with_index("key", table::IndexType::LookupUnique);
let config = table::StateConfig::new().with_table("kv", table);
let db = Database::open("hello.db").unwrap().init(config).unwrap();
let table = db.table("kv").unwrap();
table.insert(table::TableRow { values: HashMap::from_iter([
("key".into(), "foo".into()),
("value".into(), "bar".into()),
]) }).unwrap();
dbg!(table.lookup("key", "hello").unwrap());
dbg!(table.lookup("key", "foo").unwrap());
panic!("(panic so i can see log more easily)");
}
impl Database {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let db = rusqlite::Connection::open(path)?;
Ok(Database {
connection: Arc::new(db),
config: None,
})
}
}
impl table::State<Table> for Database {
type Err = anyhow::Error;
fn init(mut self, config: table::StateConfig) -> Result<Box<Self>> {
let mut sql = String::new();
for (table_name, table_column) in &config.tables {
sql.push_str("CREATE TABLE '");
sql.push_str(&table_name.replace("'", "''"));
sql.push_str("' (");
for (idx, (column_name, column_config)) in table_column.columns.iter().enumerate() {
if idx != 0 {
sql.push_str(", ");
}
sql.push_str(&column_name.replace("'", "''"));
sql.push_str(match column_config {
table::ColumnType::String => " TEXT",
table::ColumnType::Text => " TEXT",
table::ColumnType::Integer => " INT",
table::ColumnType::Float => " REAL",
table::ColumnType::Event => " TEXT",
});
}
sql.push_str(");\n");
}
self.connection.execute_batch(&sql).unwrap();
self.config = Some(config);
Ok(Box::new(self))
}
fn table(&self, name: &str) -> Result<Table> {
let Some(table) = self.config.as_ref().unwrap().tables.get(name) else {
panic!("table does not exist");
};
Ok(Table {
connection: self.connection.clone(),
name: name.to_owned(),
config: table.to_owned(),
})
}
}
impl table::Table for Table {
type Err = anyhow::Error;
fn lookup(
&self,
column: &str,
value: impl Into<table::ColumnValue>,
) -> Result<Vec<table::TableRow>> {
let Some(column_config) = self.config.columns.get(column) else {
panic!("does not exist");
};
let has_index = self.config.indexes.iter().any(|idx| {
idx.column == column
&& match idx.index_type {
table::IndexType::Lookup => true,
table::IndexType::LookupUnique => true,
_ => false,
}
});
if !has_index {
panic!("no lookup index");
}
let mut sql = String::from("SELECT * FROM ");
sql.push_str(&self.name.replace("'", "''"));
sql.push_str(" WHERE ");
sql.push_str(&column.replace("'", "''"));
sql.push_str(" = ");
match (column_config, value.into()) {
(table::ColumnType::String, table::ColumnValue::String(s)) => {
sql.push_str("'");
sql.push_str(&s.replace("'", "''"));
sql.push_str("'");
}
(table::ColumnType::Integer, table::ColumnValue::Integer(i)) => {
sql.push_str(&i.to_string());
}
(_, _) => todo!(),
};
let mut sql_query = self.connection.prepare(&sql).unwrap();
let mut sql_rows = sql_query.query([]).unwrap();
let mut rows = vec![];
while let Some(sql_row) = sql_rows.next().unwrap() {
let mut map = HashMap::new();
for (column_name, column_type) in &self.config.columns {
let val = match column_type {
table::ColumnType::String => {
table::ColumnValue::String(sql_row.get(column_name.as_str()).unwrap())
}
table::ColumnType::Text => todo!(),
table::ColumnType::Integer => {
table::ColumnValue::Integer(sql_row.get(column_name.as_str()).unwrap())
}
table::ColumnType::Float => {
table::ColumnValue::Float(sql_row.get(column_name.as_str()).unwrap())
}
table::ColumnType::Event => todo!(),
};
map.insert(column_name.to_string(), val);
}
rows.push(table::TableRow { values: map });
}
Ok(rows)
}
fn lookup_one(
&self,
column: &str,
value: impl Into<table::ColumnValue>,
) -> Result<table::TableRow> {
Ok(self
.lookup(column, value)
.unwrap()
.into_iter()
.next()
.unwrap())
}
fn lookup_optional(
&self,
column: &str,
value: impl Into<table::ColumnValue>,
) -> Result<Option<table::TableRow>> {
Ok(self.lookup(column, value).unwrap().into_iter().next())
}
fn range(
&self,
_column: &str,
_paginate: table::Paginate,
_limit: u64,
) -> Result<Vec<table::TableRow>> {
todo!()
}
fn search_text(
&self,
_column: &str,
_search: &str,
_limit: u64,
_after: u64,
) -> Result<Vec<table::TableRow>> {
todo!()
}
fn insert(&self, mut row: table::TableRow) -> Result<()> {
let mut sql = String::from("INSERT INTO ");
sql.push_str(&self.name.replace("'", "''"));
sql.push_str(" (");
for (idx, name) in self.config.columns.keys().enumerate() {
if idx != 0 {
sql.push_str(", ");
}
sql.push_str(&name.replace("'", "''"));
}
sql.push_str(") VALUES (");
use rusqlite::types::Value;
let mut params: Vec<Value> = vec![];
for (idx, (name, column_type)) in self.config.columns.iter().enumerate() {
if idx != 0 {
sql.push_str(", ");
}
sql.push_str("?");
match (column_type, row.values.remove(name).unwrap()) {
(table::ColumnType::String, table::ColumnValue::String(s)) => params.push(Value::Text(s)),
(table::ColumnType::Integer, table::ColumnValue::Integer(i)) => params.push(Value::Integer(i.try_into().unwrap())),
(table::ColumnType::Float, table::ColumnValue::Float(r)) => params.push(Value::Real(r)),
_ => todo!(),
};
}
sql.push_str(")");
self.connection.execute(&sql, params_from_iter(params)).unwrap();
Ok(())
}
}
fn get_repo(path: &Path, create: bool) -> Result<State> {
match (create, path.try_exists()?) {
(true, true) => panic!("repo already exists"),
@ -128,12 +351,12 @@ fn get_repo(path: &Path, create: bool) -> Result<State> {
match &mut room {
Some(r) => {
r.append_event(ev)?;
},
}
None => {
let resolver = Box::new(KVResolver);
let r = Room::from_root(resolver, ev)?;
room = Some(r);
},
}
}
}
drop(rows);
@ -149,9 +372,9 @@ fn main() -> Result<()> {
let cli = Cli::parse();
let mut repo = match &cli {
Cli::Init { repo } => get_repo(repo, true)?,
Cli::Set { repo, .. } |
Cli::Sync { repo, .. } => get_repo(repo, false)?,
Cli::Set { repo, .. } | Cli::Sync { repo, .. } => get_repo(repo, false)?,
};
dbg!(repo.room.get_resolver().get_state_config());
match cli {
Cli::Init { .. } => {}
Cli::Set { key, value, .. } => {

View file

@ -7,6 +7,7 @@ use crate::event::{EventId, HashType, SignatureType};
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Error)]
/// Every way the library can fail.
pub enum Error {
#[error("{0}")]
Serde(#[from] serde_json::Error),

View file

@ -1,8 +1,8 @@
#![allow(dead_code)] // TODO: remove this later!
pub mod error;
pub mod resolvers;
pub mod proto;
pub mod resolvers;
pub use error::{Error, Result};
pub use proto::{actor, event, room, resolver};
pub use proto::{actor, event, resolver, room};

View file

@ -1,4 +1,5 @@
pub mod actor;
pub mod event;
pub mod room;
pub mod resolver;
pub mod room;
pub mod table;

View file

@ -1,12 +1,17 @@
use serde::Serialize;
use crate::event::{EventId, Event};
use crate::event::{Event, EventId};
use std::{cmp::Ordering, collections::HashSet, fmt::Debug};
use super::table::StateConfig;
/// small shards of code designed to resolve state
pub trait Resolver<Type, State> {
/// Given a set of ordered events, resolve the final state
fn resolve(&self, events: &[&Event<Type>]) -> State;
fn resolve(&self, state: State, event: &Event<Type>) -> State;
/// TEMP: get initial state with no events
fn iniaial_state(&self) -> State;
/// Given two events, decide which one comes first
/// if Ordering::Equal is returned, the event id is used
@ -14,6 +19,8 @@ pub trait Resolver<Type, State> {
/// TEMP: Get the name/id of this resolver
fn name(&self) -> &str;
fn get_state_config(&self) -> StateConfig;
}
/// topologically sort a list of events
@ -31,8 +38,9 @@ pub fn sort<T: Debug + Serialize + Clone>(
})
.collect();
let mut sorted = vec![];
let (mut heads, mut unsorted): (Vec<_>, Vec<_>) =
events.iter().partition(|event| event.references().is_empty());
let (mut heads, mut unsorted): (Vec<_>, Vec<_>) = events
.iter()
.partition(|event| event.references().is_empty());
assert!(heads.len() == 1);
while let Some(ev) = heads.pop() {
references.retain(|(parent_id, _)| parent_id != ev.id());

View file

@ -1,7 +1,11 @@
use serde::Serialize;
use crate::{
actor::ActorSecret, event::EventId, event::{CoreContent, CreateContent, Event, HashType, SignatureType}, resolver::{sort, Resolver}, Error, Result
actor::ActorSecret,
event::EventId,
event::{CoreContent, CreateContent, Event, HashType, SignatureType},
resolver::{sort, Resolver},
Error, Result,
};
use std::fmt::Debug;
@ -41,10 +45,7 @@ impl<T: Debug + Serialize + Clone, S> Room<T, S> {
})
}
pub fn from_root(
resolver: Box<dyn Resolver<T, S>>,
event: Event<T>,
) -> Result<Self> {
pub fn from_root(resolver: Box<dyn Resolver<T, S>>, event: Event<T>) -> Result<Self> {
Ok(Self {
heads: vec![event.id().clone()],
events: vec![event],
@ -66,10 +67,18 @@ impl<T: Debug + Serialize + Clone, S> Room<T, S> {
pub fn get_state(&mut self) -> S {
let resolver = self.get_resolver();
let sorted = sort(|a, b| resolver.tiebreak(a, b), &self.events);
resolver.resolve(&sorted)
let mut state = resolver.iniaial_state();
for event in sorted {
state = resolver.resolve(state, event);
}
state
}
pub fn create_event(&mut self, event_content: CoreContent<T>, secret: &ActorSecret) -> Result<&Event<T>> {
pub fn create_event(
&mut self,
event_content: CoreContent<T>,
secret: &ActorSecret,
) -> Result<&Event<T>> {
let event = Event::builder(event_content, secret)
.with_references(std::mem::take(&mut self.heads))
.then_hash()?
@ -128,7 +137,11 @@ impl<T: Clone + Debug + Serialize, S> RoomBuilder<T, S> {
pub fn create(self, secret: &ActorSecret) -> Result<Room<T, S>> {
Room::<T, S>::new(
self.resolver.as_ref().ok_or(Error::MissingBuilderData)?.name().to_owned(),
self.resolver
.as_ref()
.ok_or(Error::MissingBuilderData)?
.name()
.to_owned(),
self.resolver.ok_or(Error::MissingBuilderData)?,
self.hasher.ok_or(Error::MissingBuilderData)?,
self.signer.ok_or(Error::MissingBuilderData)?,

251
src/proto/table.rs Normal file
View file

@ -0,0 +1,251 @@
//! Each room has a small, well defined, domain specific relational
//! database associated with it. The database's contents will then be defined
//! by the events it receives. This module contains database traits and
//! how the database's schema is defined.
//!
//! This is a work in progress; the goal is trying to find a good
//! effort/payoff ratio in the database schema definition.
use std::collections::HashMap;
use crate::event::EventId;
#[derive(Debug)]
// TODO: actually implement Text
pub struct Text(pub String);
#[derive(Debug, Clone, PartialEq, Eq)]
/// Defines an index on a column. Unlike most databases, an index is
/// required to retreive values from a database.
pub enum IndexType {
/// This index is for retrieving the row from a value
Lookup,
/// Like `IndexType::Lookup`, but the keys are unique
LookupUnique,
/// This column references another table's column
Reference(String, String),
/// Like `IndexType::Reference`, but the keys are unique
ReferenceUnique(String, String),
/// This index is for ordering items
Ordered,
/// Full text search, only supports ColumnType::{String, Text}
FullText,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ColumnType {
/// An arbitrary utf8 string
String,
/// human-readable text
Text,
/// u64 integer
// TODO: signed integer (i64)
// NOTE: does it really need to be u64? what if it can be less? maybe
// always use x64, but let the database downcast to save space?
Integer,
/// f64 float
Float,
/// event reference
Event,
}
#[derive(Debug, Default)]
pub struct StateConfig {
pub tables: HashMap<String, TableConfig>,
}
#[derive(Debug, Default, Clone)]
pub struct TableConfig {
pub columns: HashMap<String, ColumnType>,
pub indexes: Vec<IndexConfig>,
}
#[derive(Debug, Clone)]
pub struct IndexConfig {
pub index_type: IndexType,
pub column: String,
}
#[derive(Debug)]
pub struct TableRow {
pub values: HashMap<String, ColumnValue>,
}
#[derive(Debug)]
pub enum ColumnValue {
/// An arbitrary utf8 string
String(String),
/// human-readable text
Text(Text),
/// u64 integer
Integer(u64),
/// f64 float
Float(f64),
/// event reference
Event(EventId),
}
#[derive(Debug)]
pub enum Paginate {
Forwards(Option<ColumnValue>),
Backwards(Option<ColumnValue>),
}
pub trait State<Tb: Table> {
type Err;
fn init(self, config: StateConfig) -> Result<Box<Self>, Self::Err>;
fn table(&self, name: &str) -> Result<Tb, Self::Err>;
}
pub trait Table {
type Err;
/// Retreive a row from a Index::Lookup column
fn lookup(
&self,
column: &str,
value: impl Into<ColumnValue>,
) -> Result<Vec<TableRow>, Self::Err>;
/// Retreive a row from a Index::LookupUnique column, failing if it does not exist
fn lookup_one(
&self,
column: &str,
value: impl Into<ColumnValue>,
) -> Result<TableRow, Self::Err>;
/// Retreive a row from a Index::LookupUnique column
fn lookup_optional(
&self,
column: &str,
value: impl Into<ColumnValue>,
) -> Result<Option<TableRow>, Self::Err>;
/// Retreive a set of rows from an Index::Ordered column
fn range(
&self,
column: &str,
paginate: Paginate,
limit: u64,
) -> Result<Vec<TableRow>, Self::Err>;
/// Do a full text search over an Index::FullText column
fn search_text(
&self,
column: &str,
search: &str,
limit: u64,
after: u64,
) -> Result<Vec<TableRow>, Self::Err>;
/// Insert a row. Should only be called by the resolver
// TODO: enforce by types/traits
fn insert(&self, row: TableRow) -> Result<(), Self::Err>;
// fn insert_bulk(&self, rows: Vec<TableRow>) -> Result<(), Self::Err>;
// TODO: add a way to interact with Reference and ReferenceUnique columns
}
impl From<&str> for ColumnValue {
fn from(value: &str) -> Self {
ColumnValue::String(value.to_string())
}
}
impl From<String> for ColumnValue {
fn from(value: String) -> Self {
ColumnValue::String(value)
}
}
impl From<u64> for ColumnValue {
fn from(value: u64) -> Self {
ColumnValue::Integer(value)
}
}
impl From<f64> for ColumnValue {
fn from(value: f64) -> Self {
ColumnValue::Float(value)
}
}
impl StateConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_table(mut self, name: impl Into<String>, table: TableConfig) -> Self {
self.tables.insert(name.into(), table);
self
}
pub fn verify(self) -> Self {
for table_config in self.tables.values() {
for index_config in &table_config.indexes {
let Some(column) = table_config.columns.get(&index_config.column) else {
panic!("column doesn't exist");
};
match &index_config.index_type {
IndexType::Lookup | IndexType::LookupUnique => {}
IndexType::Reference(other_table, other_column)
| IndexType::ReferenceUnique(other_table, other_column) => {
let other_type = self
.tables
.get(other_table)
.and_then(|t| t.columns.get(other_column))
.expect("Tried to reference column that doesn't exist");
assert_eq!(column, other_type);
}
IndexType::Ordered => match column {
ColumnType::String | ColumnType::Integer | ColumnType::Float => {}
ColumnType::Event => {} // does topological sorting
ColumnType::Text => {
panic!("IndexType::Ordered can not be aplied to ColumnType::Text")
}
},
IndexType::FullText => match column {
ColumnType::String | ColumnType::Text => {}
_ => panic!(
"IndexType::FullText can only be put on ColumnType::{{String, Text}}"
),
},
};
}
}
self
}
}
impl TableConfig {
pub fn new() -> Self {
TableConfig::default()
}
pub fn with_column(mut self, name: impl Into<String>, data_type: ColumnType) -> Self {
self.columns.insert(name.into(), data_type);
self
}
pub fn with_index(mut self, column: impl Into<String>, index_type: IndexType) -> Self {
self.indexes.push(IndexConfig {
column: column.into(),
index_type,
});
self
}
}

View file

@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize};
use crate::{
event::{CoreContent, Event},
proto::table::{ColumnType, IndexType, StateConfig, TableConfig},
resolver::Resolver,
};
use std::{cmp::Ordering, collections::BTreeMap};
@ -23,18 +24,21 @@ pub struct KVState {
}
impl Resolver<KVEventContent, KVState> for KVResolver {
fn resolve(&self, events: &[&Event<KVEventContent>]) -> KVState {
dbg!(events);
let mut kv = BTreeMap::new();
for event in events {
match &event.content() {
CoreContent::Create(_) => {}
CoreContent::Custom(KVEventContent::Set(k, v)) => {
kv.insert(k.clone(), v.clone());
}
fn resolve(&self, mut state: KVState, event: &Event<KVEventContent>) -> KVState {
dbg!(event);
match &event.content() {
CoreContent::Create(_) => {}
CoreContent::Custom(KVEventContent::Set(k, v)) => {
state.entries.insert(k.clone(), v.clone());
}
}
KVState { entries: kv }
state
}
fn iniaial_state(&self) -> KVState {
KVState {
entries: BTreeMap::new(),
}
}
fn tiebreak(&self, _a: &Event<KVEventContent>, _b: &Event<KVEventContent>) -> Ordering {
@ -44,4 +48,12 @@ impl Resolver<KVEventContent, KVState> for KVResolver {
fn name(&self) -> &str {
"kv"
}
fn get_state_config(&self) -> StateConfig {
let table = TableConfig::new()
.with_column("key", ColumnType::String)
.with_column("value", ColumnType::String)
.with_index("key", IndexType::LookupUnique);
StateConfig::new().with_table("kv", table).verify()
}
}

View file

@ -18,13 +18,16 @@ fn test_example() {
room.create_event(
CoreContent::Custom(KVEventContent::Set("foo".into(), "apple".into())),
&secret,
).unwrap();
)
.unwrap();
room.create_event(
CoreContent::Custom(KVEventContent::Set("bar".into(), "banana".into())),
&secret,
).unwrap();
)
.unwrap();
room.create_event(
CoreContent::Custom(KVEventContent::Set("baz".into(), "carrot".into())),
&secret,
).unwrap();
)
.unwrap();
}