random stuff
This commit is contained in:
parent
bd9184d9cc
commit
3428100fe0
17 changed files with 950 additions and 75 deletions
48
Cargo.lock
generated
48
Cargo.lock
generated
|
@ -244,6 +244,12 @@ version = "1.6.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
|
||||
|
||||
[[package]]
|
||||
name = "beef"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3a8241f3ebb85c056b509d4327ad0358fbbba6ffb340bf388f26350aeda225b1"
|
||||
|
||||
[[package]]
|
||||
name = "bitflags"
|
||||
version = "1.3.2"
|
||||
|
@ -491,12 +497,14 @@ dependencies = [
|
|||
"serde_json",
|
||||
"sha2",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dag-resolve-impls"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-recursion",
|
||||
"dag-resolve",
|
||||
"futures",
|
||||
"rusqlite",
|
||||
|
@ -1145,6 +1153,39 @@ version = "0.4.20"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
|
||||
|
||||
[[package]]
|
||||
name = "logos"
|
||||
version = "0.14.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "161971eb88a0da7ae0c333e1063467c5b5727e7fb6b710b8db4814eade3a42e8"
|
||||
dependencies = [
|
||||
"logos-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "logos-codegen"
|
||||
version = "0.14.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8e31badd9de5131fdf4921f6473d457e3dd85b11b7f091ceb50e4df7c3eeb12a"
|
||||
dependencies = [
|
||||
"beef",
|
||||
"fnv",
|
||||
"lazy_static",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"regex-syntax",
|
||||
"syn 2.0.48",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "logos-derive"
|
||||
version = "0.14.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1c2a69b3eb68d5bd595107c9ee58d7e07fe2bb5e360cc85b0f084dedac80de0a"
|
||||
dependencies = [
|
||||
"logos-codegen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "matchit"
|
||||
version = "0.7.3"
|
||||
|
@ -1422,6 +1463,13 @@ dependencies = [
|
|||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "query"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"logos",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quote"
|
||||
version = "1.0.35"
|
||||
|
|
|
@ -4,6 +4,7 @@ members = [
|
|||
"crates/proto",
|
||||
"crates/impls",
|
||||
"crates/cli",
|
||||
"crates/query",
|
||||
]
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
|
|
@ -21,8 +21,6 @@ use dag_resolve_impls::{
|
|||
};
|
||||
use server::{HasRequest, HasResponse, ServerState, SyncRequest, SyncResponse};
|
||||
|
||||
use crate::server::exec;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[clap(version, about)]
|
||||
enum Cli {
|
||||
|
@ -101,7 +99,7 @@ async fn resolve_state<R: Resolver>(room: &RoomOld<R>, db: &Database) -> Result<
|
|||
room.resolve_state(ResolveInput::AllEvents(events))?
|
||||
}
|
||||
dag_resolve::room::ResolveState::ProcessEvent(idx, events, (step, req)) => {
|
||||
room.resolve_state(ResolveInput::ProcessEvent(idx, events, (step, exec(req, &db).await)))?
|
||||
room.resolve_state(ResolveInput::ProcessEvent(idx, events, (step, db.exec(req).await)))?
|
||||
},
|
||||
dag_resolve::room::ResolveState::Done => break,
|
||||
}
|
||||
|
@ -193,11 +191,11 @@ async fn send_event<R: Resolver + Debug>(state: &mut State<R>, data: &str) -> Re
|
|||
},
|
||||
AppendState::Step2(step, event, req) => {
|
||||
match req {
|
||||
DatabaseRequest::Done => {
|
||||
DatabaseRequest::Noop => {
|
||||
AppendInput::Step3(event)
|
||||
},
|
||||
_ => {
|
||||
let res = exec(req, &state.db).await;
|
||||
let res = state.db.exec(req).await;
|
||||
AppendInput::Step2(step, res, event)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -94,37 +94,6 @@ async fn route_has<R: Resolver>(
|
|||
Json(HasResponse { disjoint })
|
||||
}
|
||||
|
||||
#[async_recursion::async_recursion]
|
||||
pub async fn exec(req: DatabaseRequest, db: &Database) -> DatabaseResponse {
|
||||
match req {
|
||||
DatabaseRequest::Done => DatabaseResponse::Done,
|
||||
DatabaseRequest::Reset => {
|
||||
db.reset().await;
|
||||
DatabaseResponse::Done
|
||||
},
|
||||
DatabaseRequest::QuerySingle(_, _) => todo!(),
|
||||
DatabaseRequest::QueryAll(_, _) => todo!(),
|
||||
DatabaseRequest::QueryIter(_, _) => todo!(),
|
||||
DatabaseRequest::QueryCount(_, _) => todo!(),
|
||||
DatabaseRequest::QueryIterReverse(_, _) => todo!(),
|
||||
DatabaseRequest::Get(table, key) => {
|
||||
let kv = db.table(table).get(&key).await;
|
||||
DatabaseResponse::Single(kv)
|
||||
},
|
||||
DatabaseRequest::Put(table, key, value) => {
|
||||
db.table(table).put(&key, &value).await;
|
||||
DatabaseResponse::Done
|
||||
},
|
||||
DatabaseRequest::Delete(table, key) => {
|
||||
db.table(table).delete(&key).await;
|
||||
DatabaseResponse::Done
|
||||
},
|
||||
DatabaseRequest::WithData(data, req) => {
|
||||
DatabaseResponse::WithData(data, Box::new(exec(*req, db).await))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn append<R>(room: &mut RoomOld<R>, db: &Database, event: Event<R::EventType>) where R: Resolver {
|
||||
let mut step = room.append_event(AppendInput::Step1(event)).unwrap();
|
||||
loop {
|
||||
|
@ -134,8 +103,8 @@ async fn append<R>(room: &mut RoomOld<R>, db: &Database, event: Event<R::EventTy
|
|||
},
|
||||
AppendState::Step2(step, event, req) => {
|
||||
match req {
|
||||
DatabaseRequest::Done => {
|
||||
let res = exec(req, &db).await;
|
||||
DatabaseRequest::Noop => {
|
||||
let res = db.exec(req).await;
|
||||
room.append_event(AppendInput::Step2(step, res, event)).unwrap()
|
||||
},
|
||||
_ => {
|
||||
|
|
|
@ -6,6 +6,7 @@ edition = "2021"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
async-recursion = "1.0.5"
|
||||
dag-resolve = { version = "0.1.0", path = "../proto" }
|
||||
futures = "0.3.30"
|
||||
rusqlite = { version = "0.30.0", features = ["bundled"] }
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//! Store state in a sqlite database
|
||||
|
||||
use dag_resolve::{
|
||||
actor::{ActorId, ActorSecret}, event::EventId, proto::table, room::{TABLE_EVENTS, TABLE_HEADS}
|
||||
actor::{ActorId, ActorSecret}, event::EventId, proto::table::{self, DatabaseRequest, DatabaseResponse}, room::{TABLE_EVENTS, TABLE_HEADS}
|
||||
};
|
||||
use futures::{stream, Stream};
|
||||
use sqlx::{query, sqlite::SqliteConnectOptions, SqlitePool};
|
||||
|
@ -304,3 +304,36 @@ impl<'a> Query<'a> {
|
|||
self.get_all().await.len() as u64
|
||||
}
|
||||
}
|
||||
|
||||
impl Database {
|
||||
#[async_recursion::async_recursion]
|
||||
pub async fn exec(&self, req: DatabaseRequest) -> DatabaseResponse {
|
||||
match req {
|
||||
DatabaseRequest::Noop => DatabaseResponse::Done,
|
||||
DatabaseRequest::Reset => {
|
||||
self.reset().await;
|
||||
DatabaseResponse::Done
|
||||
},
|
||||
DatabaseRequest::QuerySingle(_, _) => todo!(),
|
||||
DatabaseRequest::QueryAll(_, _) => todo!(),
|
||||
DatabaseRequest::QueryIter(_, _) => todo!(),
|
||||
DatabaseRequest::QueryCount(_, _) => todo!(),
|
||||
DatabaseRequest::QueryIterReverse(_, _) => todo!(),
|
||||
DatabaseRequest::Get(table, key) => {
|
||||
let kv = self.table(table).get(&key).await;
|
||||
DatabaseResponse::Single(kv)
|
||||
},
|
||||
DatabaseRequest::Put(table, key, value) => {
|
||||
self.table(table).put(&key, &value).await;
|
||||
DatabaseResponse::Done
|
||||
},
|
||||
DatabaseRequest::Delete(table, key) => {
|
||||
self.table(table).delete(&key).await;
|
||||
DatabaseResponse::Done
|
||||
},
|
||||
DatabaseRequest::WithData(data, req) => {
|
||||
DatabaseResponse::WithData(data, Box::new(self.exec(*req).await))
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,3 +17,4 @@ serde = { version = "1.0.196", features = ["derive"] }
|
|||
serde_json = "1.0.113"
|
||||
sha2 = "0.10.8"
|
||||
thiserror = "1.0.56"
|
||||
tokio = { version = "1.36.0", features = ["test-util", "macros"] }
|
||||
|
|
|
@ -12,6 +12,8 @@ use std::{
|
|||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
|
||||
use super::room2::{Room as _, RoomIdle};
|
||||
|
||||
#[derive(Hash, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
#[serde(try_from = "&str", into = "String")]
|
||||
pub struct EventId(EventIdData);
|
||||
|
@ -177,6 +179,36 @@ impl<T: Debug + Serialize + Clone> Event<T> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn verify_room2<R: Resolver<EventType = T>>(
|
||||
&self,
|
||||
room: &RoomIdle<R>,
|
||||
) -> Result<()> {
|
||||
self.verify()?;
|
||||
|
||||
let room_config = match &room.get_root().content {
|
||||
EventContent::Create(c) => c,
|
||||
_ => unreachable!("the root should always be a create event"),
|
||||
};
|
||||
|
||||
if room_config.hasher != self.id.get_type() {
|
||||
return Err(Error::MismatchedHasher {
|
||||
expected: room_config.hasher,
|
||||
got: self.id.get_type(),
|
||||
});
|
||||
}
|
||||
|
||||
if room_config.signer != self.author.get_type()
|
||||
|| room_config.signer != self.signature.get_type()
|
||||
{
|
||||
return Err(Error::MismatchedSigner {
|
||||
expected: room_config.signer,
|
||||
got: self.author.get_type(),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn id(&self) -> &EventId {
|
||||
&self.id
|
||||
}
|
||||
|
|
|
@ -5,5 +5,6 @@ pub mod resolver;
|
|||
pub mod room;
|
||||
pub mod table;
|
||||
pub mod atoms;
|
||||
mod room2;
|
||||
pub mod room2;
|
||||
mod room3;
|
||||
mod rpc;
|
||||
|
|
|
@ -145,7 +145,7 @@ impl<R: Resolver> RoomOld<R> {
|
|||
// ideally i don't get *all* events up front, and query the db as needed
|
||||
let resolver = self.get_resolver();
|
||||
let sorted = sort(|a, b| resolver.tiebreak(a, b), &events).into_iter().cloned().collect();
|
||||
Ok(ResolveState::ProcessEvent(0, sorted, (0, DatabaseRequest::Done)))
|
||||
Ok(ResolveState::ProcessEvent(0, sorted, (0, DatabaseRequest::Noop)))
|
||||
},
|
||||
ResolveInput::ProcessEvent(idx, events, state) => {
|
||||
let resolver = self.get_resolver();
|
||||
|
@ -153,7 +153,7 @@ impl<R: Resolver> RoomOld<R> {
|
|||
match resolver.resolve(state, &events[idx]) {
|
||||
Ok(()) => {
|
||||
if idx + 1 < events.len() {
|
||||
Ok(ResolveState::ProcessEvent(idx + 1, events, (0, DatabaseRequest::Done)))
|
||||
Ok(ResolveState::ProcessEvent(idx + 1, events, (0, DatabaseRequest::Noop)))
|
||||
} else {
|
||||
Ok(ResolveState::Done)
|
||||
}
|
||||
|
|
|
@ -1,15 +1,59 @@
|
|||
#![allow(unused)]
|
||||
|
||||
//! Possibly another way rooms could be handled?
|
||||
//! A state machine representing a room
|
||||
|
||||
use super::table::Selector;
|
||||
use std::collections::VecDeque;
|
||||
|
||||
struct RoomIdle;
|
||||
struct RoomRead;
|
||||
struct RoomWrite;
|
||||
use crate::{
|
||||
actor::ActorSecret,
|
||||
event::{Event, EventContent, EventId},
|
||||
proto::table::DatabaseResponse,
|
||||
resolver::{sort, Resolver, Verification},
|
||||
room::{TABLE_EVENTS, TABLE_HEADS},
|
||||
};
|
||||
|
||||
enum RejectReason {
|
||||
use super::table::{DatabaseRequest, Selector};
|
||||
|
||||
pub struct RoomIdle<R: Resolver> {
|
||||
data: RoomData<R>,
|
||||
}
|
||||
|
||||
pub struct RoomResolveReset<R: Resolver> {
|
||||
data: RoomData<R>,
|
||||
}
|
||||
|
||||
pub struct RoomResolveAllEvents<R: Resolver> {
|
||||
data: RoomData<R>,
|
||||
req: DatabaseRequest,
|
||||
}
|
||||
|
||||
pub struct RoomResolveStep<R: Resolver> {
|
||||
data: RoomData<R>,
|
||||
events: Vec<Event<R::EventType>>,
|
||||
event_idx: u64,
|
||||
event_step: u64,
|
||||
event_res: Option<DatabaseResponse>,
|
||||
}
|
||||
|
||||
pub struct RoomCreateHeads<R: Resolver> {
|
||||
data: RoomData<R>,
|
||||
req: DatabaseRequest,
|
||||
event_content: EventContent<R::EventType>,
|
||||
secret: ActorSecret,
|
||||
}
|
||||
|
||||
pub struct RoomAppendVerify<R: Resolver> {
|
||||
data: RoomData<R>,
|
||||
req: DatabaseRequest,
|
||||
step: usize,
|
||||
event: Event<R::EventType>,
|
||||
}
|
||||
|
||||
pub struct RoomAppendWrite<R: Resolver> {
|
||||
data: RoomData<R>,
|
||||
reqs: VecDeque<DatabaseRequest>,
|
||||
}
|
||||
|
||||
pub enum RejectReason {
|
||||
/// The event has invalid data
|
||||
Invalid,
|
||||
|
||||
|
@ -23,38 +67,274 @@ enum RejectReason {
|
|||
Panic,
|
||||
}
|
||||
|
||||
enum RoomOutput {
|
||||
/// The event is valid and has been added
|
||||
Accept(RoomIdle),
|
||||
|
||||
/// The event has been rejected (but shouldn't necessarily be dropped)
|
||||
Reject(RejectReason, RoomIdle),
|
||||
|
||||
/// The room state machine is requesting to read data
|
||||
Read(RoomRead),
|
||||
|
||||
/// The room state machine is requesting to write data
|
||||
Write(RoomWrite),
|
||||
/// the internal data of the room
|
||||
struct RoomData<R: Resolver> {
|
||||
root: Event<R::EventType>,
|
||||
resolver: R,
|
||||
}
|
||||
|
||||
impl RoomIdle {
|
||||
/// Enters into the event processing state. Pretend its a coroutine for processing an event.
|
||||
pub fn process_event(self) -> RoomOutput {
|
||||
pub trait Room<R: Resolver> {
|
||||
fn get_root(&self) -> &Event<R::EventType>;
|
||||
|
||||
fn get_resolver(&self) -> &R;
|
||||
}
|
||||
|
||||
macro_rules! impl_room {
|
||||
($state:ident) => {
|
||||
impl<R: Resolver> Room<R> for $state<R> {
|
||||
fn get_root(&self) -> &Event<<R as Resolver>::EventType> {
|
||||
&self.data.root
|
||||
}
|
||||
|
||||
fn get_resolver(&self) -> &R {
|
||||
&self.data.resolver
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl_room!(RoomIdle);
|
||||
impl_room!(RoomResolveReset);
|
||||
impl_room!(RoomResolveStep);
|
||||
impl_room!(RoomResolveAllEvents);
|
||||
impl_room!(RoomCreateHeads);
|
||||
impl_room!(RoomAppendVerify);
|
||||
impl_room!(RoomAppendWrite);
|
||||
|
||||
impl<R: Resolver> RoomIdle<R> {
|
||||
/// Process a new event from the network or appended locally.
|
||||
pub fn process_event(self, event: Event<R::EventType>) -> RoomAppendVerifyOutput<R> {
|
||||
// try to append event here
|
||||
todo!()
|
||||
event
|
||||
.verify_room2(&self)
|
||||
.expect("event failed verification");
|
||||
|
||||
let a = (0, DatabaseResponse::Done);
|
||||
match self.get_resolver().verify(a, &event) {
|
||||
Ok(verification) => match verification {
|
||||
Verification::Valid => RoomAppendVerifyOutput::Write(RoomAppendWrite {
|
||||
data: self.data,
|
||||
reqs: get_reqs::<R>(&event),
|
||||
}),
|
||||
Verification::Unauthorized => panic!("unauthorized"),
|
||||
Verification::Invalid => panic!("invalid data"),
|
||||
},
|
||||
Err(req) => {
|
||||
if req.is_read() {
|
||||
RoomAppendVerifyOutput::Verify(RoomAppendVerify {
|
||||
data: self.data,
|
||||
req,
|
||||
step: 0,
|
||||
event,
|
||||
})
|
||||
} else {
|
||||
panic!("can't write during verify");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn resolve_state(self) -> RoomResolveReset<R> {
|
||||
RoomResolveReset { data: self.data }
|
||||
}
|
||||
|
||||
/// creates a new event. call `append_event` afterwards
|
||||
pub fn create_event(self, event_content: EventContent<R::EventType>, secret: &ActorSecret) -> RoomCreateHeads<R> {
|
||||
RoomCreateHeads {
|
||||
data: self.data,
|
||||
req: DatabaseRequest::QueryAll(TABLE_HEADS.into(), Selector::Any),
|
||||
event_content,
|
||||
secret: secret.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RoomRead {
|
||||
impl<R: Resolver> RoomCreateHeads<R> {
|
||||
pub fn done(self, res: DatabaseResponse) -> (RoomIdle<R>, Event<R::EventType>) {
|
||||
let heads: Vec<_> = match res {
|
||||
DatabaseResponse::Many(records) => records
|
||||
.into_iter()
|
||||
.map(|(id, _)| EventId::from_bytes(&id))
|
||||
.collect(),
|
||||
_ => panic!(),
|
||||
};
|
||||
let event = Event::builder(self.event_content, &self.secret)
|
||||
.with_references(heads)
|
||||
.then_hash()
|
||||
.unwrap()
|
||||
.and_sign()
|
||||
.unwrap();
|
||||
let idle = RoomIdle { data: self.data };
|
||||
(idle, event)
|
||||
}
|
||||
|
||||
/// what to do
|
||||
pub fn what(&self) -> &DatabaseRequest {
|
||||
&self.req
|
||||
}
|
||||
}
|
||||
|
||||
pub enum RoomAppendVerifyOutput<R: Resolver> {
|
||||
Verify(RoomAppendVerify<R>),
|
||||
Write(RoomAppendWrite<R>),
|
||||
}
|
||||
|
||||
pub enum RoomAppendWriteOutput<R: Resolver> {
|
||||
Write(RoomAppendWrite<R>),
|
||||
Idle(RoomIdle<R>),
|
||||
}
|
||||
|
||||
fn get_reqs<R: Resolver>(event: &Event<R::EventType>) -> VecDeque<DatabaseRequest> {
|
||||
let mut reqs = VecDeque::new();
|
||||
for head in event.references() {
|
||||
reqs.push_back(DatabaseRequest::Delete(TABLE_HEADS.into(), head.to_bytes()));
|
||||
}
|
||||
reqs.push_back(DatabaseRequest::Put(
|
||||
TABLE_EVENTS.into(),
|
||||
event.id().to_bytes(),
|
||||
serde_json::to_vec(&event).unwrap(),
|
||||
));
|
||||
reqs.push_back(DatabaseRequest::Put(
|
||||
TABLE_HEADS.into(),
|
||||
event.id().to_bytes(),
|
||||
vec![],
|
||||
));
|
||||
reqs
|
||||
}
|
||||
|
||||
impl<R: Resolver> RoomAppendVerify<R> {
|
||||
/// Done reading (call with DatabaseResponse::Done first)
|
||||
pub fn done(self, response: DatabaseResponse) -> RoomAppendVerifyOutput<R> {
|
||||
let a = (self.step as u64, response);
|
||||
match self.get_resolver().verify(a, &self.event) {
|
||||
Ok(verification) => match verification {
|
||||
Verification::Valid => RoomAppendVerifyOutput::Write(RoomAppendWrite {
|
||||
data: self.data,
|
||||
reqs: get_reqs::<R>(&self.event),
|
||||
}),
|
||||
Verification::Unauthorized => panic!("unauthorized"),
|
||||
Verification::Invalid => panic!("invalid data"),
|
||||
},
|
||||
Err(req) => {
|
||||
if req.is_read() {
|
||||
RoomAppendVerifyOutput::Verify(RoomAppendVerify {
|
||||
data: self.data,
|
||||
req,
|
||||
step: self.step + 1,
|
||||
event: self.event,
|
||||
})
|
||||
} else {
|
||||
panic!("can't write during verify");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// what to do
|
||||
pub fn what(&self) -> &DatabaseRequest {
|
||||
&self.req
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Resolver> RoomAppendWrite<R> {
|
||||
/// Done reading
|
||||
pub fn done(self, data: Vec<u8>) -> RoomOutput {
|
||||
todo!()
|
||||
pub fn done(mut self, response: DatabaseResponse) -> RoomAppendWriteOutput<R> {
|
||||
match response {
|
||||
DatabaseResponse::Done => {},
|
||||
_ => panic!(),
|
||||
}
|
||||
if self.reqs.pop_front().is_none() {
|
||||
RoomAppendWriteOutput::Idle(RoomIdle { data: self.data })
|
||||
} else {
|
||||
RoomAppendWriteOutput::Write(self)
|
||||
}
|
||||
}
|
||||
|
||||
/// what to do
|
||||
pub fn what(&self) -> &DatabaseRequest {
|
||||
self.reqs.front().as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl RoomWrite {
|
||||
/// Done writing
|
||||
pub fn done(self) -> RoomOutput {
|
||||
todo!()
|
||||
impl<R: Resolver> RoomResolveReset<R> {
|
||||
/// Done reading
|
||||
pub fn done(self, res: DatabaseResponse) -> RoomResolveAllEvents<R> {
|
||||
match res {
|
||||
DatabaseResponse::Done => RoomResolveAllEvents {
|
||||
data: self.data,
|
||||
req: DatabaseRequest::QueryAll(TABLE_EVENTS.into(), Selector::Any),
|
||||
},
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
/// what to do
|
||||
pub fn what(&self) -> &DatabaseRequest {
|
||||
&DatabaseRequest::Reset
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Resolver> RoomResolveAllEvents<R> {
|
||||
/// Done reading
|
||||
pub fn done(self, res: DatabaseResponse) -> RoomResolveStep<R> {
|
||||
match res {
|
||||
DatabaseResponse::Many(records) => {
|
||||
// ideally i don't get *all* events up front, and query the db as needed
|
||||
let events: Vec<_> = records.into_iter().map(|(_, data)| serde_json::from_slice(&data).unwrap()).collect();
|
||||
let resolver = self.get_resolver();
|
||||
let sorted = sort(|a, b| resolver.tiebreak(a, b), &events).into_iter().cloned().collect();
|
||||
RoomResolveStep {
|
||||
data: self.data,
|
||||
events: sorted,
|
||||
event_idx: 0,
|
||||
event_step: 0,
|
||||
event_res: Some(DatabaseResponse::Done),
|
||||
}
|
||||
},
|
||||
_ => panic!(),
|
||||
}
|
||||
}
|
||||
|
||||
/// what to do
|
||||
pub fn what(&self) -> &DatabaseRequest {
|
||||
&self.req
|
||||
}
|
||||
}
|
||||
|
||||
pub enum RoomResolveStepOutput<R: Resolver> {
|
||||
Step(RoomResolveStep<R>),
|
||||
Done(RoomIdle<R>),
|
||||
}
|
||||
|
||||
impl<R: Resolver> RoomResolveStep<R> {
|
||||
/// Done with io
|
||||
pub fn done(mut self, res: DatabaseResponse) -> RoomResolveStepOutput<R> {
|
||||
if (self.event_idx as usize) < self.events.len() {
|
||||
assert!(self.event_res.is_none());
|
||||
self.event_res = Some(res);
|
||||
RoomResolveStepOutput::Step(self)
|
||||
} else {
|
||||
RoomResolveStepOutput::Done(RoomIdle { data: self.data })
|
||||
}
|
||||
}
|
||||
|
||||
/// what to do
|
||||
pub fn what(&mut self) -> DatabaseRequest {
|
||||
let res = self.event_res.take().unwrap();
|
||||
match self.get_resolver().resolve((self.event_step, res), &self.events[self.event_idx as usize]) {
|
||||
Ok(()) => {
|
||||
self.event_idx += 1;
|
||||
if (self.event_idx as usize) < self.events.len() {
|
||||
self.event_res = Some(DatabaseResponse::Done);
|
||||
self.what()
|
||||
} else {
|
||||
DatabaseRequest::Noop
|
||||
}
|
||||
},
|
||||
Err(req) => {
|
||||
self.event_step += 1;
|
||||
req
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
146
crates/proto/src/proto/room3.rs
Normal file
146
crates/proto/src/proto/room3.rs
Normal file
|
@ -0,0 +1,146 @@
|
|||
#![allow(unused)]
|
||||
|
||||
use std::{
|
||||
cell::OnceCell,
|
||||
future::IntoFuture,
|
||||
pin::Pin,
|
||||
sync::{Arc, OnceLock},
|
||||
task::{Context, Poll, Wake, Waker},
|
||||
};
|
||||
|
||||
use futures::{Future, FutureExt};
|
||||
|
||||
async fn verify<S: DbSync, A: DbAsync>(db: Db<S, A>) {
|
||||
println!("getting...");
|
||||
assert_eq!(db.get().await, vec![123]);
|
||||
println!("...got!");
|
||||
}
|
||||
|
||||
struct CommandGet<'a, S: DbSync, A: DbAsync> {
|
||||
db: &'a Db<S, A>,
|
||||
data: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
enum DbType<S: DbSync, A: DbAsync> {
|
||||
Sync(S),
|
||||
Async(A),
|
||||
}
|
||||
|
||||
struct FakeSync;
|
||||
impl DbSync for FakeSync {
|
||||
fn get(&self) -> Vec<u8> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
struct FakeAsync;
|
||||
impl DbAsync for FakeAsync {
|
||||
async fn get(&self) -> Vec<u8> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, S: DbSync, A: DbAsync> Future for CommandGet<'a, S, A> {
|
||||
type Output = Vec<u8>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
if let Some(data) = self.data.take() {
|
||||
Poll::Ready(data)
|
||||
} else {
|
||||
match &self.db.inner {
|
||||
DbType::Sync(s) => {
|
||||
self.data = Some(s.get());
|
||||
cx.waker().wake_by_ref();
|
||||
Poll::Pending
|
||||
},
|
||||
DbType::Async(a) => {
|
||||
Box::pin(a.get().into_future()).poll_unpin(cx)
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Db<S: DbSync, A: DbAsync> {
|
||||
inner: DbType<S, A>,
|
||||
}
|
||||
|
||||
impl<S: DbSync, A: DbAsync> Db<S, A> {
|
||||
fn new(inner: DbType<S, A>) -> Self {
|
||||
Db { inner }
|
||||
}
|
||||
|
||||
fn get(&self) -> CommandGet<'_, S, A> {
|
||||
CommandGet {
|
||||
db: self,
|
||||
data: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trait DbImpl {
|
||||
fn get(&self, cb: &mut dyn FnMut(Vec<u8>));
|
||||
}
|
||||
|
||||
trait DbSync {
|
||||
fn get(&self) -> Vec<u8>;
|
||||
}
|
||||
|
||||
trait DbAsync {
|
||||
async fn get(&self) -> Vec<u8>;
|
||||
}
|
||||
|
||||
struct DbWaker;
|
||||
|
||||
impl Wake for DbWaker {
|
||||
fn wake(self: Arc<Self>) {
|
||||
// nothing yet...
|
||||
}
|
||||
}
|
||||
|
||||
fn run_sync<D: DbSync + 'static>(d: D) {
|
||||
let db = Db::new(DbType::<_, FakeAsync>::Sync(d));
|
||||
let mut ver = Box::pin(verify(db));
|
||||
let waker = Waker::from(Arc::new(DbWaker));
|
||||
let mut ctx = Context::from_waker(&waker);
|
||||
loop {
|
||||
match ver.as_mut().poll(&mut ctx) {
|
||||
Poll::Ready(_) => break,
|
||||
Poll::Pending => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_async<D: DbAsync + 'static>(d: D) {
|
||||
verify(Db::new(DbType::<FakeSync, _>::Async(d))).await;
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sync() {
|
||||
struct TestDb;
|
||||
impl DbSync for TestDb {
|
||||
fn get(&self) -> Vec<u8> {
|
||||
vec![123]
|
||||
}
|
||||
}
|
||||
|
||||
impl DbImpl for TestDb {
|
||||
fn get(&self, cb: &mut dyn FnMut(Vec<u8>)) {
|
||||
cb(DbSync::get(self))
|
||||
}
|
||||
}
|
||||
|
||||
run_sync(TestDb);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_async() {
|
||||
struct TestDb;
|
||||
impl DbAsync for TestDb {
|
||||
async fn get(&self) -> Vec<u8> {
|
||||
vec![123]
|
||||
}
|
||||
}
|
||||
|
||||
run_async(TestDb).await;
|
||||
}
|
|
@ -3,9 +3,10 @@
|
|||
//! an experiment in making the database a graph db?
|
||||
|
||||
/*
|
||||
each database is a list of facts and indexes matching a schema
|
||||
each database is a list of facts, rules, and indexes matching a schema
|
||||
each fact is an (entity, property, value) tuple
|
||||
each fact can only be looked up by an index or reference
|
||||
each rule generates more facts (like a materialized view)
|
||||
*/
|
||||
|
||||
struct Database {
|
||||
|
|
|
@ -2,7 +2,7 @@ type Bytes = Vec<u8>;
|
|||
|
||||
#[derive(Debug)]
|
||||
pub enum DatabaseRequest {
|
||||
Done,
|
||||
Noop,
|
||||
Reset,
|
||||
QuerySingle(String, Selector),
|
||||
QueryAll(String, Selector),
|
||||
|
@ -15,7 +15,7 @@ pub enum DatabaseRequest {
|
|||
WithData(Vec<u8>, Box<DatabaseRequest>),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum DatabaseResponse {
|
||||
Done,
|
||||
Single(Option<(Bytes, Bytes)>),
|
||||
|
@ -37,3 +37,37 @@ impl From<Vec<u8>> for Selector {
|
|||
Selector::Exact(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl DatabaseRequest {
|
||||
pub fn is_read(&self) -> bool {
|
||||
match self {
|
||||
DatabaseRequest::Noop => true,
|
||||
DatabaseRequest::Reset => false,
|
||||
DatabaseRequest::QuerySingle(_, _) => true,
|
||||
DatabaseRequest::QueryAll(_, _) => true,
|
||||
DatabaseRequest::QueryIter(_, _) => true,
|
||||
DatabaseRequest::QueryCount(_, _) => true,
|
||||
DatabaseRequest::QueryIterReverse(_, _) => true,
|
||||
DatabaseRequest::Get(_, _) => true,
|
||||
DatabaseRequest::Put(_, _, _) => false,
|
||||
DatabaseRequest::Delete(_, _) => false,
|
||||
DatabaseRequest::WithData(_, q) => q.is_read(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// trait DatabaseImpl {
|
||||
// fn reset(&self, callback: dyn FnOnce());
|
||||
// fn query(&self, table: &str, selector: Selector, callback: dyn FnOnce(Vec<(Bytes, Bytes)>));
|
||||
// fn get(&self, table: &str, key: Bytes, callback: dyn FnOnce(Vec<(Bytes, Bytes)>));
|
||||
// fn put(&self, table: &str, key: Bytes, value: Bytes, callback: dyn FnOnce());
|
||||
// fn delete(&self, table: &str, key: Bytes, callback: dyn FnOnce());
|
||||
// }
|
||||
|
||||
// trait NetworkImpl {
|
||||
// fn foo();
|
||||
// }
|
||||
|
||||
// trait RoomImpl {
|
||||
// fn handle();
|
||||
// }
|
||||
|
|
9
crates/query/Cargo.toml
Normal file
9
crates/query/Cargo.toml
Normal file
|
@ -0,0 +1,9 @@
|
|||
[package]
|
||||
name = "query"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
logos = "0.14.0"
|
291
crates/query/src/main.rs
Normal file
291
crates/query/src/main.rs
Normal file
|
@ -0,0 +1,291 @@
|
|||
#![allow(unused)]
|
||||
|
||||
use logos::{Lexer, Logos};
|
||||
|
||||
#[derive(Logos, Debug, PartialEq)]
|
||||
#[logos(skip r"[ \t\n\f]+")] // Ignore this regex pattern between tokens
|
||||
enum Token {
|
||||
#[token("[")]
|
||||
OpenBracket,
|
||||
|
||||
#[token("]")]
|
||||
CloseBracket,
|
||||
|
||||
#[token("_")]
|
||||
Blank,
|
||||
|
||||
#[regex(":[a-zA-Z/]+")]
|
||||
Atom,
|
||||
|
||||
#[regex("\\?[a-zA-Z]+")]
|
||||
Thing,
|
||||
|
||||
#[regex("[a-zA-Z]+")]
|
||||
Identifier,
|
||||
|
||||
#[regex("[0-9]+")]
|
||||
Number,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
struct Parsed {
|
||||
root: Data,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
enum Data {
|
||||
Atom(String),
|
||||
Thing(String),
|
||||
Identifier(String),
|
||||
Number(u64),
|
||||
Tuple(Vec<Data>),
|
||||
Blank,
|
||||
}
|
||||
|
||||
fn parse(lex: &mut Lexer<'_, Token>, token: Token) -> Data {
|
||||
match token {
|
||||
Token::OpenBracket => {
|
||||
let mut values = vec![];
|
||||
loop {
|
||||
let data = match lex.next().expect("missing token").unwrap() {
|
||||
Token::CloseBracket => break,
|
||||
token => parse(lex, token),
|
||||
};
|
||||
values.push(data);
|
||||
}
|
||||
Data::Tuple(values)
|
||||
},
|
||||
Token::CloseBracket => panic!("unexpected closing bracket"),
|
||||
Token::Atom => Data::Atom(lex.slice().to_string()),
|
||||
Token::Thing => Data::Thing(lex.slice().to_string()),
|
||||
Token::Identifier => Data::Identifier(lex.slice().to_string()),
|
||||
Token::Number => Data::Number(lex.slice().parse().unwrap()),
|
||||
Token::Blank => Data::Blank,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn basic() {
|
||||
let mut lex = Token::lexer("
|
||||
[:find ?name :where
|
||||
[?post :post/id 123]
|
||||
[?post :post/author ?author]
|
||||
[?author :user/name ?name]]");
|
||||
let token = lex.next().expect("unexpected eof").unwrap();
|
||||
let parsed = Parsed {
|
||||
root: parse(&mut lex, token),
|
||||
};
|
||||
use Data::*;
|
||||
let target = Parsed {
|
||||
root: Tuple(vec![
|
||||
Atom(":find".into()), Thing("?name".into()), Atom(":where".into()),
|
||||
Tuple(vec![Thing("?post".into()), Atom(":post/id".into()), Number(123)]),
|
||||
Tuple(vec![Thing("?post".into()), Atom(":post/author".into()), Thing("?author".into())]),
|
||||
Tuple(vec![Thing("?author".into()), Atom(":user/name".into()), Thing("?name".into())]),
|
||||
],
|
||||
),
|
||||
};
|
||||
assert_eq!(parsed, target);
|
||||
}
|
||||
|
||||
enum Command {
|
||||
/// define a schema
|
||||
Schema(CommandSchema),
|
||||
|
||||
/// query the database
|
||||
Find(CommandFind),
|
||||
|
||||
/// insert some triples
|
||||
Insert(CommandInsert),
|
||||
|
||||
/// update or delete some triples
|
||||
Update(CommandUpdate),
|
||||
|
||||
/// get info and stats
|
||||
Info,
|
||||
}
|
||||
|
||||
struct CommandFind {
|
||||
/// return these items
|
||||
select: Vec<Selection>,
|
||||
|
||||
/// using these inputs
|
||||
inputs: Vec<usize>,
|
||||
|
||||
/// with these filters
|
||||
filter: Vec<Filter>,
|
||||
|
||||
/// ordered by these keys
|
||||
order: Vec<usize>,
|
||||
}
|
||||
|
||||
struct CommandSchema {
|
||||
item: SchemaItem,
|
||||
}
|
||||
|
||||
struct CommandInsert {
|
||||
triples: Vec<Triple>,
|
||||
}
|
||||
|
||||
struct CommandUpdate {
|
||||
/// using these inputs
|
||||
inputs: Vec<usize>,
|
||||
|
||||
/// with these filters
|
||||
filter: Vec<Filter>,
|
||||
|
||||
/// do this upate
|
||||
update: Vec<Update>,
|
||||
}
|
||||
|
||||
enum Update {
|
||||
/// add a triple to an entity
|
||||
Append {
|
||||
property: String,
|
||||
data: Data,
|
||||
},
|
||||
|
||||
/// set a triple
|
||||
Set {
|
||||
property: String,
|
||||
data: Data,
|
||||
},
|
||||
|
||||
/// update a triple
|
||||
Update {
|
||||
property: String,
|
||||
func: Operator,
|
||||
},
|
||||
|
||||
/// remote a triple
|
||||
Remove {
|
||||
property: String,
|
||||
data: Option<Data>,
|
||||
},
|
||||
|
||||
/// remote the entity a triple points to
|
||||
RemoveEntity,
|
||||
}
|
||||
|
||||
struct Triple {
|
||||
entity: u64,
|
||||
property: String,
|
||||
data: Data,
|
||||
}
|
||||
|
||||
enum SchemaItem {
|
||||
/// a new thing
|
||||
Thing {
|
||||
name: String,
|
||||
documentation: Option<String>,
|
||||
cardinality: Cardinality,
|
||||
},
|
||||
|
||||
/// a value on a thing
|
||||
Value {
|
||||
thing: String,
|
||||
name: String,
|
||||
ty: Type,
|
||||
cardinality: Cardinality,
|
||||
documentation: Option<String>,
|
||||
unique: bool, // implemented as a custom predicate?
|
||||
predicate: Filter,
|
||||
index: Vec<Index>,
|
||||
// at: Option<EventId>,
|
||||
},
|
||||
|
||||
/// materialized view/stored procedure
|
||||
Rule {
|
||||
name: String,
|
||||
find: CommandFind,
|
||||
},
|
||||
|
||||
/// like a rule but asserts data instead of generating it
|
||||
Check {
|
||||
name: String,
|
||||
find: Vec<Filter>,
|
||||
},
|
||||
}
|
||||
|
||||
enum Index {
|
||||
Fulltext,
|
||||
Lookup,
|
||||
Ordered,
|
||||
}
|
||||
|
||||
enum Filter {
|
||||
Equals {
|
||||
left: Data,
|
||||
right: Data,
|
||||
name: Data,
|
||||
},
|
||||
Operator(Operator),
|
||||
}
|
||||
|
||||
enum Operator {
|
||||
Not(Box<Operator>),
|
||||
Any(Vec<Operator>),
|
||||
All(Vec<Operator>),
|
||||
Less(Data, Data),
|
||||
Greater(Data, Data),
|
||||
LessEqual(Data, Data),
|
||||
GreaterEqual(Data, Data),
|
||||
Add(Data, Data),
|
||||
Sub(Data, Data),
|
||||
Mul(Data, Data),
|
||||
Div(Data, Data),
|
||||
FullText(Data, Data),
|
||||
Missing(Data, Data),
|
||||
Rule(Vec<Data>),
|
||||
}
|
||||
|
||||
enum Selection {
|
||||
Thing(usize),
|
||||
Count(usize),
|
||||
Min(usize),
|
||||
Max(usize),
|
||||
Sum(usize),
|
||||
First(usize),
|
||||
Last(usize),
|
||||
Head(usize),
|
||||
Tail(usize),
|
||||
}
|
||||
|
||||
enum Type {
|
||||
UInt,
|
||||
Int,
|
||||
Float,
|
||||
Instant,
|
||||
String,
|
||||
Text,
|
||||
Bytes,
|
||||
Ref,
|
||||
|
||||
/// `[:enum rel/follows rel/blocks]` to only allow one or the other
|
||||
/// `[:enum none [some value]]` where `value` is another column
|
||||
/// overrides that column's cardinality? how does this work?
|
||||
Enum,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, PartialEq, Eq)]
|
||||
enum Cardinality {
|
||||
One,
|
||||
ZeroOne,
|
||||
OnePlus,
|
||||
|
||||
#[default]
|
||||
ZeroPlus,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let mut lex = Token::lexer("
|
||||
[:find ?name :where
|
||||
[?post :post/id 123]
|
||||
[?post :post/author ?author]
|
||||
[?author :user/name ?name]]");
|
||||
let token = lex.next().expect("unexpected eof").unwrap();
|
||||
let parsed = Parsed {
|
||||
root: parse(&mut lex, token),
|
||||
};
|
||||
dbg!(parsed);
|
||||
}
|
30
docs/protocol.md
Normal file
30
docs/protocol.md
Normal file
|
@ -0,0 +1,30 @@
|
|||
# protocol
|
||||
|
||||
there's the core protocol. each room contains a directed acyclic graph
|
||||
of events. each event is signed and represents one atomic mutation. each
|
||||
room has some immutable config set by the root event. resolvers contain
|
||||
tiebreaking, a verifier, a reducer. the reducer takes in the events and
|
||||
outputs state/a database. rooms can be synced between any two nodes,
|
||||
whether they're server-server, client-server, or peer-peer.
|
||||
|
||||
the database. not sure if it will be key value, relational, graph,
|
||||
triplestore? it needs to be fairly flexible to support a wide variety of
|
||||
use cases. query peers for database directly without needing to sync room,
|
||||
for peeking or custom feeds/search indexes. most features are implemented
|
||||
as special server-generated databases.
|
||||
|
||||
the frontend. email/forum-like discussion and documents like the web or
|
||||
a git repo are the two main planned things. ideally it would be efficient
|
||||
with your time, bringing back inboxes.
|
||||
|
||||
## database
|
||||
|
||||
key/value sucks in general.
|
||||
|
||||
relational is well established and works pretty well. i think a
|
||||
relational database with datalog-like querying would be pretty good
|
||||
overall, but having relations be tables is still a bit annoying and
|
||||
relational dbs don't seem to work with tagged enums particularily well.
|
||||
|
||||
i think triplestores with a datalog dialect would be interesting. this
|
||||
is the best contender right now, with much simpler joins than sql.
|
Loading…
Reference in a new issue