bootleg coroutines

and terrible code all around. i hate this so much.
This commit is contained in:
tezlm 2024-03-01 22:46:29 -08:00
parent 0cad159a3d
commit 2be6299ab4
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
3 changed files with 192 additions and 190 deletions

View file

@ -1,31 +1,32 @@
use serde::{Deserialize, Serialize};
// use super::table::{State, StateConfig, TableRow};
use super::table::Database;
use crate::event::{Event, EventId};
use std::{cmp::Ordering, collections::HashSet, fmt::Debug, future::Future};
use std::{cmp::Ordering, collections::HashSet, fmt::Debug};
use super::table::{DatabaseResponse, DatabaseRequest};
/// small shards of code designed to resolve state
pub trait Resolver: Send + Sync + Clone {
type EventType: Clone + Debug + Serialize + for<'a> Deserialize<'a> + Send + Sync;
/// Given a set of ordered events, resolve the final state
fn resolve<D: Database>(
fn resolve(
&self,
state: &D,
state: (u64, DatabaseResponse),
event: &Event<Self::EventType>,
) -> impl Future<Output = Vec<Command>> + Send;
) -> Result<(), DatabaseRequest>;
/// Given two events, decide which one comes first
/// if Ordering::Equal is returned, the timestamp then event id is used
fn tiebreak(&self, a: &Event<Self::EventType>, b: &Event<Self::EventType>) -> Ordering;
/// Verify if an event can be sent or not
fn verify<D: Database>(
fn verify(
&self,
state: &D,
state: (u64, DatabaseResponse),
event: &Event<Self::EventType>,
) -> impl Future<Output = Verification> + Send;
) -> Result<Verification, DatabaseRequest>;
/// TEMP: Get the name/id of this resolver
fn name(&self) -> &str;

View file

@ -1,16 +1,13 @@
use proto::table::Selector;
use crate::{
actor::ActorSecret,
event::EventId,
event::{CreateContent, Event, EventContent, HashType, SignatureType},
proto,
resolver::{sort, Command, Resolver, Verification},
resolver::{sort, Resolver, Verification},
Error, Result,
};
use std::fmt::Debug;
use super::table::{Database, Query, Table};
use super::table::{DatabaseResponse, DatabaseRequest};
pub const TABLE_EVENTS: &str = "_events";
pub const TABLE_HEADS: &str = "_heads";
@ -21,41 +18,109 @@ pub struct RoomOld<R: Resolver> {
resolver: R,
}
pub enum CreateState<R: Resolver> {
Heads,
Append(AppendState<R>),
Done(Event<R::EventType>),
}
pub enum CreateInput<R: Resolver> {
Init,
Append(AppendInput<R>),
Heads(Vec<EventId>),
}
pub enum AppendState<R: Resolver> {
Step1(u64, Event<R::EventType>),
Step2(u64, Event<R::EventType>, DatabaseRequest),
/// if db.table(TABLE_EVENTS).get(id_bytes).await.is_some() {
/// return Err(Error::AlreadyExists);
/// }
/// db.table(TABLE_EVENTS)
/// .put(id_bytes, &serde_json::to_vec(&event).unwrap())
/// .await;
/// for head in dbg!(event.references()) {
/// db.table(TABLE_HEADS)
/// .delete(&head.to_string().into_bytes())
/// .await;
/// }
/// db.table(TABLE_HEADS)
/// .put(&event.id().to_string().into_bytes(), &[])
/// .await;
Step3(Event<R::EventType>),
Done(Event<R::EventType>),
}
pub enum AppendInput<R: Resolver> {
Step1(Event<R::EventType>),
Step2(u64, DatabaseResponse, Event<R::EventType>),
Step3(Event<R::EventType>),
}
pub enum ResolveInput<R: Resolver> {
Init,
AllEvents(Vec<Event<R::EventType>>),
ProcessEvent(usize, Vec<Event<R::EventType>>, (u64, DatabaseResponse)),
}
pub enum ResolveState<R: Resolver> {
/// reset the state and get all events
GetAllAndReset,
ProcessEvent(usize, Vec<Event<R::EventType>>, (u64, DatabaseRequest)),
Done,
}
pub enum RoomBuilderInput<R: Resolver> {
Init {
resolver_name: String,
resolver: R,
hasher: HashType,
signer: SignatureType,
secret: ActorSecret,
},
Written {
resolver: R,
root: Event<R::EventType>,
},
}
pub enum RoomBuilderState<R: Resolver> {
Write {
resolver: R,
root: Event<R::EventType>,
},
Done(RoomOld<R>),
}
impl<R: Resolver> RoomOld<R> {
pub fn builder<D: Database>() -> RoomBuilder<'static, R, D> {
pub fn builder() -> RoomBuilder<R> {
RoomBuilder::new()
}
pub async fn new<D: Database>(
resolver_name: impl Into<String>,
resolver: R,
database: &mut D,
hasher: HashType,
signer: SignatureType,
secret: &ActorSecret,
) -> Result<Self> {
println!("create new room");
let base_event = Event::builder(
EventContent::Create(CreateContent {
resolver: resolver_name.into(),
hasher,
signer,
}),
secret,
)
.then_hash()?
.and_sign()?;
let id_bytes = base_event.id().to_string();
let id_bytes = id_bytes.as_bytes();
database
.table(TABLE_EVENTS)
.put(id_bytes, &serde_json::to_vec(&base_event).unwrap())
.await;
database.table(TABLE_HEADS).put(id_bytes, &[]).await;
Ok(Self {
root: base_event,
resolver,
})
pub fn new(input: RoomBuilderInput<R>) -> Result<RoomBuilderState<R>> {
match input {
RoomBuilderInput::Init { resolver_name, resolver, hasher, signer, secret } => {
println!("create new room");
let base_event = Event::builder(
EventContent::Create(CreateContent {
resolver: resolver_name.into(),
hasher,
signer,
}),
&secret,
)
.then_hash()?
.and_sign()?;
Ok(RoomBuilderState::Write { resolver, root: base_event } )
},
RoomBuilderInput::Written { resolver, root } => {
Ok(RoomBuilderState::Done(Self {
root,
resolver,
}))
},
}
}
pub fn from_root(resolver: R, event: Event<R::EventType>) -> Result<Self> {
@ -73,121 +138,96 @@ impl<R: Resolver> RoomOld<R> {
&self.resolver
}
pub async fn resolve_state<D: Database>(&self, db: &mut D) {
let resolver = self.get_resolver();
// ideally i don't get *all* events up front, and query the db as needed
let events = self.all_events(db).await;
let sorted = sort(|a, b| resolver.tiebreak(a, b), &events);
db.reset().await;
for event in sorted {
let effects = resolver.resolve(db, event).await;
for effect in effects {
match effect {
Command::Put { table, key, value } => {
db.table(table).put(&key, &value).await
}
Command::Delete { table, key } => db.table(table).delete(&key).await,
pub fn resolve_state(&self, input: ResolveInput<R>) -> Result<ResolveState<R>> {
match input {
ResolveInput::Init => Ok(ResolveState::GetAllAndReset),
ResolveInput::AllEvents(events) => {
// ideally i don't get *all* events up front, and query the db as needed
let resolver = self.get_resolver();
let sorted = sort(|a, b| resolver.tiebreak(a, b), &events).into_iter().cloned().collect();
Ok(ResolveState::ProcessEvent(0, sorted, (0, DatabaseRequest::Done)))
},
ResolveInput::ProcessEvent(idx, events, state) => {
let resolver = self.get_resolver();
let step = state.0;
match resolver.resolve(state, &events[idx]) {
Ok(()) => {
if idx + 1 < events.len() {
Ok(ResolveState::ProcessEvent(idx + 1, events, (0, DatabaseRequest::Done)))
} else {
Ok(ResolveState::Done)
}
},
Err(req) => Ok(ResolveState::ProcessEvent(idx, events, (step + 1, req)))
}
},
}
}
pub fn create_event(
&mut self,
event_content: EventContent<R::EventType>,
secret: &ActorSecret,
input: CreateInput<R>,
) -> Result<CreateState<R>> {
match input {
CreateInput::Init => Ok(CreateState::Heads),
CreateInput::Heads(heads) => {
let event = Event::builder(event_content, secret)
.with_references(heads)
.then_hash()?
.and_sign()?;
Ok(CreateState::Append(self.append_event(AppendInput::Step1(event))?))
},
CreateInput::Append(input) => {
match self.append_event(input)? {
AppendState::Done(event) => Ok(CreateState::Done(event)),
other => Ok(CreateState::Append(other)),
}
}
}
}
pub async fn create_event<D: Database>(
&mut self,
event_content: EventContent<R::EventType>,
secret: &ActorSecret,
db: &mut D,
) -> Result<Event<R::EventType>> {
let heads = self.get_heads(db).await;
let event = Event::builder(event_content, secret)
.with_references(heads)
.then_hash()?
.and_sign()?;
self.append_event(event, db).await
}
pub async fn append_event<D: Database>(
&mut self,
event: Event<R::EventType>,
db: &mut D,
) -> Result<Event<R::EventType>> {
event.verify_room(self).expect("event failed verification");
match self.get_resolver().verify(db, &event).await {
Verification::Valid => {}
Verification::Unauthorized => panic!("unauthorized"),
Verification::Invalid => panic!("invalid data"),
pub fn append_event(&mut self, input: AppendInput<R>) -> Result<AppendState<R>> {
match input {
AppendInput::Step1(event) => {
event.verify_room(self).expect("event failed verification");
Ok(AppendState::Step1(0, event))
}
AppendInput::Step2(step, resp, event) => {
match self.get_resolver().verify((step, resp), &event) {
Ok(verification) => match verification {
Verification::Valid => Ok(AppendState::Step3(event)),
Verification::Unauthorized => panic!("unauthorized"),
Verification::Invalid => panic!("invalid data"),
},
Err(req) => Ok(AppendState::Step2(step + 1, event, req)),
}
}
AppendInput::Step3(event) => {
Ok(AppendState::Done(event))
},
}
let id_bytes = event.id().to_string();
let id_bytes = id_bytes.as_bytes();
if db
.table(TABLE_EVENTS)
.get(id_bytes)
.await
.is_some()
{
return Err(Error::AlreadyExists);
}
db
.table(TABLE_EVENTS)
.put(id_bytes, &serde_json::to_vec(&event).unwrap())
.await;
for head in dbg!(event.references()) {
db
.table(TABLE_HEADS)
.delete(&head.to_string().into_bytes())
.await;
}
db
.table(TABLE_HEADS)
.put(&event.id().to_string().into_bytes(), &[])
.await;
Ok(event)
}
pub async fn get_heads<D: Database>(&self, db: &mut D) -> Vec<EventId> {
db
.table(TABLE_HEADS)
.query(Selector::Prefix(vec![]))
.get_all()
.await
.into_iter()
.map(|(key, _)| String::from_utf8(key).unwrap().parse().unwrap())
.collect()
}
// TEMP: get all events
pub async fn all_events<D: Database>(&self, db: &mut D) -> Vec<Event<R::EventType>> {
db
.table(TABLE_EVENTS)
.query(Selector::Any)
.get_all()
.await
.iter()
.map(|(_, bytes)| serde_json::from_slice(bytes).expect("invalid data"))
.collect()
}
}
pub struct RoomBuilder<'a, R: Resolver, D: Database> {
pub struct RoomBuilder<R: Resolver> {
resolver: Option<R>,
hasher: Option<HashType>,
signer: Option<SignatureType>,
database: Option<&'a mut D>,
}
impl<'a, R: Resolver, D: Database> Default for RoomBuilder<'a, R, D> {
impl<R: Resolver> Default for RoomBuilder<R> {
fn default() -> Self {
Self {
resolver: None,
hasher: None,
signer: None,
database: None,
}
}
}
impl<'a, R: Resolver, D: Database> RoomBuilder<'a, R, D> {
impl<R: Resolver> RoomBuilder<R> {
pub fn new() -> Self {
Self::default()
}
@ -207,24 +247,17 @@ impl<'a, R: Resolver, D: Database> RoomBuilder<'a, R, D> {
self
}
pub fn with_database(mut self, database: &'a mut D) -> Self {
self.database = Some(database);
self
}
pub async fn create(self, secret: &ActorSecret) -> Result<RoomOld<R>> {
RoomOld::new(
self.resolver
pub fn create(self, secret: &ActorSecret) -> Result<RoomBuilderState<R>> {
RoomOld::new(RoomBuilderInput::Init {
resolver_name: self.resolver
.as_ref()
.ok_or(Error::MissingBuilderData)?
.name()
.to_owned(),
self.resolver.ok_or(Error::MissingBuilderData)?,
self.database.ok_or(Error::MissingBuilderData)?,
self.hasher.ok_or(Error::MissingBuilderData)?,
self.signer.ok_or(Error::MissingBuilderData)?,
secret,
)
.await
resolver: self.resolver.ok_or(Error::MissingBuilderData)?,
hasher: self.hasher.ok_or(Error::MissingBuilderData)?,
signer: self.signer.ok_or(Error::MissingBuilderData)?,
secret: secret.clone(),
})
}
}

View file

@ -1,10 +1,8 @@
use std::future::Future;
use futures::Stream;
type Bytes = Vec<u8>;
#[derive(Debug)]
pub enum DatabaseRequest {
Done,
Reset,
QuerySingle(String, Selector),
QueryAll(String, Selector),
@ -14,56 +12,26 @@ pub enum DatabaseRequest {
Get(String, Bytes),
Put(String, Bytes, Bytes),
Delete(String, Bytes),
WithData(Vec<u8>, Box<DatabaseRequest>),
}
pub enum DatabaseReponse {
#[derive(Debug)]
pub enum DatabaseResponse {
Done,
Single(Option<(Bytes, Bytes)>),
Many(Vec<(Bytes, Bytes)>),
// Iter(Vec<(Bytes, Bytes)>),
Count(u64),
WithData(Vec<u8>, Box<DatabaseResponse>),
}
pub trait Database: Send + Sync {
type Table: Table;
fn table(&self, name: impl Into<String>) -> Self::Table;
#[must_use("future must be polled")]
fn reset(&self) -> impl Future<Output = ()> + Send;
}
pub trait Table: Send {
// "Table must live as long as any returned Query" (so the returned query lasts as long as the table)
type Query<'a>: Query<'a> where Self: 'a;
fn query(&self, selector: Selector) -> Self::Query<'_>;
fn query_reverse(&self, selector: Selector) -> Self::Query<'_>;
#[must_use = "future must be polled"]
fn get(&self, key: &[u8]) -> impl Future<Output = Option<Bytes>> + Send;
#[must_use = "future must be polled"]
fn put(&self, key: &[u8], value: &[u8]) -> impl Future<Output = ()> + Send;
#[must_use = "future must be polled"]
fn delete(&self, key: &[u8]) -> impl Future<Output = ()> + Send;
}
#[derive(Debug)]
pub enum Selector {
Exact(Bytes),
Prefix(Bytes),
Any,
}
#[must_use = "you have a query that doesn't do anything"]
pub trait Query<'a>: Send {
fn get_single(self) -> impl Future<Output = Option<Bytes>> + Send;
fn get_all(self) -> impl Future<Output = Vec<(Bytes, Bytes)>> + Send;
fn get_iter(self) -> impl Stream<Item = (Bytes, Bytes)> + Send + 'a;
fn count(self) -> impl Future<Output = u64> + Send;
}
impl From<Vec<u8>> for Selector {
fn from(value: Vec<u8>) -> Self {
Selector::Exact(value)