diff --git a/.env b/.env new file mode 100644 index 0000000..01e26bf --- /dev/null +++ b/.env @@ -0,0 +1,4 @@ +# comment + +DATABASE_URL=sqlite://repo.db +DATABASE_URL2=sqlite://repo.db diff --git a/.envrc b/.envrc new file mode 100644 index 0000000..3563ad1 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +FOO=123 diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index b2f2ce9..4e5c3e5 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -11,7 +11,7 @@ use dag_resolve::{ event::{Event, EventContent, EventId, HashType, SignatureType}, proto::table::{Database as _, Query, Selector, Table}, resolver::Resolver, - room::{Room, TABLE_EVENTS}, + room::{RoomOld, TABLE_EVENTS}, }; use dag_resolve_impls::{ resolvers::{ @@ -48,33 +48,34 @@ enum Cli { #[derive(Debug)] struct State { - room: Room, + room: RoomOld, secret: ActorSecret, + db: Database, } impl State { async fn init(resolver: R, path: &Path) -> Result { // create new repo println!("init new repo!"); - let db = Database::open(path.to_str().unwrap()).await?.init().await?; + let mut db = Database::open(path.to_str().unwrap()).await?.init().await?; let (actor, secret) = ActorId::new(SignatureType::Ed25519); - let room = Room::builder() + let room = RoomOld::builder() .with_resolver(resolver) .with_hasher(HashType::Sha256) .with_signer(SignatureType::Ed25519) - .with_database(db) + .with_database(&mut db) .create(&secret) .await?; - room.get_state() + db .set_config(&actor, &secret, room.get_root().id()) .await; - room.resolve_state().await; - Ok(State { room, secret }) + room.resolve_state(&mut db).await; + Ok(State { room, secret, db }) } } async fn open(path: impl AsRef) -> Result { - let db = Database::open(path.as_ref().to_str().unwrap()) + let mut db = Database::open(path.as_ref().to_str().unwrap()) .await? .init() .await?; @@ -89,15 +90,15 @@ async fn open(path: impl AsRef) -> Result { match event.content() { EventContent::Create(c) => match c.resolver.as_str() { "kv" => { - let room = Room::from_root(KVResolver, db, serde_json::from_slice(&event_data)?)?; - room.resolve_state().await; - Ok(Opened::Kv(State { room, secret })) + let room = RoomOld::from_root(KVResolver, serde_json::from_slice(&event_data)?)?; + room.resolve_state(&mut db).await; + Ok(Opened::Kv(State { room, secret, db })) } "forum-v0" => { let room = - Room::from_root(ForumResolver, db, serde_json::from_slice(&event_data)?)?; - room.resolve_state().await; - Ok(Opened::Forum(State { room, secret })) + RoomOld::from_root(ForumResolver, serde_json::from_slice(&event_data)?)?; + room.resolve_state(&mut db).await; + Ok(Opened::Forum(State { room, secret, db })) } _ => unimplemented!("unknown resolver"), }, @@ -130,15 +131,15 @@ async fn startup(path: impl AsRef, create: Option<&str>) -> Result async fn send_event(state: &mut State, data: &str) -> Result<()> { state .room - .create_event(dbg!(serde_json::from_str(data)?), &state.secret) + .create_event(dbg!(serde_json::from_str(data)?), &state.secret, &mut state.db) .await?; - state.room.resolve_state().await; + state.room.resolve_state(&mut state.db).await; Ok(()) } -async fn print_info(state: State) -> Result<()> { - state.room.resolve_state().await; - let db = state.room.get_state(); +async fn print_info(state: &mut State) -> Result<()> { + state.room.resolve_state(&mut state.db).await; + let db = &mut state.db; let event_count = db .table(TABLE_EVENTS) .query(Selector::Prefix(vec![])) @@ -185,7 +186,7 @@ repo.room.resolve_state().await; async fn sync_state(state: &mut State, remote: &str) -> Result<()> { let http = reqwest::Client::new(); - let ids_all = state.room.all_events().await.iter().rev().map(|ev| ev.id().to_owned()).collect(); + let ids_all = state.room.all_events(&mut state.db).await.iter().rev().map(|ev| ev.id().to_owned()).collect(); let has: HasResponse = http .post(format!("{remote}/has")) .json(&HasRequest { @@ -201,7 +202,7 @@ async fn sync_state(state: &mut State, remote: &str) -> Result<( .post(format!("{remote}/sync")) .json(&SyncRequest:: { root_id: state.room.get_root().id().to_owned(), - events: state.room.all_events().await.into_iter().filter(|ev| missing_ids.contains(ev.id())).collect(), + events: state.room.all_events(&mut state.db).await.into_iter().filter(|ev| missing_ids.contains(ev.id())).collect(), }) .send() .await? @@ -228,8 +229,8 @@ async fn main() -> Result<()> { (Cli::Send { repo: _, data }, Opened::Forum(mut state)) => { send_event(&mut state, &data).await? } - (Cli::Info { repo: _ }, Opened::Kv(state)) => print_info(state).await?, - (Cli::Info { repo: _ }, Opened::Forum(state)) => print_info(state).await?, + (Cli::Info { repo: _ }, Opened::Kv(mut state)) => print_info(&mut state).await?, + (Cli::Info { repo: _ }, Opened::Forum(mut state)) => print_info(&mut state).await?, (Cli::Sync { repo: _, remote: _ }, Opened::Kv(_kv)) => todo!(), (Cli::Sync { repo: _, remote }, Opened::Forum(mut state)) => { sync_state(&mut state, &remote).await? diff --git a/crates/cli/src/server.rs b/crates/cli/src/server.rs index 66ce2e7..6c8a94c 100644 --- a/crates/cli/src/server.rs +++ b/crates/cli/src/server.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{ops::DerefMut, sync::Arc}; use anyhow::Result; use axum::extract::{Json, State}; @@ -70,8 +70,7 @@ async fn route_has( panic!("cannot sync events from two different rooms"); } - let event_table = repo.room.get_state() - .table(TABLE_EVENTS); + let event_table = repo.db.table(TABLE_EVENTS); let our_ids: Vec = event_table .query(Selector::Prefix(vec![])) .get_iter() @@ -105,13 +104,13 @@ async fn route_sync( panic!("cannot sync events from two different rooms"); } - // a list of event ids would either be cached or easily queryable + let repo = repo.deref_mut(); for event in req.events { - repo.room.append_event(event).await.unwrap(); + repo.room.append_event(event, &mut repo.db).await.unwrap(); } // this doesn't work and i don't know why - // repo.room.resolve_state().await; + // repo.room.resolve_state(&mut repo.db).await; Json(SyncResponse {}) } diff --git a/crates/proto/src/lib.rs b/crates/proto/src/lib.rs index 13a7680..fbdb774 100644 --- a/crates/proto/src/lib.rs +++ b/crates/proto/src/lib.rs @@ -2,4 +2,4 @@ pub mod error; pub mod proto; pub use error::{Error, Result}; -pub use proto::{actor, event, resolver, room}; +pub use proto::{actor, event, resolver, room, atoms}; diff --git a/crates/proto/src/proto/atoms.rs b/crates/proto/src/proto/atoms.rs index 863c93a..807dc12 100644 --- a/crates/proto/src/proto/atoms.rs +++ b/crates/proto/src/proto/atoms.rs @@ -21,5 +21,6 @@ impl Display for RoomId { } } -/// A media/blob identifier -pub struct MediaId(String); +// TODO: how will this be implemented? content based addressing per-room? +// /// A media/blob identifier +// pub struct MediaId(String); diff --git a/crates/proto/src/proto/event.rs b/crates/proto/src/proto/event.rs index 13a3ef7..383e851 100644 --- a/crates/proto/src/proto/event.rs +++ b/crates/proto/src/proto/event.rs @@ -1,7 +1,7 @@ use crate::{ actor::{ActorId, ActorSecret, ActorSignature}, resolver::Resolver, - room::Room, + room::RoomOld, Error, Result, }; use base64::{engine::general_purpose::URL_SAFE_NO_PAD as b64engine, Engine}; @@ -12,8 +12,6 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; -use super::table::Database; - #[derive(Hash, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[serde(try_from = "&str", into = "String")] pub struct EventId(EventIdData); @@ -149,9 +147,9 @@ impl Event { Ok(()) } - pub fn verify_room, D: Database>( + pub fn verify_room>( &self, - room: &Room, + room: &RoomOld, ) -> Result<()> { self.verify()?; diff --git a/crates/proto/src/proto/mod.rs b/crates/proto/src/proto/mod.rs index d275505..2fa373f 100644 --- a/crates/proto/src/proto/mod.rs +++ b/crates/proto/src/proto/mod.rs @@ -5,3 +5,5 @@ pub mod resolver; pub mod room; pub mod table; pub mod atoms; +mod room2; +mod rpc; diff --git a/crates/proto/src/proto/room.rs b/crates/proto/src/proto/room.rs index 8752f18..3673b28 100644 --- a/crates/proto/src/proto/room.rs +++ b/crates/proto/src/proto/room.rs @@ -16,21 +16,20 @@ pub const TABLE_EVENTS: &str = "_events"; pub const TABLE_HEADS: &str = "_heads"; #[derive(Debug)] -pub struct Room { +pub struct RoomOld { root: Event, resolver: R, - database: D, } -impl Room { - pub fn builder() -> RoomBuilder { +impl RoomOld { + pub fn builder() -> RoomBuilder<'static, R, D> { RoomBuilder::new() } - pub async fn new( + pub async fn new( resolver_name: impl Into, resolver: R, - database: D, + database: &mut D, hasher: HashType, signer: SignatureType, secret: &ActorSecret, @@ -56,15 +55,13 @@ impl Room { Ok(Self { root: base_event, resolver, - database, }) } - pub fn from_root(resolver: R, database: D, event: Event) -> Result { + pub fn from_root(resolver: R, event: Event) -> Result { Ok(Self { root: event, resolver, - database, }) } @@ -76,52 +73,53 @@ impl Room { &self.resolver } - pub async fn resolve_state(&self) { + pub async fn resolve_state(&self, db: &mut D) { let resolver = self.get_resolver(); // ideally i don't get *all* events up front, and query the db as needed - let events = self.all_events().await; + let events = self.all_events(db).await; let sorted = sort(|a, b| resolver.tiebreak(a, b), &events); - self.database.reset().await; + db.reset().await; for event in sorted { - let effects = resolver.resolve(&self.database, event).await; + let effects = resolver.resolve(db, event).await; for effect in effects { match effect { Command::Put { table, key, value } => { - self.database.table(table).put(&key, &value).await + db.table(table).put(&key, &value).await } - Command::Delete { table, key } => self.database.table(table).delete(&key).await, + Command::Delete { table, key } => db.table(table).delete(&key).await, } } } } - pub async fn create_event( + pub async fn create_event( &mut self, event_content: EventContent, secret: &ActorSecret, + db: &mut D, ) -> Result> { - let heads = self.get_heads().await; + let heads = self.get_heads(db).await; let event = Event::builder(event_content, secret) .with_references(heads) .then_hash()? .and_sign()?; - self.append_event(event).await + self.append_event(event, db).await } - pub async fn append_event( + pub async fn append_event( &mut self, event: Event, + db: &mut D, ) -> Result> { event.verify_room(self).expect("event failed verification"); - match self.get_resolver().verify(&self.database, &event).await { + match self.get_resolver().verify(db, &event).await { Verification::Valid => {} Verification::Unauthorized => panic!("unauthorized"), Verification::Invalid => panic!("invalid data"), } let id_bytes = event.id().to_string(); let id_bytes = id_bytes.as_bytes(); - if self - .database + if db .table(TABLE_EVENTS) .get(id_bytes) .await @@ -129,26 +127,26 @@ impl Room { { return Err(Error::AlreadyExists); } - self.database + db .table(TABLE_EVENTS) .put(id_bytes, &serde_json::to_vec(&event).unwrap()) .await; for head in dbg!(event.references()) { - self.database + db .table(TABLE_HEADS) .delete(&head.to_string().into_bytes()) .await; } - self.database + db .table(TABLE_HEADS) .put(&event.id().to_string().into_bytes(), &[]) .await; Ok(event) } - pub async fn get_heads(&self) -> Vec { - self.database + pub async fn get_heads(&self, db: &mut D) -> Vec { + db .table(TABLE_HEADS) .query(Selector::Prefix(vec![])) .get_all() @@ -159,8 +157,8 @@ impl Room { } // TEMP: get all events - pub async fn all_events(&self) -> Vec> { - self.database + pub async fn all_events(&self, db: &mut D) -> Vec> { + db .table(TABLE_EVENTS) .query(Selector::Any) .get_all() @@ -169,20 +167,16 @@ impl Room { .map(|(_, bytes)| serde_json::from_slice(bytes).expect("invalid data")) .collect() } - - pub fn get_state(&self) -> &D { - &self.database - } } -pub struct RoomBuilder { +pub struct RoomBuilder<'a, R: Resolver, D: Database> { resolver: Option, hasher: Option, signer: Option, - database: Option, + database: Option<&'a mut D>, } -impl Default for RoomBuilder { +impl<'a, R: Resolver, D: Database> Default for RoomBuilder<'a, R, D> { fn default() -> Self { Self { resolver: None, @@ -193,7 +187,7 @@ impl Default for RoomBuilder { } } -impl RoomBuilder { +impl<'a, R: Resolver, D: Database> RoomBuilder<'a, R, D> { pub fn new() -> Self { Self::default() } @@ -213,13 +207,13 @@ impl RoomBuilder { self } - pub fn with_database(mut self, database: D) -> Self { + pub fn with_database(mut self, database: &'a mut D) -> Self { self.database = Some(database); self } - pub async fn create(self, secret: &ActorSecret) -> Result> { - Room::new( + pub async fn create(self, secret: &ActorSecret) -> Result> { + RoomOld::new( self.resolver .as_ref() .ok_or(Error::MissingBuilderData)? diff --git a/crates/proto/src/proto/room2.rs b/crates/proto/src/proto/room2.rs new file mode 100644 index 0000000..565409e --- /dev/null +++ b/crates/proto/src/proto/room2.rs @@ -0,0 +1,60 @@ +#![allow(unused)] + +//! Possibly another way rooms could be handled? +//! A state machine representing a room + +use super::table::Selector; + +struct RoomIdle; +struct RoomRead; +struct RoomWrite; + +enum RejectReason { + /// The event has invalid data + Invalid, + + /// The event exceeds some sort of limit + Limited, + + /// The sender doesn't have permission to send this event + Unauthorized, + + /// The resolver has a bug! + Panic, +} + +enum RoomOutput { + /// The event is valid and has been added + Accept(RoomIdle), + + /// The event has been rejected (but shouldn't necessarily be dropped) + Reject(RejectReason, RoomIdle), + + /// The room state machine is requesting to read data + Read(RoomRead), + + /// The room state machine is requesting to write data + Write(RoomWrite), +} + +impl RoomIdle { + /// Enters into the event processing state. Pretend its a coroutine for processing an event. + pub fn process_event(self) -> RoomOutput { + // try to append event here + todo!() + } +} + +impl RoomRead { + /// Done reading + pub fn done(self, data: Vec) -> RoomOutput { + todo!() + } +} + +impl RoomWrite { + /// Done writing + pub fn done(self) -> RoomOutput { + todo!() + } +} diff --git a/crates/proto/src/proto/rpc.rs b/crates/proto/src/proto/rpc.rs new file mode 100644 index 0000000..0d17e87 --- /dev/null +++ b/crates/proto/src/proto/rpc.rs @@ -0,0 +1,119 @@ +#![allow(unused)] + +//! ideas on how rpc could work + +use crate::{event::EventId, atoms::RoomId}; + +enum Rpc { + /// hello message each peer sends to each other before starting anything else + Hello(Hello), + + /// querying the peer's database + QueryRequest(String), + + /// response for a queryresponse + QueryResponse(Vec), + + /// get a root event + Root(RoomId), + + /// here is an event + Event(u64, Vec), + + /// whoops + Error(Vec), + + /// i want these events + Want(RoomId, Vec), + + /// i have these events + Have(RoomId, Vec), + + /// the heads this peer wants + WantHeads(Vec), + + /// the heads the other peer has + HaveHeads(Vec<(RoomId, Vec)>), + + /// receive new events/updates about these rooms + Subscribe(u64, Vec), + + /// stop receiving updates about this room + Unsubscribe(u64, Vec), + + /// extra info about a room, unrelated to the core protocol + OutOfBand(u64, RoomId, Vec), + + /// acknowledge a transaction + Ack(Vec), + + /// cleanly close the connection + Bye, +} + +enum PeerState { + /// The connection started + Connect, + + /// The connection is ready (Hellos sent) + Ready, + + /// The connection is closed + Closed, +} + +struct Hello { + /// the versions this peer supports + versions: Vec, +} + +/* +# syncing + +peers want to eagerly sync; if you have heads that the other peer doesn't they should try to sync + +1. start by sending Want(head) for each head +2. send Have(last_id) messages in a loop (batch for performance) +3. once they start sending you event (found the lowest common denominator), break and handle events +4. if there are multiple heads and you don't have all of them yet, go to 2 + +step 2 is the tricky bit: how do i send Have messages efficiently? +*/ + +// how a peer looks if it was fully synchronous +trait Peer { + /// connect and wait on Hellos + fn connect(); // -> Peer; + + /// disconnect + fn close(self); + + /// run a query on the peer + fn query(&self, q: &str) -> Vec; + + /// sync some rooms once + fn sync(&self, rooms: &[RoomId]); + + /// continually sync some rooms + fn subscribe(&self, rooms: &[RoomId]); + + /// stop syncing some rooms + fn unsubscribe(&self, rooms: &[RoomId]); +} + +/* +one off requests: + subscribe(ids) + unsubscribe(ids) + query(ids) -> response + heads(ids) -> response + root(ids) -> response + +streams: + event + outofband + +syncing requests: + have + want +*/ diff --git a/crates/proto/src/proto/table/graph.rs b/crates/proto/src/proto/table/graph.rs new file mode 100644 index 0000000..0758884 --- /dev/null +++ b/crates/proto/src/proto/table/graph.rs @@ -0,0 +1,139 @@ +#![allow(unused)] + +//! an experiment in making the database a graph db? + +/* +each database is a list of facts and indexes matching a schema +each fact is an (entity, property, value) tuple +each fact can only be looked up by an index or reference +*/ + +struct Database { + schema: Schema, +} + +struct Schema { + fields: Vec, +} + +struct SchemaField { + thing: String, + ty: SchemaType, + cardinality: SchemaCardinality, + index: Vec, +} + +enum SchemaConfig { + IndexLookup, + IndexOrdering, + // IndexFullText, + Unique, +} + +enum SchemaCardinality { + Optional, + One, + Many, +} + +enum SchemaType { + Reference(String), + Entity, + Integer, + String, + // Text, EventId, RoomId, ActorId +} + +enum SchemaValue { + Reference(u64), + Entity(u64), + Integer(u64), + String(String), +} + +enum DatabaseOperation { + Insert { + thing: String, + fields: Vec<(String, SchemaValue)>, + }, + Select { + query: DatabaseQuery, + }, + Delete { + query: DatabaseQuery, + }, + // Update { + // query: Query, + // }, +} + +struct DatabaseQuery { + rules: Vec, + output: Vec, +} + +struct DatabaseRule { + name: String, + tokens: Vec, + conditions: Vec, +} + +enum DatabaseOutput { + Value(u64), + Min(u64), + Max(u64), + Average(u64), +} + +enum DatabaseCondition { + ThingFieldMatches { + token: u64, + thing: String, + field: String, + value: SchemaValue, + }, + EntityFieldMatches { + token: u64, + field: String, + value: SchemaValue, + }, + EntityFieldEntity { + token: u64, + field: String, + other: u64, + }, + ConstraintLessThan { + token: u64, + value: u64, + }, + ConstraintGreaterThan { + token: u64, + value: u64, + }, +} + +/* +[:find ?name where + [?post :post/id 123] + [?post :post/author ?author] + [?author :user/name ?name]] + +vec![ + DatabaseRule::ThingFieldMatches { + token: 0, + thing: "post".into(), + field: "id".into(), + value: SchemaValue::Integer(123), + }, + DatabaseRule::EntityFieldEntity { + token: 0, + field: "author".into(), + other: 1, + }, + DatabaseRule::EntityFieldEntity { + token: 1, + field: "name".into(), + other: 2, + }, +] +*/ diff --git a/crates/proto/src/proto/table/kv.rs b/crates/proto/src/proto/table/kv.rs index dd15b6b..cc34162 100644 --- a/crates/proto/src/proto/table/kv.rs +++ b/crates/proto/src/proto/table/kv.rs @@ -4,6 +4,26 @@ use futures::Stream; type Bytes = Vec; +pub enum DatabaseRequest { + Reset, + QuerySingle(String, Selector), + QueryAll(String, Selector), + QueryIter(String, Selector), + QueryCount(String, Selector), + QueryIterReverse(String, Selector), + Get(String, Bytes), + Put(String, Bytes, Bytes), + Delete(String, Bytes), +} + +pub enum DatabaseReponse { + Done, + Single(Option<(Bytes, Bytes)>), + Many(Vec<(Bytes, Bytes)>), + // Iter(Vec<(Bytes, Bytes)>), + Count(u64), +} + pub trait Database: Send + Sync { type Table: Table; diff --git a/crates/proto/src/proto/table/mod.rs b/crates/proto/src/proto/table/mod.rs index d14ed20..aef60d8 100644 --- a/crates/proto/src/proto/table/mod.rs +++ b/crates/proto/src/proto/table/mod.rs @@ -1,4 +1,4 @@ -//! Each room has a small, well defined, domain specific relational +//! Each room has a small, well defined, domain specific //! database associated with it. The database's contents will then be defined //! by the events it receives. This module contains database traits and //! how the database's schema is defined. @@ -7,6 +7,7 @@ //! effort/payoff ratio in the database schema definition. // mod old; +mod graph; mod kv; // pub use old::*; diff --git a/crates/proto/src/proto/table/old.rs b/crates/proto/src/proto/table/old.rs index 9016aa0..c841ad1 100644 --- a/crates/proto/src/proto/table/old.rs +++ b/crates/proto/src/proto/table/old.rs @@ -1,3 +1,5 @@ +//! Making the database as a relational db + // TODO: properly type things // maybe replace TableRow with impl Serialize? // and make schema a trait instead of builder?