From 3428100fe0982abc678bc32b662c7f5639ffa33e Mon Sep 17 00:00:00 2001 From: tezlm Date: Mon, 4 Mar 2024 16:31:16 -0800 Subject: [PATCH] random stuff --- Cargo.lock | 48 ++++ Cargo.toml | 1 + crates/cli/src/main.rs | 8 +- crates/cli/src/server.rs | 35 +-- crates/impls/Cargo.toml | 1 + crates/impls/src/stores/sqlite_async.rs | 35 ++- crates/proto/Cargo.toml | 1 + crates/proto/src/proto/event.rs | 32 +++ crates/proto/src/proto/mod.rs | 3 +- crates/proto/src/proto/room.rs | 4 +- crates/proto/src/proto/room2.rs | 340 +++++++++++++++++++++--- crates/proto/src/proto/room3.rs | 146 ++++++++++ crates/proto/src/proto/table/graph.rs | 3 +- crates/proto/src/proto/table/kv.rs | 38 ++- crates/query/Cargo.toml | 9 + crates/query/src/main.rs | 291 ++++++++++++++++++++ docs/protocol.md | 30 +++ 17 files changed, 950 insertions(+), 75 deletions(-) create mode 100644 crates/proto/src/proto/room3.rs create mode 100644 crates/query/Cargo.toml create mode 100644 crates/query/src/main.rs create mode 100644 docs/protocol.md diff --git a/Cargo.lock b/Cargo.lock index 7d00183..e208c9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -244,6 +244,12 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" +[[package]] +name = "beef" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a8241f3ebb85c056b509d4327ad0358fbbba6ffb340bf388f26350aeda225b1" + [[package]] name = "bitflags" version = "1.3.2" @@ -491,12 +497,14 @@ dependencies = [ "serde_json", "sha2", "thiserror", + "tokio", ] [[package]] name = "dag-resolve-impls" version = "0.1.0" dependencies = [ + "async-recursion", "dag-resolve", "futures", "rusqlite", @@ -1145,6 +1153,39 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +[[package]] +name = "logos" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "161971eb88a0da7ae0c333e1063467c5b5727e7fb6b710b8db4814eade3a42e8" +dependencies = [ + "logos-derive", +] + +[[package]] +name = "logos-codegen" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e31badd9de5131fdf4921f6473d457e3dd85b11b7f091ceb50e4df7c3eeb12a" +dependencies = [ + "beef", + "fnv", + "lazy_static", + "proc-macro2", + "quote", + "regex-syntax", + "syn 2.0.48", +] + +[[package]] +name = "logos-derive" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c2a69b3eb68d5bd595107c9ee58d7e07fe2bb5e360cc85b0f084dedac80de0a" +dependencies = [ + "logos-codegen", +] + [[package]] name = "matchit" version = "0.7.3" @@ -1422,6 +1463,13 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "query" +version = "0.1.0" +dependencies = [ + "logos", +] + [[package]] name = "quote" version = "1.0.35" diff --git a/Cargo.toml b/Cargo.toml index f0583dd..709f298 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "crates/proto", "crates/impls", "crates/cli", + "crates/query", ] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index b4501b1..8bea5c9 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -21,8 +21,6 @@ use dag_resolve_impls::{ }; use server::{HasRequest, HasResponse, ServerState, SyncRequest, SyncResponse}; -use crate::server::exec; - #[derive(Parser)] #[clap(version, about)] enum Cli { @@ -101,7 +99,7 @@ async fn resolve_state(room: &RoomOld, db: &Database) -> Result< room.resolve_state(ResolveInput::AllEvents(events))? } dag_resolve::room::ResolveState::ProcessEvent(idx, events, (step, req)) => { - room.resolve_state(ResolveInput::ProcessEvent(idx, events, (step, exec(req, &db).await)))? + room.resolve_state(ResolveInput::ProcessEvent(idx, events, (step, db.exec(req).await)))? }, dag_resolve::room::ResolveState::Done => break, } @@ -193,11 +191,11 @@ async fn send_event(state: &mut State, data: &str) -> Re }, AppendState::Step2(step, event, req) => { match req { - DatabaseRequest::Done => { + DatabaseRequest::Noop => { AppendInput::Step3(event) }, _ => { - let res = exec(req, &state.db).await; + let res = state.db.exec(req).await; AppendInput::Step2(step, res, event) } } diff --git a/crates/cli/src/server.rs b/crates/cli/src/server.rs index 15ada95..0ecc500 100644 --- a/crates/cli/src/server.rs +++ b/crates/cli/src/server.rs @@ -94,37 +94,6 @@ async fn route_has( Json(HasResponse { disjoint }) } -#[async_recursion::async_recursion] -pub async fn exec(req: DatabaseRequest, db: &Database) -> DatabaseResponse { - match req { - DatabaseRequest::Done => DatabaseResponse::Done, - DatabaseRequest::Reset => { - db.reset().await; - DatabaseResponse::Done - }, - DatabaseRequest::QuerySingle(_, _) => todo!(), - DatabaseRequest::QueryAll(_, _) => todo!(), - DatabaseRequest::QueryIter(_, _) => todo!(), - DatabaseRequest::QueryCount(_, _) => todo!(), - DatabaseRequest::QueryIterReverse(_, _) => todo!(), - DatabaseRequest::Get(table, key) => { - let kv = db.table(table).get(&key).await; - DatabaseResponse::Single(kv) - }, - DatabaseRequest::Put(table, key, value) => { - db.table(table).put(&key, &value).await; - DatabaseResponse::Done - }, - DatabaseRequest::Delete(table, key) => { - db.table(table).delete(&key).await; - DatabaseResponse::Done - }, - DatabaseRequest::WithData(data, req) => { - DatabaseResponse::WithData(data, Box::new(exec(*req, db).await)) - }, - } -} - async fn append(room: &mut RoomOld, db: &Database, event: Event) where R: Resolver { let mut step = room.append_event(AppendInput::Step1(event)).unwrap(); loop { @@ -134,8 +103,8 @@ async fn append(room: &mut RoomOld, db: &Database, event: Event { match req { - DatabaseRequest::Done => { - let res = exec(req, &db).await; + DatabaseRequest::Noop => { + let res = db.exec(req).await; room.append_event(AppendInput::Step2(step, res, event)).unwrap() }, _ => { diff --git a/crates/impls/Cargo.toml b/crates/impls/Cargo.toml index 09060b8..225025a 100644 --- a/crates/impls/Cargo.toml +++ b/crates/impls/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-recursion = "1.0.5" dag-resolve = { version = "0.1.0", path = "../proto" } futures = "0.3.30" rusqlite = { version = "0.30.0", features = ["bundled"] } diff --git a/crates/impls/src/stores/sqlite_async.rs b/crates/impls/src/stores/sqlite_async.rs index fa9436a..2414e95 100644 --- a/crates/impls/src/stores/sqlite_async.rs +++ b/crates/impls/src/stores/sqlite_async.rs @@ -1,7 +1,7 @@ //! Store state in a sqlite database use dag_resolve::{ - actor::{ActorId, ActorSecret}, event::EventId, proto::table, room::{TABLE_EVENTS, TABLE_HEADS} + actor::{ActorId, ActorSecret}, event::EventId, proto::table::{self, DatabaseRequest, DatabaseResponse}, room::{TABLE_EVENTS, TABLE_HEADS} }; use futures::{stream, Stream}; use sqlx::{query, sqlite::SqliteConnectOptions, SqlitePool}; @@ -304,3 +304,36 @@ impl<'a> Query<'a> { self.get_all().await.len() as u64 } } + +impl Database { + #[async_recursion::async_recursion] + pub async fn exec(&self, req: DatabaseRequest) -> DatabaseResponse { + match req { + DatabaseRequest::Noop => DatabaseResponse::Done, + DatabaseRequest::Reset => { + self.reset().await; + DatabaseResponse::Done + }, + DatabaseRequest::QuerySingle(_, _) => todo!(), + DatabaseRequest::QueryAll(_, _) => todo!(), + DatabaseRequest::QueryIter(_, _) => todo!(), + DatabaseRequest::QueryCount(_, _) => todo!(), + DatabaseRequest::QueryIterReverse(_, _) => todo!(), + DatabaseRequest::Get(table, key) => { + let kv = self.table(table).get(&key).await; + DatabaseResponse::Single(kv) + }, + DatabaseRequest::Put(table, key, value) => { + self.table(table).put(&key, &value).await; + DatabaseResponse::Done + }, + DatabaseRequest::Delete(table, key) => { + self.table(table).delete(&key).await; + DatabaseResponse::Done + }, + DatabaseRequest::WithData(data, req) => { + DatabaseResponse::WithData(data, Box::new(self.exec(*req).await)) + }, + } + } +} diff --git a/crates/proto/Cargo.toml b/crates/proto/Cargo.toml index 6a71d45..3b36886 100644 --- a/crates/proto/Cargo.toml +++ b/crates/proto/Cargo.toml @@ -17,3 +17,4 @@ serde = { version = "1.0.196", features = ["derive"] } serde_json = "1.0.113" sha2 = "0.10.8" thiserror = "1.0.56" +tokio = { version = "1.36.0", features = ["test-util", "macros"] } diff --git a/crates/proto/src/proto/event.rs b/crates/proto/src/proto/event.rs index 383e851..d87ee33 100644 --- a/crates/proto/src/proto/event.rs +++ b/crates/proto/src/proto/event.rs @@ -12,6 +12,8 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; +use super::room2::{Room as _, RoomIdle}; + #[derive(Hash, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[serde(try_from = "&str", into = "String")] pub struct EventId(EventIdData); @@ -176,6 +178,36 @@ impl Event { Ok(()) } + + pub fn verify_room2>( + &self, + room: &RoomIdle, + ) -> Result<()> { + self.verify()?; + + let room_config = match &room.get_root().content { + EventContent::Create(c) => c, + _ => unreachable!("the root should always be a create event"), + }; + + if room_config.hasher != self.id.get_type() { + return Err(Error::MismatchedHasher { + expected: room_config.hasher, + got: self.id.get_type(), + }); + } + + if room_config.signer != self.author.get_type() + || room_config.signer != self.signature.get_type() + { + return Err(Error::MismatchedSigner { + expected: room_config.signer, + got: self.author.get_type(), + }); + } + + Ok(()) + } pub fn id(&self) -> &EventId { &self.id diff --git a/crates/proto/src/proto/mod.rs b/crates/proto/src/proto/mod.rs index 2fa373f..ab0ea47 100644 --- a/crates/proto/src/proto/mod.rs +++ b/crates/proto/src/proto/mod.rs @@ -5,5 +5,6 @@ pub mod resolver; pub mod room; pub mod table; pub mod atoms; -mod room2; +pub mod room2; +mod room3; mod rpc; diff --git a/crates/proto/src/proto/room.rs b/crates/proto/src/proto/room.rs index 601df82..ea3489d 100644 --- a/crates/proto/src/proto/room.rs +++ b/crates/proto/src/proto/room.rs @@ -145,7 +145,7 @@ impl RoomOld { // ideally i don't get *all* events up front, and query the db as needed let resolver = self.get_resolver(); let sorted = sort(|a, b| resolver.tiebreak(a, b), &events).into_iter().cloned().collect(); - Ok(ResolveState::ProcessEvent(0, sorted, (0, DatabaseRequest::Done))) + Ok(ResolveState::ProcessEvent(0, sorted, (0, DatabaseRequest::Noop))) }, ResolveInput::ProcessEvent(idx, events, state) => { let resolver = self.get_resolver(); @@ -153,7 +153,7 @@ impl RoomOld { match resolver.resolve(state, &events[idx]) { Ok(()) => { if idx + 1 < events.len() { - Ok(ResolveState::ProcessEvent(idx + 1, events, (0, DatabaseRequest::Done))) + Ok(ResolveState::ProcessEvent(idx + 1, events, (0, DatabaseRequest::Noop))) } else { Ok(ResolveState::Done) } diff --git a/crates/proto/src/proto/room2.rs b/crates/proto/src/proto/room2.rs index 565409e..ec62604 100644 --- a/crates/proto/src/proto/room2.rs +++ b/crates/proto/src/proto/room2.rs @@ -1,15 +1,59 @@ -#![allow(unused)] - //! Possibly another way rooms could be handled? //! A state machine representing a room -use super::table::Selector; +use std::collections::VecDeque; -struct RoomIdle; -struct RoomRead; -struct RoomWrite; +use crate::{ + actor::ActorSecret, + event::{Event, EventContent, EventId}, + proto::table::DatabaseResponse, + resolver::{sort, Resolver, Verification}, + room::{TABLE_EVENTS, TABLE_HEADS}, +}; -enum RejectReason { +use super::table::{DatabaseRequest, Selector}; + +pub struct RoomIdle { + data: RoomData, +} + +pub struct RoomResolveReset { + data: RoomData, +} + +pub struct RoomResolveAllEvents { + data: RoomData, + req: DatabaseRequest, +} + +pub struct RoomResolveStep { + data: RoomData, + events: Vec>, + event_idx: u64, + event_step: u64, + event_res: Option, +} + +pub struct RoomCreateHeads { + data: RoomData, + req: DatabaseRequest, + event_content: EventContent, + secret: ActorSecret, +} + +pub struct RoomAppendVerify { + data: RoomData, + req: DatabaseRequest, + step: usize, + event: Event, +} + +pub struct RoomAppendWrite { + data: RoomData, + reqs: VecDeque, +} + +pub enum RejectReason { /// The event has invalid data Invalid, @@ -23,38 +67,274 @@ enum RejectReason { Panic, } -enum RoomOutput { - /// The event is valid and has been added - Accept(RoomIdle), - - /// The event has been rejected (but shouldn't necessarily be dropped) - Reject(RejectReason, RoomIdle), - - /// The room state machine is requesting to read data - Read(RoomRead), - - /// The room state machine is requesting to write data - Write(RoomWrite), +/// the internal data of the room +struct RoomData { + root: Event, + resolver: R, } -impl RoomIdle { - /// Enters into the event processing state. Pretend its a coroutine for processing an event. - pub fn process_event(self) -> RoomOutput { +pub trait Room { + fn get_root(&self) -> &Event; + + fn get_resolver(&self) -> &R; +} + +macro_rules! impl_room { + ($state:ident) => { + impl Room for $state { + fn get_root(&self) -> &Event<::EventType> { + &self.data.root + } + + fn get_resolver(&self) -> &R { + &self.data.resolver + } + } + }; +} + +impl_room!(RoomIdle); +impl_room!(RoomResolveReset); +impl_room!(RoomResolveStep); +impl_room!(RoomResolveAllEvents); +impl_room!(RoomCreateHeads); +impl_room!(RoomAppendVerify); +impl_room!(RoomAppendWrite); + +impl RoomIdle { + /// Process a new event from the network or appended locally. + pub fn process_event(self, event: Event) -> RoomAppendVerifyOutput { // try to append event here - todo!() + event + .verify_room2(&self) + .expect("event failed verification"); + + let a = (0, DatabaseResponse::Done); + match self.get_resolver().verify(a, &event) { + Ok(verification) => match verification { + Verification::Valid => RoomAppendVerifyOutput::Write(RoomAppendWrite { + data: self.data, + reqs: get_reqs::(&event), + }), + Verification::Unauthorized => panic!("unauthorized"), + Verification::Invalid => panic!("invalid data"), + }, + Err(req) => { + if req.is_read() { + RoomAppendVerifyOutput::Verify(RoomAppendVerify { + data: self.data, + req, + step: 0, + event, + }) + } else { + panic!("can't write during verify"); + } + } + } + } + + pub fn resolve_state(self) -> RoomResolveReset { + RoomResolveReset { data: self.data } + } + + /// creates a new event. call `append_event` afterwards + pub fn create_event(self, event_content: EventContent, secret: &ActorSecret) -> RoomCreateHeads { + RoomCreateHeads { + data: self.data, + req: DatabaseRequest::QueryAll(TABLE_HEADS.into(), Selector::Any), + event_content, + secret: secret.clone(), + } } } -impl RoomRead { +impl RoomCreateHeads { + pub fn done(self, res: DatabaseResponse) -> (RoomIdle, Event) { + let heads: Vec<_> = match res { + DatabaseResponse::Many(records) => records + .into_iter() + .map(|(id, _)| EventId::from_bytes(&id)) + .collect(), + _ => panic!(), + }; + let event = Event::builder(self.event_content, &self.secret) + .with_references(heads) + .then_hash() + .unwrap() + .and_sign() + .unwrap(); + let idle = RoomIdle { data: self.data }; + (idle, event) + } + + /// what to do + pub fn what(&self) -> &DatabaseRequest { + &self.req + } +} + +pub enum RoomAppendVerifyOutput { + Verify(RoomAppendVerify), + Write(RoomAppendWrite), +} + +pub enum RoomAppendWriteOutput { + Write(RoomAppendWrite), + Idle(RoomIdle), +} + +fn get_reqs(event: &Event) -> VecDeque { + let mut reqs = VecDeque::new(); + for head in event.references() { + reqs.push_back(DatabaseRequest::Delete(TABLE_HEADS.into(), head.to_bytes())); + } + reqs.push_back(DatabaseRequest::Put( + TABLE_EVENTS.into(), + event.id().to_bytes(), + serde_json::to_vec(&event).unwrap(), + )); + reqs.push_back(DatabaseRequest::Put( + TABLE_HEADS.into(), + event.id().to_bytes(), + vec![], + )); + reqs +} + +impl RoomAppendVerify { + /// Done reading (call with DatabaseResponse::Done first) + pub fn done(self, response: DatabaseResponse) -> RoomAppendVerifyOutput { + let a = (self.step as u64, response); + match self.get_resolver().verify(a, &self.event) { + Ok(verification) => match verification { + Verification::Valid => RoomAppendVerifyOutput::Write(RoomAppendWrite { + data: self.data, + reqs: get_reqs::(&self.event), + }), + Verification::Unauthorized => panic!("unauthorized"), + Verification::Invalid => panic!("invalid data"), + }, + Err(req) => { + if req.is_read() { + RoomAppendVerifyOutput::Verify(RoomAppendVerify { + data: self.data, + req, + step: self.step + 1, + event: self.event, + }) + } else { + panic!("can't write during verify"); + } + } + } + } + + /// what to do + pub fn what(&self) -> &DatabaseRequest { + &self.req + } +} + +impl RoomAppendWrite { /// Done reading - pub fn done(self, data: Vec) -> RoomOutput { - todo!() + pub fn done(mut self, response: DatabaseResponse) -> RoomAppendWriteOutput { + match response { + DatabaseResponse::Done => {}, + _ => panic!(), + } + if self.reqs.pop_front().is_none() { + RoomAppendWriteOutput::Idle(RoomIdle { data: self.data }) + } else { + RoomAppendWriteOutput::Write(self) + } + } + + /// what to do + pub fn what(&self) -> &DatabaseRequest { + self.reqs.front().as_ref().unwrap() } } -impl RoomWrite { - /// Done writing - pub fn done(self) -> RoomOutput { - todo!() +impl RoomResolveReset { + /// Done reading + pub fn done(self, res: DatabaseResponse) -> RoomResolveAllEvents { + match res { + DatabaseResponse::Done => RoomResolveAllEvents { + data: self.data, + req: DatabaseRequest::QueryAll(TABLE_EVENTS.into(), Selector::Any), + }, + _ => panic!(), + } + } + + /// what to do + pub fn what(&self) -> &DatabaseRequest { + &DatabaseRequest::Reset + } +} + +impl RoomResolveAllEvents { + /// Done reading + pub fn done(self, res: DatabaseResponse) -> RoomResolveStep { + match res { + DatabaseResponse::Many(records) => { + // ideally i don't get *all* events up front, and query the db as needed + let events: Vec<_> = records.into_iter().map(|(_, data)| serde_json::from_slice(&data).unwrap()).collect(); + let resolver = self.get_resolver(); + let sorted = sort(|a, b| resolver.tiebreak(a, b), &events).into_iter().cloned().collect(); + RoomResolveStep { + data: self.data, + events: sorted, + event_idx: 0, + event_step: 0, + event_res: Some(DatabaseResponse::Done), + } + }, + _ => panic!(), + } + } + + /// what to do + pub fn what(&self) -> &DatabaseRequest { + &self.req + } +} + +pub enum RoomResolveStepOutput { + Step(RoomResolveStep), + Done(RoomIdle), +} + +impl RoomResolveStep { + /// Done with io + pub fn done(mut self, res: DatabaseResponse) -> RoomResolveStepOutput { + if (self.event_idx as usize) < self.events.len() { + assert!(self.event_res.is_none()); + self.event_res = Some(res); + RoomResolveStepOutput::Step(self) + } else { + RoomResolveStepOutput::Done(RoomIdle { data: self.data }) + } + } + + /// what to do + pub fn what(&mut self) -> DatabaseRequest { + let res = self.event_res.take().unwrap(); + match self.get_resolver().resolve((self.event_step, res), &self.events[self.event_idx as usize]) { + Ok(()) => { + self.event_idx += 1; + if (self.event_idx as usize) < self.events.len() { + self.event_res = Some(DatabaseResponse::Done); + self.what() + } else { + DatabaseRequest::Noop + } + }, + Err(req) => { + self.event_step += 1; + req + }, + } } } diff --git a/crates/proto/src/proto/room3.rs b/crates/proto/src/proto/room3.rs new file mode 100644 index 0000000..ede526c --- /dev/null +++ b/crates/proto/src/proto/room3.rs @@ -0,0 +1,146 @@ +#![allow(unused)] + +use std::{ + cell::OnceCell, + future::IntoFuture, + pin::Pin, + sync::{Arc, OnceLock}, + task::{Context, Poll, Wake, Waker}, +}; + +use futures::{Future, FutureExt}; + +async fn verify(db: Db) { + println!("getting..."); + assert_eq!(db.get().await, vec![123]); + println!("...got!"); +} + +struct CommandGet<'a, S: DbSync, A: DbAsync> { + db: &'a Db, + data: Option>, +} + +enum DbType { + Sync(S), + Async(A), +} + +struct FakeSync; +impl DbSync for FakeSync { + fn get(&self) -> Vec { + todo!() + } +} + +struct FakeAsync; +impl DbAsync for FakeAsync { + async fn get(&self) -> Vec { + todo!() + } +} + +impl<'a, S: DbSync, A: DbAsync> Future for CommandGet<'a, S, A> { + type Output = Vec; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Some(data) = self.data.take() { + Poll::Ready(data) + } else { + match &self.db.inner { + DbType::Sync(s) => { + self.data = Some(s.get()); + cx.waker().wake_by_ref(); + Poll::Pending + }, + DbType::Async(a) => { + Box::pin(a.get().into_future()).poll_unpin(cx) + }, + } + } + } +} + +struct Db { + inner: DbType, +} + +impl Db { + fn new(inner: DbType) -> Self { + Db { inner } + } + + fn get(&self) -> CommandGet<'_, S, A> { + CommandGet { + db: self, + data: None, + } + } +} + +trait DbImpl { + fn get(&self, cb: &mut dyn FnMut(Vec)); +} + +trait DbSync { + fn get(&self) -> Vec; +} + +trait DbAsync { + async fn get(&self) -> Vec; +} + +struct DbWaker; + +impl Wake for DbWaker { + fn wake(self: Arc) { + // nothing yet... + } +} + +fn run_sync(d: D) { + let db = Db::new(DbType::<_, FakeAsync>::Sync(d)); + let mut ver = Box::pin(verify(db)); + let waker = Waker::from(Arc::new(DbWaker)); + let mut ctx = Context::from_waker(&waker); + loop { + match ver.as_mut().poll(&mut ctx) { + Poll::Ready(_) => break, + Poll::Pending => {} + } + } +} + +async fn run_async(d: D) { + verify(Db::new(DbType::::Async(d))).await; +} + +#[test] +fn test_sync() { + struct TestDb; + impl DbSync for TestDb { + fn get(&self) -> Vec { + vec![123] + } + } + + impl DbImpl for TestDb { + fn get(&self, cb: &mut dyn FnMut(Vec)) { + cb(DbSync::get(self)) + } + } + + run_sync(TestDb); +} + +#[tokio::test] +async fn test_async() { + struct TestDb; + impl DbAsync for TestDb { + async fn get(&self) -> Vec { + vec![123] + } + } + + run_async(TestDb).await; +} diff --git a/crates/proto/src/proto/table/graph.rs b/crates/proto/src/proto/table/graph.rs index 0758884..6105949 100644 --- a/crates/proto/src/proto/table/graph.rs +++ b/crates/proto/src/proto/table/graph.rs @@ -3,9 +3,10 @@ //! an experiment in making the database a graph db? /* -each database is a list of facts and indexes matching a schema +each database is a list of facts, rules, and indexes matching a schema each fact is an (entity, property, value) tuple each fact can only be looked up by an index or reference +each rule generates more facts (like a materialized view) */ struct Database { diff --git a/crates/proto/src/proto/table/kv.rs b/crates/proto/src/proto/table/kv.rs index 2cf5510..b18a898 100644 --- a/crates/proto/src/proto/table/kv.rs +++ b/crates/proto/src/proto/table/kv.rs @@ -2,7 +2,7 @@ type Bytes = Vec; #[derive(Debug)] pub enum DatabaseRequest { - Done, + Noop, Reset, QuerySingle(String, Selector), QueryAll(String, Selector), @@ -15,7 +15,7 @@ pub enum DatabaseRequest { WithData(Vec, Box), } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum DatabaseResponse { Done, Single(Option<(Bytes, Bytes)>), @@ -37,3 +37,37 @@ impl From> for Selector { Selector::Exact(value) } } + +impl DatabaseRequest { + pub fn is_read(&self) -> bool { + match self { + DatabaseRequest::Noop => true, + DatabaseRequest::Reset => false, + DatabaseRequest::QuerySingle(_, _) => true, + DatabaseRequest::QueryAll(_, _) => true, + DatabaseRequest::QueryIter(_, _) => true, + DatabaseRequest::QueryCount(_, _) => true, + DatabaseRequest::QueryIterReverse(_, _) => true, + DatabaseRequest::Get(_, _) => true, + DatabaseRequest::Put(_, _, _) => false, + DatabaseRequest::Delete(_, _) => false, + DatabaseRequest::WithData(_, q) => q.is_read(), + } + } +} + +// trait DatabaseImpl { +// fn reset(&self, callback: dyn FnOnce()); +// fn query(&self, table: &str, selector: Selector, callback: dyn FnOnce(Vec<(Bytes, Bytes)>)); +// fn get(&self, table: &str, key: Bytes, callback: dyn FnOnce(Vec<(Bytes, Bytes)>)); +// fn put(&self, table: &str, key: Bytes, value: Bytes, callback: dyn FnOnce()); +// fn delete(&self, table: &str, key: Bytes, callback: dyn FnOnce()); +// } + +// trait NetworkImpl { +// fn foo(); +// } + +// trait RoomImpl { +// fn handle(); +// } diff --git a/crates/query/Cargo.toml b/crates/query/Cargo.toml new file mode 100644 index 0000000..a0c6775 --- /dev/null +++ b/crates/query/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "query" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +logos = "0.14.0" diff --git a/crates/query/src/main.rs b/crates/query/src/main.rs new file mode 100644 index 0000000..478a79f --- /dev/null +++ b/crates/query/src/main.rs @@ -0,0 +1,291 @@ +#![allow(unused)] + +use logos::{Lexer, Logos}; + +#[derive(Logos, Debug, PartialEq)] +#[logos(skip r"[ \t\n\f]+")] // Ignore this regex pattern between tokens +enum Token { + #[token("[")] + OpenBracket, + + #[token("]")] + CloseBracket, + + #[token("_")] + Blank, + + #[regex(":[a-zA-Z/]+")] + Atom, + + #[regex("\\?[a-zA-Z]+")] + Thing, + + #[regex("[a-zA-Z]+")] + Identifier, + + #[regex("[0-9]+")] + Number, +} + +#[derive(Debug, PartialEq)] +struct Parsed { + root: Data, +} + +#[derive(Debug, PartialEq)] +enum Data { + Atom(String), + Thing(String), + Identifier(String), + Number(u64), + Tuple(Vec), + Blank, +} + +fn parse(lex: &mut Lexer<'_, Token>, token: Token) -> Data { + match token { + Token::OpenBracket => { + let mut values = vec![]; + loop { + let data = match lex.next().expect("missing token").unwrap() { + Token::CloseBracket => break, + token => parse(lex, token), + }; + values.push(data); + } + Data::Tuple(values) + }, + Token::CloseBracket => panic!("unexpected closing bracket"), + Token::Atom => Data::Atom(lex.slice().to_string()), + Token::Thing => Data::Thing(lex.slice().to_string()), + Token::Identifier => Data::Identifier(lex.slice().to_string()), + Token::Number => Data::Number(lex.slice().parse().unwrap()), + Token::Blank => Data::Blank, + } +} + +#[test] +fn basic() { + let mut lex = Token::lexer(" + [:find ?name :where + [?post :post/id 123] + [?post :post/author ?author] + [?author :user/name ?name]]"); + let token = lex.next().expect("unexpected eof").unwrap(); + let parsed = Parsed { + root: parse(&mut lex, token), + }; + use Data::*; + let target = Parsed { + root: Tuple(vec![ + Atom(":find".into()), Thing("?name".into()), Atom(":where".into()), + Tuple(vec![Thing("?post".into()), Atom(":post/id".into()), Number(123)]), + Tuple(vec![Thing("?post".into()), Atom(":post/author".into()), Thing("?author".into())]), + Tuple(vec![Thing("?author".into()), Atom(":user/name".into()), Thing("?name".into())]), + ], + ), + }; + assert_eq!(parsed, target); +} + +enum Command { + /// define a schema + Schema(CommandSchema), + + /// query the database + Find(CommandFind), + + /// insert some triples + Insert(CommandInsert), + + /// update or delete some triples + Update(CommandUpdate), + + /// get info and stats + Info, +} + +struct CommandFind { + /// return these items + select: Vec, + + /// using these inputs + inputs: Vec, + + /// with these filters + filter: Vec, + + /// ordered by these keys + order: Vec, +} + +struct CommandSchema { + item: SchemaItem, +} + +struct CommandInsert { + triples: Vec, +} + +struct CommandUpdate { + /// using these inputs + inputs: Vec, + + /// with these filters + filter: Vec, + + /// do this upate + update: Vec, +} + +enum Update { + /// add a triple to an entity + Append { + property: String, + data: Data, + }, + + /// set a triple + Set { + property: String, + data: Data, + }, + + /// update a triple + Update { + property: String, + func: Operator, + }, + + /// remote a triple + Remove { + property: String, + data: Option, + }, + + /// remote the entity a triple points to + RemoveEntity, +} + +struct Triple { + entity: u64, + property: String, + data: Data, +} + +enum SchemaItem { + /// a new thing + Thing { + name: String, + documentation: Option, + cardinality: Cardinality, + }, + + /// a value on a thing + Value { + thing: String, + name: String, + ty: Type, + cardinality: Cardinality, + documentation: Option, + unique: bool, // implemented as a custom predicate? + predicate: Filter, + index: Vec, + // at: Option, + }, + + /// materialized view/stored procedure + Rule { + name: String, + find: CommandFind, + }, + + /// like a rule but asserts data instead of generating it + Check { + name: String, + find: Vec, + }, +} + +enum Index { + Fulltext, + Lookup, + Ordered, +} + +enum Filter { + Equals { + left: Data, + right: Data, + name: Data, + }, + Operator(Operator), +} + +enum Operator { + Not(Box), + Any(Vec), + All(Vec), + Less(Data, Data), + Greater(Data, Data), + LessEqual(Data, Data), + GreaterEqual(Data, Data), + Add(Data, Data), + Sub(Data, Data), + Mul(Data, Data), + Div(Data, Data), + FullText(Data, Data), + Missing(Data, Data), + Rule(Vec), +} + +enum Selection { + Thing(usize), + Count(usize), + Min(usize), + Max(usize), + Sum(usize), + First(usize), + Last(usize), + Head(usize), + Tail(usize), +} + +enum Type { + UInt, + Int, + Float, + Instant, + String, + Text, + Bytes, + Ref, + + /// `[:enum rel/follows rel/blocks]` to only allow one or the other + /// `[:enum none [some value]]` where `value` is another column + /// overrides that column's cardinality? how does this work? + Enum, +} + +#[derive(Debug, Default, PartialEq, Eq)] +enum Cardinality { + One, + ZeroOne, + OnePlus, + + #[default] + ZeroPlus, +} + +fn main() { + let mut lex = Token::lexer(" + [:find ?name :where + [?post :post/id 123] + [?post :post/author ?author] + [?author :user/name ?name]]"); + let token = lex.next().expect("unexpected eof").unwrap(); + let parsed = Parsed { + root: parse(&mut lex, token), + }; + dbg!(parsed); +} diff --git a/docs/protocol.md b/docs/protocol.md new file mode 100644 index 0000000..7200f34 --- /dev/null +++ b/docs/protocol.md @@ -0,0 +1,30 @@ +# protocol + +there's the core protocol. each room contains a directed acyclic graph +of events. each event is signed and represents one atomic mutation. each +room has some immutable config set by the root event. resolvers contain +tiebreaking, a verifier, a reducer. the reducer takes in the events and +outputs state/a database. rooms can be synced between any two nodes, +whether they're server-server, client-server, or peer-peer. + +the database. not sure if it will be key value, relational, graph, +triplestore? it needs to be fairly flexible to support a wide variety of +use cases. query peers for database directly without needing to sync room, +for peeking or custom feeds/search indexes. most features are implemented +as special server-generated databases. + +the frontend. email/forum-like discussion and documents like the web or +a git repo are the two main planned things. ideally it would be efficient +with your time, bringing back inboxes. + +## database + +key/value sucks in general. + +relational is well established and works pretty well. i think a +relational database with datalog-like querying would be pretty good +overall, but having relations be tables is still a bit annoying and +relational dbs don't seem to work with tagged enums particularily well. + +i think triplestores with a datalog dialect would be interesting. this +is the best contender right now, with much simpler joins than sql.