From 2ed5d6a7dfe59f620ad63f0a9cc1626f1121813e Mon Sep 17 00:00:00 2001 From: tezlm Date: Mon, 26 Feb 2024 22:31:11 -0800 Subject: [PATCH] Try to improve server and work around async --- Cargo.lock | 31 ++++++ crates/cli/Cargo.toml | 1 + crates/cli/src/main.rs | 124 +++++++++--------------- crates/cli/src/server.rs | 118 +++++++++++++++++++++- crates/impls/Cargo.toml | 1 + crates/impls/src/resolvers/forum.rs | 2 +- crates/impls/src/resolvers/kv.rs | 2 +- crates/impls/src/stores/sqlite_async.rs | 93 +++++++++++++++--- crates/proto/Cargo.toml | 1 + crates/proto/src/proto/atoms.rs | 25 +++++ crates/proto/src/proto/event.rs | 19 ++++ crates/proto/src/proto/mod.rs | 1 + crates/proto/src/proto/resolver.rs | 8 +- crates/proto/src/proto/room.rs | 3 +- crates/proto/src/proto/table/kv.rs | 41 ++++---- 15 files changed, 352 insertions(+), 118 deletions(-) create mode 100644 crates/proto/src/proto/atoms.rs diff --git a/Cargo.lock b/Cargo.lock index 5ed2e5e..f8cd826 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -352,6 +352,7 @@ dependencies = [ "clap", "dag-resolve", "dag-resolve-impls", + "futures", "reqwest", "rusqlite", "serde", @@ -472,6 +473,7 @@ dependencies = [ "canonical_json", "ed25519", "ed25519-dalek", + "futures", "rand", "serde", "serde_json", @@ -484,6 +486,7 @@ name = "dag-resolve-impls" version = "0.1.0" dependencies = [ "dag-resolve", + "futures", "rusqlite", "serde", "serde_json", @@ -653,6 +656,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -697,6 +715,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -715,8 +744,10 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 12cfc24..de89ef8 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -12,6 +12,7 @@ axum = { version = "0.7.4", features = ["macros"] } clap = { version = "4.5.0", features = ["derive"] } dag-resolve = { version = "0.1.0", path = "../proto" } dag-resolve-impls = { version = "0.1.0", path = "../impls" } +futures = "0.3.30" reqwest = { version = "0.11.24", features = ["json", "rustls-tls-native-roots"], default-features = false } rusqlite = { version = "0.30.0", features = ["bundled"] } serde = { version = "1.0.196", features = ["derive"] } diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 33a49ca..b2f2ce9 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -1,7 +1,5 @@ use std::{ - fmt::Debug, - path::{Path, PathBuf}, - sync::Arc, + collections::HashSet, fmt::Debug, path::{Path, PathBuf} }; mod server; @@ -17,13 +15,12 @@ use dag_resolve::{ }; use dag_resolve_impls::{ resolvers::{ - forum::{ForumEventContent, ForumResolver}, + forum::ForumResolver, kv::KVResolver, }, stores::sqlite_async::Database, }; -use serde::{Deserialize, Serialize}; -use tokio::sync::Mutex; +use server::{HasRequest, HasResponse, ServerState, SyncRequest, SyncResponse}; #[derive(Parser)] #[clap(version, about)] @@ -154,88 +151,62 @@ async fn print_info(state: State) -> Result<()> { Ok(()) } -#[derive(Debug, Serialize, Deserialize)] -struct SyncRequest { - root_id: EventId, - heads: Vec, - events: Vec>, -} +/* +// a list of event ids would either be cached or easily queryable +let our_ids: Vec<_> = repo + .room + .all_events() + .await + .iter() + .map(|ev| ev.id()) + .cloned() + .collect(); +let mut missing_ids: Vec = req.heads.to_vec(); -#[derive(Debug, Serialize, Deserialize)] -struct SyncResponse {} - -#[derive(Clone)] -struct ServerState { - repo: Arc>>, -} - -async fn sync_http( - axum::extract::State(state): axum::extract::State, - axum::extract::Json(req): axum::extract::Json, -) -> axum::extract::Json { - println!("receive new events"); - let mut repo = state.repo.lock().await; - - if repo.room.get_root().id() != &req.root_id { - panic!("cannot sync events from two different rooms"); +// this would be batched, to find multiple missing events at once +while let Some(next) = missing_ids.pop() { + if our_ids.contains(&next) { + continue; } - // a list of event ids would either be cached or easily queryable - let our_ids: Vec<_> = repo - .room - .all_events() - .await + // getting an event from its id would be cached or easily queryable + let event = req + .events .iter() - .map(|ev| ev.id()) - .cloned() - .collect(); - let mut missing_ids: Vec = req.heads.to_vec(); - - // this would be batched, to find multiple missing events at once - while let Some(next) = missing_ids.pop() { - if our_ids.contains(&next) { - continue; - } - - // getting an event from its id would be cached or easily queryable - let event = req - .events - .iter() - .rev() - .find(|ev| ev.id() == &next) - .expect("`from` sent the id but doesn't have the event?") - .clone(); - repo.room.append_event(event).await.unwrap(); - } - - repo.room.resolve_state().await; - axum::extract::Json(SyncResponse {}) + .rev() + .find(|ev| ev.id() == &next) + .expect("`from` sent the id but doesn't have the event?") + .clone(); + repo.room.append_event(event).await.unwrap(); } -async fn serve(state: ServerState) -> Result<()> { - let tcp = tokio::net::TcpListener::bind("0.0.0.0:3000").await?; - let router = axum::Router::new() - .route("/", axum::routing::get(|| async { "hello world" })) - .route("/sync", axum::routing::post(sync_http)) - .with_state(state); - axum::serve(tcp, router.into_make_service()).await?; - Ok(()) -} +repo.room.resolve_state().await; +*/ -async fn sync_state(state: &mut State, remote: &str) -> Result<()> { +async fn sync_state(state: &mut State, remote: &str) -> Result<()> { let http = reqwest::Client::new(); - let res: SyncResponse = http - .post(remote) - .json(&SyncRequest { + let ids_all = state.room.all_events().await.iter().rev().map(|ev| ev.id().to_owned()).collect(); + let has: HasResponse = http + .post(format!("{remote}/has")) + .json(&HasRequest { root_id: state.room.get_root().id().to_owned(), - heads: state.room.get_heads().await, - events: state.room.all_events().await, + ids: ids_all, + }) + .send() + .await? + .json() + .await?; + let missing_ids: HashSet = has.disjoint.into_iter().collect(); + let _res: SyncResponse = http + .post(format!("{remote}/sync")) + .json(&SyncRequest:: { + root_id: state.room.get_root().id().to_owned(), + events: state.room.all_events().await.into_iter().filter(|ev| missing_ids.contains(ev.id())).collect(), }) .send() .await? .json() .await?; - dbg!(res); Ok(()) } @@ -265,10 +236,7 @@ async fn main() -> Result<()> { } (Cli::Serve { repo: _ }, Opened::Kv(_kv)) => todo!(), (Cli::Serve { repo: _ }, Opened::Forum(forum)) => { - serve(ServerState { - repo: Arc::new(Mutex::new(forum)), - }) - .await? + server::serve(ServerState::new(forum)).await? } } Ok(()) diff --git a/crates/cli/src/server.rs b/crates/cli/src/server.rs index 65b3dba..66ce2e7 100644 --- a/crates/cli/src/server.rs +++ b/crates/cli/src/server.rs @@ -1 +1,117 @@ -// todo +use std::sync::Arc; + +use anyhow::Result; +use axum::extract::{Json, State}; +use dag_resolve::{ + event::EventId, + proto::table::{Database, Query, Selector, Table}, + resolver::Resolver, + room::TABLE_EVENTS, +}; +use futures::StreamExt as _; +use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; + +use crate::State as Repo; + +#[derive(Debug, Serialize, Deserialize)] +pub struct SyncRequest { + pub root_id: EventId, + pub events: Vec>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SyncResponse {} + +#[derive(Debug, Serialize, Deserialize)] +pub struct HasRequest { + pub root_id: EventId, + pub ids: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct HasResponse { + pub disjoint: Vec, +} + +#[derive(Clone)] +pub struct ServerState { + repo: Arc>>, +} + +impl ServerState { + pub fn new(repo: Repo) -> ServerState { + ServerState { + repo: Arc::new(Mutex::new(repo)), + } + } +} + +pub async fn serve(state: ServerState) -> Result<()> { + let tcp = tokio::net::TcpListener::bind("0.0.0.0:3000").await?; + let router = axum::Router::new() + .route("/", axum::routing::get(|| async { "hello world" })) + .route("/has", axum::routing::post(route_has)) + .route("/sync", axum::routing::post(route_sync)) + .with_state(state); + axum::serve(tcp, router.into_make_service()).await?; + Ok(()) +} + +/// get the last event we have +async fn route_has( + State(state): State>, + Json(req): Json, +) -> Json { + println!("check has new events"); + let repo = state.repo.lock().await; + + if repo.room.get_root().id() != &req.root_id { + panic!("cannot sync events from two different rooms"); + } + + let event_table = repo.room.get_state() + .table(TABLE_EVENTS); + let our_ids: Vec = event_table + .query(Selector::Prefix(vec![])) + .get_iter() + .map(|bytes| String::from_utf8(bytes.0).unwrap()) + .map(|s| s.parse().unwrap()) + .collect() + .await; + let mut missing_ids: Vec = req.ids; + let mut disjoint = vec![]; + + while let Some(next) = missing_ids.pop() { + if our_ids.contains(&next) { + continue; + } + + disjoint.push(next); + } + + // repo.room.resolve_state().await; + Json(HasResponse { disjoint }) +} + +async fn route_sync( + State(state): State>, + Json(req): Json>, +) -> Json { + println!("receive new events"); + let mut repo = state.repo.lock().await; + + if repo.room.get_root().id() != &req.root_id { + panic!("cannot sync events from two different rooms"); + } + + // a list of event ids would either be cached or easily queryable + for event in req.events { + repo.room.append_event(event).await.unwrap(); + } + + // this doesn't work and i don't know why + // repo.room.resolve_state().await; + + Json(SyncResponse {}) +} diff --git a/crates/impls/Cargo.toml b/crates/impls/Cargo.toml index ce0f300..09060b8 100644 --- a/crates/impls/Cargo.toml +++ b/crates/impls/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] dag-resolve = { version = "0.1.0", path = "../proto" } +futures = "0.3.30" rusqlite = { version = "0.30.0", features = ["bundled"] } serde = { version = "1.0.196", features = ["derive"] } serde_json = "1.0.113" diff --git a/crates/impls/src/resolvers/forum.rs b/crates/impls/src/resolvers/forum.rs index cdc616e..0fbaea1 100644 --- a/crates/impls/src/resolvers/forum.rs +++ b/crates/impls/src/resolvers/forum.rs @@ -14,7 +14,7 @@ use dag_resolve::{ resolver::{Command, Resolver, Verification}, }; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone, Copy)] /// An example of how a basic forum could look like /// /// This is designed to be the "more fully fledged" example and will try to show off a decent number of features diff --git a/crates/impls/src/resolvers/kv.rs b/crates/impls/src/resolvers/kv.rs index 244ebf8..2c3bd97 100644 --- a/crates/impls/src/resolvers/kv.rs +++ b/crates/impls/src/resolvers/kv.rs @@ -9,7 +9,7 @@ use dag_resolve::{ }; use std::cmp::Ordering; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone, Copy)] /// A basic key-value store /// /// This is designed to be the "extremely basic and minimalistic" example diff --git a/crates/impls/src/stores/sqlite_async.rs b/crates/impls/src/stores/sqlite_async.rs index 034dd95..c97e796 100644 --- a/crates/impls/src/stores/sqlite_async.rs +++ b/crates/impls/src/stores/sqlite_async.rs @@ -1,11 +1,9 @@ //! Store state in a sqlite database use dag_resolve::{ - actor::{ActorId, ActorSecret}, - event::EventId, - proto::table::{self, Database as _}, - room::{TABLE_EVENTS, TABLE_HEADS}, + actor::{ActorId, ActorSecret}, event::EventId, proto::table::{self, Database as _}, room::{TABLE_EVENTS, TABLE_HEADS} }; +use futures::{stream, Stream}; use sqlx::{query, sqlite::SqliteConnectOptions, SqlitePool}; use thiserror::Error; @@ -91,9 +89,9 @@ impl Database { } } -pub struct Query { +pub struct Query<'a> { selector: table::Selector, - table: Table, + table: &'a Table, _reversed: bool, } @@ -130,20 +128,20 @@ impl table::Database for Database { } impl table::Table for Table { - type Query = Query; + type Query<'a> = Query<'a>; - fn query(&self, selector: table::Selector) -> Self::Query { + fn query(&self, selector: table::Selector) -> Self::Query<'_> { Query { selector, - table: self.clone(), + table: &self, _reversed: false, } } - fn query_reverse(&self, selector: table::Selector) -> Self::Query { + fn query_reverse(&self, selector: table::Selector) -> Self::Query<'_> { Query { selector, - table: self.clone(), + table: &self, _reversed: true, } } @@ -184,7 +182,7 @@ impl table::Table for Table { } } -impl table::Query for Query { +impl<'a> table::Query<'a> for Query<'a> { async fn get_single(self) -> Option> { match self.selector { table::Selector::Exact(e) => query!( @@ -210,6 +208,7 @@ impl table::Query for Query { .unwrap() .map(|r| r.value) } + table::Selector::Any => todo!(), } } @@ -232,11 +231,77 @@ impl table::Query for Query { .map(|r| (r.key, r.value)) .collect() } + table::Selector::Any => { + query!( + "SELECT key, value FROM data WHERE tb = ?", + self.table.name, + ) + .fetch_all(&self.table.connection) + .await + .unwrap() + .into_iter() + .map(|r| (r.key, r.value)) + .collect() + } } } - fn get_iter(self) -> impl Iterator> { - vec![].into_iter() + fn get_iter(self) -> impl Stream, Vec)> + Send { + // this is incredibly inefficient, but i don't care + // rust async is extremely painful and i won't bother making things nice until the situation improves + let items = std::thread::scope(|scope| { + scope.spawn(|| { + let rt = tokio::runtime::Runtime::new().unwrap(); + let items: Vec<(Vec, Vec)> = rt.block_on(async { + match &self.selector { + table::Selector::Exact(exact) => { + let evec = exact.to_vec(); + query!( + "SELECT key, value FROM data WHERE tb = ? AND key == ?", + self.table.name, + evec + ) + .fetch_all(&self.table.connection) + .await + .unwrap() + .into_iter() + .map(|r| (r.key, r.value)) + .collect() + }, + table::Selector::Prefix(p) => { + let plen = p.len() as i64; + let pvec = p.to_vec(); + query!( + "SELECT key, value FROM data WHERE tb = ? AND substr(key, 0, ?) == ?", + self.table.name, + plen, + pvec, + ) + .fetch_all(&self.table.connection) + .await + .unwrap() + .into_iter() + .map(|r| (r.key, r.value)) + .collect() + } + table::Selector::Any => { + query!( + "SELECT key, value FROM data WHERE tb = ?", + self.table.name, + ) + .fetch_all(&self.table.connection) + .await + .unwrap() + .into_iter() + .map(|r| (r.key, r.value)) + .collect() + } + } + }); + items + }).join().unwrap() + }); + stream::iter(items) } async fn count(self) -> u64 { diff --git a/crates/proto/Cargo.toml b/crates/proto/Cargo.toml index fa26a1d..6a71d45 100644 --- a/crates/proto/Cargo.toml +++ b/crates/proto/Cargo.toml @@ -11,6 +11,7 @@ base64 = "0.21.7" canonical_json = "0.5.0" ed25519 = "2.2.3" ed25519-dalek = { version = "2.1.1", features = ["rand_core"] } +futures = "0.3.30" rand = "0.8.5" serde = { version = "1.0.196", features = ["derive"] } serde_json = "1.0.113" diff --git a/crates/proto/src/proto/atoms.rs b/crates/proto/src/proto/atoms.rs new file mode 100644 index 0000000..863c93a --- /dev/null +++ b/crates/proto/src/proto/atoms.rs @@ -0,0 +1,25 @@ +//! Other random atoms that might be split out into their own modules + +use std::fmt::Display; +use base64::{engine::general_purpose::URL_SAFE_NO_PAD as b64engine, Engine}; + +use crate::event::EventId; + +/// A room identifier. Same as the root event id, but with a `!` instead +/// of `$` to refer to the room itself rather than the root event. +pub struct RoomId(EventId); + +impl RoomId { + pub fn new_from_id(event_id: EventId) -> RoomId { + RoomId(event_id) + } +} + +impl Display for RoomId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "!{}", b64engine.encode(self.0.to_bytes())) + } +} + +/// A media/blob identifier +pub struct MediaId(String); diff --git a/crates/proto/src/proto/event.rs b/crates/proto/src/proto/event.rs index 1e4f47a..13a3ef7 100644 --- a/crates/proto/src/proto/event.rs +++ b/crates/proto/src/proto/event.rs @@ -263,12 +263,31 @@ impl EventId { Self(EventIdData::Sha2(hash.to_vec())) } + pub fn from_bytes(data: &[u8]) -> Self { + let bytes = data[1..].to_vec(); + let data = match data.get(0) { + Some(0x00) => EventIdData::Debug(String::from_utf8(bytes).unwrap()), + Some(0x01) => EventIdData::Sha2(bytes), + Some(_) => todo!(), + None => todo!(), + }; + Self(data) + } + pub fn get_type(&self) -> HashType { match &self.0 { EventIdData::Debug(_) => HashType::Debug, EventIdData::Sha2(_) => HashType::Sha256, } } + + pub fn to_bytes(&self) -> Vec { + let (prefix, bytes) = match &self.0 { + EventIdData::Debug(d) => (0x00, d.as_bytes()), + EventIdData::Sha2(d) => (0x01, d.as_ref()), + }; + [prefix].into_iter().chain(bytes.to_vec()).collect() + } } impl FromStr for EventId { diff --git a/crates/proto/src/proto/mod.rs b/crates/proto/src/proto/mod.rs index b7758a7..d275505 100644 --- a/crates/proto/src/proto/mod.rs +++ b/crates/proto/src/proto/mod.rs @@ -4,3 +4,4 @@ pub mod event; pub mod resolver; pub mod room; pub mod table; +pub mod atoms; diff --git a/crates/proto/src/proto/resolver.rs b/crates/proto/src/proto/resolver.rs index 68c3244..ebf546f 100644 --- a/crates/proto/src/proto/resolver.rs +++ b/crates/proto/src/proto/resolver.rs @@ -6,15 +6,15 @@ use crate::event::{Event, EventId}; use std::{cmp::Ordering, collections::HashSet, fmt::Debug, future::Future}; /// small shards of code designed to resolve state -pub trait Resolver { - type EventType: Clone + Debug + Serialize + for<'a> Deserialize<'a>; +pub trait Resolver: Send + Sync + Clone { + type EventType: Clone + Debug + Serialize + for<'a> Deserialize<'a> + Send + Sync; /// Given a set of ordered events, resolve the final state fn resolve( &self, state: &D, event: &Event, - ) -> impl Future>; + ) -> impl Future> + Send; /// Given two events, decide which one comes first /// if Ordering::Equal is returned, the timestamp then event id is used @@ -25,7 +25,7 @@ pub trait Resolver { &self, state: &D, event: &Event, - ) -> impl Future; + ) -> impl Future + Send; /// TEMP: Get the name/id of this resolver fn name(&self) -> &str; diff --git a/crates/proto/src/proto/room.rs b/crates/proto/src/proto/room.rs index ceca056..8752f18 100644 --- a/crates/proto/src/proto/room.rs +++ b/crates/proto/src/proto/room.rs @@ -81,7 +81,6 @@ impl Room { // ideally i don't get *all* events up front, and query the db as needed let events = self.all_events().await; let sorted = sort(|a, b| resolver.tiebreak(a, b), &events); - dbg!(&sorted); self.database.reset().await; for event in sorted { let effects = resolver.resolve(&self.database, event).await; @@ -163,7 +162,7 @@ impl Room { pub async fn all_events(&self) -> Vec> { self.database .table(TABLE_EVENTS) - .query(Selector::Prefix(vec![])) + .query(Selector::Any) .get_all() .await .iter() diff --git a/crates/proto/src/proto/table/kv.rs b/crates/proto/src/proto/table/kv.rs index 6665f1c..dd15b6b 100644 --- a/crates/proto/src/proto/table/kv.rs +++ b/crates/proto/src/proto/table/kv.rs @@ -1,40 +1,47 @@ use std::future::Future; +use futures::Stream; + type Bytes = Vec; -pub trait Database { +pub trait Database: Send + Sync { type Table: Table; fn table(&self, name: impl Into) -> Self::Table; - fn reset(&self) -> impl Future; + + #[must_use("future must be polled")] + fn reset(&self) -> impl Future + Send; } -pub trait Table { - type Query: Query; +pub trait Table: Send { + // "Table must live as long as any returned Query" (so the returned query lasts as long as the table) + type Query<'a>: Query<'a> where Self: 'a; - fn query(&self, selector: Selector) -> Self::Query; - fn query_reverse(&self, selector: Selector) -> Self::Query; + fn query(&self, selector: Selector) -> Self::Query<'_>; + fn query_reverse(&self, selector: Selector) -> Self::Query<'_>; - #[must_use("future must be polled")] - fn get(&self, key: &[u8]) -> impl Future>; + #[must_use = "future must be polled"] + fn get(&self, key: &[u8]) -> impl Future> + Send; - #[must_use("future must be polled")] - fn put(&self, key: &[u8], value: &[u8]) -> impl Future; + #[must_use = "future must be polled"] + fn put(&self, key: &[u8], value: &[u8]) -> impl Future + Send; - #[must_use("future must be polled")] - fn delete(&self, key: &[u8]) -> impl Future; + #[must_use = "future must be polled"] + fn delete(&self, key: &[u8]) -> impl Future + Send; } pub enum Selector { Exact(Bytes), Prefix(Bytes), + Any, } -pub trait Query { - fn get_single(self) -> impl Future>; - fn get_all(self) -> impl Future>; - fn get_iter(self) -> impl Iterator; - fn count(self) -> impl Future; +#[must_use = "you have a query that doesn't do anything"] +pub trait Query<'a>: Send { + fn get_single(self) -> impl Future> + Send; + fn get_all(self) -> impl Future> + Send; + fn get_iter(self) -> impl Stream + Send + 'a; + fn count(self) -> impl Future + Send; } impl From> for Selector {