maybe kv instead of tables?
This commit is contained in:
parent
9100867f51
commit
820abbc089
14 changed files with 499 additions and 403 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -708,6 +708,7 @@ dependencies = [
|
|||
"dag-resolve",
|
||||
"rusqlite",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
|
|
|
@ -113,8 +113,7 @@ impl<R: Resolver> State<R> {
|
|||
)?;
|
||||
}
|
||||
let db = Rc::new(db);
|
||||
let store = Database::from_conn(db.clone())
|
||||
.init(room.get_resolver().get_state_config())?;
|
||||
let store = Database::from_conn(db.clone()).init()?;
|
||||
Ok(State {
|
||||
db,
|
||||
room,
|
||||
|
@ -154,8 +153,7 @@ fn open(path: impl AsRef<Path>) -> Result<Opened> {
|
|||
drop(rows);
|
||||
drop(q);
|
||||
let db = Rc::new(db);
|
||||
let store = Database::from_conn(db.clone())
|
||||
.init(room.get_resolver().get_state_config())?;
|
||||
let store = Database::from_conn(db.clone()).init()?;
|
||||
Ok(Opened::Kv(State {
|
||||
db,
|
||||
room,
|
||||
|
@ -174,8 +172,7 @@ fn open(path: impl AsRef<Path>) -> Result<Opened> {
|
|||
drop(rows);
|
||||
drop(q);
|
||||
let db = Rc::new(db);
|
||||
let store = Database::from_conn(db.clone())
|
||||
.init(room.get_resolver().get_state_config())?;
|
||||
let store = Database::from_conn(db.clone()).init()?;
|
||||
Ok(Opened::Forum(State {
|
||||
db,
|
||||
room,
|
||||
|
@ -234,26 +231,28 @@ fn send_event<R: Resolver + Debug>(state: &mut State<R>, data: &str) -> Result<(
|
|||
|
||||
fn print_info<R: Resolver + Debug>(state: State<R>) -> Result<()> {
|
||||
let event_count: u64 = state.db.query_row("SELECT count(*) FROM _events", [], |r| r.get(0))?;
|
||||
let row_count: u64 = state.db.query_row("SELECT count(*) FROM data", [], |r| r.get(0))?;
|
||||
println!("room type: {}", state.room.get_resolver().name());
|
||||
println!("room events: {} total", event_count);
|
||||
println!("room state schema:");
|
||||
for (table_name, table_schema) in state.room.get_resolver().get_state_config().tables {
|
||||
print!(" {} (", table_name);
|
||||
for (idx, (column_name, ctype)) in table_schema.columns.iter().enumerate() {
|
||||
if idx != 0 {
|
||||
print!(", ");
|
||||
}
|
||||
print!("{} {:?}", column_name, ctype);
|
||||
}
|
||||
print!(") indexed on (");
|
||||
for (idx, index) in table_schema.indexes.iter().enumerate() {
|
||||
if idx != 0 {
|
||||
print!(", ");
|
||||
}
|
||||
print!("{} {:?}", index.column, index.index_type);
|
||||
}
|
||||
println!(")");
|
||||
}
|
||||
println!("room events: {}", event_count);
|
||||
println!("room db records: {}", row_count);
|
||||
// println!("room state schema:");
|
||||
// for (table_name, table_schema) in state.room.get_resolver().get_state_config().tables {
|
||||
// print!(" {} (", table_name);
|
||||
// for (idx, (column_name, ctype)) in table_schema.columns.iter().enumerate() {
|
||||
// if idx != 0 {
|
||||
// print!(", ");
|
||||
// }
|
||||
// print!("{} {:?}", column_name, ctype);
|
||||
// }
|
||||
// print!(") indexed on (");
|
||||
// for (idx, index) in table_schema.indexes.iter().enumerate() {
|
||||
// if idx != 0 {
|
||||
// print!(", ");
|
||||
// }
|
||||
// print!("{} {:?}", index.column, index.index_type);
|
||||
// }
|
||||
// println!(")");
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
|
@ -9,4 +9,5 @@ edition = "2021"
|
|||
dag-resolve = { version = "0.1.0", path = "../proto" }
|
||||
rusqlite = { version = "0.30.0", features = ["bundled"] }
|
||||
serde = { version = "1.0.196", features = ["derive"] }
|
||||
serde_json = "1.0.113"
|
||||
thiserror = "1.0.57"
|
||||
|
|
|
@ -7,10 +7,7 @@ use serde::{Deserialize, Serialize};
|
|||
use dag_resolve::{
|
||||
actor::ActorId,
|
||||
event::{CoreContent, Event, EventId},
|
||||
proto::table::{
|
||||
ColumnType, IndexType, State, StateConfig, Table as _, TableConfig, TableRow,
|
||||
Text,
|
||||
},
|
||||
proto::{data::Text, table::Database},
|
||||
resolver::{Command, Resolver, Verification},
|
||||
};
|
||||
|
||||
|
@ -150,42 +147,52 @@ impl ForumRoomAcl {
|
|||
impl Resolver for ForumResolver {
|
||||
type EventType = ForumEventContent;
|
||||
|
||||
fn resolve<S: State>(&self, _state: &S, event: &Event<Self::EventType>) -> Vec<Command> {
|
||||
fn resolve<S: Database>(&self, _state: &S, event: &Event<Self::EventType>) -> Vec<Command> {
|
||||
let row = match event.content() {
|
||||
CoreContent::Create(_) => Command::insert(
|
||||
"members",
|
||||
TableRow::new()
|
||||
.with("actor", event.author().to_owned())
|
||||
.with("acl", ForumMemberAcl::Operator.to_str()),
|
||||
),
|
||||
CoreContent::Custom(ForumEventContent::Post { .. }) => Command::insert(
|
||||
"replies",
|
||||
TableRow::new()
|
||||
.with("event", event.id().to_owned())
|
||||
.with("time", event.timestamp()),
|
||||
),
|
||||
CoreContent::Create(_) => {
|
||||
let author = event.author().to_string().as_bytes().to_vec();
|
||||
let key: Vec<_> = b"member\xff".into_iter().copied().chain(author).collect();
|
||||
let value = ForumMemberAcl::Operator.to_str().as_bytes();
|
||||
Command::put(key, value)
|
||||
}
|
||||
CoreContent::Custom(ForumEventContent::Post { .. }) => {
|
||||
let key: Vec<_> = b"posts\xff"
|
||||
.into_iter()
|
||||
.copied()
|
||||
.chain(event.id().to_string().as_bytes().to_vec())
|
||||
.collect();
|
||||
Command::put(key, b"")
|
||||
}
|
||||
CoreContent::Custom(ForumEventContent::Reply {
|
||||
post, reference, ..
|
||||
}) => Command::insert(
|
||||
"replies",
|
||||
TableRow::new()
|
||||
.with("event", event.id().to_owned())
|
||||
.with("post", post.to_owned())
|
||||
.with("reference", reference.to_owned()),
|
||||
),
|
||||
CoreContent::Custom(ForumEventContent::Config { name, topic, acl }) => Command::insert(
|
||||
"config",
|
||||
TableRow::new()
|
||||
.with("name", name.to_owned())
|
||||
.with("topic", topic.to_owned())
|
||||
.with("acl", acl.to_str()),
|
||||
),
|
||||
CoreContent::Custom(ForumEventContent::Member { id, acl }) => Command::insert(
|
||||
"members",
|
||||
TableRow::new()
|
||||
.with("actor", id.to_owned())
|
||||
.with("acl", acl.to_str()),
|
||||
),
|
||||
}) => {
|
||||
let key: Vec<_> = b"replies\xff"
|
||||
.into_iter()
|
||||
.copied()
|
||||
.chain(event.id().to_string().as_bytes().to_vec())
|
||||
.collect();
|
||||
let value: Vec<_> = vec![]
|
||||
.into_iter()
|
||||
.copied()
|
||||
.chain(post.to_string().as_bytes().to_vec())
|
||||
.chain([0xff])
|
||||
.chain(reference.to_string().as_bytes().to_vec())
|
||||
.collect();
|
||||
Command::put(key, value)
|
||||
}
|
||||
CoreContent::Custom(ForumEventContent::Config { name, topic, acl }) => {
|
||||
return vec![
|
||||
Command::put(b"name", serde_json::to_vec(name).unwrap()),
|
||||
Command::put(b"topic", serde_json::to_vec(topic).unwrap()),
|
||||
Command::put(b"acl", acl.to_str().as_bytes()),
|
||||
]
|
||||
}
|
||||
CoreContent::Custom(ForumEventContent::Member { id, acl }) => {
|
||||
let author = event.author().to_string().as_bytes().to_vec();
|
||||
let key: Vec<_> = b"member\xff".into_iter().copied().chain(author).collect();
|
||||
let value = acl.to_str().as_bytes();
|
||||
Command::put(key, value)
|
||||
}
|
||||
};
|
||||
vec![row]
|
||||
}
|
||||
|
@ -198,52 +205,21 @@ impl Resolver for ForumResolver {
|
|||
"forum-v0"
|
||||
}
|
||||
|
||||
fn get_state_config(&self) -> StateConfig {
|
||||
let config = TableConfig::new()
|
||||
.with_column("name", ColumnType::Text)
|
||||
.with_column("topic", ColumnType::Text)
|
||||
.with_index("name", IndexType::LookupUnique)
|
||||
.with_index("topic", IndexType::LookupUnique);
|
||||
let members = TableConfig::new()
|
||||
.with_column("actor", ColumnType::Actor)
|
||||
.with_column("acl", ColumnType::String)
|
||||
.with_index("actor", IndexType::LookupUnique);
|
||||
let posts = TableConfig::new()
|
||||
.with_column("event", ColumnType::Event)
|
||||
.with_column("time", ColumnType::Integer)
|
||||
.with_index("time", IndexType::Ordered);
|
||||
let replies = TableConfig::new()
|
||||
.with_column("event", ColumnType::Event)
|
||||
.with_column("post", ColumnType::Event)
|
||||
.with_column("parent", ColumnType::Event)
|
||||
.with_index("post", IndexType::Lookup);
|
||||
StateConfig::new()
|
||||
.with_table("config", config)
|
||||
.with_table("members", members)
|
||||
.with_table("posts", posts)
|
||||
.with_table("replies", replies)
|
||||
.verify()
|
||||
}
|
||||
|
||||
fn verify<S: State>(&self, state: &S, event: &Event<Self::EventType>) -> Verification {
|
||||
fn verify<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> Verification {
|
||||
let key: Vec<_> = b"members-"
|
||||
.into_iter()
|
||||
.copied()
|
||||
.chain(event.author().to_string().as_bytes().to_vec())
|
||||
.collect();
|
||||
let member_entry = state
|
||||
.table("members")
|
||||
.unwrap()
|
||||
.lookup_optional("actor", event.author().to_owned())
|
||||
.unwrap()
|
||||
.and_then(|r| r.values.get("acl").cloned())
|
||||
.and_then(|d| d.as_string().map(ToOwned::to_owned))
|
||||
.get(&key)
|
||||
.and_then(|b| String::from_utf8(b).ok())
|
||||
.map(|s| ForumMemberAcl::from_str(&s))
|
||||
.unwrap_or_default();
|
||||
let room_entry = state
|
||||
.table("config")
|
||||
.unwrap()
|
||||
.lookup_one("key", "acl")
|
||||
.unwrap()
|
||||
.values
|
||||
.get("acl")
|
||||
.and_then(|d| d.as_string())
|
||||
.map(|s| ForumRoomAcl::from_str(s))
|
||||
.get(b"acl")
|
||||
.and_then(|d| String::from_utf8(d).ok())
|
||||
.map(|s| ForumRoomAcl::from_str(&s))
|
||||
.unwrap_or_default();
|
||||
|
||||
match event.content() {
|
||||
|
|
|
@ -3,9 +3,7 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use dag_resolve::{
|
||||
event::{CoreContent, Event},
|
||||
proto::table::{ColumnType, IndexType, State, StateConfig, Table, TableConfig, TableRow},
|
||||
resolver::{Command, Resolver, Verification},
|
||||
event::{CoreContent, Event}, proto::table::Database, resolver::{Command, Resolver, Verification}
|
||||
};
|
||||
use std::cmp::Ordering;
|
||||
|
||||
|
@ -29,19 +27,14 @@ impl KVResolver {
|
|||
impl Resolver for KVResolver {
|
||||
type EventType = KVEventContent;
|
||||
|
||||
fn resolve<S: State>(&self, _state: &S, event: &Event<KVEventContent>) -> Vec<Command> {
|
||||
fn resolve<D: Database>(&self, _state: &D, event: &Event<KVEventContent>) -> Vec<Command> {
|
||||
match &event.content() {
|
||||
CoreContent::Create(_) => {
|
||||
let row = TableRow::new()
|
||||
.with("key", "owner".to_owned())
|
||||
.with("value", event.author().to_owned());
|
||||
vec![Command::insert("meta", row)]
|
||||
vec![Command::Put { key: b"owner".into(), value: event.author().to_string().as_bytes().to_vec() }]
|
||||
},
|
||||
CoreContent::Custom(KVEventContent::Set(k, v)) => {
|
||||
let row = TableRow::new()
|
||||
.with("key", k.to_owned())
|
||||
.with("value", v.to_owned());
|
||||
vec![Command::insert("kv", row)]
|
||||
let key = b"kv\xff".into_iter().chain(k.as_bytes()).cloned().collect();
|
||||
vec![Command::Put { key, value: v.as_bytes().to_owned() }]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -54,23 +47,11 @@ impl Resolver for KVResolver {
|
|||
"kv"
|
||||
}
|
||||
|
||||
fn get_state_config(&self) -> StateConfig {
|
||||
let meta = TableConfig::new()
|
||||
.with_column("key", ColumnType::String)
|
||||
.with_column("value", ColumnType::String)
|
||||
.with_index("key", IndexType::LookupUnique);
|
||||
let table = TableConfig::new()
|
||||
.with_column("key", ColumnType::String)
|
||||
.with_column("value", ColumnType::String)
|
||||
.with_index("key", IndexType::LookupUnique);
|
||||
StateConfig::new().with_table("meta", meta).with_table("kv", table).verify()
|
||||
}
|
||||
|
||||
fn verify<S: State>(&self, state: &S, event: &Event<Self::EventType>) -> Verification {
|
||||
let entry = state.table("meta").unwrap().lookup_optional("owner", event.author().to_owned()).unwrap();
|
||||
match entry {
|
||||
Some(_) => Verification::Valid,
|
||||
None => Verification::Unauthorized,
|
||||
fn verify<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> Verification {
|
||||
if state.get(b"owner").unwrap() == event.author().to_string().as_bytes() {
|
||||
Verification::Valid
|
||||
} else {
|
||||
Verification::Invalid
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
//! Contains premade stores to store state/data
|
||||
|
||||
pub mod memory;
|
||||
// pub mod memory;
|
||||
pub mod sqlite;
|
||||
|
|
|
@ -2,21 +2,13 @@
|
|||
|
||||
use std::{collections::HashMap, path::Path, rc::Rc};
|
||||
|
||||
use dag_resolve::proto::table::{self, State};
|
||||
use dag_resolve::proto::table::{self, Database as _};
|
||||
use rusqlite::params_from_iter;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Database {
|
||||
connection: Rc<rusqlite::Connection>,
|
||||
config: Option<table::StateConfig>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Table {
|
||||
connection: Rc<rusqlite::Connection>,
|
||||
name: String,
|
||||
config: table::TableConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
|
@ -27,249 +19,252 @@ impl Database {
|
|||
let db = rusqlite::Connection::open(path).unwrap();
|
||||
Ok(Database {
|
||||
connection: Rc::new(db),
|
||||
config: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn from_conn(conn: Rc<rusqlite::Connection>) -> Self {
|
||||
Database {
|
||||
connection: conn,
|
||||
config: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn init(mut self, config: table::StateConfig) -> Result<Box<Self>, Error> {
|
||||
self.config = Some(config);
|
||||
self.reset()?;
|
||||
pub fn init(mut self) -> Result<Box<Self>, Error> {
|
||||
self.reset();
|
||||
Ok(Box::new(self))
|
||||
}
|
||||
}
|
||||
|
||||
impl table::State for Database {
|
||||
type Table = Table;
|
||||
type Err = Error;
|
||||
pub struct Query;
|
||||
|
||||
fn table(&self, name: &str) -> Result<Table, Self::Err> {
|
||||
let Some(table) = self.config.as_ref().unwrap().tables.get(name) else {
|
||||
panic!("table does not exist");
|
||||
};
|
||||
Ok(Table {
|
||||
connection: self.connection.clone(),
|
||||
name: name.to_owned(),
|
||||
config: table.to_owned(),
|
||||
})
|
||||
}
|
||||
|
||||
fn reset(&mut self) -> Result<(), Self::Err> {
|
||||
let mut sql = String::new();
|
||||
for (table_name, table_column) in &self.config.as_ref().unwrap().tables {
|
||||
// FIXME: don't drop and resolve from scratch every time
|
||||
sql.push_str("DROP TABLE IF EXISTS '");
|
||||
sql.push_str(&table_name.replace('\'', "''"));
|
||||
sql.push_str("';\n");
|
||||
sql.push_str("CREATE TABLE '");
|
||||
sql.push_str(&table_name.replace('\'', "''"));
|
||||
sql.push_str("' (");
|
||||
for (idx, (column_name, column_config)) in table_column.columns.iter().enumerate() {
|
||||
if idx != 0 {
|
||||
sql.push_str(", ");
|
||||
}
|
||||
sql.push_str(&column_name.replace('\'', "''"));
|
||||
sql.push_str(match column_config {
|
||||
table::ColumnType::String => " TEXT",
|
||||
table::ColumnType::Text => " TEXT",
|
||||
table::ColumnType::Integer => " INT",
|
||||
table::ColumnType::Float => " REAL",
|
||||
table::ColumnType::Event => " TEXT",
|
||||
table::ColumnType::Actor => " TEXT",
|
||||
});
|
||||
}
|
||||
sql.push_str(");\n");
|
||||
}
|
||||
self.connection.execute_batch(&sql).unwrap();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl table::Table for Table {
|
||||
type Err = Error;
|
||||
|
||||
fn all(&self) -> Result<Vec<table::TableRow>, Self::Err> {
|
||||
let mut sql = String::from("SELECT * FROM '");
|
||||
sql.push_str(&self.name.replace('\'', "''"));
|
||||
sql.push('\'');
|
||||
let mut sql_query = self.connection.prepare(&sql).unwrap();
|
||||
let mut sql_rows = sql_query.query([]).unwrap();
|
||||
let mut rows = vec![];
|
||||
while let Some(sql_row) = sql_rows.next().unwrap() {
|
||||
let mut map = HashMap::new();
|
||||
for (column_name, column_type) in &self.config.columns {
|
||||
let val = match column_type {
|
||||
table::ColumnType::String => {
|
||||
table::ColumnValue::String(sql_row.get(column_name.as_str()).unwrap())
|
||||
}
|
||||
table::ColumnType::Text => todo!(),
|
||||
table::ColumnType::Integer => {
|
||||
table::ColumnValue::Integer(sql_row.get(column_name.as_str()).unwrap())
|
||||
}
|
||||
table::ColumnType::Float => {
|
||||
table::ColumnValue::Float(sql_row.get(column_name.as_str()).unwrap())
|
||||
}
|
||||
table::ColumnType::Actor => todo!(),
|
||||
table::ColumnType::Event => todo!(),
|
||||
};
|
||||
map.insert(column_name.to_string(), val);
|
||||
}
|
||||
rows.push(table::TableRow { values: map });
|
||||
}
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
fn lookup(
|
||||
&self,
|
||||
column: &str,
|
||||
value: impl Into<table::ColumnValue>,
|
||||
) -> Result<Vec<table::TableRow>, Self::Err> {
|
||||
let Some(column_config) = self.config.columns.get(column) else {
|
||||
panic!("does not exist");
|
||||
};
|
||||
let has_index = self.config.indexes.iter().any(|idx| {
|
||||
idx.column == column
|
||||
&& matches!(
|
||||
idx.index_type,
|
||||
table::IndexType::Lookup | table::IndexType::LookupUnique
|
||||
)
|
||||
});
|
||||
if !has_index {
|
||||
panic!("no lookup index");
|
||||
}
|
||||
let mut sql = String::from("SELECT * FROM ");
|
||||
sql.push_str(&self.name.replace('\'', "''"));
|
||||
sql.push_str(" WHERE ");
|
||||
sql.push_str(&column.replace('\'', "''"));
|
||||
sql.push_str(" = ");
|
||||
match (column_config, value.into()) {
|
||||
(table::ColumnType::String, table::ColumnValue::String(s)) => {
|
||||
sql.push('\'');
|
||||
sql.push_str(&s.replace('\'', "''"));
|
||||
sql.push('\'');
|
||||
}
|
||||
(table::ColumnType::Integer, table::ColumnValue::Integer(i)) => {
|
||||
sql.push_str(&i.to_string());
|
||||
}
|
||||
(_, _) => todo!(),
|
||||
};
|
||||
let mut sql_query = self.connection.prepare(&sql).unwrap();
|
||||
let mut sql_rows = sql_query.query([]).unwrap();
|
||||
let mut rows = vec![];
|
||||
while let Some(sql_row) = sql_rows.next().unwrap() {
|
||||
let mut map = HashMap::new();
|
||||
for (column_name, column_type) in &self.config.columns {
|
||||
let val = match column_type {
|
||||
table::ColumnType::String => {
|
||||
table::ColumnValue::String(sql_row.get(column_name.as_str()).unwrap())
|
||||
}
|
||||
table::ColumnType::Text => todo!(),
|
||||
table::ColumnType::Integer => {
|
||||
table::ColumnValue::Integer(sql_row.get(column_name.as_str()).unwrap())
|
||||
}
|
||||
table::ColumnType::Float => {
|
||||
table::ColumnValue::Float(sql_row.get(column_name.as_str()).unwrap())
|
||||
}
|
||||
table::ColumnType::Event => todo!(),
|
||||
table::ColumnType::Actor => todo!(),
|
||||
};
|
||||
map.insert(column_name.to_string(), val);
|
||||
}
|
||||
rows.push(table::TableRow { values: map });
|
||||
}
|
||||
Ok(rows)
|
||||
}
|
||||
|
||||
fn lookup_one(
|
||||
&self,
|
||||
column: &str,
|
||||
value: impl Into<table::ColumnValue>,
|
||||
) -> Result<table::TableRow, Self::Err> {
|
||||
Ok(self
|
||||
.lookup(column, value)
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.next()
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
fn lookup_optional(
|
||||
&self,
|
||||
column: &str,
|
||||
value: impl Into<table::ColumnValue>,
|
||||
) -> Result<Option<table::TableRow>, Self::Err> {
|
||||
Ok(self.lookup(column, value).unwrap().into_iter().next())
|
||||
}
|
||||
|
||||
fn range(
|
||||
&self,
|
||||
_column: &str,
|
||||
_paginate: table::Paginate,
|
||||
_limit: u64,
|
||||
) -> Result<Vec<table::TableRow>, Self::Err> {
|
||||
impl table::Query for Query {
|
||||
fn get_single(self) -> Option<Vec<u8>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn search_text(
|
||||
&self,
|
||||
_column: &str,
|
||||
_search: &str,
|
||||
_limit: u64,
|
||||
_after: u64,
|
||||
) -> Result<Vec<table::TableRow>, Self::Err> {
|
||||
fn get_all(self) -> Vec<Vec<u8>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn insert(&mut self, mut row: table::TableRow) -> Result<(), Self::Err> {
|
||||
let mut sql = String::from("INSERT INTO ");
|
||||
sql.push_str(&self.name.replace('\'', "''"));
|
||||
sql.push_str(" (");
|
||||
for (idx, name) in self.config.columns.keys().enumerate() {
|
||||
if idx != 0 {
|
||||
sql.push_str(", ");
|
||||
fn get_iter(self) -> impl Iterator<Item = Vec<u8>> {
|
||||
vec![].into_iter()
|
||||
}
|
||||
sql.push_str(&name.replace('\'', "''"));
|
||||
}
|
||||
sql.push_str(") VALUES (");
|
||||
use rusqlite::types::Value;
|
||||
let mut params: Vec<Value> = vec![];
|
||||
for (idx, (name, column_type)) in self.config.columns.iter().enumerate() {
|
||||
if idx != 0 {
|
||||
sql.push_str(", ");
|
||||
}
|
||||
sql.push('?');
|
||||
match (column_type, row.values.remove(name).unwrap()) {
|
||||
(table::ColumnType::Text, table::ColumnValue::Text(t)) => {
|
||||
params.push(Value::Text(t.0))
|
||||
}
|
||||
(table::ColumnType::String, table::ColumnValue::String(s)) => {
|
||||
params.push(Value::Text(s))
|
||||
}
|
||||
(table::ColumnType::Integer, table::ColumnValue::Integer(i)) => {
|
||||
params.push(Value::Integer(i.try_into().unwrap()))
|
||||
}
|
||||
(table::ColumnType::Float, table::ColumnValue::Float(r)) => {
|
||||
params.push(Value::Real(r))
|
||||
}
|
||||
(table::ColumnType::Event, table::ColumnValue::Event(event_id)) => {
|
||||
params.push(Value::Text(event_id.to_string()))
|
||||
}
|
||||
(table::ColumnType::Actor, table::ColumnValue::Actor(actor_id)) => {
|
||||
params.push(Value::Text(actor_id.to_string()))
|
||||
}
|
||||
(column_type, column_value) => todo!("column_type={column_type:?} with value {column_value:?}"),
|
||||
};
|
||||
}
|
||||
sql.push(')');
|
||||
self.connection
|
||||
.execute(&sql, params_from_iter(params))
|
||||
.unwrap();
|
||||
Ok(())
|
||||
|
||||
fn count(self) -> u64 {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl table::Database for Database {
|
||||
type Query = Query;
|
||||
|
||||
fn reset(&self) {
|
||||
self.connection.execute_batch("
|
||||
DROP TABLE IF EXISTS data;
|
||||
CREATE TABLE data (key BLOB, value BLOB);
|
||||
").unwrap();
|
||||
}
|
||||
|
||||
fn query(&self, selector: table::Selector) -> Self::Query {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn query_reverse(&self, selector: table::Selector) -> Self::Query {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn put(&self, key: &[u8], value: &[u8]) {
|
||||
let mut s = self.connection.prepare("INSERT INTO data (key, value) VALUES (?, ?)").unwrap();
|
||||
s.execute((key, value)).unwrap();
|
||||
}
|
||||
|
||||
fn delete(&self, key: &[u8]) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
// impl table::Table for Table {
|
||||
// type Err = Error;
|
||||
|
||||
// fn all(&self) -> Result<Vec<table::TableRow>, Self::Err> {
|
||||
// let mut sql = String::from("SELECT * FROM '");
|
||||
// sql.push_str(&self.name.replace('\'', "''"));
|
||||
// sql.push('\'');
|
||||
// let mut sql_query = self.connection.prepare(&sql).unwrap();
|
||||
// let mut sql_rows = sql_query.query([]).unwrap();
|
||||
// let mut rows = vec![];
|
||||
// while let Some(sql_row) = sql_rows.next().unwrap() {
|
||||
// let mut map = HashMap::new();
|
||||
// for (column_name, column_type) in &self.config.columns {
|
||||
// let val = match column_type {
|
||||
// table::ColumnType::String => {
|
||||
// table::ColumnValue::String(sql_row.get(column_name.as_str()).unwrap())
|
||||
// }
|
||||
// table::ColumnType::Text => todo!(),
|
||||
// table::ColumnType::Integer => {
|
||||
// table::ColumnValue::Integer(sql_row.get(column_name.as_str()).unwrap())
|
||||
// }
|
||||
// table::ColumnType::Float => {
|
||||
// table::ColumnValue::Float(sql_row.get(column_name.as_str()).unwrap())
|
||||
// }
|
||||
// table::ColumnType::Actor => todo!(),
|
||||
// table::ColumnType::Event => todo!(),
|
||||
// };
|
||||
// map.insert(column_name.to_string(), val);
|
||||
// }
|
||||
// rows.push(table::TableRow { values: map });
|
||||
// }
|
||||
// Ok(rows)
|
||||
// }
|
||||
|
||||
// fn lookup(
|
||||
// &self,
|
||||
// column: &str,
|
||||
// value: impl Into<table::ColumnValue>,
|
||||
// ) -> Result<Vec<table::TableRow>, Self::Err> {
|
||||
// let Some(column_config) = self.config.columns.get(column) else {
|
||||
// panic!("does not exist");
|
||||
// };
|
||||
// let has_index = self.config.indexes.iter().any(|idx| {
|
||||
// idx.column == column
|
||||
// && matches!(
|
||||
// idx.index_type,
|
||||
// table::IndexType::Lookup | table::IndexType::LookupUnique
|
||||
// )
|
||||
// });
|
||||
// if !has_index {
|
||||
// panic!("no lookup index");
|
||||
// }
|
||||
// let mut sql = String::from("SELECT * FROM ");
|
||||
// sql.push_str(&self.name.replace('\'', "''"));
|
||||
// sql.push_str(" WHERE ");
|
||||
// sql.push_str(&column.replace('\'', "''"));
|
||||
// sql.push_str(" = ");
|
||||
// match (column_config, value.into()) {
|
||||
// (table::ColumnType::String, table::ColumnValue::String(s)) => {
|
||||
// sql.push('\'');
|
||||
// sql.push_str(&s.replace('\'', "''"));
|
||||
// sql.push('\'');
|
||||
// }
|
||||
// (table::ColumnType::Integer, table::ColumnValue::Integer(i)) => {
|
||||
// sql.push_str(&i.to_string());
|
||||
// }
|
||||
// (_, _) => todo!(),
|
||||
// };
|
||||
// let mut sql_query = self.connection.prepare(&sql).unwrap();
|
||||
// let mut sql_rows = sql_query.query([]).unwrap();
|
||||
// let mut rows = vec![];
|
||||
// while let Some(sql_row) = sql_rows.next().unwrap() {
|
||||
// let mut map = HashMap::new();
|
||||
// for (column_name, column_type) in &self.config.columns {
|
||||
// let val = match column_type {
|
||||
// table::ColumnType::String => {
|
||||
// table::ColumnValue::String(sql_row.get(column_name.as_str()).unwrap())
|
||||
// }
|
||||
// table::ColumnType::Text => todo!(),
|
||||
// table::ColumnType::Integer => {
|
||||
// table::ColumnValue::Integer(sql_row.get(column_name.as_str()).unwrap())
|
||||
// }
|
||||
// table::ColumnType::Float => {
|
||||
// table::ColumnValue::Float(sql_row.get(column_name.as_str()).unwrap())
|
||||
// }
|
||||
// table::ColumnType::Event => todo!(),
|
||||
// table::ColumnType::Actor => todo!(),
|
||||
// };
|
||||
// map.insert(column_name.to_string(), val);
|
||||
// }
|
||||
// rows.push(table::TableRow { values: map });
|
||||
// }
|
||||
// Ok(rows)
|
||||
// }
|
||||
|
||||
// fn lookup_one(
|
||||
// &self,
|
||||
// column: &str,
|
||||
// value: impl Into<table::ColumnValue>,
|
||||
// ) -> Result<table::TableRow, Self::Err> {
|
||||
// Ok(self
|
||||
// .lookup(column, value)
|
||||
// .unwrap()
|
||||
// .into_iter()
|
||||
// .next()
|
||||
// .unwrap())
|
||||
// }
|
||||
|
||||
// fn lookup_optional(
|
||||
// &self,
|
||||
// column: &str,
|
||||
// value: impl Into<table::ColumnValue>,
|
||||
// ) -> Result<Option<table::TableRow>, Self::Err> {
|
||||
// Ok(self.lookup(column, value).unwrap().into_iter().next())
|
||||
// }
|
||||
|
||||
// fn range(
|
||||
// &self,
|
||||
// _column: &str,
|
||||
// _paginate: table::Paginate,
|
||||
// _limit: u64,
|
||||
// ) -> Result<Vec<table::TableRow>, Self::Err> {
|
||||
// todo!()
|
||||
// }
|
||||
|
||||
// fn search_text(
|
||||
// &self,
|
||||
// _column: &str,
|
||||
// _search: &str,
|
||||
// _limit: u64,
|
||||
// _after: u64,
|
||||
// ) -> Result<Vec<table::TableRow>, Self::Err> {
|
||||
// todo!()
|
||||
// }
|
||||
|
||||
// fn insert(&mut self, mut row: table::TableRow) -> Result<(), Self::Err> {
|
||||
// let mut sql = String::from("INSERT INTO ");
|
||||
// sql.push_str(&self.name.replace('\'', "''"));
|
||||
// sql.push_str(" (");
|
||||
// for (idx, name) in self.config.columns.keys().enumerate() {
|
||||
// if idx != 0 {
|
||||
// sql.push_str(", ");
|
||||
// }
|
||||
// sql.push_str(&name.replace('\'', "''"));
|
||||
// }
|
||||
// sql.push_str(") VALUES (");
|
||||
// use rusqlite::types::Value;
|
||||
// let mut params: Vec<Value> = vec![];
|
||||
// for (idx, (name, column_type)) in self.config.columns.iter().enumerate() {
|
||||
// if idx != 0 {
|
||||
// sql.push_str(", ");
|
||||
// }
|
||||
// sql.push('?');
|
||||
// match (column_type, row.values.remove(name).unwrap()) {
|
||||
// (table::ColumnType::Text, table::ColumnValue::Text(t)) => {
|
||||
// params.push(Value::Text(t.0))
|
||||
// }
|
||||
// (table::ColumnType::String, table::ColumnValue::String(s)) => {
|
||||
// params.push(Value::Text(s))
|
||||
// }
|
||||
// (table::ColumnType::Integer, table::ColumnValue::Integer(i)) => {
|
||||
// params.push(Value::Integer(i.try_into().unwrap()))
|
||||
// }
|
||||
// (table::ColumnType::Float, table::ColumnValue::Float(r)) => {
|
||||
// params.push(Value::Real(r))
|
||||
// }
|
||||
// (table::ColumnType::Event, table::ColumnValue::Event(event_id)) => {
|
||||
// params.push(Value::Text(event_id.to_string()))
|
||||
// }
|
||||
// (table::ColumnType::Actor, table::ColumnValue::Actor(actor_id)) => {
|
||||
// params.push(Value::Text(actor_id.to_string()))
|
||||
// }
|
||||
// (column_type, column_value) => todo!("column_type={column_type:?} with value {column_value:?}"),
|
||||
// };
|
||||
// }
|
||||
// sql.push(')');
|
||||
// self.connection
|
||||
// .execute(&sql, params_from_iter(params))
|
||||
// .unwrap();
|
||||
// Ok(())
|
||||
// }
|
||||
// }
|
||||
|
|
13
crates/proto/src/proto/data.rs
Normal file
13
crates/proto/src/proto/data.rs
Normal file
|
@ -0,0 +1,13 @@
|
|||
//! common data types
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct Text(Vec<TextPart>);
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct TextPart {
|
||||
pub lang: String,
|
||||
pub mime_type: String,
|
||||
pub content: String,
|
||||
}
|
|
@ -2,4 +2,5 @@ pub mod actor;
|
|||
pub mod event;
|
||||
pub mod resolver;
|
||||
pub mod room;
|
||||
pub mod data;
|
||||
pub mod table;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::table::{State, StateConfig, TableRow};
|
||||
// use super::table::{State, StateConfig, TableRow};
|
||||
use super::table::Database;
|
||||
use crate::event::{Event, EventId};
|
||||
use std::{cmp::Ordering, collections::HashSet, fmt::Debug};
|
||||
|
||||
|
@ -9,20 +10,17 @@ pub trait Resolver {
|
|||
type EventType: Clone + Debug + Serialize + for<'a> Deserialize<'a>;
|
||||
|
||||
/// Given a set of ordered events, resolve the final state
|
||||
fn resolve<S: State>(&self, state: &S, event: &Event<Self::EventType>) -> Vec<Command>;
|
||||
fn resolve<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> Vec<Command>;
|
||||
|
||||
/// Given two events, decide which one comes first
|
||||
/// if Ordering::Equal is returned, the timestamp then event id is used
|
||||
fn tiebreak(&self, a: &Event<Self::EventType>, b: &Event<Self::EventType>) -> Ordering;
|
||||
|
||||
/// Verify if an event can be sent or not
|
||||
fn verify<S: State>(&self, state: &S, event: &Event<Self::EventType>) -> Verification;
|
||||
fn verify<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> Verification;
|
||||
|
||||
/// TEMP: Get the name/id of this resolver
|
||||
fn name(&self) -> &str;
|
||||
|
||||
/// Get the schema for state
|
||||
fn get_state_config(&self) -> StateConfig;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -41,17 +39,13 @@ pub enum Verification {
|
|||
/// Effects a resolver can produce
|
||||
pub enum Command {
|
||||
/// Insert a new row into the database
|
||||
Insert {
|
||||
table: String,
|
||||
row: TableRow,
|
||||
Put {
|
||||
key: Vec<u8>,
|
||||
value: Vec<u8>,
|
||||
},
|
||||
|
||||
// // TODO: how does delete/update work?
|
||||
// /// Delete an existing row
|
||||
// Delete { query: StateQuery },
|
||||
|
||||
// /// Update an existing row
|
||||
// Update {},
|
||||
/// Delete an existing row
|
||||
Delete { key: Vec<u8> },
|
||||
|
||||
// /// Notify someone outside of the room that they can join
|
||||
// Invite {},
|
||||
|
@ -62,8 +56,8 @@ pub enum Command {
|
|||
}
|
||||
|
||||
impl Command {
|
||||
pub fn insert(table: impl Into<String>, row: TableRow) -> Self {
|
||||
Self::Insert { table: table.into(), row }
|
||||
pub fn put(key: impl Into<Vec<u8>>, value: impl Into<Vec<u8>>) -> Self {
|
||||
Command::Put { key: key.into(), value: value.into() }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -8,8 +8,6 @@ use crate::{
|
|||
};
|
||||
use std::fmt::Debug;
|
||||
|
||||
use super::table::Table;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Room<R: Resolver> {
|
||||
pub events: Vec<Event<R::EventType>>,
|
||||
|
@ -65,20 +63,19 @@ impl<R: Resolver> Room<R> {
|
|||
&self.resolver
|
||||
}
|
||||
|
||||
pub fn resolve_state<S>(&self, state: &mut S)
|
||||
pub fn resolve_state<D>(&self, state: &mut D)
|
||||
where
|
||||
S: proto::table::State,
|
||||
D: proto::table::Database,
|
||||
{
|
||||
let resolver = self.get_resolver();
|
||||
let sorted = sort(|a, b| resolver.tiebreak(a, b), &self.events);
|
||||
state.reset().unwrap();
|
||||
state.reset();
|
||||
for event in sorted {
|
||||
let effects = resolver.resolve(state, event);
|
||||
for effect in effects {
|
||||
match effect {
|
||||
Command::Insert { table, row } => {
|
||||
state.table(&table).unwrap().insert(row).unwrap();
|
||||
},
|
||||
Command::Put { key, value } => state.put(&key, &value),
|
||||
Command::Delete { key } => state.delete(&key),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
26
crates/proto/src/proto/table/kv.rs
Normal file
26
crates/proto/src/proto/table/kv.rs
Normal file
|
@ -0,0 +1,26 @@
|
|||
pub trait Database {
|
||||
type Query: Query;
|
||||
|
||||
fn query(&self, selector: Selector) -> Self::Query;
|
||||
fn query_reverse(&self, selector: Selector) -> Self::Query;
|
||||
|
||||
fn reset(&self);
|
||||
fn get(&self, key: &[u8]) -> Option<Vec<u8>>;
|
||||
fn put(&self, key: &[u8], value: &[u8]);
|
||||
fn delete(&self, key: &[u8]);
|
||||
}
|
||||
|
||||
pub struct Selector(Vec<u8>);
|
||||
|
||||
pub trait Query {
|
||||
fn get_single(self) -> Option<Vec<u8>>;
|
||||
fn get_all(self) -> Vec<Vec<u8>>;
|
||||
fn get_iter(self) -> impl Iterator<Item = Vec<u8>>;
|
||||
fn count(self) -> u64;
|
||||
}
|
||||
|
||||
impl Into<Selector> for Vec<u8> {
|
||||
fn into(self) -> Selector {
|
||||
Selector(self)
|
||||
}
|
||||
}
|
13
crates/proto/src/proto/table/mod.rs
Normal file
13
crates/proto/src/proto/table/mod.rs
Normal file
|
@ -0,0 +1,13 @@
|
|||
//! Each room has a small, well defined, domain specific relational
|
||||
//! database associated with it. The database's contents will then be defined
|
||||
//! by the events it receives. This module contains database traits and
|
||||
//! how the database's schema is defined.
|
||||
//!
|
||||
//! This is a work in progress; the goal is trying to find a good
|
||||
//! effort/payoff ratio in the database schema definition.
|
||||
|
||||
mod old;
|
||||
mod kv;
|
||||
|
||||
// pub use old::*;
|
||||
pub use kv::*;
|
|
@ -1,11 +1,3 @@
|
|||
//! Each room has a small, well defined, domain specific relational
|
||||
//! database associated with it. The database's contents will then be defined
|
||||
//! by the events it receives. This module contains database traits and
|
||||
//! how the database's schema is defined.
|
||||
//!
|
||||
//! This is a work in progress; the goal is trying to find a good
|
||||
//! effort/payoff ratio in the database schema definition.
|
||||
|
||||
// TODO: properly type things
|
||||
// maybe replace TableRow with impl Serialize?
|
||||
// and make schema a trait instead of builder?
|
||||
|
@ -13,7 +5,7 @@
|
|||
use std::{collections::HashMap, fmt::Debug};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{actor::ActorId, event::EventId};
|
||||
use crate::{actor::ActorId, event::EventId, resolver::Command};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
// TODO: actually implement Text
|
||||
|
@ -88,7 +80,6 @@ pub struct IndexConfig {
|
|||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct TableRow {
|
||||
pub id: u64,
|
||||
pub values: HashMap<String, ColumnValue>,
|
||||
}
|
||||
|
||||
|
@ -313,3 +304,111 @@ impl TableRow {
|
|||
self
|
||||
}
|
||||
}
|
||||
|
||||
// ==========================
|
||||
// alternative way of querying the db
|
||||
|
||||
/// A way to query the database
|
||||
pub struct Query;
|
||||
|
||||
// trait Queryable {}
|
||||
|
||||
pub enum QuerySelector {
|
||||
/// Select one specific value
|
||||
Lookup(ColumnValue),
|
||||
|
||||
/// Select starting from a specific (inclusive)
|
||||
From(ColumnValue),
|
||||
|
||||
/// Select ending at a specific value (inclusive)
|
||||
Until(ColumnValue),
|
||||
}
|
||||
|
||||
pub enum QuerySorter {
|
||||
/// Lowest to highest.
|
||||
Ascending,
|
||||
|
||||
/// Highest to lowest.
|
||||
Descending,
|
||||
}
|
||||
|
||||
// do i do sql?
|
||||
// how much of this becomes a trait, how much is a query struct?
|
||||
impl Query {
|
||||
pub fn new() -> Self {
|
||||
Self
|
||||
}
|
||||
|
||||
/// sql FROM
|
||||
pub fn from(self, _table: impl Into<String>) -> Self {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// sql WHERE
|
||||
pub fn matching(self, _column: impl Into<String>, _matcher: QuerySelector) -> Self {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// sql ORDER BY
|
||||
pub fn sorted(self, _column: impl Into<String>, _sorter: QuerySorter) -> Self {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// sql JOIN ON _column = other.column
|
||||
pub fn join_to(self, _column: impl Into<String>) -> Self {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// sql JOIN ON _other._column = column
|
||||
pub fn join_from(self, _other: Query, _column: impl Into<String>) -> Self {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Select a single item
|
||||
pub fn first(self) -> Option<TableRow> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Select all items
|
||||
// do i really want this? it allows unbounded access
|
||||
pub fn all(self) -> Vec<TableRow> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Iterate over items
|
||||
pub fn iter(self) -> impl Iterator<Item = TableRow> {
|
||||
todo!();
|
||||
#[allow(unused)]
|
||||
vec![].into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
// these would only be available in the resolver
|
||||
impl Query {
|
||||
/// Create a set of commands that will update the selected rows
|
||||
pub fn update<F: FnMut(TableRow) -> ()>(self, _f: F) -> Vec<Command> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Create a set of commands that will replace the selected rows with new rows
|
||||
pub fn put(self, _rows: impl IntoIterator<Item = TableRow>) -> Vec<Command> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
/// Create a command that will delete the selected rows
|
||||
pub fn delete(self) -> Command {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
// impl TableRow {
|
||||
// /// creates a command that deletes the row
|
||||
// pub fn delete(self) -> Command {
|
||||
// todo!()
|
||||
// }
|
||||
|
||||
// /// creates a command that updates the row with new data
|
||||
// pub fn update(&self, _row: TableRow) -> Command {
|
||||
// todo!()
|
||||
// }
|
||||
// }
|
Loading…
Reference in a new issue