Try to improve server and work around async

This commit is contained in:
tezlm 2024-02-26 22:31:11 -08:00
parent aaf91271c5
commit 2ed5d6a7df
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
15 changed files with 352 additions and 118 deletions

31
Cargo.lock generated
View file

@ -352,6 +352,7 @@ dependencies = [
"clap",
"dag-resolve",
"dag-resolve-impls",
"futures",
"reqwest",
"rusqlite",
"serde",
@ -472,6 +473,7 @@ dependencies = [
"canonical_json",
"ed25519",
"ed25519-dalek",
"futures",
"rand",
"serde",
"serde_json",
@ -484,6 +486,7 @@ name = "dag-resolve-impls"
version = "0.1.0"
dependencies = [
"dag-resolve",
"futures",
"rusqlite",
"serde",
"serde_json",
@ -653,6 +656,21 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
@ -697,6 +715,17 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
@ -715,8 +744,10 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",

View file

@ -12,6 +12,7 @@ axum = { version = "0.7.4", features = ["macros"] }
clap = { version = "4.5.0", features = ["derive"] }
dag-resolve = { version = "0.1.0", path = "../proto" }
dag-resolve-impls = { version = "0.1.0", path = "../impls" }
futures = "0.3.30"
reqwest = { version = "0.11.24", features = ["json", "rustls-tls-native-roots"], default-features = false }
rusqlite = { version = "0.30.0", features = ["bundled"] }
serde = { version = "1.0.196", features = ["derive"] }

View file

@ -1,7 +1,5 @@
use std::{
fmt::Debug,
path::{Path, PathBuf},
sync::Arc,
collections::HashSet, fmt::Debug, path::{Path, PathBuf}
};
mod server;
@ -17,13 +15,12 @@ use dag_resolve::{
};
use dag_resolve_impls::{
resolvers::{
forum::{ForumEventContent, ForumResolver},
forum::ForumResolver,
kv::KVResolver,
},
stores::sqlite_async::Database,
};
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use server::{HasRequest, HasResponse, ServerState, SyncRequest, SyncResponse};
#[derive(Parser)]
#[clap(version, about)]
@ -154,32 +151,7 @@ async fn print_info<R: Resolver + Debug>(state: State<R>) -> Result<()> {
Ok(())
}
#[derive(Debug, Serialize, Deserialize)]
struct SyncRequest {
root_id: EventId,
heads: Vec<EventId>,
events: Vec<Event<ForumEventContent>>,
}
#[derive(Debug, Serialize, Deserialize)]
struct SyncResponse {}
#[derive(Clone)]
struct ServerState {
repo: Arc<Mutex<State<ForumResolver>>>,
}
async fn sync_http(
axum::extract::State(state): axum::extract::State<ServerState>,
axum::extract::Json(req): axum::extract::Json<SyncRequest>,
) -> axum::extract::Json<SyncResponse> {
println!("receive new events");
let mut repo = state.repo.lock().await;
if repo.room.get_root().id() != &req.root_id {
panic!("cannot sync events from two different rooms");
}
/*
// a list of event ids would either be cached or easily queryable
let our_ids: Vec<_> = repo
.room
@ -209,33 +181,32 @@ async fn sync_http(
}
repo.room.resolve_state().await;
axum::extract::Json(SyncResponse {})
}
*/
async fn serve(state: ServerState) -> Result<()> {
let tcp = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
let router = axum::Router::new()
.route("/", axum::routing::get(|| async { "hello world" }))
.route("/sync", axum::routing::post(sync_http))
.with_state(state);
axum::serve(tcp, router.into_make_service()).await?;
Ok(())
}
async fn sync_state(state: &mut State<ForumResolver>, remote: &str) -> Result<()> {
async fn sync_state<R: Resolver>(state: &mut State<R>, remote: &str) -> Result<()> {
let http = reqwest::Client::new();
let res: SyncResponse = http
.post(remote)
.json(&SyncRequest {
let ids_all = state.room.all_events().await.iter().rev().map(|ev| ev.id().to_owned()).collect();
let has: HasResponse = http
.post(format!("{remote}/has"))
.json(&HasRequest {
root_id: state.room.get_root().id().to_owned(),
heads: state.room.get_heads().await,
events: state.room.all_events().await,
ids: ids_all,
})
.send()
.await?
.json()
.await?;
let missing_ids: HashSet<EventId> = has.disjoint.into_iter().collect();
let _res: SyncResponse = http
.post(format!("{remote}/sync"))
.json(&SyncRequest::<R> {
root_id: state.room.get_root().id().to_owned(),
events: state.room.all_events().await.into_iter().filter(|ev| missing_ids.contains(ev.id())).collect(),
})
.send()
.await?
.json()
.await?;
dbg!(res);
Ok(())
}
@ -265,10 +236,7 @@ async fn main() -> Result<()> {
}
(Cli::Serve { repo: _ }, Opened::Kv(_kv)) => todo!(),
(Cli::Serve { repo: _ }, Opened::Forum(forum)) => {
serve(ServerState {
repo: Arc::new(Mutex::new(forum)),
})
.await?
server::serve(ServerState::new(forum)).await?
}
}
Ok(())

View file

@ -1 +1,117 @@
// todo
use std::sync::Arc;
use anyhow::Result;
use axum::extract::{Json, State};
use dag_resolve::{
event::EventId,
proto::table::{Database, Query, Selector, Table},
resolver::Resolver,
room::TABLE_EVENTS,
};
use futures::StreamExt as _;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use crate::State as Repo;
#[derive(Debug, Serialize, Deserialize)]
pub struct SyncRequest<R: Resolver> {
pub root_id: EventId,
pub events: Vec<dag_resolve::event::Event<R::EventType>>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SyncResponse {}
#[derive(Debug, Serialize, Deserialize)]
pub struct HasRequest {
pub root_id: EventId,
pub ids: Vec<EventId>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct HasResponse {
pub disjoint: Vec<EventId>,
}
#[derive(Clone)]
pub struct ServerState<R: Resolver> {
repo: Arc<Mutex<Repo<R>>>,
}
impl<R: Resolver> ServerState<R> {
pub fn new(repo: Repo<R>) -> ServerState<R> {
ServerState {
repo: Arc::new(Mutex::new(repo)),
}
}
}
pub async fn serve<R: Resolver + 'static>(state: ServerState<R>) -> Result<()> {
let tcp = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
let router = axum::Router::new()
.route("/", axum::routing::get(|| async { "hello world" }))
.route("/has", axum::routing::post(route_has))
.route("/sync", axum::routing::post(route_sync))
.with_state(state);
axum::serve(tcp, router.into_make_service()).await?;
Ok(())
}
/// get the last event we have
async fn route_has<R: Resolver>(
State(state): State<ServerState<R>>,
Json(req): Json<HasRequest>,
) -> Json<HasResponse> {
println!("check has new events");
let repo = state.repo.lock().await;
if repo.room.get_root().id() != &req.root_id {
panic!("cannot sync events from two different rooms");
}
let event_table = repo.room.get_state()
.table(TABLE_EVENTS);
let our_ids: Vec<EventId> = event_table
.query(Selector::Prefix(vec![]))
.get_iter()
.map(|bytes| String::from_utf8(bytes.0).unwrap())
.map(|s| s.parse().unwrap())
.collect()
.await;
let mut missing_ids: Vec<EventId> = req.ids;
let mut disjoint = vec![];
while let Some(next) = missing_ids.pop() {
if our_ids.contains(&next) {
continue;
}
disjoint.push(next);
}
// repo.room.resolve_state().await;
Json(HasResponse { disjoint })
}
async fn route_sync<R: Resolver>(
State(state): State<ServerState<R>>,
Json(req): Json<SyncRequest<R>>,
) -> Json<SyncResponse> {
println!("receive new events");
let mut repo = state.repo.lock().await;
if repo.room.get_root().id() != &req.root_id {
panic!("cannot sync events from two different rooms");
}
// a list of event ids would either be cached or easily queryable
for event in req.events {
repo.room.append_event(event).await.unwrap();
}
// this doesn't work and i don't know why
// repo.room.resolve_state().await;
Json(SyncResponse {})
}

View file

@ -7,6 +7,7 @@ edition = "2021"
[dependencies]
dag-resolve = { version = "0.1.0", path = "../proto" }
futures = "0.3.30"
rusqlite = { version = "0.30.0", features = ["bundled"] }
serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"

View file

@ -14,7 +14,7 @@ use dag_resolve::{
resolver::{Command, Resolver, Verification},
};
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone, Copy)]
/// An example of how a basic forum could look like
///
/// This is designed to be the "more fully fledged" example and will try to show off a decent number of features

View file

@ -9,7 +9,7 @@ use dag_resolve::{
};
use std::cmp::Ordering;
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone, Copy)]
/// A basic key-value store
///
/// This is designed to be the "extremely basic and minimalistic" example

View file

@ -1,11 +1,9 @@
//! Store state in a sqlite database
use dag_resolve::{
actor::{ActorId, ActorSecret},
event::EventId,
proto::table::{self, Database as _},
room::{TABLE_EVENTS, TABLE_HEADS},
actor::{ActorId, ActorSecret}, event::EventId, proto::table::{self, Database as _}, room::{TABLE_EVENTS, TABLE_HEADS}
};
use futures::{stream, Stream};
use sqlx::{query, sqlite::SqliteConnectOptions, SqlitePool};
use thiserror::Error;
@ -91,9 +89,9 @@ impl Database {
}
}
pub struct Query {
pub struct Query<'a> {
selector: table::Selector,
table: Table,
table: &'a Table,
_reversed: bool,
}
@ -130,20 +128,20 @@ impl table::Database for Database {
}
impl table::Table for Table {
type Query = Query;
type Query<'a> = Query<'a>;
fn query(&self, selector: table::Selector) -> Self::Query {
fn query(&self, selector: table::Selector) -> Self::Query<'_> {
Query {
selector,
table: self.clone(),
table: &self,
_reversed: false,
}
}
fn query_reverse(&self, selector: table::Selector) -> Self::Query {
fn query_reverse(&self, selector: table::Selector) -> Self::Query<'_> {
Query {
selector,
table: self.clone(),
table: &self,
_reversed: true,
}
}
@ -184,7 +182,7 @@ impl table::Table for Table {
}
}
impl table::Query for Query {
impl<'a> table::Query<'a> for Query<'a> {
async fn get_single(self) -> Option<Vec<u8>> {
match self.selector {
table::Selector::Exact(e) => query!(
@ -210,6 +208,7 @@ impl table::Query for Query {
.unwrap()
.map(|r| r.value)
}
table::Selector::Any => todo!(),
}
}
@ -232,11 +231,77 @@ impl table::Query for Query {
.map(|r| (r.key, r.value))
.collect()
}
table::Selector::Any => {
query!(
"SELECT key, value FROM data WHERE tb = ?",
self.table.name,
)
.fetch_all(&self.table.connection)
.await
.unwrap()
.into_iter()
.map(|r| (r.key, r.value))
.collect()
}
}
}
fn get_iter(self) -> impl Iterator<Item = Vec<u8>> {
vec![].into_iter()
fn get_iter(self) -> impl Stream<Item = (Vec<u8>, Vec<u8>)> + Send {
// this is incredibly inefficient, but i don't care
// rust async is extremely painful and i won't bother making things nice until the situation improves
let items = std::thread::scope(|scope| {
scope.spawn(|| {
let rt = tokio::runtime::Runtime::new().unwrap();
let items: Vec<(Vec<u8>, Vec<u8>)> = rt.block_on(async {
match &self.selector {
table::Selector::Exact(exact) => {
let evec = exact.to_vec();
query!(
"SELECT key, value FROM data WHERE tb = ? AND key == ?",
self.table.name,
evec
)
.fetch_all(&self.table.connection)
.await
.unwrap()
.into_iter()
.map(|r| (r.key, r.value))
.collect()
},
table::Selector::Prefix(p) => {
let plen = p.len() as i64;
let pvec = p.to_vec();
query!(
"SELECT key, value FROM data WHERE tb = ? AND substr(key, 0, ?) == ?",
self.table.name,
plen,
pvec,
)
.fetch_all(&self.table.connection)
.await
.unwrap()
.into_iter()
.map(|r| (r.key, r.value))
.collect()
}
table::Selector::Any => {
query!(
"SELECT key, value FROM data WHERE tb = ?",
self.table.name,
)
.fetch_all(&self.table.connection)
.await
.unwrap()
.into_iter()
.map(|r| (r.key, r.value))
.collect()
}
}
});
items
}).join().unwrap()
});
stream::iter(items)
}
async fn count(self) -> u64 {

View file

@ -11,6 +11,7 @@ base64 = "0.21.7"
canonical_json = "0.5.0"
ed25519 = "2.2.3"
ed25519-dalek = { version = "2.1.1", features = ["rand_core"] }
futures = "0.3.30"
rand = "0.8.5"
serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.113"

View file

@ -0,0 +1,25 @@
//! Other random atoms that might be split out into their own modules
use std::fmt::Display;
use base64::{engine::general_purpose::URL_SAFE_NO_PAD as b64engine, Engine};
use crate::event::EventId;
/// A room identifier. Same as the root event id, but with a `!` instead
/// of `$` to refer to the room itself rather than the root event.
pub struct RoomId(EventId);
impl RoomId {
pub fn new_from_id(event_id: EventId) -> RoomId {
RoomId(event_id)
}
}
impl Display for RoomId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "!{}", b64engine.encode(self.0.to_bytes()))
}
}
/// A media/blob identifier
pub struct MediaId(String);

View file

@ -263,12 +263,31 @@ impl EventId {
Self(EventIdData::Sha2(hash.to_vec()))
}
pub fn from_bytes(data: &[u8]) -> Self {
let bytes = data[1..].to_vec();
let data = match data.get(0) {
Some(0x00) => EventIdData::Debug(String::from_utf8(bytes).unwrap()),
Some(0x01) => EventIdData::Sha2(bytes),
Some(_) => todo!(),
None => todo!(),
};
Self(data)
}
pub fn get_type(&self) -> HashType {
match &self.0 {
EventIdData::Debug(_) => HashType::Debug,
EventIdData::Sha2(_) => HashType::Sha256,
}
}
pub fn to_bytes(&self) -> Vec<u8> {
let (prefix, bytes) = match &self.0 {
EventIdData::Debug(d) => (0x00, d.as_bytes()),
EventIdData::Sha2(d) => (0x01, d.as_ref()),
};
[prefix].into_iter().chain(bytes.to_vec()).collect()
}
}
impl FromStr for EventId {

View file

@ -4,3 +4,4 @@ pub mod event;
pub mod resolver;
pub mod room;
pub mod table;
pub mod atoms;

View file

@ -6,15 +6,15 @@ use crate::event::{Event, EventId};
use std::{cmp::Ordering, collections::HashSet, fmt::Debug, future::Future};
/// small shards of code designed to resolve state
pub trait Resolver {
type EventType: Clone + Debug + Serialize + for<'a> Deserialize<'a>;
pub trait Resolver: Send + Sync + Clone {
type EventType: Clone + Debug + Serialize + for<'a> Deserialize<'a> + Send + Sync;
/// Given a set of ordered events, resolve the final state
fn resolve<D: Database>(
&self,
state: &D,
event: &Event<Self::EventType>,
) -> impl Future<Output = Vec<Command>>;
) -> impl Future<Output = Vec<Command>> + Send;
/// Given two events, decide which one comes first
/// if Ordering::Equal is returned, the timestamp then event id is used
@ -25,7 +25,7 @@ pub trait Resolver {
&self,
state: &D,
event: &Event<Self::EventType>,
) -> impl Future<Output = Verification>;
) -> impl Future<Output = Verification> + Send;
/// TEMP: Get the name/id of this resolver
fn name(&self) -> &str;

View file

@ -81,7 +81,6 @@ impl<R: Resolver, D: Database> Room<R, D> {
// ideally i don't get *all* events up front, and query the db as needed
let events = self.all_events().await;
let sorted = sort(|a, b| resolver.tiebreak(a, b), &events);
dbg!(&sorted);
self.database.reset().await;
for event in sorted {
let effects = resolver.resolve(&self.database, event).await;
@ -163,7 +162,7 @@ impl<R: Resolver, D: Database> Room<R, D> {
pub async fn all_events(&self) -> Vec<Event<R::EventType>> {
self.database
.table(TABLE_EVENTS)
.query(Selector::Prefix(vec![]))
.query(Selector::Any)
.get_all()
.await
.iter()

View file

@ -1,40 +1,47 @@
use std::future::Future;
use futures::Stream;
type Bytes = Vec<u8>;
pub trait Database {
pub trait Database: Send + Sync {
type Table: Table;
fn table(&self, name: impl Into<String>) -> Self::Table;
fn reset(&self) -> impl Future<Output = ()>;
#[must_use("future must be polled")]
fn reset(&self) -> impl Future<Output = ()> + Send;
}
pub trait Table {
type Query: Query;
pub trait Table: Send {
// "Table must live as long as any returned Query" (so the returned query lasts as long as the table)
type Query<'a>: Query<'a> where Self: 'a;
fn query(&self, selector: Selector) -> Self::Query;
fn query_reverse(&self, selector: Selector) -> Self::Query;
fn query(&self, selector: Selector) -> Self::Query<'_>;
fn query_reverse(&self, selector: Selector) -> Self::Query<'_>;
#[must_use("future must be polled")]
fn get(&self, key: &[u8]) -> impl Future<Output = Option<Bytes>>;
#[must_use = "future must be polled"]
fn get(&self, key: &[u8]) -> impl Future<Output = Option<Bytes>> + Send;
#[must_use("future must be polled")]
fn put(&self, key: &[u8], value: &[u8]) -> impl Future<Output = ()>;
#[must_use = "future must be polled"]
fn put(&self, key: &[u8], value: &[u8]) -> impl Future<Output = ()> + Send;
#[must_use("future must be polled")]
fn delete(&self, key: &[u8]) -> impl Future<Output = ()>;
#[must_use = "future must be polled"]
fn delete(&self, key: &[u8]) -> impl Future<Output = ()> + Send;
}
pub enum Selector {
Exact(Bytes),
Prefix(Bytes),
Any,
}
pub trait Query {
fn get_single(self) -> impl Future<Output = Option<Bytes>>;
fn get_all(self) -> impl Future<Output = Vec<(Bytes, Bytes)>>;
fn get_iter(self) -> impl Iterator<Item = Bytes>;
fn count(self) -> impl Future<Output = u64>;
#[must_use = "you have a query that doesn't do anything"]
pub trait Query<'a>: Send {
fn get_single(self) -> impl Future<Output = Option<Bytes>> + Send;
fn get_all(self) -> impl Future<Output = Vec<(Bytes, Bytes)>> + Send;
fn get_iter(self) -> impl Stream<Item = (Bytes, Bytes)> + Send + 'a;
fn count(self) -> impl Future<Output = u64> + Send;
}
impl From<Vec<u8>> for Selector {