store data in db, make library async, add http sync
This commit is contained in:
parent
9c6a8ef52e
commit
bc5f0b4d49
12 changed files with 1747 additions and 237 deletions
1286
Cargo.lock
generated
1286
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -8,9 +8,12 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
anyhow = { version = "1.0.79", features = ["backtrace"] }
|
||||
axum = "0.7.4"
|
||||
axum = { version = "0.7.4", features = ["macros"] }
|
||||
clap = { version = "4.5.0", features = ["derive"] }
|
||||
dag-resolve = { version = "0.1.0", path = "../proto" }
|
||||
dag-resolve-impls = { version = "0.1.0", path = "../impls" }
|
||||
reqwest = { version = "0.11.24", features = ["json", "rustls-tls-native-roots"], default-features = false }
|
||||
rusqlite = { version = "0.30.0", features = ["bundled"] }
|
||||
serde = { version = "1.0.196", features = ["derive"] }
|
||||
serde_json = "1.0.113"
|
||||
tokio = { version = "1.36.0", features = ["full"] }
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::{
|
||||
fmt::Debug,
|
||||
path::{Path, PathBuf},
|
||||
rc::Rc,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
mod server;
|
||||
|
@ -9,13 +9,21 @@ mod server;
|
|||
use anyhow::Result;
|
||||
use clap::Parser;
|
||||
use dag_resolve::{
|
||||
actor::{ActorId, ActorSecret}, event::{EventContent, Event, EventId, HashType, SignatureType}, resolver::Resolver, room::Room
|
||||
actor::{ActorId, ActorSecret},
|
||||
event::{Event, EventContent, EventId, HashType, SignatureType},
|
||||
proto::table::{Database as _, Query, Selector, Table},
|
||||
resolver::Resolver,
|
||||
room::{Room, TABLE_EVENTS},
|
||||
};
|
||||
use dag_resolve_impls::{
|
||||
resolvers::{forum::ForumResolver, kv::KVResolver},
|
||||
stores::sqlite::Database,
|
||||
resolvers::{
|
||||
forum::{ForumEventContent, ForumResolver},
|
||||
kv::KVResolver,
|
||||
},
|
||||
stores::sqlite_async::Database,
|
||||
};
|
||||
use rusqlite::Connection;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
#[derive(Parser)]
|
||||
#[clap(version, about)]
|
||||
|
@ -34,8 +42,8 @@ enum Cli {
|
|||
/// Get info about a repository
|
||||
Info { repo: PathBuf },
|
||||
|
||||
/// Synchronize two repositories
|
||||
Sync { repo: PathBuf, other: PathBuf },
|
||||
/// Synchronize the repository with a remote
|
||||
Sync { repo: PathBuf, remote: String },
|
||||
|
||||
/// Serve a repository over http
|
||||
Serve { repo: PathBuf },
|
||||
|
@ -43,104 +51,54 @@ enum Cli {
|
|||
|
||||
#[derive(Debug)]
|
||||
struct State<R: Resolver> {
|
||||
db: Rc<Connection>,
|
||||
room: Room<R, Database>,
|
||||
secret: ActorSecret,
|
||||
}
|
||||
|
||||
impl<R: Resolver> State<R> {
|
||||
fn create_event(&mut self, content: EventContent<R::EventType>) -> Result<()> {
|
||||
self
|
||||
.room
|
||||
.create_event(content, &self.secret)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn append_event(&mut self, event: Event<R::EventType>) -> Result<()> {
|
||||
self.room.append_event(event)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn init(resolver: R, path: &Path) -> Result<Self> {
|
||||
async fn init(resolver: R, path: &Path) -> Result<Self> {
|
||||
// create new repo
|
||||
println!("init new repo!");
|
||||
let db = rusqlite::Connection::open(path)?;
|
||||
db.execute_batch(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS config (
|
||||
key TEXT PRIMARY KEY,
|
||||
value ANY
|
||||
);
|
||||
"#,
|
||||
)?;
|
||||
let db = Database::open(path.to_str().unwrap()).await?.init().await?;
|
||||
let (actor, secret) = ActorId::new(SignatureType::Ed25519);
|
||||
let db = Rc::new(db);
|
||||
let room = Room::builder()
|
||||
.with_resolver(resolver)
|
||||
.with_hasher(HashType::Sha256)
|
||||
.with_signer(SignatureType::Ed25519)
|
||||
.with_database(Database::from_conn(db.clone()).init().unwrap())
|
||||
.create(&secret)?;
|
||||
db.execute(
|
||||
"INSERT INTO config VALUES (?, ?)",
|
||||
("actor_id", actor.to_string()),
|
||||
)?;
|
||||
db.execute(
|
||||
"INSERT INTO config VALUES (?, ?)",
|
||||
("actor_secret", serde_json::to_string(&secret)?),
|
||||
)?;
|
||||
room.resolve_state();
|
||||
Ok(State {
|
||||
db,
|
||||
room,
|
||||
secret,
|
||||
})
|
||||
.with_database(db)
|
||||
.create(&secret)
|
||||
.await?;
|
||||
room.get_state().set_config(&actor, &secret, room.get_root().id()).await;
|
||||
room.resolve_state().await;
|
||||
Ok(State { room, secret })
|
||||
}
|
||||
}
|
||||
|
||||
fn open(path: impl AsRef<Path>) -> Result<Opened> {
|
||||
// restore repo
|
||||
let db = Rc::new(rusqlite::Connection::open(path)?);
|
||||
let _actor: ActorId = db.query_row(
|
||||
"SELECT value FROM config WHERE key='actor_id'",
|
||||
[],
|
||||
|row| row.get(0).map(|s: String| s.parse()),
|
||||
)??;
|
||||
let secret: ActorSecret = db.query_row(
|
||||
"SELECT value FROM config WHERE key='actor_secret'",
|
||||
[],
|
||||
|row| row.get(0).map(|s: String| serde_json::from_str(&s)),
|
||||
)??;
|
||||
let mut q = db.prepare("SELECT event FROM events")?;
|
||||
let mut rows = q.query([])?;
|
||||
|
||||
let event_json: Vec<u8> = rows
|
||||
.next()?
|
||||
.expect("there's always one root event")
|
||||
.get(0)?;
|
||||
drop(rows);
|
||||
drop(q);
|
||||
let event: Event<()> = serde_json::from_slice(&event_json)?;
|
||||
let store = Database::from_conn(db.clone()).init()?;
|
||||
async fn open(path: impl AsRef<Path>) -> Result<Opened> {
|
||||
let db = Database::open(path.as_ref().to_str().unwrap())
|
||||
.await?
|
||||
.init()
|
||||
.await?;
|
||||
let (_actor, secret, root_id) = db.get_config().await;
|
||||
let event_data = db
|
||||
.table(TABLE_EVENTS)
|
||||
.query(Selector::Exact(root_id.to_string().into_bytes()))
|
||||
.get_single()
|
||||
.await
|
||||
.expect("there's always one root event");
|
||||
let event: Event<()> = serde_json::from_slice(&event_data)?;
|
||||
match event.content() {
|
||||
EventContent::Create(c) => match c.resolver.as_str() {
|
||||
"kv" => {
|
||||
let room = Room::from_root(KVResolver, store, serde_json::from_slice(&event_json)?)?;
|
||||
room.resolve_state();
|
||||
Ok(Opened::Kv(State {
|
||||
db,
|
||||
room,
|
||||
secret,
|
||||
}))
|
||||
let room = Room::from_root(KVResolver, db, serde_json::from_slice(&event_data)?)?;
|
||||
room.resolve_state().await;
|
||||
Ok(Opened::Kv(State { room, secret }))
|
||||
}
|
||||
"forum-v0" => {
|
||||
let room = Room::from_root(ForumResolver, store, serde_json::from_slice(&event_json)?)?;
|
||||
room.resolve_state();
|
||||
Ok(Opened::Forum(State {
|
||||
db,
|
||||
room,
|
||||
secret,
|
||||
}))
|
||||
let room =
|
||||
Room::from_root(ForumResolver, db, serde_json::from_slice(&event_data)?)?;
|
||||
room.resolve_state().await;
|
||||
Ok(Opened::Forum(State { room, secret }))
|
||||
}
|
||||
_ => unimplemented!("unknown resolver"),
|
||||
},
|
||||
|
@ -153,49 +111,83 @@ enum Opened {
|
|||
Forum(State<ForumResolver>),
|
||||
}
|
||||
|
||||
fn startup(path: impl AsRef<Path>, create: Option<&str>) -> Result<Opened> {
|
||||
async fn startup(path: impl AsRef<Path>, create: Option<&str>) -> Result<Opened> {
|
||||
let path = path.as_ref();
|
||||
match (create, path.try_exists()?) {
|
||||
(Some(_), true) => panic!("repo already exists"),
|
||||
(None, false) => panic!("repo does not exist"),
|
||||
(Some(resolver_name), false) => {
|
||||
let state = match resolver_name {
|
||||
"kv" => Opened::Kv(State::init(KVResolver, path)?),
|
||||
"forum" => Opened::Forum(State::init(ForumResolver, path)?),
|
||||
"kv" => Opened::Kv(State::init(KVResolver, path).await?),
|
||||
"forum" => Opened::Forum(State::init(ForumResolver, path).await?),
|
||||
_ => unimplemented!("unknown resolver"),
|
||||
};
|
||||
Ok(state)
|
||||
}
|
||||
(None, true) => open(path),
|
||||
(None, true) => open(path).await,
|
||||
}
|
||||
}
|
||||
|
||||
// fn sync_state_v1<R: Resolver + Debug>(from: &mut State<R>, to: &mut State<R>) -> Result<()> {
|
||||
// if from.room.get_root() != to.room.get_root() {
|
||||
// panic!("cannot sync events from two different rooms");
|
||||
// }
|
||||
// for event in &from.room.events {
|
||||
// to.append_event(event.clone())?;
|
||||
// }
|
||||
// for event in &to.room.events {
|
||||
// from.append_event(event.clone())?;
|
||||
// }
|
||||
// from.room.resolve_state(&mut *from.store);
|
||||
// to.room.resolve_state(&mut *to.store);
|
||||
// Ok(())
|
||||
// }
|
||||
async fn send_event<R: Resolver + Debug>(state: &mut State<R>, data: &str) -> Result<()> {
|
||||
state
|
||||
.room
|
||||
.create_event(dbg!(serde_json::from_str(data)?), &state.secret)
|
||||
.await?;
|
||||
state.room.resolve_state().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn sync_state_v2<R: Resolver>(from: &mut State<R>, to: &mut State<R>) -> Result<()> {
|
||||
if from.room.get_root() != to.room.get_root() {
|
||||
async fn print_info<R: Resolver + Debug>(state: State<R>) -> Result<()> {
|
||||
state.room.resolve_state().await;
|
||||
let db = state.room.get_state();
|
||||
let event_count = db
|
||||
.table(TABLE_EVENTS)
|
||||
.query(Selector::Prefix(vec![]))
|
||||
.count()
|
||||
.await;
|
||||
let row_count = db.data_count().await;
|
||||
println!("room type: {}", state.room.get_resolver().name());
|
||||
println!("room events: {}", event_count);
|
||||
println!("room db records: {}", row_count);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct SyncRequest {
|
||||
root_id: EventId,
|
||||
heads: Vec<EventId>,
|
||||
events: Vec<Event<ForumEventContent>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct SyncResponse {}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ServerState {
|
||||
repo: Arc<Mutex<State<ForumResolver>>>,
|
||||
}
|
||||
|
||||
async fn sync_http(
|
||||
axum::extract::State(state): axum::extract::State<ServerState>,
|
||||
axum::extract::Json(req): axum::extract::Json<SyncRequest>,
|
||||
) -> axum::extract::Json<SyncResponse> {
|
||||
println!("receive new events");
|
||||
let mut repo = state.repo.lock().await;
|
||||
|
||||
if repo.room.get_root().id() != &req.root_id {
|
||||
panic!("cannot sync events from two different rooms");
|
||||
}
|
||||
|
||||
let from_events = from.room.all_events();
|
||||
let to_events = from.room.all_events();
|
||||
|
||||
// a list of event ids would either be cached or easily queryable
|
||||
let our_ids: Vec<_> = to_events.iter().map(|ev| ev.id()).cloned().collect();
|
||||
let mut missing_ids: Vec<EventId> = from.room.get_heads().into_iter().cloned().collect();
|
||||
let our_ids: Vec<_> = repo
|
||||
.room
|
||||
.all_events()
|
||||
.await
|
||||
.iter()
|
||||
.map(|ev| ev.id())
|
||||
.cloned()
|
||||
.collect();
|
||||
let mut missing_ids: Vec<EventId> = req.heads.iter().cloned().collect();
|
||||
|
||||
// this would be batched, to find multiple missing events at once
|
||||
while let Some(next) = missing_ids.pop() {
|
||||
|
@ -204,84 +196,77 @@ fn sync_state_v2<R: Resolver>(from: &mut State<R>, to: &mut State<R>) -> Result<
|
|||
}
|
||||
|
||||
// getting an event from its id would be cached or easily queryable
|
||||
let event = from_events
|
||||
let event = req
|
||||
.events
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|ev| ev.id() == &next)
|
||||
.expect("`from` sent the id but doesn't have the event?")
|
||||
.clone();
|
||||
to.append_event(event)?;
|
||||
repo.room.append_event(event).await.unwrap();
|
||||
}
|
||||
|
||||
from.room.resolve_state();
|
||||
to.room.resolve_state();
|
||||
repo.room.resolve_state().await;
|
||||
axum::extract::Json(SyncResponse {})
|
||||
}
|
||||
|
||||
async fn serve(state: ServerState) -> Result<()> {
|
||||
let tcp = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
|
||||
let router = axum::Router::new()
|
||||
.route("/", axum::routing::get(|| async { "hello world" }))
|
||||
.route("/sync", axum::routing::post(sync_http))
|
||||
.with_state(state);
|
||||
axum::serve(tcp, router.into_make_service()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_event<R: Resolver + Debug>(state: &mut State<R>, data: &str) -> Result<()> {
|
||||
state.create_event(dbg!(serde_json::from_str(dbg!(data))?))?;
|
||||
state.room.resolve_state();
|
||||
async fn sync_state(state: &mut State<ForumResolver>, remote: &str) -> Result<()> {
|
||||
let http = reqwest::Client::new();
|
||||
let res: SyncResponse = http.post(remote)
|
||||
.json(&SyncRequest {
|
||||
root_id: state.room.get_root().id().to_owned(),
|
||||
heads: state.room.get_heads().await,
|
||||
events: state.room.all_events().await,
|
||||
})
|
||||
.send()
|
||||
.await?
|
||||
.json()
|
||||
.await?;
|
||||
dbg!(res);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_info<R: Resolver + Debug>(state: State<R>) -> Result<()> {
|
||||
let event_count: u64 = state
|
||||
.db
|
||||
.query_row("SELECT count(*) FROM events", [], |r| r.get(0))?;
|
||||
let row_count: u64 = state
|
||||
.db
|
||||
.query_row("SELECT count(*) FROM data", [], |r| r.get(0))?;
|
||||
println!("room type: {}", state.room.get_resolver().name());
|
||||
println!("room events: {}", event_count);
|
||||
println!("room db records: {}", row_count);
|
||||
// println!("room state schema:");
|
||||
// for (table_name, table_schema) in state.room.get_resolver().get_state_config().tables {
|
||||
// print!(" {} (", table_name);
|
||||
// for (idx, (column_name, ctype)) in table_schema.columns.iter().enumerate() {
|
||||
// if idx != 0 {
|
||||
// print!(", ");
|
||||
// }
|
||||
// print!("{} {:?}", column_name, ctype);
|
||||
// }
|
||||
// print!(") indexed on (");
|
||||
// for (idx, index) in table_schema.indexes.iter().enumerate() {
|
||||
// if idx != 0 {
|
||||
// print!(", ");
|
||||
// }
|
||||
// print!("{} {:?}", index.column, index.index_type);
|
||||
// }
|
||||
// println!(")");
|
||||
// }
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
let opened = match &cli {
|
||||
Cli::Init { repo, resolver } => startup(repo, Some(resolver))?,
|
||||
Cli::Init { repo, resolver } => startup(repo, Some(resolver)).await?,
|
||||
Cli::Send { repo, .. }
|
||||
| Cli::Info { repo, .. }
|
||||
| Cli::Sync { repo, .. }
|
||||
| Cli::Serve { repo } => startup(repo, None)?,
|
||||
| Cli::Serve { repo } => startup(repo, None).await?,
|
||||
};
|
||||
match (cli, opened) {
|
||||
(Cli::Init { .. }, _) => {}
|
||||
(Cli::Send { repo: _, data }, Opened::Kv(mut state)) => send_event(&mut state, &data)?,
|
||||
(Cli::Send { repo: _, data }, Opened::Forum(mut state)) => send_event(&mut state, &data)?,
|
||||
(Cli::Info { repo: _ }, Opened::Kv(state)) => print_info(state)?,
|
||||
(Cli::Info { repo: _ }, Opened::Forum(state)) => print_info(state)?,
|
||||
(Cli::Sync { repo: _, other }, Opened::Kv(mut our_state)) => match startup(other, None)? {
|
||||
Opened::Kv(mut their_state) => sync_state_v2(&mut our_state, &mut their_state)?,
|
||||
_ => panic!("cannot sync different room types"),
|
||||
},
|
||||
(Cli::Sync { repo: _, other }, Opened::Forum(mut our_state)) => {
|
||||
match startup(other, None)? {
|
||||
Opened::Forum(mut their_state) => sync_state_v2(&mut our_state, &mut their_state)?,
|
||||
_ => panic!("cannot sync different room types"),
|
||||
}
|
||||
(Cli::Send { repo: _, data }, Opened::Kv(mut state)) => {
|
||||
send_event(&mut state, &data).await?
|
||||
}
|
||||
(Cli::Send { repo: _, data }, Opened::Forum(mut state)) => {
|
||||
send_event(&mut state, &data).await?
|
||||
}
|
||||
(Cli::Info { repo: _ }, Opened::Kv(state)) => print_info(state).await?,
|
||||
(Cli::Info { repo: _ }, Opened::Forum(state)) => print_info(state).await?,
|
||||
(Cli::Sync { repo: _, remote: _ }, Opened::Kv(_kv)) => todo!(),
|
||||
(Cli::Sync { repo: _, remote }, Opened::Forum(mut state)) => {
|
||||
sync_state(&mut state, &remote).await?
|
||||
}
|
||||
(Cli::Serve { repo: _ }, Opened::Kv(_kv)) => todo!(),
|
||||
(Cli::Serve { repo: _ }, Opened::Forum(forum)) => {
|
||||
serve(ServerState {
|
||||
repo: Arc::new(Mutex::new(forum)),
|
||||
})
|
||||
.await?
|
||||
}
|
||||
(Cli::Serve { repo: _ }, Opened::Kv(_)) => todo!(),
|
||||
(Cli::Serve { repo: _ }, Opened::Forum(_)) => todo!(),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -10,4 +10,6 @@ dag-resolve = { version = "0.1.0", path = "../proto" }
|
|||
rusqlite = { version = "0.30.0", features = ["bundled"] }
|
||||
serde = { version = "1.0.196", features = ["derive"] }
|
||||
serde_json = "1.0.113"
|
||||
sqlx = { version = "0.7.3", features = ["runtime-tokio", "sqlite"] }
|
||||
thiserror = "1.0.57"
|
||||
tokio = "1.36.0"
|
||||
|
|
|
@ -147,7 +147,7 @@ impl ForumRoomAcl {
|
|||
impl Resolver for ForumResolver {
|
||||
type EventType = ForumEventContent;
|
||||
|
||||
fn resolve<S: Database>(&self, _state: &S, event: &Event<Self::EventType>) -> Vec<Command> {
|
||||
async fn resolve<S: Database>(&self, _state: &S, event: &Event<Self::EventType>) -> Vec<Command> {
|
||||
let row = match event.content() {
|
||||
EventContent::Create(_) => {
|
||||
Command::put("members", event.author().to_string().as_bytes(), ForumMemberAcl::Operator.to_str().as_bytes())
|
||||
|
@ -189,16 +189,18 @@ impl Resolver for ForumResolver {
|
|||
"forum-v0"
|
||||
}
|
||||
|
||||
fn verify<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> Verification {
|
||||
async fn verify<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> Verification {
|
||||
let member_entry = state
|
||||
.table("members")
|
||||
.get(&event.author().to_string().as_bytes())
|
||||
.await
|
||||
.and_then(|b| String::from_utf8(b).ok())
|
||||
.map(|s| ForumMemberAcl::from_str(&s))
|
||||
.unwrap_or_default();
|
||||
let room_entry = state
|
||||
.table("meta")
|
||||
.get(b"acl")
|
||||
.await
|
||||
.and_then(|d| String::from_utf8(d).ok())
|
||||
.map(|s| ForumRoomAcl::from_str(&s))
|
||||
.unwrap_or_default();
|
||||
|
|
|
@ -29,7 +29,7 @@ impl KVResolver {
|
|||
impl Resolver for KVResolver {
|
||||
type EventType = KVEventContent;
|
||||
|
||||
fn resolve<D: Database>(&self, _state: &D, event: &Event<KVEventContent>) -> Vec<Command> {
|
||||
async fn resolve<D: Database>(&self, _state: &D, event: &Event<KVEventContent>) -> Vec<Command> {
|
||||
match &event.content() {
|
||||
EventContent::Create(_) => {
|
||||
vec![Command::Put {
|
||||
|
@ -52,8 +52,8 @@ impl Resolver for KVResolver {
|
|||
"kv"
|
||||
}
|
||||
|
||||
fn verify<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> Verification {
|
||||
if state.table("meta").get(b"owner").unwrap() == event.author().to_string().as_bytes() {
|
||||
async fn verify<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> Verification {
|
||||
if state.table("meta").get(b"owner").await.unwrap() == event.author().to_string().as_bytes() {
|
||||
Verification::Valid
|
||||
} else {
|
||||
Verification::Invalid
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
//! Contains premade stores to store state/data
|
||||
|
||||
// pub mod memory;
|
||||
pub mod sqlite;
|
||||
// pub mod sqlite;
|
||||
pub mod sqlite_async;
|
||||
|
|
238
crates/impls/src/stores/sqlite_async.rs
Normal file
238
crates/impls/src/stores/sqlite_async.rs
Normal file
|
@ -0,0 +1,238 @@
|
|||
//! Store state in a sqlite database
|
||||
|
||||
use dag_resolve::{
|
||||
actor::{ActorId, ActorSecret}, event::EventId, proto::table::{self, Database as _}, room::{TABLE_EVENTS, TABLE_HEADS}
|
||||
};
|
||||
use sqlx::{query, sqlite::SqliteConnectOptions, SqlitePool};
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Database {
|
||||
connection: SqlitePool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum Error {}
|
||||
|
||||
impl Database {
|
||||
pub async fn open(path: &str) -> Result<Self, Error> {
|
||||
let opts = SqliteConnectOptions::new()
|
||||
.filename(path)
|
||||
.create_if_missing(true);
|
||||
let db = SqlitePool::connect_with(opts).await.unwrap();
|
||||
Ok(Database { connection: db })
|
||||
}
|
||||
|
||||
pub async fn init(self) -> Result<Self, Error> {
|
||||
self.reset().await;
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub async fn set_config(&self, actor_id: &ActorId, secret: &ActorSecret, root_id: &EventId) {
|
||||
let actoridstr = actor_id.to_string();
|
||||
let rootidstr = root_id.to_string();
|
||||
let secretstr = serde_json::to_string(&secret).unwrap();
|
||||
query!(
|
||||
"INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)",
|
||||
"actor_id",
|
||||
actoridstr
|
||||
)
|
||||
.execute(&self.connection)
|
||||
.await
|
||||
.unwrap();
|
||||
query!(
|
||||
"INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)",
|
||||
"actor_secret",
|
||||
secretstr
|
||||
)
|
||||
.execute(&self.connection)
|
||||
.await
|
||||
.unwrap();
|
||||
query!(
|
||||
"INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)",
|
||||
"root_id",
|
||||
rootidstr
|
||||
)
|
||||
.execute(&self.connection)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub async fn get_config(&self) -> (ActorId, ActorSecret, EventId) {
|
||||
let rows = query!("SELECT * FROM config").fetch_all(&self.connection).await.unwrap();
|
||||
let mut actor_id = None;
|
||||
let mut secret = None;
|
||||
let mut root_id = None;
|
||||
for row in rows {
|
||||
match row.key.as_deref() {
|
||||
Some("actor_id") => actor_id = row.value.map(|s| s.parse().unwrap()),
|
||||
Some("actor_secret") => secret = row.value.map(|s| serde_json::from_str(&s).unwrap()),
|
||||
Some("root_id") => root_id = row.value.map(|s| s.parse().unwrap()),
|
||||
_ => {},
|
||||
}
|
||||
}
|
||||
(actor_id.unwrap(), secret.unwrap(), root_id.unwrap())
|
||||
}
|
||||
|
||||
pub async fn data_count(&self) -> u64 {
|
||||
let rows = query!("SELECT count(*) as count FROM data").fetch_one(&self.connection).await.unwrap();
|
||||
rows.count as u64
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Query {
|
||||
selector: table::Selector,
|
||||
table: Table,
|
||||
_reversed: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Table {
|
||||
connection: SqlitePool,
|
||||
name: String,
|
||||
}
|
||||
|
||||
impl table::Database for Database {
|
||||
type Table = Table;
|
||||
|
||||
async fn reset(&self) {
|
||||
query("
|
||||
CREATE TABLE IF NOT EXISTS data (tb TEXT, key BLOB, value BLOB NOT NULL, PRIMARY KEY(tb, key));
|
||||
CREATE TABLE IF NOT EXISTS config (key TEXT PRIMARY KEY, value TEXT);
|
||||
").execute(&self.connection).await.unwrap();
|
||||
query!("DELETE FROM data WHERE tb NOT IN (?, ?)", TABLE_EVENTS, TABLE_HEADS)
|
||||
.execute(&self.connection)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn table(&self, name: impl Into<String>) -> Self::Table {
|
||||
Table {
|
||||
connection: self.connection.clone(),
|
||||
name: name.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl table::Table for Table {
|
||||
type Query = Query;
|
||||
|
||||
fn query(&self, selector: table::Selector) -> Self::Query {
|
||||
Query {
|
||||
selector,
|
||||
table: self.clone(),
|
||||
_reversed: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn query_reverse(&self, selector: table::Selector) -> Self::Query {
|
||||
Query {
|
||||
selector,
|
||||
table: self.clone(),
|
||||
_reversed: true,
|
||||
}
|
||||
}
|
||||
|
||||
async fn get(&self, key: &[u8]) -> Option<Vec<u8>> {
|
||||
let keyvec = key.to_vec();
|
||||
let row =
|
||||
query!(
|
||||
"SELECT value FROM data WHERE tb = ? AND key = ?",
|
||||
self.name,
|
||||
keyvec
|
||||
)
|
||||
.fetch_one(&self.connection)
|
||||
.await;
|
||||
match row {
|
||||
Ok(row) => Some(row.value),
|
||||
Err(sqlx::Error::RowNotFound) => None,
|
||||
Err(err) => Err(err).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn put(&self, key: &[u8], value: &[u8]) {
|
||||
query!(
|
||||
"INSERT OR REPLACE INTO data (tb, key, value) VALUES (?, ?, ?)",
|
||||
self.name,
|
||||
key,
|
||||
value
|
||||
)
|
||||
.execute(&self.connection)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
async fn delete(&self, key: &[u8]) {
|
||||
query!(
|
||||
"DELETE FROM data WHERE tb = ? AND key = ?",
|
||||
self.name,
|
||||
key,
|
||||
)
|
||||
.execute(&self.connection)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
impl table::Query for Query {
|
||||
async fn get_single(self) -> Option<Vec<u8>> {
|
||||
match self.selector {
|
||||
table::Selector::Exact(e) => {
|
||||
query!(
|
||||
"SELECT value FROM data WHERE tb = ? AND key = ?",
|
||||
self.table.name,
|
||||
e
|
||||
)
|
||||
.fetch_optional(&self.table.connection)
|
||||
.await
|
||||
.unwrap()
|
||||
.map(|r| r.value)
|
||||
},
|
||||
table::Selector::Prefix(p) => {
|
||||
let pvec = p.to_vec();
|
||||
let plen = p.len() as i64;
|
||||
query!(
|
||||
"SELECT value FROM data WHERE tb = ? AND substr(key, 0, ?) == ?",
|
||||
self.table.name,
|
||||
plen,
|
||||
pvec
|
||||
)
|
||||
.fetch_optional(&self.table.connection)
|
||||
.await
|
||||
.unwrap()
|
||||
.map(|r| r.value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_all(self) -> Vec<(Vec<u8>, Vec<u8>)> {
|
||||
match self.selector {
|
||||
table::Selector::Exact(_) => todo!(),
|
||||
table::Selector::Prefix(p) => {
|
||||
let pvec = p.to_vec();
|
||||
let plen = p.len() as i64;
|
||||
query!(
|
||||
"SELECT key, value FROM data WHERE tb = ? AND substr(key, 0, ?) == ?",
|
||||
self.table.name,
|
||||
plen,
|
||||
pvec
|
||||
)
|
||||
.fetch_all(&self.table.connection)
|
||||
.await
|
||||
.unwrap()
|
||||
.into_iter()
|
||||
.map(|r| (r.key, r.value))
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_iter(self) -> impl Iterator<Item = Vec<u8>> {
|
||||
vec![].into_iter()
|
||||
}
|
||||
|
||||
async fn count(self) -> u64 {
|
||||
self.get_all().await.len() as u64
|
||||
}
|
||||
}
|
||||
|
|
@ -189,7 +189,7 @@ impl<T: Debug + Serialize + Clone> Event<T> {
|
|||
&self.content
|
||||
}
|
||||
|
||||
pub fn references(&self) -> &Vec<EventId> {
|
||||
pub fn references(&self) -> &[EventId] {
|
||||
&self.references
|
||||
}
|
||||
|
||||
|
|
|
@ -3,21 +3,21 @@ use serde::{Deserialize, Serialize};
|
|||
// use super::table::{State, StateConfig, TableRow};
|
||||
use super::table::Database;
|
||||
use crate::event::{Event, EventId};
|
||||
use std::{cmp::Ordering, collections::HashSet, fmt::Debug};
|
||||
use std::{cmp::Ordering, collections::HashSet, fmt::Debug, future::Future};
|
||||
|
||||
/// small shards of code designed to resolve state
|
||||
pub trait Resolver {
|
||||
type EventType: Clone + Debug + Serialize + for<'a> Deserialize<'a>;
|
||||
|
||||
/// Given a set of ordered events, resolve the final state
|
||||
fn resolve<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> Vec<Command>;
|
||||
fn resolve<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> impl Future<Output = Vec<Command>>;
|
||||
|
||||
/// Given two events, decide which one comes first
|
||||
/// if Ordering::Equal is returned, the timestamp then event id is used
|
||||
fn tiebreak(&self, a: &Event<Self::EventType>, b: &Event<Self::EventType>) -> Ordering;
|
||||
|
||||
/// Verify if an event can be sent or not
|
||||
fn verify<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> Verification;
|
||||
fn verify<D: Database>(&self, state: &D, event: &Event<Self::EventType>) -> impl Future<Output = Verification>;
|
||||
|
||||
/// TEMP: Get the name/id of this resolver
|
||||
fn name(&self) -> &str;
|
||||
|
|
|
@ -3,7 +3,7 @@ use proto::table::Selector;
|
|||
use crate::{
|
||||
actor::ActorSecret,
|
||||
event::EventId,
|
||||
event::{EventContent, CreateContent, Event, HashType, SignatureType},
|
||||
event::{CreateContent, Event, EventContent, HashType, SignatureType},
|
||||
proto,
|
||||
resolver::{sort, Command, Resolver, Verification},
|
||||
Error, Result,
|
||||
|
@ -13,11 +13,11 @@ use std::fmt::Debug;
|
|||
use super::table::{Database, Query, Table};
|
||||
|
||||
pub const TABLE_EVENTS: &str = "_events";
|
||||
pub const TABLE_HEADS: &str = "_heads";
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Room<R: Resolver, D: Database> {
|
||||
root: Event<R::EventType>,
|
||||
heads: Vec<EventId>,
|
||||
resolver: R,
|
||||
database: D,
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ impl<R: Resolver, D: Database> Room<R, D> {
|
|||
RoomBuilder::new()
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
pub async fn new(
|
||||
resolver_name: impl Into<String>,
|
||||
resolver: R,
|
||||
database: D,
|
||||
|
@ -35,6 +35,7 @@ impl<R: Resolver, D: Database> Room<R, D> {
|
|||
signer: SignatureType,
|
||||
secret: &ActorSecret,
|
||||
) -> Result<Self> {
|
||||
println!("create new room");
|
||||
let base_event = Event::builder(
|
||||
EventContent::Create(CreateContent {
|
||||
resolver: resolver_name.into(),
|
||||
|
@ -49,9 +50,13 @@ impl<R: Resolver, D: Database> Room<R, D> {
|
|||
let id_bytes = id_bytes.as_bytes();
|
||||
database
|
||||
.table(TABLE_EVENTS)
|
||||
.put(id_bytes, &serde_json::to_vec(&base_event).unwrap());
|
||||
.put(id_bytes, &serde_json::to_vec(&base_event).unwrap())
|
||||
.await;
|
||||
database
|
||||
.table(TABLE_HEADS)
|
||||
.put(id_bytes, &[])
|
||||
.await;
|
||||
Ok(Self {
|
||||
heads: vec![base_event.id().clone()],
|
||||
root: base_event,
|
||||
resolver,
|
||||
database,
|
||||
|
@ -60,7 +65,6 @@ impl<R: Resolver, D: Database> Room<R, D> {
|
|||
|
||||
pub fn from_root(resolver: R, database: D, event: Event<R::EventType>) -> Result<Self> {
|
||||
Ok(Self {
|
||||
heads: vec![event.id().clone()],
|
||||
root: event,
|
||||
resolver,
|
||||
database,
|
||||
|
@ -75,71 +79,98 @@ impl<R: Resolver, D: Database> Room<R, D> {
|
|||
&self.resolver
|
||||
}
|
||||
|
||||
pub fn resolve_state(&self) {
|
||||
pub async fn resolve_state(&self) {
|
||||
let resolver = self.get_resolver();
|
||||
// ideally i don't get *all* events up front, and query the db as needed
|
||||
let events = self.all_events();
|
||||
let events = self.all_events().await;
|
||||
let sorted = sort(|a, b| resolver.tiebreak(a, b), &events);
|
||||
self.database.reset();
|
||||
dbg!(&sorted);
|
||||
self.database.reset().await;
|
||||
for event in sorted {
|
||||
let effects = resolver.resolve(&self.database, event);
|
||||
let effects = resolver.resolve(&self.database, event).await;
|
||||
for effect in effects {
|
||||
match effect {
|
||||
Command::Put { table, key, value } => {
|
||||
self.database.table(table).put(&key, &value)
|
||||
self.database.table(table).put(&key, &value).await
|
||||
}
|
||||
Command::Delete { table, key } => self.database.table(table).delete(&key),
|
||||
Command::Delete { table, key } => self.database.table(table).delete(&key).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn create_event(
|
||||
pub async fn create_event(
|
||||
&mut self,
|
||||
event_content: EventContent<R::EventType>,
|
||||
secret: &ActorSecret,
|
||||
) -> Result<Event<R::EventType>> {
|
||||
let heads = self.get_heads().await;
|
||||
let event = Event::builder(event_content, secret)
|
||||
.with_references(std::mem::take(&mut self.heads))
|
||||
.with_references(heads)
|
||||
.then_hash()?
|
||||
.and_sign()?;
|
||||
self.append_event(event)
|
||||
self.append_event(event).await
|
||||
}
|
||||
|
||||
pub fn append_event(&mut self, event: Event<R::EventType>) -> Result<Event<R::EventType>> {
|
||||
pub async fn append_event(
|
||||
&mut self,
|
||||
event: Event<R::EventType>,
|
||||
) -> Result<Event<R::EventType>> {
|
||||
event.verify_room(self).expect("event failed verification");
|
||||
match self.get_resolver().verify(&self.database, &event) {
|
||||
match self.get_resolver().verify(&self.database, &event).await {
|
||||
Verification::Valid => {}
|
||||
Verification::Unauthorized => panic!("unauthorized"),
|
||||
Verification::Invalid => panic!("invalid data"),
|
||||
}
|
||||
let id_bytes = event.id().to_string();
|
||||
let id_bytes = id_bytes.as_bytes();
|
||||
if self.database.table(TABLE_EVENTS).get(id_bytes).is_some() {
|
||||
if self
|
||||
.database
|
||||
.table(TABLE_EVENTS)
|
||||
.get(id_bytes)
|
||||
.await
|
||||
.is_some()
|
||||
{
|
||||
return Err(Error::AlreadyExists);
|
||||
}
|
||||
self.database
|
||||
.table(TABLE_EVENTS)
|
||||
.put(id_bytes, &serde_json::to_vec(&event).unwrap());
|
||||
self.heads.retain(|id| !event.references().contains(id));
|
||||
self.heads.push(event.id().clone());
|
||||
.put(id_bytes, &serde_json::to_vec(&event).unwrap())
|
||||
.await;
|
||||
|
||||
for head in dbg!(event.references()) {
|
||||
self.database.table(TABLE_HEADS).delete(&head.to_string().into_bytes()).await;
|
||||
}
|
||||
self.database.table(TABLE_HEADS).put(&event.id().to_string().into_bytes(), &[]).await;
|
||||
Ok(event)
|
||||
}
|
||||
|
||||
pub fn get_heads(&self) -> &[EventId] {
|
||||
&self.heads
|
||||
pub async fn get_heads(&self) -> Vec<EventId> {
|
||||
self.database
|
||||
.table(TABLE_HEADS)
|
||||
.query(Selector::Prefix(vec![]))
|
||||
.get_all()
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|(key, _)| String::from_utf8(key).unwrap().parse().unwrap())
|
||||
.collect()
|
||||
}
|
||||
|
||||
// TEMP: get all events
|
||||
pub fn all_events(&self) -> Vec<Event<R::EventType>> {
|
||||
pub async fn all_events(&self) -> Vec<Event<R::EventType>> {
|
||||
self.database
|
||||
.table(TABLE_EVENTS)
|
||||
.query(Selector::Prefix(vec![]))
|
||||
.get_all()
|
||||
.await
|
||||
.iter()
|
||||
.map(|bytes| serde_json::from_slice(bytes).expect("invalid data"))
|
||||
.map(|(_, bytes)| serde_json::from_slice(bytes).expect("invalid data"))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn get_state(&self) -> &D {
|
||||
&self.database
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RoomBuilder<R: Resolver, D: Database> {
|
||||
|
@ -185,7 +216,7 @@ impl<R: Resolver, D: Database> RoomBuilder<R, D> {
|
|||
self
|
||||
}
|
||||
|
||||
pub fn create(self, secret: &ActorSecret) -> Result<Room<R, D>> {
|
||||
pub async fn create(self, secret: &ActorSecret) -> Result<Room<R, D>> {
|
||||
Room::new(
|
||||
self.resolver
|
||||
.as_ref()
|
||||
|
@ -198,5 +229,6 @@ impl<R: Resolver, D: Database> RoomBuilder<R, D> {
|
|||
self.signer.ok_or(Error::MissingBuilderData)?,
|
||||
secret,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,12 @@
|
|||
use std::future::Future;
|
||||
|
||||
type Bytes = Vec<u8>;
|
||||
|
||||
pub trait Database {
|
||||
type Table: Table;
|
||||
|
||||
fn table(&self, name: impl Into<String>) -> Self::Table;
|
||||
fn reset(&self);
|
||||
fn reset(&self) -> impl Future<Output = ()>;
|
||||
}
|
||||
|
||||
pub trait Table {
|
||||
|
@ -11,21 +15,26 @@ pub trait Table {
|
|||
fn query(&self, selector: Selector) -> Self::Query;
|
||||
fn query_reverse(&self, selector: Selector) -> Self::Query;
|
||||
|
||||
fn get(&self, key: &[u8]) -> Option<Vec<u8>>;
|
||||
fn put(&self, key: &[u8], value: &[u8]);
|
||||
fn delete(&self, key: &[u8]);
|
||||
#[must_use("future must be polled")]
|
||||
fn get(&self, key: &[u8]) -> impl Future<Output = Option<Bytes>>;
|
||||
|
||||
#[must_use("future must be polled")]
|
||||
fn put(&self, key: &[u8], value: &[u8]) -> impl Future<Output = ()>;
|
||||
|
||||
#[must_use("future must be polled")]
|
||||
fn delete(&self, key: &[u8]) -> impl Future<Output = ()>;
|
||||
}
|
||||
|
||||
pub enum Selector {
|
||||
Exact(Vec<u8>),
|
||||
Prefix(Vec<u8>),
|
||||
Exact(Bytes),
|
||||
Prefix(Bytes),
|
||||
}
|
||||
|
||||
pub trait Query {
|
||||
fn get_single(self) -> Option<Vec<u8>>;
|
||||
fn get_all(self) -> Vec<Vec<u8>>;
|
||||
fn get_iter(self) -> impl Iterator<Item = Vec<u8>>;
|
||||
fn count(self) -> u64;
|
||||
fn get_single(self) -> impl Future<Output = Option<Bytes>>;
|
||||
fn get_all(self) -> impl Future<Output = Vec<(Bytes, Bytes)>>;
|
||||
fn get_iter(self) -> impl Iterator<Item = Bytes>;
|
||||
fn count(self) -> impl Future<Output = u64>;
|
||||
}
|
||||
|
||||
impl Into<Selector> for Vec<u8> {
|
||||
|
|
Loading…
Reference in a new issue