testing and playing around with ideas

This commit is contained in:
tezlm 2024-03-01 19:43:25 -08:00
parent 2ed5d6a7df
commit 0cad159a3d
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
15 changed files with 420 additions and 79 deletions

4
.env Normal file
View file

@ -0,0 +1,4 @@
# comment
DATABASE_URL=sqlite://repo.db
DATABASE_URL2=sqlite://repo.db

1
.envrc Normal file
View file

@ -0,0 +1 @@
FOO=123

View file

@ -11,7 +11,7 @@ use dag_resolve::{
event::{Event, EventContent, EventId, HashType, SignatureType},
proto::table::{Database as _, Query, Selector, Table},
resolver::Resolver,
room::{Room, TABLE_EVENTS},
room::{RoomOld, TABLE_EVENTS},
};
use dag_resolve_impls::{
resolvers::{
@ -48,33 +48,34 @@ enum Cli {
#[derive(Debug)]
struct State<R: Resolver> {
room: Room<R, Database>,
room: RoomOld<R>,
secret: ActorSecret,
db: Database,
}
impl<R: Resolver> State<R> {
async fn init(resolver: R, path: &Path) -> Result<Self> {
// create new repo
println!("init new repo!");
let db = Database::open(path.to_str().unwrap()).await?.init().await?;
let mut db = Database::open(path.to_str().unwrap()).await?.init().await?;
let (actor, secret) = ActorId::new(SignatureType::Ed25519);
let room = Room::builder()
let room = RoomOld::builder()
.with_resolver(resolver)
.with_hasher(HashType::Sha256)
.with_signer(SignatureType::Ed25519)
.with_database(db)
.with_database(&mut db)
.create(&secret)
.await?;
room.get_state()
db
.set_config(&actor, &secret, room.get_root().id())
.await;
room.resolve_state().await;
Ok(State { room, secret })
room.resolve_state(&mut db).await;
Ok(State { room, secret, db })
}
}
async fn open(path: impl AsRef<Path>) -> Result<Opened> {
let db = Database::open(path.as_ref().to_str().unwrap())
let mut db = Database::open(path.as_ref().to_str().unwrap())
.await?
.init()
.await?;
@ -89,15 +90,15 @@ async fn open(path: impl AsRef<Path>) -> Result<Opened> {
match event.content() {
EventContent::Create(c) => match c.resolver.as_str() {
"kv" => {
let room = Room::from_root(KVResolver, db, serde_json::from_slice(&event_data)?)?;
room.resolve_state().await;
Ok(Opened::Kv(State { room, secret }))
let room = RoomOld::from_root(KVResolver, serde_json::from_slice(&event_data)?)?;
room.resolve_state(&mut db).await;
Ok(Opened::Kv(State { room, secret, db }))
}
"forum-v0" => {
let room =
Room::from_root(ForumResolver, db, serde_json::from_slice(&event_data)?)?;
room.resolve_state().await;
Ok(Opened::Forum(State { room, secret }))
RoomOld::from_root(ForumResolver, serde_json::from_slice(&event_data)?)?;
room.resolve_state(&mut db).await;
Ok(Opened::Forum(State { room, secret, db }))
}
_ => unimplemented!("unknown resolver"),
},
@ -130,15 +131,15 @@ async fn startup(path: impl AsRef<Path>, create: Option<&str>) -> Result<Opened>
async fn send_event<R: Resolver + Debug>(state: &mut State<R>, data: &str) -> Result<()> {
state
.room
.create_event(dbg!(serde_json::from_str(data)?), &state.secret)
.create_event(dbg!(serde_json::from_str(data)?), &state.secret, &mut state.db)
.await?;
state.room.resolve_state().await;
state.room.resolve_state(&mut state.db).await;
Ok(())
}
async fn print_info<R: Resolver + Debug>(state: State<R>) -> Result<()> {
state.room.resolve_state().await;
let db = state.room.get_state();
async fn print_info<R: Resolver + Debug>(state: &mut State<R>) -> Result<()> {
state.room.resolve_state(&mut state.db).await;
let db = &mut state.db;
let event_count = db
.table(TABLE_EVENTS)
.query(Selector::Prefix(vec![]))
@ -185,7 +186,7 @@ repo.room.resolve_state().await;
async fn sync_state<R: Resolver>(state: &mut State<R>, remote: &str) -> Result<()> {
let http = reqwest::Client::new();
let ids_all = state.room.all_events().await.iter().rev().map(|ev| ev.id().to_owned()).collect();
let ids_all = state.room.all_events(&mut state.db).await.iter().rev().map(|ev| ev.id().to_owned()).collect();
let has: HasResponse = http
.post(format!("{remote}/has"))
.json(&HasRequest {
@ -201,7 +202,7 @@ async fn sync_state<R: Resolver>(state: &mut State<R>, remote: &str) -> Result<(
.post(format!("{remote}/sync"))
.json(&SyncRequest::<R> {
root_id: state.room.get_root().id().to_owned(),
events: state.room.all_events().await.into_iter().filter(|ev| missing_ids.contains(ev.id())).collect(),
events: state.room.all_events(&mut state.db).await.into_iter().filter(|ev| missing_ids.contains(ev.id())).collect(),
})
.send()
.await?
@ -228,8 +229,8 @@ async fn main() -> Result<()> {
(Cli::Send { repo: _, data }, Opened::Forum(mut state)) => {
send_event(&mut state, &data).await?
}
(Cli::Info { repo: _ }, Opened::Kv(state)) => print_info(state).await?,
(Cli::Info { repo: _ }, Opened::Forum(state)) => print_info(state).await?,
(Cli::Info { repo: _ }, Opened::Kv(mut state)) => print_info(&mut state).await?,
(Cli::Info { repo: _ }, Opened::Forum(mut state)) => print_info(&mut state).await?,
(Cli::Sync { repo: _, remote: _ }, Opened::Kv(_kv)) => todo!(),
(Cli::Sync { repo: _, remote }, Opened::Forum(mut state)) => {
sync_state(&mut state, &remote).await?

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{ops::DerefMut, sync::Arc};
use anyhow::Result;
use axum::extract::{Json, State};
@ -70,8 +70,7 @@ async fn route_has<R: Resolver>(
panic!("cannot sync events from two different rooms");
}
let event_table = repo.room.get_state()
.table(TABLE_EVENTS);
let event_table = repo.db.table(TABLE_EVENTS);
let our_ids: Vec<EventId> = event_table
.query(Selector::Prefix(vec![]))
.get_iter()
@ -105,13 +104,13 @@ async fn route_sync<R: Resolver>(
panic!("cannot sync events from two different rooms");
}
// a list of event ids would either be cached or easily queryable
let repo = repo.deref_mut();
for event in req.events {
repo.room.append_event(event).await.unwrap();
repo.room.append_event(event, &mut repo.db).await.unwrap();
}
// this doesn't work and i don't know why
// repo.room.resolve_state().await;
// repo.room.resolve_state(&mut repo.db).await;
Json(SyncResponse {})
}

View file

@ -2,4 +2,4 @@ pub mod error;
pub mod proto;
pub use error::{Error, Result};
pub use proto::{actor, event, resolver, room};
pub use proto::{actor, event, resolver, room, atoms};

View file

@ -21,5 +21,6 @@ impl Display for RoomId {
}
}
/// A media/blob identifier
pub struct MediaId(String);
// TODO: how will this be implemented? content based addressing per-room?
// /// A media/blob identifier
// pub struct MediaId(String);

View file

@ -1,7 +1,7 @@
use crate::{
actor::{ActorId, ActorSecret, ActorSignature},
resolver::Resolver,
room::Room,
room::RoomOld,
Error, Result,
};
use base64::{engine::general_purpose::URL_SAFE_NO_PAD as b64engine, Engine};
@ -12,8 +12,6 @@ use std::{
time::{SystemTime, UNIX_EPOCH},
};
use super::table::Database;
#[derive(Hash, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(try_from = "&str", into = "String")]
pub struct EventId(EventIdData);
@ -149,9 +147,9 @@ impl<T: Debug + Serialize + Clone> Event<T> {
Ok(())
}
pub fn verify_room<R: Resolver<EventType = T>, D: Database>(
pub fn verify_room<R: Resolver<EventType = T>>(
&self,
room: &Room<R, D>,
room: &RoomOld<R>,
) -> Result<()> {
self.verify()?;

View file

@ -5,3 +5,5 @@ pub mod resolver;
pub mod room;
pub mod table;
pub mod atoms;
mod room2;
mod rpc;

View file

@ -16,21 +16,20 @@ pub const TABLE_EVENTS: &str = "_events";
pub const TABLE_HEADS: &str = "_heads";
#[derive(Debug)]
pub struct Room<R: Resolver, D: Database> {
pub struct RoomOld<R: Resolver> {
root: Event<R::EventType>,
resolver: R,
database: D,
}
impl<R: Resolver, D: Database> Room<R, D> {
pub fn builder() -> RoomBuilder<R, D> {
impl<R: Resolver> RoomOld<R> {
pub fn builder<D: Database>() -> RoomBuilder<'static, R, D> {
RoomBuilder::new()
}
pub async fn new(
pub async fn new<D: Database>(
resolver_name: impl Into<String>,
resolver: R,
database: D,
database: &mut D,
hasher: HashType,
signer: SignatureType,
secret: &ActorSecret,
@ -56,15 +55,13 @@ impl<R: Resolver, D: Database> Room<R, D> {
Ok(Self {
root: base_event,
resolver,
database,
})
}
pub fn from_root(resolver: R, database: D, event: Event<R::EventType>) -> Result<Self> {
pub fn from_root(resolver: R, event: Event<R::EventType>) -> Result<Self> {
Ok(Self {
root: event,
resolver,
database,
})
}
@ -76,52 +73,53 @@ impl<R: Resolver, D: Database> Room<R, D> {
&self.resolver
}
pub async fn resolve_state(&self) {
pub async fn resolve_state<D: Database>(&self, db: &mut D) {
let resolver = self.get_resolver();
// ideally i don't get *all* events up front, and query the db as needed
let events = self.all_events().await;
let events = self.all_events(db).await;
let sorted = sort(|a, b| resolver.tiebreak(a, b), &events);
self.database.reset().await;
db.reset().await;
for event in sorted {
let effects = resolver.resolve(&self.database, event).await;
let effects = resolver.resolve(db, event).await;
for effect in effects {
match effect {
Command::Put { table, key, value } => {
self.database.table(table).put(&key, &value).await
db.table(table).put(&key, &value).await
}
Command::Delete { table, key } => self.database.table(table).delete(&key).await,
Command::Delete { table, key } => db.table(table).delete(&key).await,
}
}
}
}
pub async fn create_event(
pub async fn create_event<D: Database>(
&mut self,
event_content: EventContent<R::EventType>,
secret: &ActorSecret,
db: &mut D,
) -> Result<Event<R::EventType>> {
let heads = self.get_heads().await;
let heads = self.get_heads(db).await;
let event = Event::builder(event_content, secret)
.with_references(heads)
.then_hash()?
.and_sign()?;
self.append_event(event).await
self.append_event(event, db).await
}
pub async fn append_event(
pub async fn append_event<D: Database>(
&mut self,
event: Event<R::EventType>,
db: &mut D,
) -> Result<Event<R::EventType>> {
event.verify_room(self).expect("event failed verification");
match self.get_resolver().verify(&self.database, &event).await {
match self.get_resolver().verify(db, &event).await {
Verification::Valid => {}
Verification::Unauthorized => panic!("unauthorized"),
Verification::Invalid => panic!("invalid data"),
}
let id_bytes = event.id().to_string();
let id_bytes = id_bytes.as_bytes();
if self
.database
if db
.table(TABLE_EVENTS)
.get(id_bytes)
.await
@ -129,26 +127,26 @@ impl<R: Resolver, D: Database> Room<R, D> {
{
return Err(Error::AlreadyExists);
}
self.database
db
.table(TABLE_EVENTS)
.put(id_bytes, &serde_json::to_vec(&event).unwrap())
.await;
for head in dbg!(event.references()) {
self.database
db
.table(TABLE_HEADS)
.delete(&head.to_string().into_bytes())
.await;
}
self.database
db
.table(TABLE_HEADS)
.put(&event.id().to_string().into_bytes(), &[])
.await;
Ok(event)
}
pub async fn get_heads(&self) -> Vec<EventId> {
self.database
pub async fn get_heads<D: Database>(&self, db: &mut D) -> Vec<EventId> {
db
.table(TABLE_HEADS)
.query(Selector::Prefix(vec![]))
.get_all()
@ -159,8 +157,8 @@ impl<R: Resolver, D: Database> Room<R, D> {
}
// TEMP: get all events
pub async fn all_events(&self) -> Vec<Event<R::EventType>> {
self.database
pub async fn all_events<D: Database>(&self, db: &mut D) -> Vec<Event<R::EventType>> {
db
.table(TABLE_EVENTS)
.query(Selector::Any)
.get_all()
@ -169,20 +167,16 @@ impl<R: Resolver, D: Database> Room<R, D> {
.map(|(_, bytes)| serde_json::from_slice(bytes).expect("invalid data"))
.collect()
}
pub fn get_state(&self) -> &D {
&self.database
}
}
pub struct RoomBuilder<R: Resolver, D: Database> {
pub struct RoomBuilder<'a, R: Resolver, D: Database> {
resolver: Option<R>,
hasher: Option<HashType>,
signer: Option<SignatureType>,
database: Option<D>,
database: Option<&'a mut D>,
}
impl<R: Resolver, D: Database> Default for RoomBuilder<R, D> {
impl<'a, R: Resolver, D: Database> Default for RoomBuilder<'a, R, D> {
fn default() -> Self {
Self {
resolver: None,
@ -193,7 +187,7 @@ impl<R: Resolver, D: Database> Default for RoomBuilder<R, D> {
}
}
impl<R: Resolver, D: Database> RoomBuilder<R, D> {
impl<'a, R: Resolver, D: Database> RoomBuilder<'a, R, D> {
pub fn new() -> Self {
Self::default()
}
@ -213,13 +207,13 @@ impl<R: Resolver, D: Database> RoomBuilder<R, D> {
self
}
pub fn with_database(mut self, database: D) -> Self {
pub fn with_database(mut self, database: &'a mut D) -> Self {
self.database = Some(database);
self
}
pub async fn create(self, secret: &ActorSecret) -> Result<Room<R, D>> {
Room::new(
pub async fn create(self, secret: &ActorSecret) -> Result<RoomOld<R>> {
RoomOld::new(
self.resolver
.as_ref()
.ok_or(Error::MissingBuilderData)?

View file

@ -0,0 +1,60 @@
#![allow(unused)]
//! Possibly another way rooms could be handled?
//! A state machine representing a room
use super::table::Selector;
struct RoomIdle;
struct RoomRead;
struct RoomWrite;
enum RejectReason {
/// The event has invalid data
Invalid,
/// The event exceeds some sort of limit
Limited,
/// The sender doesn't have permission to send this event
Unauthorized,
/// The resolver has a bug!
Panic,
}
enum RoomOutput {
/// The event is valid and has been added
Accept(RoomIdle),
/// The event has been rejected (but shouldn't necessarily be dropped)
Reject(RejectReason, RoomIdle),
/// The room state machine is requesting to read data
Read(RoomRead),
/// The room state machine is requesting to write data
Write(RoomWrite),
}
impl RoomIdle {
/// Enters into the event processing state. Pretend its a coroutine for processing an event.
pub fn process_event(self) -> RoomOutput {
// try to append event here
todo!()
}
}
impl RoomRead {
/// Done reading
pub fn done(self, data: Vec<u8>) -> RoomOutput {
todo!()
}
}
impl RoomWrite {
/// Done writing
pub fn done(self) -> RoomOutput {
todo!()
}
}

View file

@ -0,0 +1,119 @@
#![allow(unused)]
//! ideas on how rpc could work
use crate::{event::EventId, atoms::RoomId};
enum Rpc {
/// hello message each peer sends to each other before starting anything else
Hello(Hello),
/// querying the peer's database
QueryRequest(String),
/// response for a queryresponse
QueryResponse(Vec<u8>),
/// get a root event
Root(RoomId),
/// here is an event
Event(u64, Vec<u8>),
/// whoops
Error(Vec<u8>),
/// i want these events
Want(RoomId, Vec<EventId>),
/// i have these events
Have(RoomId, Vec<EventId>),
/// the heads this peer wants
WantHeads(Vec<RoomId>),
/// the heads the other peer has
HaveHeads(Vec<(RoomId, Vec<RoomId>)>),
/// receive new events/updates about these rooms
Subscribe(u64, Vec<RoomId>),
/// stop receiving updates about this room
Unsubscribe(u64, Vec<RoomId>),
/// extra info about a room, unrelated to the core protocol
OutOfBand(u64, RoomId, Vec<u8>),
/// acknowledge a transaction
Ack(Vec<u64>),
/// cleanly close the connection
Bye,
}
enum PeerState {
/// The connection started
Connect,
/// The connection is ready (Hellos sent)
Ready,
/// The connection is closed
Closed,
}
struct Hello {
/// the versions this peer supports
versions: Vec<u8>,
}
/*
# syncing
peers want to eagerly sync; if you have heads that the other peer doesn't they should try to sync
1. start by sending Want(head) for each head
2. send Have(last_id) messages in a loop (batch for performance)
3. once they start sending you event (found the lowest common denominator), break and handle events
4. if there are multiple heads and you don't have all of them yet, go to 2
step 2 is the tricky bit: how do i send Have messages efficiently?
*/
// how a peer looks if it was fully synchronous
trait Peer {
/// connect and wait on Hellos
fn connect(); // -> Peer;
/// disconnect
fn close(self);
/// run a query on the peer
fn query(&self, q: &str) -> Vec<u8>;
/// sync some rooms once
fn sync(&self, rooms: &[RoomId]);
/// continually sync some rooms
fn subscribe(&self, rooms: &[RoomId]);
/// stop syncing some rooms
fn unsubscribe(&self, rooms: &[RoomId]);
}
/*
one off requests:
subscribe(ids)
unsubscribe(ids)
query(ids) -> response
heads(ids) -> response
root(ids) -> response
streams:
event
outofband
syncing requests:
have
want
*/

View file

@ -0,0 +1,139 @@
#![allow(unused)]
//! an experiment in making the database a graph db?
/*
each database is a list of facts and indexes matching a schema
each fact is an (entity, property, value) tuple
each fact can only be looked up by an index or reference
*/
struct Database {
schema: Schema,
}
struct Schema {
fields: Vec<SchemaField>,
}
struct SchemaField {
thing: String,
ty: SchemaType,
cardinality: SchemaCardinality,
index: Vec<SchemaConfig>,
}
enum SchemaConfig {
IndexLookup,
IndexOrdering,
// IndexFullText,
Unique,
}
enum SchemaCardinality {
Optional,
One,
Many,
}
enum SchemaType {
Reference(String),
Entity,
Integer,
String,
// Text, EventId, RoomId, ActorId
}
enum SchemaValue {
Reference(u64),
Entity(u64),
Integer(u64),
String(String),
}
enum DatabaseOperation {
Insert {
thing: String,
fields: Vec<(String, SchemaValue)>,
},
Select {
query: DatabaseQuery,
},
Delete {
query: DatabaseQuery,
},
// Update {
// query: Query,
// },
}
struct DatabaseQuery {
rules: Vec<DatabaseCondition>,
output: Vec<DatabaseOutput>,
}
struct DatabaseRule {
name: String,
tokens: Vec<u8>,
conditions: Vec<DatabaseCondition>,
}
enum DatabaseOutput {
Value(u64),
Min(u64),
Max(u64),
Average(u64),
}
enum DatabaseCondition {
ThingFieldMatches {
token: u64,
thing: String,
field: String,
value: SchemaValue,
},
EntityFieldMatches {
token: u64,
field: String,
value: SchemaValue,
},
EntityFieldEntity {
token: u64,
field: String,
other: u64,
},
ConstraintLessThan {
token: u64,
value: u64,
},
ConstraintGreaterThan {
token: u64,
value: u64,
},
}
/*
[:find ?name where
[?post :post/id 123]
[?post :post/author ?author]
[?author :user/name ?name]]
vec![
DatabaseRule::ThingFieldMatches {
token: 0,
thing: "post".into(),
field: "id".into(),
value: SchemaValue::Integer(123),
},
DatabaseRule::EntityFieldEntity {
token: 0,
field: "author".into(),
other: 1,
},
DatabaseRule::EntityFieldEntity {
token: 1,
field: "name".into(),
other: 2,
},
]
*/

View file

@ -4,6 +4,26 @@ use futures::Stream;
type Bytes = Vec<u8>;
pub enum DatabaseRequest {
Reset,
QuerySingle(String, Selector),
QueryAll(String, Selector),
QueryIter(String, Selector),
QueryCount(String, Selector),
QueryIterReverse(String, Selector),
Get(String, Bytes),
Put(String, Bytes, Bytes),
Delete(String, Bytes),
}
pub enum DatabaseReponse {
Done,
Single(Option<(Bytes, Bytes)>),
Many(Vec<(Bytes, Bytes)>),
// Iter(Vec<(Bytes, Bytes)>),
Count(u64),
}
pub trait Database: Send + Sync {
type Table: Table;

View file

@ -1,4 +1,4 @@
//! Each room has a small, well defined, domain specific relational
//! Each room has a small, well defined, domain specific
//! database associated with it. The database's contents will then be defined
//! by the events it receives. This module contains database traits and
//! how the database's schema is defined.
@ -7,6 +7,7 @@
//! effort/payoff ratio in the database schema definition.
// mod old;
mod graph;
mod kv;
// pub use old::*;

View file

@ -1,3 +1,5 @@
//! Making the database as a relational db
// TODO: properly type things
// maybe replace TableRow with impl Serialize?
// and make schema a trait instead of builder?