Make rooms manage state
This commit is contained in:
parent
05faa56297
commit
6101fc6d70
9 changed files with 270 additions and 212 deletions
|
@ -9,11 +9,7 @@ mod server;
|
|||
use anyhow::Result;
|
||||
use clap::Parser;
|
||||
use dag_resolve::{
|
||||
actor::{ActorId, ActorSecret},
|
||||
event::{CoreContent, Event, EventId, HashType, SignatureType},
|
||||
resolver::Resolver,
|
||||
room::Room,
|
||||
Error,
|
||||
actor::{ActorId, ActorSecret}, event::{CoreContent, Event, EventId, HashType, SignatureType}, resolver::Resolver, room::Room
|
||||
};
|
||||
use dag_resolve_impls::{
|
||||
resolvers::{forum::ForumResolver, kv::KVResolver},
|
||||
|
@ -48,41 +44,21 @@ enum Cli {
|
|||
#[derive(Debug)]
|
||||
struct State<R: Resolver> {
|
||||
db: Rc<Connection>,
|
||||
room: Room<R>,
|
||||
room: Room<R, Database>,
|
||||
secret: ActorSecret,
|
||||
store: Box<Database>,
|
||||
}
|
||||
|
||||
impl<R: Resolver> State<R> {
|
||||
fn create_event(&mut self, content: CoreContent<R::EventType>) -> Result<()> {
|
||||
match self
|
||||
self
|
||||
.room
|
||||
.create_event(&mut *self.store, content, &self.secret)
|
||||
{
|
||||
Ok(event) => {
|
||||
self.db.execute(
|
||||
"INSERT INTO _events (id, json) VALUES (?, ?)",
|
||||
(event.id().to_string(), serde_json::to_string(&event)?),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
Err(Error::AlreadyExists) => Ok(()),
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
.create_event(content, &self.secret)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn append_event(&mut self, event: Event<R::EventType>) -> Result<()> {
|
||||
match self.room.append_event(&mut *self.store, event) {
|
||||
Ok(event) => {
|
||||
self.db.execute(
|
||||
"INSERT INTO _events (id, json) VALUES (?, ?)",
|
||||
(event.id().to_string(), serde_json::to_string(&event)?),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
Err(Error::AlreadyExists) => Ok(()),
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
self.room.append_event(event)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn init(resolver: R, path: &Path) -> Result<Self> {
|
||||
|
@ -91,120 +67,79 @@ impl<R: Resolver> State<R> {
|
|||
let db = rusqlite::Connection::open(path)?;
|
||||
db.execute_batch(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS _config (
|
||||
CREATE TABLE IF NOT EXISTS config (
|
||||
key TEXT PRIMARY KEY,
|
||||
value ANY
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS _events (
|
||||
id TEXT PRIMARY KEY,
|
||||
json TEXT
|
||||
);
|
||||
"#,
|
||||
)?;
|
||||
let (actor, secret) = ActorId::new(SignatureType::Ed25519);
|
||||
let db = Rc::new(db);
|
||||
let room = Room::builder()
|
||||
.with_resolver(resolver)
|
||||
.with_hasher(HashType::Sha256)
|
||||
.with_signer(SignatureType::Ed25519)
|
||||
.with_database(Database::from_conn(db.clone()).init().unwrap())
|
||||
.create(&secret)?;
|
||||
db.execute(
|
||||
"INSERT INTO _config VALUES (?, ?)",
|
||||
"INSERT INTO config VALUES (?, ?)",
|
||||
("actor_id", actor.to_string()),
|
||||
)?;
|
||||
db.execute(
|
||||
"INSERT INTO _config VALUES (?, ?)",
|
||||
"INSERT INTO config VALUES (?, ?)",
|
||||
("actor_secret", serde_json::to_string(&secret)?),
|
||||
)?;
|
||||
for event in &room.events {
|
||||
db.execute(
|
||||
"INSERT INTO _events (id, json) VALUES (?, ?)",
|
||||
(event.id().to_string(), serde_json::to_string(&event)?),
|
||||
)?;
|
||||
}
|
||||
let db = Rc::new(db);
|
||||
let mut store = Database::from_conn(db.clone()).init()?;
|
||||
room.resolve_state(&mut *store);
|
||||
room.resolve_state();
|
||||
Ok(State {
|
||||
db,
|
||||
room,
|
||||
secret,
|
||||
store,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn open(path: impl AsRef<Path>) -> Result<Opened> {
|
||||
// restore repo
|
||||
let db = rusqlite::Connection::open(path)?;
|
||||
let db = Rc::new(rusqlite::Connection::open(path)?);
|
||||
let _actor: ActorId = db.query_row(
|
||||
"SELECT value FROM _config WHERE key='actor_id'",
|
||||
"SELECT value FROM config WHERE key='actor_id'",
|
||||
[],
|
||||
|row| row.get(0).map(|s: String| s.parse()),
|
||||
)??;
|
||||
let secret: ActorSecret = db.query_row(
|
||||
"SELECT value FROM _config WHERE key='actor_secret'",
|
||||
"SELECT value FROM config WHERE key='actor_secret'",
|
||||
[],
|
||||
|row| row.get(0).map(|s: String| serde_json::from_str(&s)),
|
||||
)??;
|
||||
let mut q = db.prepare("SELECT json FROM _events")?;
|
||||
let mut q = db.prepare("SELECT event FROM events")?;
|
||||
let mut rows = q.query([])?;
|
||||
|
||||
let event_json: String = rows
|
||||
let event_json: Vec<u8> = rows
|
||||
.next()?
|
||||
.expect("there's always one root event")
|
||||
.get(0)?;
|
||||
let event: Event<()> = serde_json::from_str(&event_json)?;
|
||||
drop(rows);
|
||||
drop(q);
|
||||
let event: Event<()> = serde_json::from_slice(&event_json)?;
|
||||
let store = Database::from_conn(db.clone()).init()?;
|
||||
match event.content() {
|
||||
CoreContent::Create(c) => match c.resolver.as_str() {
|
||||
"kv" => {
|
||||
let mut room = Room::from_root(KVResolver, serde_json::from_str(&event_json)?)?;
|
||||
let mut events = vec![];
|
||||
while let Some(row) = rows.next()? {
|
||||
let s: String = row.get(0)?;
|
||||
let ev = serde_json::from_str(&s)?;
|
||||
events.push(ev);
|
||||
}
|
||||
drop(event);
|
||||
drop(rows);
|
||||
drop(q);
|
||||
let db = Rc::new(db);
|
||||
let mut store = Database::from_conn(db.clone()).init()?;
|
||||
room.resolve_state(&mut *store);
|
||||
for event in events {
|
||||
room.append_event(&mut *store, event)?;
|
||||
room.resolve_state(&mut *store);
|
||||
}
|
||||
let room = Room::from_root(KVResolver, store, serde_json::from_slice(&event_json)?)?;
|
||||
room.resolve_state();
|
||||
Ok(Opened::Kv(State {
|
||||
db,
|
||||
room,
|
||||
secret,
|
||||
store,
|
||||
}))
|
||||
}
|
||||
"forum-v0" => {
|
||||
let mut room = Room::from_root(ForumResolver, serde_json::from_str(&event_json)?)?;
|
||||
let mut events = vec![];
|
||||
while let Some(row) = rows.next()? {
|
||||
let s: String = row.get(0)?;
|
||||
let ev = serde_json::from_str(&s)?;
|
||||
events.push(ev);
|
||||
}
|
||||
drop(event);
|
||||
drop(rows);
|
||||
drop(q);
|
||||
let db = Rc::new(db);
|
||||
let mut store = Database::from_conn(db.clone()).init()?;
|
||||
room.resolve_state(&mut *store);
|
||||
for event in events {
|
||||
room.append_event(&mut *store, event)?;
|
||||
room.resolve_state(&mut *store);
|
||||
}
|
||||
let room = Room::from_root(ForumResolver, store, serde_json::from_slice(&event_json)?)?;
|
||||
room.resolve_state();
|
||||
Ok(Opened::Forum(State {
|
||||
db,
|
||||
room,
|
||||
secret,
|
||||
store,
|
||||
}))
|
||||
}
|
||||
_ => unimplemented!("unknown resolver"),
|
||||
|
@ -255,8 +190,11 @@ fn sync_state_v2<R: Resolver>(from: &mut State<R>, to: &mut State<R>) -> Result<
|
|||
panic!("cannot sync events from two different rooms");
|
||||
}
|
||||
|
||||
let from_events = from.room.all_events();
|
||||
let to_events = from.room.all_events();
|
||||
|
||||
// a list of event ids would either be cached or easily queryable
|
||||
let our_ids: Vec<_> = to.room.events.iter().map(|ev| ev.id()).cloned().collect();
|
||||
let our_ids: Vec<_> = to_events.iter().map(|ev| ev.id()).cloned().collect();
|
||||
let mut missing_ids: Vec<EventId> = from.room.get_heads().into_iter().cloned().collect();
|
||||
|
||||
// this would be batched, to find multiple missing events at once
|
||||
|
@ -266,9 +204,7 @@ fn sync_state_v2<R: Resolver>(from: &mut State<R>, to: &mut State<R>) -> Result<
|
|||
}
|
||||
|
||||
// getting an event from its id would be cached or easily queryable
|
||||
let event = from
|
||||
.room
|
||||
.events
|
||||
let event = from_events
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|ev| ev.id() == &next)
|
||||
|
@ -277,21 +213,21 @@ fn sync_state_v2<R: Resolver>(from: &mut State<R>, to: &mut State<R>) -> Result<
|
|||
to.append_event(event)?;
|
||||
}
|
||||
|
||||
from.room.resolve_state(&mut *from.store);
|
||||
to.room.resolve_state(&mut *to.store);
|
||||
from.room.resolve_state();
|
||||
to.room.resolve_state();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_event<R: Resolver + Debug>(state: &mut State<R>, data: &str) -> Result<()> {
|
||||
state.create_event(dbg!(serde_json::from_str(dbg!(data))?))?;
|
||||
state.room.resolve_state(&mut *state.store);
|
||||
state.room.resolve_state();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_info<R: Resolver + Debug>(state: State<R>) -> Result<()> {
|
||||
let event_count: u64 = state
|
||||
.db
|
||||
.query_row("SELECT count(*) FROM _events", [], |r| r.get(0))?;
|
||||
.query_row("SELECT count(*) FROM events", [], |r| r.get(0))?;
|
||||
let row_count: u64 = state
|
||||
.db
|
||||
.query_row("SELECT count(*) FROM data", [], |r| r.get(0))?;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
pub mod node;
|
||||
// pub mod node;
|
||||
pub mod resolvers;
|
||||
pub mod stores;
|
||||
pub use node::Node;
|
||||
// pub use node::Node;
|
||||
|
|
|
@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
|
|||
use dag_resolve::{
|
||||
actor::ActorId,
|
||||
event::{CoreContent, Event, EventId},
|
||||
proto::{data::Text, table::Database},
|
||||
proto::{data::Text, table::{Database, Table as _}},
|
||||
resolver::{Command, Resolver, Verification},
|
||||
};
|
||||
|
||||
|
@ -153,7 +153,7 @@ impl Resolver for ForumResolver {
|
|||
let author = event.author().to_string().as_bytes().to_vec();
|
||||
let key: Vec<_> = b"member\xff".into_iter().copied().chain(author).collect();
|
||||
let value = ForumMemberAcl::Operator.to_str().as_bytes();
|
||||
Command::put(key, value)
|
||||
Command::put("members", key, value)
|
||||
}
|
||||
CoreContent::Custom(ForumEventContent::Post { .. }) => {
|
||||
let key: Vec<_> = b"posts\xff"
|
||||
|
@ -161,7 +161,7 @@ impl Resolver for ForumResolver {
|
|||
.copied()
|
||||
.chain(event.id().to_string().as_bytes().to_vec())
|
||||
.collect();
|
||||
Command::put(key, b"")
|
||||
Command::put("posts", key, b"")
|
||||
}
|
||||
CoreContent::Custom(ForumEventContent::Reply {
|
||||
post, reference, ..
|
||||
|
@ -178,20 +178,20 @@ impl Resolver for ForumResolver {
|
|||
.chain([0xff])
|
||||
.chain(reference.to_string().as_bytes().to_vec())
|
||||
.collect();
|
||||
Command::put(key, value)
|
||||
Command::put("replies", key, value)
|
||||
}
|
||||
CoreContent::Custom(ForumEventContent::Config { name, topic, acl }) => {
|
||||
return vec![
|
||||
Command::put(b"name", serde_json::to_vec(name).unwrap()),
|
||||
Command::put(b"topic", serde_json::to_vec(topic).unwrap()),
|
||||
Command::put(b"acl", acl.to_str().as_bytes()),
|
||||
Command::put("meta", b"name", serde_json::to_vec(name).unwrap()),
|
||||
Command::put("meta", b"topic", serde_json::to_vec(topic).unwrap()),
|
||||
Command::put("meta", b"acl", acl.to_str().as_bytes()),
|
||||
]
|
||||
}
|
||||
CoreContent::Custom(ForumEventContent::Member { id, acl }) => {
|
||||
let author = id.to_string().as_bytes().to_vec();
|
||||
let key: Vec<_> = b"member\xff".into_iter().copied().chain(author).collect();
|
||||
let value = acl.to_str().as_bytes();
|
||||
Command::put(key, value)
|
||||
Command::put("members", key, value)
|
||||
}
|
||||
};
|
||||
vec![row]
|
||||
|
@ -206,17 +206,14 @@ impl Resolver for ForumResolver {
|
|||
}
|
||||
|
||||
fn verify<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> Verification {
|
||||
let key: Vec<_> = b"members-"
|
||||
.into_iter()
|
||||
.copied()
|
||||
.chain(event.author().to_string().as_bytes().to_vec())
|
||||
.collect();
|
||||
let member_entry = state
|
||||
.get(&key)
|
||||
.table("members")
|
||||
.get(&event.author().to_string().as_bytes())
|
||||
.and_then(|b| String::from_utf8(b).ok())
|
||||
.map(|s| ForumMemberAcl::from_str(&s))
|
||||
.unwrap_or_default();
|
||||
let room_entry = state
|
||||
.table("meta")
|
||||
.get(b"acl")
|
||||
.and_then(|d| String::from_utf8(d).ok())
|
||||
.map(|s| ForumRoomAcl::from_str(&s))
|
||||
|
@ -227,4 +224,13 @@ impl Resolver for ForumResolver {
|
|||
CoreContent::Custom(c) => can_send_event(room_entry, member_entry, c),
|
||||
}
|
||||
}
|
||||
|
||||
fn schema(&self) -> Vec<String> {
|
||||
vec![
|
||||
"meta".into(),
|
||||
"members".into(),
|
||||
"posts".into(),
|
||||
"replies".into(),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,13 +3,15 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use dag_resolve::{
|
||||
event::{CoreContent, Event}, proto::table::Database, resolver::{Command, Resolver, Verification}
|
||||
event::{CoreContent, Event},
|
||||
proto::table::{Database, Table as _},
|
||||
resolver::{Command, Resolver, Verification},
|
||||
};
|
||||
use std::cmp::Ordering;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
/// A basic key-value store
|
||||
///
|
||||
///
|
||||
/// This is designed to be the "extremely basic and minimalistic" example
|
||||
pub struct KVResolver;
|
||||
|
||||
|
@ -30,11 +32,14 @@ impl Resolver for KVResolver {
|
|||
fn resolve<D: Database>(&self, _state: &D, event: &Event<KVEventContent>) -> Vec<Command> {
|
||||
match &event.content() {
|
||||
CoreContent::Create(_) => {
|
||||
vec![Command::Put { key: b"owner".into(), value: event.author().to_string().as_bytes().to_vec() }]
|
||||
},
|
||||
vec![Command::Put {
|
||||
table: "meta".into(),
|
||||
key: b"owner".into(),
|
||||
value: event.author().to_string().as_bytes().to_vec(),
|
||||
}]
|
||||
}
|
||||
CoreContent::Custom(KVEventContent::Set(k, v)) => {
|
||||
let key = b"kv\xff".into_iter().chain(k.as_bytes()).cloned().collect();
|
||||
vec![Command::Put { key, value: v.as_bytes().to_owned() }]
|
||||
vec![Command::put("kv", k.as_bytes(), v.as_bytes())]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -48,10 +53,14 @@ impl Resolver for KVResolver {
|
|||
}
|
||||
|
||||
fn verify<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> Verification {
|
||||
if state.get(b"owner").unwrap() == event.author().to_string().as_bytes() {
|
||||
if state.table("meta").get(b"owner").unwrap() == event.author().to_string().as_bytes() {
|
||||
Verification::Valid
|
||||
} else {
|
||||
Verification::Invalid
|
||||
}
|
||||
}
|
||||
|
||||
fn schema(&self) -> Vec<String> {
|
||||
vec!["meta".into(), "kv".into()]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
use std::{path::Path, rc::Rc};
|
||||
|
||||
use dag_resolve::proto::table::{self, Database as _};
|
||||
use dag_resolve::{proto::table::{self, Database as _}, room::TABLE_EVENTS};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -27,13 +27,87 @@ impl Database {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn init(self) -> Result<Box<Self>, Error> {
|
||||
pub fn init(self) -> Result<Self, Error> {
|
||||
self.reset();
|
||||
Ok(Box::new(self))
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Query;
|
||||
pub struct Query {
|
||||
selector: table::Selector,
|
||||
table: Table,
|
||||
_reversed: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Table {
|
||||
connection: Rc<rusqlite::Connection>,
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl table::Database for Database {
|
||||
type Table = Table;
|
||||
|
||||
fn reset(&self) {
|
||||
self.connection.execute_batch("
|
||||
DROP TABLE IF EXISTS data;
|
||||
CREATE TABLE data (tb TEXT, key BLOB, value BLOB);
|
||||
CREATE TABLE IF NOT EXISTS events (id BLOB, event BLOB);
|
||||
").unwrap();
|
||||
self.connection.execute("
|
||||
INSERT INTO data SELECT ? as tb, id as key, event as value FROM events;
|
||||
", (TABLE_EVENTS, )).unwrap();
|
||||
}
|
||||
|
||||
fn table(&self, name: impl Into<String>) -> Self::Table {
|
||||
Table {
|
||||
connection: self.connection.clone(),
|
||||
name: name.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl table::Table for Table {
|
||||
type Query = Query;
|
||||
|
||||
fn query(&self, selector: table::Selector) -> Self::Query {
|
||||
Query {
|
||||
selector,
|
||||
table: self.clone(),
|
||||
_reversed: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn query_reverse(&self, selector: table::Selector) -> Self::Query {
|
||||
Query {
|
||||
selector,
|
||||
table: self.clone(),
|
||||
_reversed: true,
|
||||
}
|
||||
}
|
||||
|
||||
fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
|
||||
let mut s = self.connection.prepare("SELECT value FROM data WHERE tb = ? AND key = ?").unwrap();
|
||||
match s.query_row((&self.name, key.to_vec()), |f| f.get(0)) {
|
||||
Ok(row) => row,
|
||||
Err(rusqlite::Error::QueryReturnedNoRows) => None,
|
||||
Err(err) => Err(err).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
fn put(&self, key: &[u8], value: &[u8]) {
|
||||
if self.name == TABLE_EVENTS {
|
||||
let mut s = self.connection.prepare("INSERT INTO events VALUES (?, ?)").unwrap();
|
||||
s.execute((key, value)).unwrap();
|
||||
}
|
||||
let mut s = self.connection.prepare("INSERT INTO data (tb, key, value) VALUES (?, ?, ?)").unwrap();
|
||||
s.execute((&self.name, key, value)).unwrap();
|
||||
}
|
||||
|
||||
fn delete(&self, _key: &[u8]) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl table::Query for Query {
|
||||
fn get_single(self) -> Option<Vec<u8>> {
|
||||
|
@ -41,7 +115,13 @@ impl table::Query for Query {
|
|||
}
|
||||
|
||||
fn get_all(self) -> Vec<Vec<u8>> {
|
||||
todo!()
|
||||
match self.selector {
|
||||
table::Selector::Exact(_) => todo!(),
|
||||
table::Selector::Prefix(p) => {
|
||||
let mut s = self.table.connection.prepare("SELECT value FROM data WHERE tb = ? AND substr(key, 0, ?) == ?").unwrap();
|
||||
s.query_map((&self.table.name, p.len(), p.to_vec()), |f| f.get(0)).unwrap().map(|r| r.unwrap()).collect()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn get_iter(self) -> impl Iterator<Item = Vec<u8>> {
|
||||
|
@ -52,36 +132,3 @@ impl table::Query for Query {
|
|||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl table::Database for Database {
|
||||
type Query = Query;
|
||||
|
||||
fn reset(&self) {
|
||||
self.connection.execute_batch("
|
||||
DROP TABLE IF EXISTS data;
|
||||
CREATE TABLE data (key BLOB, value BLOB);
|
||||
").unwrap();
|
||||
}
|
||||
|
||||
fn query(&self, _selector: table::Selector) -> Self::Query {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn query_reverse(&self, _selector: table::Selector) -> Self::Query {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
|
||||
let mut s = self.connection.prepare("SELECT value FROM data WHERE key = ?").unwrap();
|
||||
s.query_row([key.to_vec()], |f| f.get(0)).ok()
|
||||
}
|
||||
|
||||
fn put(&self, key: &[u8], value: &[u8]) {
|
||||
let mut s = self.connection.prepare("INSERT INTO data (key, value) VALUES (?, ?)").unwrap();
|
||||
s.execute((key, value)).unwrap();
|
||||
}
|
||||
|
||||
fn delete(&self, _key: &[u8]) {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,6 +12,8 @@ use std::{
|
|||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use super::table::Database;
|
||||
|
||||
#[derive(Hash, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
#[serde(try_from = "&str", into = "String")]
|
||||
pub struct EventId(EventIdData);
|
||||
|
@ -152,7 +154,7 @@ impl<T: Debug + Serialize + Clone> Event<T> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn verify_room<R: Resolver<EventType = T>>(&self, room: &Room<R>) -> Result<()> {
|
||||
pub fn verify_room<R: Resolver<EventType = T>, D: Database>(&self, room: &Room<R, D>) -> Result<()> {
|
||||
self.verify()?;
|
||||
|
||||
let room_config = match &room.get_root().content {
|
||||
|
|
|
@ -21,16 +21,19 @@ pub trait Resolver {
|
|||
|
||||
/// TEMP: Get the name/id of this resolver
|
||||
fn name(&self) -> &str;
|
||||
|
||||
/// Get the schema of this resolver
|
||||
fn schema(&self) -> Vec<String>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Verification {
|
||||
/// This event is valid
|
||||
Valid,
|
||||
|
||||
|
||||
/// This event's data makes sense, but the sender doesn't have permission to send it
|
||||
Unauthorized,
|
||||
|
||||
|
||||
/// This event contains invalid data
|
||||
Invalid,
|
||||
}
|
||||
|
@ -40,24 +43,39 @@ pub enum Verification {
|
|||
pub enum Command {
|
||||
/// Insert a new row into the database
|
||||
Put {
|
||||
table: String,
|
||||
key: Vec<u8>,
|
||||
value: Vec<u8>,
|
||||
},
|
||||
|
||||
/// Delete an existing row
|
||||
Delete { key: Vec<u8> },
|
||||
|
||||
Delete { table: String, key: Vec<u8> },
|
||||
// /// Notify someone outside of the room that they can join
|
||||
// Invite {},
|
||||
|
||||
|
||||
// /// Add to notification counters (for mentions)
|
||||
// // NOTE: maybe instead of this, there could be a special notifications table?
|
||||
// Notify {},
|
||||
}
|
||||
|
||||
impl Command {
|
||||
pub fn put(key: impl Into<Vec<u8>>, value: impl Into<Vec<u8>>) -> Self {
|
||||
Command::Put { key: key.into(), value: value.into() }
|
||||
pub fn put(
|
||||
table: impl Into<String>,
|
||||
key: impl Into<Vec<u8>>,
|
||||
value: impl Into<Vec<u8>>,
|
||||
) -> Self {
|
||||
Command::Put {
|
||||
table: table.into(),
|
||||
key: key.into(),
|
||||
value: value.into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn delete(table: impl Into<String>, key: impl Into<Vec<u8>>) -> Self {
|
||||
Command::Delete {
|
||||
table: table.into(),
|
||||
key: key.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -79,7 +97,6 @@ pub fn sort<T: Debug + Serialize + Clone>(
|
|||
let (mut heads, mut unsorted): (Vec<_>, Vec<_>) = events
|
||||
.iter()
|
||||
.partition(|event| event.references().is_empty());
|
||||
assert!(heads.len() == 1);
|
||||
while let Some(ev) = heads.pop() {
|
||||
references.retain(|(parent_id, _)| parent_id != ev.id());
|
||||
sorted.push(ev);
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
use proto::table::Selector;
|
||||
|
||||
use crate::{
|
||||
actor::ActorSecret,
|
||||
event::EventId,
|
||||
|
@ -8,23 +10,27 @@ use crate::{
|
|||
};
|
||||
use std::fmt::Debug;
|
||||
|
||||
use super::table::Database;
|
||||
use super::table::{Database, Query, Table};
|
||||
|
||||
pub const TABLE_EVENTS: &str = "_events";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Room<R: Resolver> {
|
||||
pub events: Vec<Event<R::EventType>>,
|
||||
pub heads: Vec<EventId>,
|
||||
pub resolver: R,
|
||||
pub struct Room<R: Resolver, D: Database> {
|
||||
root: Event<R::EventType>,
|
||||
heads: Vec<EventId>,
|
||||
resolver: R,
|
||||
database: D,
|
||||
}
|
||||
|
||||
impl<R: Resolver> Room<R> {
|
||||
pub fn builder() -> RoomBuilder<R> {
|
||||
impl<R: Resolver, D: Database> Room<R, D> {
|
||||
pub fn builder() -> RoomBuilder<R, D> {
|
||||
RoomBuilder::new()
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
resolver_name: impl Into<String>,
|
||||
resolver: R,
|
||||
database: D,
|
||||
hasher: HashType,
|
||||
signer: SignatureType,
|
||||
secret: &ActorSecret,
|
||||
|
@ -39,102 +45,122 @@ impl<R: Resolver> Room<R> {
|
|||
)
|
||||
.then_hash()?
|
||||
.and_sign()?;
|
||||
let id_bytes = base_event.id().to_string();
|
||||
let id_bytes = id_bytes.as_bytes();
|
||||
database
|
||||
.table(TABLE_EVENTS)
|
||||
.put(id_bytes, &serde_json::to_vec(&base_event).unwrap());
|
||||
Ok(Self {
|
||||
heads: vec![base_event.id().clone()],
|
||||
events: vec![base_event],
|
||||
root: base_event,
|
||||
resolver,
|
||||
database,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn from_root(resolver: R, event: Event<R::EventType>) -> Result<Self> {
|
||||
pub fn from_root(resolver: R, database: D, event: Event<R::EventType>) -> Result<Self> {
|
||||
Ok(Self {
|
||||
heads: vec![event.id().clone()],
|
||||
events: vec![event],
|
||||
root: event,
|
||||
resolver,
|
||||
database,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_root(&self) -> &Event<R::EventType> {
|
||||
self.events
|
||||
.first()
|
||||
.as_ref()
|
||||
.expect("room always has a root event")
|
||||
&self.root
|
||||
}
|
||||
|
||||
pub fn get_resolver(&self) -> &R {
|
||||
&self.resolver
|
||||
}
|
||||
|
||||
pub fn resolve_state<D>(&self, state: &mut D)
|
||||
where
|
||||
D: proto::table::Database,
|
||||
{
|
||||
pub fn resolve_state(&self) {
|
||||
let resolver = self.get_resolver();
|
||||
let sorted = sort(|a, b| resolver.tiebreak(a, b), &self.events);
|
||||
state.reset();
|
||||
// ideally i don't get *all* events up front, and query the db as needed
|
||||
let events = self.all_events();
|
||||
let sorted = sort(|a, b| resolver.tiebreak(a, b), &events);
|
||||
self.database.reset();
|
||||
for event in sorted {
|
||||
let effects = resolver.resolve(state, event);
|
||||
let effects = resolver.resolve(&self.database, event);
|
||||
for effect in effects {
|
||||
match effect {
|
||||
Command::Put { key, value } => state.put(&key, &value),
|
||||
Command::Delete { key } => state.delete(&key),
|
||||
Command::Put { table, key, value } => {
|
||||
self.database.table(table).put(&key, &value)
|
||||
}
|
||||
Command::Delete { table, key } => self.database.table(table).delete(&key),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_event<D: Database>(
|
||||
pub fn create_event(
|
||||
&mut self,
|
||||
state: &mut D,
|
||||
event_content: CoreContent<R::EventType>,
|
||||
secret: &ActorSecret,
|
||||
) -> Result<&Event<R::EventType>> {
|
||||
) -> Result<Event<R::EventType>> {
|
||||
let event = Event::builder(event_content, secret)
|
||||
.with_references(std::mem::take(&mut self.heads))
|
||||
.then_hash()?
|
||||
.and_sign()?;
|
||||
self.append_event(state, event)
|
||||
self.append_event(event)
|
||||
}
|
||||
|
||||
pub fn append_event<D: Database>(&mut self, state: &mut D, event: Event<R::EventType>) -> Result<&Event<R::EventType>> {
|
||||
pub fn append_event(&mut self, event: Event<R::EventType>) -> Result<Event<R::EventType>> {
|
||||
event.verify_room(self).expect("event failed verification");
|
||||
match self.get_resolver().verify(state, &event) {
|
||||
Verification::Valid => {},
|
||||
match self.get_resolver().verify(&self.database, &event) {
|
||||
Verification::Valid => {}
|
||||
Verification::Unauthorized => panic!("unauthorized"),
|
||||
Verification::Invalid => panic!("invalid data"),
|
||||
}
|
||||
if self.events.iter().any(|p| p == &event) {
|
||||
let id_bytes = event.id().to_string();
|
||||
let id_bytes = id_bytes.as_bytes();
|
||||
if self.database.table(TABLE_EVENTS).get(id_bytes).is_some() {
|
||||
return Err(Error::AlreadyExists);
|
||||
}
|
||||
self.events.push(event);
|
||||
let event_ref = self.events.last().unwrap();
|
||||
self.heads.retain(|id| !event_ref.references().contains(id));
|
||||
self.heads.push(event_ref.id().clone());
|
||||
Ok(event_ref)
|
||||
self.database
|
||||
.table(TABLE_EVENTS)
|
||||
.put(id_bytes, &serde_json::to_vec(&event).unwrap());
|
||||
self.heads.retain(|id| !event.references().contains(id));
|
||||
self.heads.push(event.id().clone());
|
||||
Ok(event)
|
||||
}
|
||||
|
||||
pub fn get_heads(&self) -> &[EventId] {
|
||||
&self.heads
|
||||
}
|
||||
|
||||
// TEMP: get all events
|
||||
pub fn all_events(&self) -> Vec<Event<R::EventType>> {
|
||||
self.database
|
||||
.table(TABLE_EVENTS)
|
||||
.query(Selector::Prefix(vec![]))
|
||||
.get_all()
|
||||
.iter()
|
||||
.map(|bytes| serde_json::from_slice(bytes).expect("invalid data"))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RoomBuilder<R: Resolver> {
|
||||
pub struct RoomBuilder<R: Resolver, D: Database> {
|
||||
resolver: Option<R>,
|
||||
hasher: Option<HashType>,
|
||||
signer: Option<SignatureType>,
|
||||
database: Option<D>,
|
||||
}
|
||||
|
||||
impl<R: Resolver> Default for RoomBuilder<R> {
|
||||
impl<R: Resolver, D: Database> Default for RoomBuilder<R, D> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
resolver: None,
|
||||
hasher: None,
|
||||
signer: None,
|
||||
database: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Resolver> RoomBuilder<R> {
|
||||
impl<R: Resolver, D: Database> RoomBuilder<R, D> {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
@ -154,7 +180,12 @@ impl<R: Resolver> RoomBuilder<R> {
|
|||
self
|
||||
}
|
||||
|
||||
pub fn create(self, secret: &ActorSecret) -> Result<Room<R>> {
|
||||
pub fn with_database(mut self, database: D) -> Self {
|
||||
self.database = Some(database);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn create(self, secret: &ActorSecret) -> Result<Room<R, D>> {
|
||||
Room::new(
|
||||
self.resolver
|
||||
.as_ref()
|
||||
|
@ -162,6 +193,7 @@ impl<R: Resolver> RoomBuilder<R> {
|
|||
.name()
|
||||
.to_owned(),
|
||||
self.resolver.ok_or(Error::MissingBuilderData)?,
|
||||
self.database.ok_or(Error::MissingBuilderData)?,
|
||||
self.hasher.ok_or(Error::MissingBuilderData)?,
|
||||
self.signer.ok_or(Error::MissingBuilderData)?,
|
||||
secret,
|
||||
|
|
|
@ -1,16 +1,25 @@
|
|||
pub trait Database {
|
||||
type Table: Table;
|
||||
|
||||
fn table(&self, name: impl Into<String>) -> Self::Table;
|
||||
fn reset(&self);
|
||||
}
|
||||
|
||||
pub trait Table {
|
||||
type Query: Query;
|
||||
|
||||
fn query(&self, selector: Selector) -> Self::Query;
|
||||
fn query_reverse(&self, selector: Selector) -> Self::Query;
|
||||
|
||||
fn reset(&self);
|
||||
fn get(&self, key: &[u8]) -> Option<Vec<u8>>;
|
||||
fn put(&self, key: &[u8], value: &[u8]);
|
||||
fn delete(&self, key: &[u8]);
|
||||
}
|
||||
|
||||
pub struct Selector(Vec<u8>);
|
||||
pub enum Selector {
|
||||
Exact(Vec<u8>),
|
||||
Prefix(Vec<u8>),
|
||||
}
|
||||
|
||||
pub trait Query {
|
||||
fn get_single(self) -> Option<Vec<u8>>;
|
||||
|
@ -21,6 +30,6 @@ pub trait Query {
|
|||
|
||||
impl Into<Selector> for Vec<u8> {
|
||||
fn into(self) -> Selector {
|
||||
Selector(self)
|
||||
Selector::Exact(self)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue