bootleg coroutines

and terrible code all around. i hate this so much.
This commit is contained in:
tezlm 2024-03-01 22:46:29 -08:00
parent 0cad159a3d
commit bd9184d9cc
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
10 changed files with 564 additions and 326 deletions

12
Cargo.lock generated
View file

@ -102,6 +102,17 @@ dependencies = [
"backtrace",
]
[[package]]
name = "async-recursion"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
]
[[package]]
name = "async-trait"
version = "0.1.77"
@ -348,6 +359,7 @@ name = "cli"
version = "0.1.0"
dependencies = [
"anyhow",
"async-recursion",
"axum",
"clap",
"dag-resolve",

View file

@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
anyhow = { version = "1.0.79", features = ["backtrace"] }
async-recursion = "1.0.5"
axum = { version = "0.7.4", features = ["macros"] }
clap = { version = "4.5.0", features = ["derive"] }
dag-resolve = { version = "0.1.0", path = "../proto" }

View file

@ -1,5 +1,7 @@
use std::{
collections::HashSet, fmt::Debug, path::{Path, PathBuf}
collections::HashSet,
fmt::Debug,
path::{Path, PathBuf},
};
mod server;
@ -9,19 +11,18 @@ use clap::Parser;
use dag_resolve::{
actor::{ActorId, ActorSecret},
event::{Event, EventContent, EventId, HashType, SignatureType},
proto::table::{Database as _, Query, Selector, Table},
proto::table::{DatabaseRequest, DatabaseResponse, Selector},
resolver::Resolver,
room::{RoomOld, TABLE_EVENTS},
room::{AppendInput, AppendState, CreateInput, ResolveInput, RoomOld, TABLE_EVENTS, TABLE_HEADS},
};
use dag_resolve_impls::{
resolvers::{
forum::ForumResolver,
kv::KVResolver,
},
resolvers::{forum::ForumResolver, kv::KVResolver},
stores::sqlite_async::Database,
};
use server::{HasRequest, HasResponse, ServerState, SyncRequest, SyncResponse};
use crate::server::exec;
#[derive(Parser)]
#[clap(version, about)]
enum Cli {
@ -57,25 +58,59 @@ impl<R: Resolver> State<R> {
async fn init(resolver: R, path: &Path) -> Result<Self> {
// create new repo
println!("init new repo!");
let mut db = Database::open(path.to_str().unwrap()).await?.init().await?;
let db = Database::open(path.to_str().unwrap()).await?.init().await?;
let (actor, secret) = ActorId::new(SignatureType::Ed25519);
let room = RoomOld::builder()
let mut room = RoomOld::builder()
.with_resolver(resolver)
.with_hasher(HashType::Sha256)
.with_signer(SignatureType::Ed25519)
.with_database(&mut db)
.create(&secret)
.await?;
db
.set_config(&actor, &secret, room.get_root().id())
.await;
room.resolve_state(&mut db).await;
.create(&secret)?;
let room = loop {
room = match room {
dag_resolve::room::RoomBuilderState::Write { resolver, root } => {
let id_bytes = root.id().to_string().as_bytes().to_vec();
db.table(TABLE_EVENTS)
.put(&id_bytes, &serde_json::to_vec(&root).unwrap())
.await;
db.table(TABLE_HEADS).put(&id_bytes, &[]).await;
RoomOld::new(dag_resolve::room::RoomBuilderInput::Written { resolver, root })?
}
dag_resolve::room::RoomBuilderState::Done(room) => break room,
}
};
db.set_config(&actor, &secret, room.get_root().id()).await;
resolve_state(&room, &db).await?;
Ok(State { room, secret, db })
}
}
async fn resolve_state<R: Resolver>(room: &RoomOld<R>, db: &Database) -> Result<()> {
let mut resolve = room.resolve_state(ResolveInput::Init)?;
loop {
resolve = match resolve {
dag_resolve::room::ResolveState::GetAllAndReset => {
db.reset().await;
let events = db
.table(TABLE_EVENTS)
.query(Selector::Any)
.get_all()
.await
.into_iter()
.map(|(_, val)| serde_json::from_slice(&val).unwrap())
.collect();
room.resolve_state(ResolveInput::AllEvents(events))?
}
dag_resolve::room::ResolveState::ProcessEvent(idx, events, (step, req)) => {
room.resolve_state(ResolveInput::ProcessEvent(idx, events, (step, exec(req, &db).await)))?
},
dag_resolve::room::ResolveState::Done => break,
}
}
Ok(())
}
async fn open(path: impl AsRef<Path>) -> Result<Opened> {
let mut db = Database::open(path.as_ref().to_str().unwrap())
let db = Database::open(path.as_ref().to_str().unwrap())
.await?
.init()
.await?;
@ -91,13 +126,12 @@ async fn open(path: impl AsRef<Path>) -> Result<Opened> {
EventContent::Create(c) => match c.resolver.as_str() {
"kv" => {
let room = RoomOld::from_root(KVResolver, serde_json::from_slice(&event_data)?)?;
room.resolve_state(&mut db).await;
resolve_state(&room, &db).await?;
Ok(Opened::Kv(State { room, secret, db }))
}
"forum-v0" => {
let room =
RoomOld::from_root(ForumResolver, serde_json::from_slice(&event_data)?)?;
room.resolve_state(&mut db).await;
let room = RoomOld::from_root(ForumResolver, serde_json::from_slice(&event_data)?)?;
resolve_state(&room, &db).await?;
Ok(Opened::Forum(State { room, secret, db }))
}
_ => unimplemented!("unknown resolver"),
@ -129,16 +163,80 @@ async fn startup(path: impl AsRef<Path>, create: Option<&str>) -> Result<Opened>
}
async fn send_event<R: Resolver + Debug>(state: &mut State<R>, data: &str) -> Result<()> {
state
.room
.create_event(dbg!(serde_json::from_str(data)?), &state.secret, &mut state.db)
.await?;
state.room.resolve_state(&mut state.db).await;
let mut create = state.room.create_event(
dbg!(serde_json::from_str(data)?),
&state.secret,
CreateInput::Init,
)?;
loop {
create = match create {
dag_resolve::room::CreateState::Heads => {
let ids = state
.db
.table(TABLE_HEADS)
.query(Selector::Any)
.get_all()
.await
.into_iter()
.map(|(id, _)| String::from_utf8(id).unwrap().parse().unwrap())
.collect();
state.room.create_event(
dbg!(serde_json::from_str(data)?),
&state.secret,
CreateInput::Heads(ids),
)?
}
dag_resolve::room::CreateState::Append(step) => {
let append = match step {
AppendState::Step1(step, event) => {
AppendInput::Step2(step, DatabaseResponse::Done, event)
},
AppendState::Step2(step, event, req) => {
match req {
DatabaseRequest::Done => {
AppendInput::Step3(event)
},
_ => {
let res = exec(req, &state.db).await;
AppendInput::Step2(step, res, event)
}
}
},
AppendState::Step3(event) => {
let id_bytes = event.id().to_bytes();
// if db.table(TABLE_EVENTS).get(&id_bytes).await.is_none() {
// // already exists
// }
state.db.table(TABLE_EVENTS)
.put(&id_bytes, &serde_json::to_vec(&event).unwrap())
.await;
for head in dbg!(event.references()) {
state.db.table(TABLE_HEADS)
.delete(&head.to_string().into_bytes())
.await;
}
state.db.table(TABLE_HEADS)
.put(&event.id().to_string().into_bytes(), &[])
.await;
AppendInput::Step3(event)
},
AppendState::Done(_) => break,
};
state.room.create_event(
serde_json::from_str(data)?,
&state.secret,
CreateInput::Append(append),
)?
}
dag_resolve::room::CreateState::Done(_) => break,
}
}
resolve_state(&state.room, &state.db).await?;
Ok(())
}
async fn print_info<R: Resolver + Debug>(state: &mut State<R>) -> Result<()> {
state.room.resolve_state(&mut state.db).await;
resolve_state(&state.room, &state.db).await?;
let db = &mut state.db;
let event_count = db
.table(TABLE_EVENTS)
@ -186,7 +284,16 @@ repo.room.resolve_state().await;
async fn sync_state<R: Resolver>(state: &mut State<R>, remote: &str) -> Result<()> {
let http = reqwest::Client::new();
let ids_all = state.room.all_events(&mut state.db).await.iter().rev().map(|ev| ev.id().to_owned()).collect();
let ids_all = state
.db
.table(TABLE_EVENTS)
.query(Selector::Any)
.get_all()
.await
.into_iter()
.rev()
.map(|(id, _)| String::from_utf8(id).unwrap().parse().unwrap())
.collect();
let has: HasResponse = http
.post(format!("{remote}/has"))
.json(&HasRequest {
@ -202,7 +309,16 @@ async fn sync_state<R: Resolver>(state: &mut State<R>, remote: &str) -> Result<(
.post(format!("{remote}/sync"))
.json(&SyncRequest::<R> {
root_id: state.room.get_root().id().to_owned(),
events: state.room.all_events(&mut state.db).await.into_iter().filter(|ev| missing_ids.contains(ev.id())).collect(),
events: state
.db
.table(TABLE_EVENTS)
.query(Selector::Any)
.get_all()
.await
.into_iter()
.map(|(_, event)| serde_json::from_slice(&event).unwrap())
.filter(|ev: &Event<R::EventType>| missing_ids.contains(ev.id()))
.collect(),
})
.send()
.await?

View file

@ -3,11 +3,12 @@ use std::{ops::DerefMut, sync::Arc};
use anyhow::Result;
use axum::extract::{Json, State};
use dag_resolve::{
event::EventId,
proto::table::{Database, Query, Selector, Table},
event::{Event, EventId},
proto::table::{DatabaseRequest, DatabaseResponse, Selector},
resolver::Resolver,
room::TABLE_EVENTS,
room::{AppendInput, AppendState, RoomOld, TABLE_EVENTS, TABLE_HEADS},
};
use dag_resolve_impls::stores::sqlite_async::Database;
use futures::StreamExt as _;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
@ -93,6 +94,78 @@ async fn route_has<R: Resolver>(
Json(HasResponse { disjoint })
}
#[async_recursion::async_recursion]
pub async fn exec(req: DatabaseRequest, db: &Database) -> DatabaseResponse {
match req {
DatabaseRequest::Done => DatabaseResponse::Done,
DatabaseRequest::Reset => {
db.reset().await;
DatabaseResponse::Done
},
DatabaseRequest::QuerySingle(_, _) => todo!(),
DatabaseRequest::QueryAll(_, _) => todo!(),
DatabaseRequest::QueryIter(_, _) => todo!(),
DatabaseRequest::QueryCount(_, _) => todo!(),
DatabaseRequest::QueryIterReverse(_, _) => todo!(),
DatabaseRequest::Get(table, key) => {
let kv = db.table(table).get(&key).await;
DatabaseResponse::Single(kv)
},
DatabaseRequest::Put(table, key, value) => {
db.table(table).put(&key, &value).await;
DatabaseResponse::Done
},
DatabaseRequest::Delete(table, key) => {
db.table(table).delete(&key).await;
DatabaseResponse::Done
},
DatabaseRequest::WithData(data, req) => {
DatabaseResponse::WithData(data, Box::new(exec(*req, db).await))
},
}
}
async fn append<R>(room: &mut RoomOld<R>, db: &Database, event: Event<R::EventType>) where R: Resolver {
let mut step = room.append_event(AppendInput::Step1(event)).unwrap();
loop {
step = match step {
AppendState::Step1(step, event) => {
room.append_event(AppendInput::Step2(step, DatabaseResponse::Done, event)).unwrap()
},
AppendState::Step2(step, event, req) => {
match req {
DatabaseRequest::Done => {
let res = exec(req, &db).await;
room.append_event(AppendInput::Step2(step, res, event)).unwrap()
},
_ => {
room.append_event(AppendInput::Step3(event)).unwrap()
}
}
},
AppendState::Step3(event) => {
let id_bytes = event.id().to_bytes();
// if db.table(TABLE_EVENTS).get(&id_bytes).await.is_none() {
// // already exists
// }
db.table(TABLE_EVENTS)
.put(&id_bytes, &serde_json::to_vec(&event).unwrap())
.await;
for head in dbg!(event.references()) {
db.table(TABLE_HEADS)
.delete(&head.to_string().into_bytes())
.await;
}
db.table(TABLE_HEADS)
.put(&event.id().to_string().into_bytes(), &[])
.await;
room.append_event(AppendInput::Step3(event)).unwrap()
},
AppendState::Done(_) => break,
}
}
}
async fn route_sync<R: Resolver>(
State(state): State<ServerState<R>>,
Json(req): Json<SyncRequest<R>>,
@ -105,8 +178,9 @@ async fn route_sync<R: Resolver>(
}
let repo = repo.deref_mut();
let db = &mut repo.db;
for event in req.events {
repo.room.append_event(event, &mut repo.db).await.unwrap();
append(&mut repo.room, db, event).await;
}
// this doesn't work and i don't know why

View file

@ -7,11 +7,8 @@ use serde::{Deserialize, Serialize};
use dag_resolve::{
actor::ActorId,
event::{Event, EventContent, EventId},
proto::{
data::Text,
table::{Database, Table as _},
},
resolver::{Command, Resolver, Verification},
proto::{data::Text, table::{DatabaseRequest, DatabaseResponse}},
resolver::{Resolver, Verification},
};
#[derive(Debug, Default, Clone, Copy)]
@ -150,46 +147,66 @@ impl ForumRoomAcl {
impl Resolver for ForumResolver {
type EventType = ForumEventContent;
async fn resolve<S: Database>(
fn resolve(
&self,
_state: &S,
state: (u64, DatabaseResponse),
event: &Event<Self::EventType>,
) -> Vec<Command> {
let row = match event.content() {
EventContent::Create(_) => Command::put(
"members",
event.author().to_string().as_bytes(),
ForumMemberAcl::Operator.as_str().as_bytes(),
),
EventContent::Custom(ForumEventContent::Post { .. }) => {
Command::put("posts", event.id().to_string().as_bytes(), b"")
) -> Result<(), DatabaseRequest> {
match state {
(0, DatabaseResponse::Done) => {
match event.content() {
EventContent::Create(_) => {
Err(DatabaseRequest::Put("members".into(), event.author().to_string().as_bytes().into(), ForumMemberAcl::Operator.as_str().as_bytes().into()))
},
EventContent::Custom(ForumEventContent::Post { .. }) => {
Err(DatabaseRequest::Put("posts".into(), event.id().to_string().as_bytes().into(), vec![]))
}
EventContent::Custom(ForumEventContent::Reply {
post, reference, ..
}) => {
let value: Vec<_> = vec![]
.into_iter()
.copied()
.chain(post.to_string().as_bytes().to_vec())
.chain([0xff])
.chain(reference.to_string().as_bytes().to_vec())
.collect();
Err(DatabaseRequest::Put("replies".into(), event.id().to_string().as_bytes().into(), value))
}
EventContent::Custom(ForumEventContent::Config { name, .. }) => {
Err(DatabaseRequest::Put("meta".into(), b"name".into(), serde_json::to_vec(name).unwrap()))
}
EventContent::Custom(ForumEventContent::Member { id, acl }) => {
Err(DatabaseRequest::Put("members".into(), id.to_string().as_bytes().into(), acl.as_str().as_bytes().into()))
},
}
}
EventContent::Custom(ForumEventContent::Reply {
post, reference, ..
}) => {
let value: Vec<_> = vec![]
.into_iter()
.copied()
.chain(post.to_string().as_bytes().to_vec())
.chain([0xff])
.chain(reference.to_string().as_bytes().to_vec())
.collect();
Command::put("replies", event.id().to_string().as_bytes(), value)
(1, DatabaseResponse::Done) => {
match event.content() {
EventContent::Custom(ForumEventContent::Config { topic, .. }) => {
Err(DatabaseRequest::Put("meta".into(), b"topic".into(), serde_json::to_vec(topic).unwrap()))
}
_ => Ok(())
}
}
EventContent::Custom(ForumEventContent::Config { name, topic, acl }) => {
return vec![
Command::put("meta", b"name", serde_json::to_vec(name).unwrap()),
Command::put("meta", b"topic", serde_json::to_vec(topic).unwrap()),
Command::put("meta", b"acl", acl.as_str().as_bytes()),
]
(2, DatabaseResponse::Done) => {
match event.content() {
EventContent::Custom(ForumEventContent::Config { acl, .. }) => {
Err(DatabaseRequest::Put("meta".into(), b"acl".into(), acl.as_str().as_bytes().into()))
}
_ => panic!()
}
}
EventContent::Custom(ForumEventContent::Member { id, acl }) => Command::put(
"members",
id.to_string().as_bytes(),
acl.as_str().as_bytes(),
),
};
vec![row]
(3, DatabaseResponse::Done) => {
match event.content() {
EventContent::Custom(ForumEventContent::Config { .. }) => {
Ok(())
}
_ => panic!()
}
}
_ => todo!()
}
}
fn tiebreak(&self, _a: &Event<Self::EventType>, _b: &Event<Self::EventType>) -> Ordering {
@ -200,25 +217,34 @@ impl Resolver for ForumResolver {
"forum-v0"
}
async fn verify<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> Verification {
let member_entry = state
.table("members")
.get(event.author().to_string().as_bytes())
.await
.and_then(|b| String::from_utf8(b).ok())
.map(|s| ForumMemberAcl::from_str(&s))
.unwrap_or_default();
let room_entry = state
.table("meta")
.get(b"acl")
.await
.and_then(|d| String::from_utf8(d).ok())
.map(|s| ForumRoomAcl::from_str(&s))
.unwrap_or_default();
match event.content() {
EventContent::Create(_) => panic!("create shouldn't be handled by this"),
EventContent::Custom(c) => can_send_event(room_entry, member_entry, c),
fn verify(&self, state: (u64, DatabaseResponse), event: &Event<Self::EventType>) -> Result<Verification, DatabaseRequest> {
match state {
(0, DatabaseResponse::Done) => {
Err(DatabaseRequest::Get("members".into(), event.author().to_string().as_bytes().into()))
}
(1, DatabaseResponse::Single(Some(acl))) => {
Err(DatabaseRequest::WithData(acl.1, Box::new(DatabaseRequest::Get("meta".into(), b"acl".into()))))
}
(2, DatabaseResponse::WithData(member_acl, d)) => match *d {
DatabaseResponse::Single(Some((_, room_acl))) => {
let room_acl = String::from_utf8(room_acl).map(|s| ForumRoomAcl::from_str(&s)).unwrap();
let member_acl = String::from_utf8(member_acl).map(|s| ForumMemberAcl::from_str(&s)).unwrap();
match event.content() {
EventContent::Create(_) => panic!("create shouldn't be handled by this"),
EventContent::Custom(c) => Ok(can_send_event(room_acl, member_acl, c)),
}
}
DatabaseResponse::Single(None) => {
let room_acl = ForumRoomAcl::Voice;
let member_acl = String::from_utf8(member_acl).map(|s| ForumMemberAcl::from_str(&s)).unwrap();
match event.content() {
EventContent::Create(_) => panic!("create shouldn't be handled by this"),
EventContent::Custom(c) => Ok(can_send_event(room_acl, member_acl, c)),
}
}
a => panic!("{:?}", a)
}
_ => panic!()
}
}

View file

@ -3,9 +3,7 @@
use serde::{Deserialize, Serialize};
use dag_resolve::{
event::{Event, EventContent},
proto::table::{Database, Table as _},
resolver::{Command, Resolver, Verification},
event::{Event, EventContent}, proto::table::{DatabaseRequest, DatabaseResponse}, resolver::{Resolver, Verification}
};
use std::cmp::Ordering;
@ -29,22 +27,28 @@ impl KVResolver {
impl Resolver for KVResolver {
type EventType = KVEventContent;
async fn resolve<D: Database>(
fn resolve(
&self,
_state: &D,
state: (u64, DatabaseResponse),
event: &Event<KVEventContent>,
) -> Vec<Command> {
match &event.content() {
EventContent::Create(_) => {
vec![Command::Put {
table: "meta".into(),
key: b"owner".into(),
value: event.author().to_string().as_bytes().to_vec(),
}]
}
EventContent::Custom(KVEventContent::Set(k, v)) => {
vec![Command::put("kv", k.as_bytes(), v.as_bytes())]
}
) -> Result<(), DatabaseRequest> {
match state {
(0, DatabaseResponse::Done) => {
match &event.content() {
EventContent::Create(_) => {
Err(DatabaseRequest::Put(
"meta".into(),
b"owner".into(),
event.author().to_string().as_bytes().to_vec(),
))
}
EventContent::Custom(KVEventContent::Set(k, v)) => {
Err(DatabaseRequest::Put("kv".into(), k.as_bytes().into(), v.as_bytes().into()))
}
}
},
(1, DatabaseResponse::Done) => Ok(()),
_ => panic!(),
}
}
@ -56,12 +60,19 @@ impl Resolver for KVResolver {
"kv"
}
async fn verify<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> Verification {
if state.table("meta").get(b"owner").await.unwrap() == event.author().to_string().as_bytes()
{
Verification::Valid
} else {
Verification::Invalid
fn verify(&self, state: (u64, DatabaseResponse), event: &Event<Self::EventType>) -> Result<Verification, DatabaseRequest> {
match state {
(0, DatabaseResponse::Done) => {
Err(DatabaseRequest::Get("meta".to_string(), b"owner".to_vec()))
}
(1, DatabaseResponse::Single(Some((_, owner)))) => {
if owner == event.author().to_string().as_bytes() {
Ok(Verification::Valid)
} else {
Ok(Verification::Invalid)
}
}
_ => panic!()
}
}

View file

@ -1,7 +1,7 @@
//! Store state in a sqlite database
use dag_resolve::{
actor::{ActorId, ActorSecret}, event::EventId, proto::table::{self, Database as _}, room::{TABLE_EVENTS, TABLE_HEADS}
actor::{ActorId, ActorSecret}, event::EventId, proto::table, room::{TABLE_EVENTS, TABLE_HEADS}
};
use futures::{stream, Stream};
use sqlx::{query, sqlite::SqliteConnectOptions, SqlitePool};
@ -101,10 +101,8 @@ pub struct Table {
name: String,
}
impl table::Database for Database {
type Table = Table;
async fn reset(&self) {
impl Database {
pub async fn reset(&self) {
query("
CREATE TABLE IF NOT EXISTS data (tb TEXT, key BLOB, value BLOB NOT NULL, PRIMARY KEY(tb, key));
CREATE TABLE IF NOT EXISTS config (key TEXT PRIMARY KEY, value TEXT);
@ -119,7 +117,7 @@ impl table::Database for Database {
.unwrap();
}
fn table(&self, name: impl Into<String>) -> Self::Table {
pub fn table(&self, name: impl Into<String>) -> Table {
Table {
connection: self.connection.clone(),
name: name.into(),
@ -127,10 +125,8 @@ impl table::Database for Database {
}
}
impl table::Table for Table {
type Query<'a> = Query<'a>;
fn query(&self, selector: table::Selector) -> Self::Query<'_> {
impl Table {
pub fn query(&self, selector: table::Selector) -> Query<'_> {
Query {
selector,
table: &self,
@ -138,7 +134,7 @@ impl table::Table for Table {
}
}
fn query_reverse(&self, selector: table::Selector) -> Self::Query<'_> {
pub fn query_reverse(&self, selector: table::Selector) -> Query<'_> {
Query {
selector,
table: &self,
@ -146,23 +142,23 @@ impl table::Table for Table {
}
}
async fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
pub async fn get(&self, key: &[u8]) -> Option<(Vec<u8>, Vec<u8>)> {
let keyvec = key.to_vec();
let row = query!(
"SELECT value FROM data WHERE tb = ? AND key = ?",
"SELECT key, value FROM data WHERE tb = ? AND key = ?",
self.name,
keyvec
)
.fetch_one(&self.connection)
.await;
match row {
Ok(row) => Some(row.value),
Ok(row) => Some((row.key, row.value)),
Err(sqlx::Error::RowNotFound) => None,
Err(err) => panic!("{:?}", err),
}
}
async fn put(&self, key: &[u8], value: &[u8]) {
pub async fn put(&self, key: &[u8], value: &[u8]) {
query!(
"INSERT OR REPLACE INTO data (tb, key, value) VALUES (?, ?, ?)",
self.name,
@ -174,7 +170,7 @@ impl table::Table for Table {
.unwrap();
}
async fn delete(&self, key: &[u8]) {
pub async fn delete(&self, key: &[u8]) {
query!("DELETE FROM data WHERE tb = ? AND key = ?", self.name, key,)
.execute(&self.connection)
.await
@ -182,8 +178,8 @@ impl table::Table for Table {
}
}
impl<'a> table::Query<'a> for Query<'a> {
async fn get_single(self) -> Option<Vec<u8>> {
impl<'a> Query<'a> {
pub async fn get_single(self) -> Option<Vec<u8>> {
match self.selector {
table::Selector::Exact(e) => query!(
"SELECT value FROM data WHERE tb = ? AND key = ?",
@ -212,7 +208,7 @@ impl<'a> table::Query<'a> for Query<'a> {
}
}
async fn get_all(self) -> Vec<(Vec<u8>, Vec<u8>)> {
pub async fn get_all(self) -> Vec<(Vec<u8>, Vec<u8>)> {
match self.selector {
table::Selector::Exact(_) => todo!(),
table::Selector::Prefix(p) => {
@ -246,7 +242,7 @@ impl<'a> table::Query<'a> for Query<'a> {
}
}
fn get_iter(self) -> impl Stream<Item = (Vec<u8>, Vec<u8>)> + Send {
pub fn get_iter(self) -> impl Stream<Item = (Vec<u8>, Vec<u8>)> + Send {
// this is incredibly inefficient, but i don't care
// rust async is extremely painful and i won't bother making things nice until the situation improves
let items = std::thread::scope(|scope| {
@ -304,7 +300,7 @@ impl<'a> table::Query<'a> for Query<'a> {
stream::iter(items)
}
async fn count(self) -> u64 {
pub async fn count(self) -> u64 {
self.get_all().await.len() as u64
}
}

View file

@ -1,31 +1,32 @@
use serde::{Deserialize, Serialize};
// use super::table::{State, StateConfig, TableRow};
use super::table::Database;
use crate::event::{Event, EventId};
use std::{cmp::Ordering, collections::HashSet, fmt::Debug, future::Future};
use std::{cmp::Ordering, collections::HashSet, fmt::Debug};
use super::table::{DatabaseResponse, DatabaseRequest};
/// small shards of code designed to resolve state
pub trait Resolver: Send + Sync + Clone {
type EventType: Clone + Debug + Serialize + for<'a> Deserialize<'a> + Send + Sync;
/// Given a set of ordered events, resolve the final state
fn resolve<D: Database>(
fn resolve(
&self,
state: &D,
state: (u64, DatabaseResponse),
event: &Event<Self::EventType>,
) -> impl Future<Output = Vec<Command>> + Send;
) -> Result<(), DatabaseRequest>;
/// Given two events, decide which one comes first
/// if Ordering::Equal is returned, the timestamp then event id is used
fn tiebreak(&self, a: &Event<Self::EventType>, b: &Event<Self::EventType>) -> Ordering;
/// Verify if an event can be sent or not
fn verify<D: Database>(
fn verify(
&self,
state: &D,
state: (u64, DatabaseResponse),
event: &Event<Self::EventType>,
) -> impl Future<Output = Verification> + Send;
) -> Result<Verification, DatabaseRequest>;
/// TEMP: Get the name/id of this resolver
fn name(&self) -> &str;

View file

@ -1,16 +1,13 @@
use proto::table::Selector;
use crate::{
actor::ActorSecret,
event::EventId,
event::{CreateContent, Event, EventContent, HashType, SignatureType},
proto,
resolver::{sort, Command, Resolver, Verification},
resolver::{sort, Resolver, Verification},
Error, Result,
};
use std::fmt::Debug;
use super::table::{Database, Query, Table};
use super::table::{DatabaseResponse, DatabaseRequest};
pub const TABLE_EVENTS: &str = "_events";
pub const TABLE_HEADS: &str = "_heads";
@ -21,41 +18,109 @@ pub struct RoomOld<R: Resolver> {
resolver: R,
}
pub enum CreateState<R: Resolver> {
Heads,
Append(AppendState<R>),
Done(Event<R::EventType>),
}
pub enum CreateInput<R: Resolver> {
Init,
Append(AppendInput<R>),
Heads(Vec<EventId>),
}
pub enum AppendState<R: Resolver> {
Step1(u64, Event<R::EventType>),
Step2(u64, Event<R::EventType>, DatabaseRequest),
/// if db.table(TABLE_EVENTS).get(id_bytes).await.is_some() {
/// return Err(Error::AlreadyExists);
/// }
/// db.table(TABLE_EVENTS)
/// .put(id_bytes, &serde_json::to_vec(&event).unwrap())
/// .await;
/// for head in dbg!(event.references()) {
/// db.table(TABLE_HEADS)
/// .delete(&head.to_string().into_bytes())
/// .await;
/// }
/// db.table(TABLE_HEADS)
/// .put(&event.id().to_string().into_bytes(), &[])
/// .await;
Step3(Event<R::EventType>),
Done(Event<R::EventType>),
}
pub enum AppendInput<R: Resolver> {
Step1(Event<R::EventType>),
Step2(u64, DatabaseResponse, Event<R::EventType>),
Step3(Event<R::EventType>),
}
pub enum ResolveInput<R: Resolver> {
Init,
AllEvents(Vec<Event<R::EventType>>),
ProcessEvent(usize, Vec<Event<R::EventType>>, (u64, DatabaseResponse)),
}
pub enum ResolveState<R: Resolver> {
/// reset the state and get all events
GetAllAndReset,
ProcessEvent(usize, Vec<Event<R::EventType>>, (u64, DatabaseRequest)),
Done,
}
pub enum RoomBuilderInput<R: Resolver> {
Init {
resolver_name: String,
resolver: R,
hasher: HashType,
signer: SignatureType,
secret: ActorSecret,
},
Written {
resolver: R,
root: Event<R::EventType>,
},
}
pub enum RoomBuilderState<R: Resolver> {
Write {
resolver: R,
root: Event<R::EventType>,
},
Done(RoomOld<R>),
}
impl<R: Resolver> RoomOld<R> {
pub fn builder<D: Database>() -> RoomBuilder<'static, R, D> {
pub fn builder() -> RoomBuilder<R> {
RoomBuilder::new()
}
pub async fn new<D: Database>(
resolver_name: impl Into<String>,
resolver: R,
database: &mut D,
hasher: HashType,
signer: SignatureType,
secret: &ActorSecret,
) -> Result<Self> {
println!("create new room");
let base_event = Event::builder(
EventContent::Create(CreateContent {
resolver: resolver_name.into(),
hasher,
signer,
}),
secret,
)
.then_hash()?
.and_sign()?;
let id_bytes = base_event.id().to_string();
let id_bytes = id_bytes.as_bytes();
database
.table(TABLE_EVENTS)
.put(id_bytes, &serde_json::to_vec(&base_event).unwrap())
.await;
database.table(TABLE_HEADS).put(id_bytes, &[]).await;
Ok(Self {
root: base_event,
resolver,
})
pub fn new(input: RoomBuilderInput<R>) -> Result<RoomBuilderState<R>> {
match input {
RoomBuilderInput::Init { resolver_name, resolver, hasher, signer, secret } => {
println!("create new room");
let base_event = Event::builder(
EventContent::Create(CreateContent {
resolver: resolver_name.into(),
hasher,
signer,
}),
&secret,
)
.then_hash()?
.and_sign()?;
Ok(RoomBuilderState::Write { resolver, root: base_event } )
},
RoomBuilderInput::Written { resolver, root } => {
Ok(RoomBuilderState::Done(Self {
root,
resolver,
}))
},
}
}
pub fn from_root(resolver: R, event: Event<R::EventType>) -> Result<Self> {
@ -73,121 +138,96 @@ impl<R: Resolver> RoomOld<R> {
&self.resolver
}
pub async fn resolve_state<D: Database>(&self, db: &mut D) {
let resolver = self.get_resolver();
// ideally i don't get *all* events up front, and query the db as needed
let events = self.all_events(db).await;
let sorted = sort(|a, b| resolver.tiebreak(a, b), &events);
db.reset().await;
for event in sorted {
let effects = resolver.resolve(db, event).await;
for effect in effects {
match effect {
Command::Put { table, key, value } => {
db.table(table).put(&key, &value).await
}
Command::Delete { table, key } => db.table(table).delete(&key).await,
pub fn resolve_state(&self, input: ResolveInput<R>) -> Result<ResolveState<R>> {
match input {
ResolveInput::Init => Ok(ResolveState::GetAllAndReset),
ResolveInput::AllEvents(events) => {
// ideally i don't get *all* events up front, and query the db as needed
let resolver = self.get_resolver();
let sorted = sort(|a, b| resolver.tiebreak(a, b), &events).into_iter().cloned().collect();
Ok(ResolveState::ProcessEvent(0, sorted, (0, DatabaseRequest::Done)))
},
ResolveInput::ProcessEvent(idx, events, state) => {
let resolver = self.get_resolver();
let step = state.0;
match resolver.resolve(state, &events[idx]) {
Ok(()) => {
if idx + 1 < events.len() {
Ok(ResolveState::ProcessEvent(idx + 1, events, (0, DatabaseRequest::Done)))
} else {
Ok(ResolveState::Done)
}
},
Err(req) => Ok(ResolveState::ProcessEvent(idx, events, (step + 1, req)))
}
},
}
}
pub fn create_event(
&mut self,
event_content: EventContent<R::EventType>,
secret: &ActorSecret,
input: CreateInput<R>,
) -> Result<CreateState<R>> {
match input {
CreateInput::Init => Ok(CreateState::Heads),
CreateInput::Heads(heads) => {
let event = Event::builder(event_content, secret)
.with_references(heads)
.then_hash()?
.and_sign()?;
Ok(CreateState::Append(self.append_event(AppendInput::Step1(event))?))
},
CreateInput::Append(input) => {
match self.append_event(input)? {
AppendState::Done(event) => Ok(CreateState::Done(event)),
other => Ok(CreateState::Append(other)),
}
}
}
}
pub async fn create_event<D: Database>(
&mut self,
event_content: EventContent<R::EventType>,
secret: &ActorSecret,
db: &mut D,
) -> Result<Event<R::EventType>> {
let heads = self.get_heads(db).await;
let event = Event::builder(event_content, secret)
.with_references(heads)
.then_hash()?
.and_sign()?;
self.append_event(event, db).await
}
pub async fn append_event<D: Database>(
&mut self,
event: Event<R::EventType>,
db: &mut D,
) -> Result<Event<R::EventType>> {
event.verify_room(self).expect("event failed verification");
match self.get_resolver().verify(db, &event).await {
Verification::Valid => {}
Verification::Unauthorized => panic!("unauthorized"),
Verification::Invalid => panic!("invalid data"),
pub fn append_event(&mut self, input: AppendInput<R>) -> Result<AppendState<R>> {
match input {
AppendInput::Step1(event) => {
event.verify_room(self).expect("event failed verification");
Ok(AppendState::Step1(0, event))
}
AppendInput::Step2(step, resp, event) => {
match self.get_resolver().verify((step, resp), &event) {
Ok(verification) => match verification {
Verification::Valid => Ok(AppendState::Step3(event)),
Verification::Unauthorized => panic!("unauthorized"),
Verification::Invalid => panic!("invalid data"),
},
Err(req) => Ok(AppendState::Step2(step + 1, event, req)),
}
}
AppendInput::Step3(event) => {
Ok(AppendState::Done(event))
},
}
let id_bytes = event.id().to_string();
let id_bytes = id_bytes.as_bytes();
if db
.table(TABLE_EVENTS)
.get(id_bytes)
.await
.is_some()
{
return Err(Error::AlreadyExists);
}
db
.table(TABLE_EVENTS)
.put(id_bytes, &serde_json::to_vec(&event).unwrap())
.await;
for head in dbg!(event.references()) {
db
.table(TABLE_HEADS)
.delete(&head.to_string().into_bytes())
.await;
}
db
.table(TABLE_HEADS)
.put(&event.id().to_string().into_bytes(), &[])
.await;
Ok(event)
}
pub async fn get_heads<D: Database>(&self, db: &mut D) -> Vec<EventId> {
db
.table(TABLE_HEADS)
.query(Selector::Prefix(vec![]))
.get_all()
.await
.into_iter()
.map(|(key, _)| String::from_utf8(key).unwrap().parse().unwrap())
.collect()
}
// TEMP: get all events
pub async fn all_events<D: Database>(&self, db: &mut D) -> Vec<Event<R::EventType>> {
db
.table(TABLE_EVENTS)
.query(Selector::Any)
.get_all()
.await
.iter()
.map(|(_, bytes)| serde_json::from_slice(bytes).expect("invalid data"))
.collect()
}
}
pub struct RoomBuilder<'a, R: Resolver, D: Database> {
pub struct RoomBuilder<R: Resolver> {
resolver: Option<R>,
hasher: Option<HashType>,
signer: Option<SignatureType>,
database: Option<&'a mut D>,
}
impl<'a, R: Resolver, D: Database> Default for RoomBuilder<'a, R, D> {
impl<R: Resolver> Default for RoomBuilder<R> {
fn default() -> Self {
Self {
resolver: None,
hasher: None,
signer: None,
database: None,
}
}
}
impl<'a, R: Resolver, D: Database> RoomBuilder<'a, R, D> {
impl<R: Resolver> RoomBuilder<R> {
pub fn new() -> Self {
Self::default()
}
@ -207,24 +247,17 @@ impl<'a, R: Resolver, D: Database> RoomBuilder<'a, R, D> {
self
}
pub fn with_database(mut self, database: &'a mut D) -> Self {
self.database = Some(database);
self
}
pub async fn create(self, secret: &ActorSecret) -> Result<RoomOld<R>> {
RoomOld::new(
self.resolver
pub fn create(self, secret: &ActorSecret) -> Result<RoomBuilderState<R>> {
RoomOld::new(RoomBuilderInput::Init {
resolver_name: self.resolver
.as_ref()
.ok_or(Error::MissingBuilderData)?
.name()
.to_owned(),
self.resolver.ok_or(Error::MissingBuilderData)?,
self.database.ok_or(Error::MissingBuilderData)?,
self.hasher.ok_or(Error::MissingBuilderData)?,
self.signer.ok_or(Error::MissingBuilderData)?,
secret,
)
.await
resolver: self.resolver.ok_or(Error::MissingBuilderData)?,
hasher: self.hasher.ok_or(Error::MissingBuilderData)?,
signer: self.signer.ok_or(Error::MissingBuilderData)?,
secret: secret.clone(),
})
}
}

View file

@ -1,10 +1,8 @@
use std::future::Future;
use futures::Stream;
type Bytes = Vec<u8>;
#[derive(Debug)]
pub enum DatabaseRequest {
Done,
Reset,
QuerySingle(String, Selector),
QueryAll(String, Selector),
@ -14,56 +12,26 @@ pub enum DatabaseRequest {
Get(String, Bytes),
Put(String, Bytes, Bytes),
Delete(String, Bytes),
WithData(Vec<u8>, Box<DatabaseRequest>),
}
pub enum DatabaseReponse {
#[derive(Debug)]
pub enum DatabaseResponse {
Done,
Single(Option<(Bytes, Bytes)>),
Many(Vec<(Bytes, Bytes)>),
// Iter(Vec<(Bytes, Bytes)>),
Count(u64),
WithData(Vec<u8>, Box<DatabaseResponse>),
}
pub trait Database: Send + Sync {
type Table: Table;
fn table(&self, name: impl Into<String>) -> Self::Table;
#[must_use("future must be polled")]
fn reset(&self) -> impl Future<Output = ()> + Send;
}
pub trait Table: Send {
// "Table must live as long as any returned Query" (so the returned query lasts as long as the table)
type Query<'a>: Query<'a> where Self: 'a;
fn query(&self, selector: Selector) -> Self::Query<'_>;
fn query_reverse(&self, selector: Selector) -> Self::Query<'_>;
#[must_use = "future must be polled"]
fn get(&self, key: &[u8]) -> impl Future<Output = Option<Bytes>> + Send;
#[must_use = "future must be polled"]
fn put(&self, key: &[u8], value: &[u8]) -> impl Future<Output = ()> + Send;
#[must_use = "future must be polled"]
fn delete(&self, key: &[u8]) -> impl Future<Output = ()> + Send;
}
#[derive(Debug)]
pub enum Selector {
Exact(Bytes),
Prefix(Bytes),
Any,
}
#[must_use = "you have a query that doesn't do anything"]
pub trait Query<'a>: Send {
fn get_single(self) -> impl Future<Output = Option<Bytes>> + Send;
fn get_all(self) -> impl Future<Output = Vec<(Bytes, Bytes)>> + Send;
fn get_iter(self) -> impl Stream<Item = (Bytes, Bytes)> + Send + 'a;
fn count(self) -> impl Future<Output = u64> + Send;
}
impl From<Vec<u8>> for Selector {
fn from(value: Vec<u8>) -> Self {
Selector::Exact(value)