different room types in cli
This commit is contained in:
parent
9b02e93906
commit
99f0b4b8c7
7 changed files with 236 additions and 140 deletions
360
src/bin/dag.rs
360
src/bin/dag.rs
|
@ -1,6 +1,5 @@
|
|||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
rc::Rc,
|
||||
fmt::Debug, path::{Path, PathBuf}, rc::Rc
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
|
@ -9,7 +8,7 @@ use dag_resolve::{
|
|||
actor::{ActorId, ActorSecret},
|
||||
event::{CoreContent, Event, HashType, SignatureType},
|
||||
resolver::Resolver,
|
||||
resolvers::kv::{KVEventContent, KVResolver},
|
||||
resolvers::{forum::ForumResolver, kv::KVResolver},
|
||||
room::Room,
|
||||
Error,
|
||||
};
|
||||
|
@ -19,32 +18,40 @@ use rusqlite::Connection;
|
|||
#[clap(version, about)]
|
||||
enum Cli {
|
||||
/// Initialize a new repository
|
||||
Init { repo: PathBuf },
|
||||
|
||||
/// Set a key in a repository
|
||||
Set {
|
||||
Init {
|
||||
repo: PathBuf,
|
||||
key: String,
|
||||
value: String,
|
||||
resolver: String,
|
||||
},
|
||||
|
||||
/// Send arbitrary event data to a room
|
||||
Send {
|
||||
repo: PathBuf,
|
||||
|
||||
/// the json as a string
|
||||
data: String,
|
||||
},
|
||||
|
||||
/// Get info about a repository
|
||||
Info { repo: PathBuf },
|
||||
|
||||
/// Synchronize two repositories
|
||||
Sync { repo: PathBuf, other: PathBuf },
|
||||
}
|
||||
|
||||
struct State {
|
||||
#[derive(Debug)]
|
||||
struct State<R: Resolver> {
|
||||
db: Rc<Connection>,
|
||||
room: Room<KVResolver>,
|
||||
room: Room<R>,
|
||||
secret: ActorSecret,
|
||||
store: Box<dag_resolve::store::sqlite::Database>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
fn create_event(&mut self, content: CoreContent<KVEventContent>) -> Result<()> {
|
||||
impl<R: Resolver> State<R> {
|
||||
fn create_event(&mut self, content: CoreContent<R::EventType>) -> Result<()> {
|
||||
match self.room.create_event(content, &self.secret) {
|
||||
Ok(event) => {
|
||||
self.db.execute(
|
||||
"INSERT INTO events (id, json) VALUES (?, ?)",
|
||||
"INSERT INTO _events (id, json) VALUES (?, ?)",
|
||||
(event.id().to_string(), serde_json::to_string(&event)?),
|
||||
)?;
|
||||
Ok(())
|
||||
|
@ -54,11 +61,11 @@ impl State {
|
|||
}
|
||||
}
|
||||
|
||||
fn append_event(&mut self, event: Event<KVEventContent>) -> Result<()> {
|
||||
fn append_event(&mut self, event: Event<R::EventType>) -> Result<()> {
|
||||
match self.room.append_event(event) {
|
||||
Ok(event) => {
|
||||
self.db.execute(
|
||||
"INSERT INTO events (id, json) VALUES (?, ?)",
|
||||
"INSERT INTO _events (id, json) VALUES (?, ?)",
|
||||
(event.id().to_string(), serde_json::to_string(&event)?),
|
||||
)?;
|
||||
Ok(())
|
||||
|
@ -67,133 +74,220 @@ impl State {
|
|||
Err(err) => Err(err.into()),
|
||||
}
|
||||
}
|
||||
|
||||
fn init(resolver: R, path: &Path) -> Result<Self> {
|
||||
// create new repo
|
||||
println!("init new repo!");
|
||||
let db = rusqlite::Connection::open(path)?;
|
||||
db.execute_batch(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS _config (
|
||||
key TEXT PRIMARY KEY,
|
||||
value ANY
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS _events (
|
||||
id TEXT PRIMARY KEY,
|
||||
json TEXT
|
||||
);
|
||||
"#,
|
||||
)?;
|
||||
let (actor, secret) = ActorId::new(SignatureType::Ed25519);
|
||||
let room = Room::builder()
|
||||
.with_resolver(resolver)
|
||||
.with_hasher(HashType::Sha256)
|
||||
.with_signer(SignatureType::Ed25519)
|
||||
.create(&secret)?;
|
||||
db.execute(
|
||||
"INSERT INTO _config VALUES (?, ?)",
|
||||
("actor_id", actor.to_string()),
|
||||
)?;
|
||||
db.execute(
|
||||
"INSERT INTO _config VALUES (?, ?)",
|
||||
("actor_secret", serde_json::to_string(&secret)?),
|
||||
)?;
|
||||
for event in &room.events {
|
||||
db.execute(
|
||||
"INSERT INTO _events (id, json) VALUES (?, ?)",
|
||||
(event.id().to_string(), serde_json::to_string(&event)?),
|
||||
)?;
|
||||
}
|
||||
let db = Rc::new(db);
|
||||
let store = dag_resolve::store::sqlite::Database::from_conn(db.clone())
|
||||
.init(room.get_resolver().get_state_config())?;
|
||||
Ok(State {
|
||||
db,
|
||||
room,
|
||||
secret,
|
||||
store,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn get_repo(path: &Path, create: bool) -> Result<State> {
|
||||
match (create, path.try_exists()?) {
|
||||
(true, true) => panic!("repo already exists"),
|
||||
(false, false) => panic!("repo does not exist"),
|
||||
(true, false) => {
|
||||
// create new repo
|
||||
let db = rusqlite::Connection::open(path)?;
|
||||
db.execute_batch(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS config (
|
||||
key TEXT PRIMARY KEY,
|
||||
value ANY
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS events (
|
||||
id TEXT PRIMARY KEY,
|
||||
json TEXT
|
||||
);
|
||||
"#,
|
||||
)?;
|
||||
let (actor, secret) = ActorId::new(SignatureType::Ed25519);
|
||||
let resolver = KVResolver::new();
|
||||
let room = Room::builder()
|
||||
.with_resolver(resolver)
|
||||
.with_hasher(HashType::Sha256)
|
||||
.with_signer(SignatureType::Ed25519)
|
||||
.create(&secret)?;
|
||||
db.execute(
|
||||
"INSERT INTO config VALUES (?, ?)",
|
||||
("actor_id", actor.to_string()),
|
||||
)?;
|
||||
db.execute(
|
||||
"INSERT INTO config VALUES (?, ?)",
|
||||
("actor_secret", serde_json::to_string(&secret)?),
|
||||
)?;
|
||||
for event in &room.events {
|
||||
db.execute(
|
||||
"INSERT INTO events (id, json) VALUES (?, ?)",
|
||||
(event.id().to_string(), serde_json::to_string(&event)?),
|
||||
)?;
|
||||
}
|
||||
let db = Rc::new(db);
|
||||
let store = dag_resolve::store::sqlite::Database::from_conn(db.clone())
|
||||
.init(room.get_resolver().get_state_config())?;
|
||||
Ok(State {
|
||||
db,
|
||||
room,
|
||||
secret,
|
||||
store,
|
||||
})
|
||||
}
|
||||
(false, true) => {
|
||||
// restore repo
|
||||
let db = rusqlite::Connection::open(path)?;
|
||||
let actor: ActorId =
|
||||
db.query_row("SELECT value FROM config WHERE key='actor_id'", [], |row| {
|
||||
row.get(0).map(|s: String| s.parse())
|
||||
})??;
|
||||
let secret: ActorSecret = db.query_row(
|
||||
"SELECT value FROM config WHERE key='actor_secret'",
|
||||
[],
|
||||
|row| row.get(0).map(|s: String| serde_json::from_str(&s)),
|
||||
)??;
|
||||
dbg!(&actor, &secret);
|
||||
let mut q = db.prepare("SELECT json FROM events")?;
|
||||
let mut rows = q.query([])?;
|
||||
let mut room: Option<Room<KVResolver>> = None;
|
||||
while let Some(row) = rows.next()? {
|
||||
let s: String = dbg!(row.get(0)?);
|
||||
let ev = dbg!(serde_json::from_str(&s)?);
|
||||
match &mut room {
|
||||
Some(r) => {
|
||||
r.append_event(ev)?;
|
||||
}
|
||||
None => {
|
||||
let resolver = KVResolver::new();
|
||||
let r = Room::from_root(resolver, ev)?;
|
||||
room = Some(r);
|
||||
}
|
||||
fn open(path: impl AsRef<Path>) -> Result<Opened> {
|
||||
// restore repo
|
||||
let db = rusqlite::Connection::open(path)?;
|
||||
let actor: ActorId =
|
||||
db.query_row("SELECT value FROM _config WHERE key='actor_id'", [], |row| {
|
||||
row.get(0).map(|s: String| s.parse())
|
||||
})??;
|
||||
let secret: ActorSecret = db.query_row(
|
||||
"SELECT value FROM _config WHERE key='actor_secret'",
|
||||
[],
|
||||
|row| row.get(0).map(|s: String| serde_json::from_str(&s)),
|
||||
)??;
|
||||
dbg!(&actor, &secret);
|
||||
let mut q = db.prepare("SELECT json FROM _events")?;
|
||||
let mut rows = q.query([])?;
|
||||
|
||||
let event_json: String = rows.next()?.expect("there's always one root event").get(0)?;
|
||||
let event: Event<()> = serde_json::from_str(&event_json)?;
|
||||
match event.content() {
|
||||
CoreContent::Create(c) => match c.resolver.as_str() {
|
||||
"kv" => {
|
||||
let mut room = Room::from_root(KVResolver, serde_json::from_str(&event_json)?)?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let s: String = row.get(0)?;
|
||||
let ev = serde_json::from_str(&s)?;
|
||||
room.append_event(ev)?;
|
||||
}
|
||||
}
|
||||
drop(rows);
|
||||
drop(q);
|
||||
let room = room.unwrap();
|
||||
let db = Rc::new(db);
|
||||
dbg!(&room);
|
||||
let store = dag_resolve::store::sqlite::Database::from_conn(db.clone())
|
||||
.init(room.get_resolver().get_state_config())?;
|
||||
Ok(State {
|
||||
db,
|
||||
room,
|
||||
secret,
|
||||
store,
|
||||
})
|
||||
}
|
||||
drop(event);
|
||||
drop(rows);
|
||||
drop(q);
|
||||
dbg!(&room);
|
||||
let db = Rc::new(db);
|
||||
let store = dag_resolve::store::sqlite::Database::from_conn(db.clone())
|
||||
.init(room.get_resolver().get_state_config())?;
|
||||
Ok(Opened::Kv(State {
|
||||
db,
|
||||
room,
|
||||
secret,
|
||||
store,
|
||||
}))
|
||||
},
|
||||
"forum-v0" => {
|
||||
let mut room = Room::from_root(ForumResolver, serde_json::from_str(&event_json)?)?;
|
||||
while let Some(row) = rows.next()? {
|
||||
let s: String = row.get(0)?;
|
||||
let ev = serde_json::from_str(&s)?;
|
||||
room.append_event(ev)?;
|
||||
}
|
||||
drop(event);
|
||||
drop(rows);
|
||||
drop(q);
|
||||
dbg!(&room);
|
||||
let db = Rc::new(db);
|
||||
let store = dag_resolve::store::sqlite::Database::from_conn(db.clone())
|
||||
.init(room.get_resolver().get_state_config())?;
|
||||
Ok(Opened::Forum(State {
|
||||
db,
|
||||
room,
|
||||
secret,
|
||||
store,
|
||||
}))
|
||||
},
|
||||
_ => unimplemented!("unknown resolver"),
|
||||
},
|
||||
_ => panic!("tried to start with non-create event"),
|
||||
}
|
||||
}
|
||||
|
||||
enum Opened {
|
||||
Kv(State<KVResolver>),
|
||||
Forum(State<ForumResolver>),
|
||||
}
|
||||
|
||||
fn startup(path: impl AsRef<Path>, create: Option<&str>) -> Result<Opened> {
|
||||
let path = path.as_ref();
|
||||
match (create, path.try_exists()?) {
|
||||
(Some(_), true) => panic!("repo already exists"),
|
||||
(None, false) => panic!("repo does not exist"),
|
||||
(Some(resolver_name), false) => {
|
||||
let state = match resolver_name {
|
||||
"kv" => Opened::Kv(State::init(KVResolver, path)?),
|
||||
"forum" => Opened::Forum(State::init(ForumResolver, path)?),
|
||||
_ => unimplemented!("unknown resolver"),
|
||||
};
|
||||
Ok(state)
|
||||
}
|
||||
(None, true) => open(path),
|
||||
}
|
||||
}
|
||||
|
||||
fn sync_state<R: Resolver + Debug>(mut from: State<R>, mut to: State<R>) -> Result<()> {
|
||||
if from.room.get_root() != to.room.get_root() {
|
||||
panic!("cannot sync events from two different rooms");
|
||||
}
|
||||
for event in &from.room.events {
|
||||
to.append_event(event.clone())?;
|
||||
}
|
||||
for event in &to.room.events {
|
||||
from.append_event(event.clone())?;
|
||||
}
|
||||
dbg!(
|
||||
from.room.resolve_state(*from.store),
|
||||
to.room.resolve_state(*to.store)
|
||||
);
|
||||
dbg!(from.room);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_event<R: Resolver + Debug>(mut state: State<R>, data: &str) -> Result<()> {
|
||||
state.create_event(dbg!(serde_json::from_str(dbg!(data))?))?;
|
||||
dbg!(&state.room);
|
||||
dbg!(&state.room.resolve_state(*state.store));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn print_info<R: Resolver + Debug>(state: State<R>) -> Result<()> {
|
||||
let event_count: u64 = state.db.query_row("SELECT count(*) FROM _events", [], |r| r.get(0))?;
|
||||
println!("room type: {}", state.room.get_resolver().name());
|
||||
println!("room events: {} total", event_count);
|
||||
println!("room state schema:");
|
||||
for (table_name, table_schema) in state.room.get_resolver().get_state_config().tables {
|
||||
print!(" {} (", table_name);
|
||||
for (idx, (column_name, ctype)) in table_schema.columns.iter().enumerate() {
|
||||
if idx != 0 {
|
||||
print!(", ");
|
||||
}
|
||||
print!("{} {:?}", column_name, ctype);
|
||||
}
|
||||
print!(") indexed on (");
|
||||
for (idx, index) in table_schema.indexes.iter().enumerate() {
|
||||
if idx != 0 {
|
||||
print!(", ");
|
||||
}
|
||||
print!("{} {:?}", index.column, index.index_type);
|
||||
}
|
||||
println!(")");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
let mut repo = match &cli {
|
||||
Cli::Init { repo } => get_repo(repo, true)?,
|
||||
Cli::Set { repo, .. } | Cli::Sync { repo, .. } => get_repo(repo, false)?,
|
||||
let opened = match &cli {
|
||||
Cli::Init { repo, resolver } => startup(repo, Some(resolver))?,
|
||||
Cli::Send { repo, .. } | Cli::Info { repo, .. } | Cli::Sync { repo, .. } => startup(repo, None)?,
|
||||
};
|
||||
match cli {
|
||||
Cli::Init { .. } => {}
|
||||
Cli::Set { key, value, .. } => {
|
||||
repo.create_event(CoreContent::Custom(KVEventContent::Set(key, value)))?;
|
||||
dbg!(&repo.room);
|
||||
dbg!(&repo.room.resolve_state(*repo.store));
|
||||
}
|
||||
Cli::Sync { other, .. } => {
|
||||
let mut other = get_repo(&other, false)?;
|
||||
for event in &repo.room.events {
|
||||
other.append_event(event.clone())?;
|
||||
match (cli, opened) {
|
||||
(Cli::Init { .. }, _) => {},
|
||||
(Cli::Send { repo: _, data }, Opened::Kv(state)) => send_event(state, &data)?,
|
||||
(Cli::Send { repo: _, data }, Opened::Forum(state)) => send_event(state, &data)?,
|
||||
(Cli::Info { repo: _ }, Opened::Kv(state)) => print_info(state)?,
|
||||
(Cli::Info { repo: _ }, Opened::Forum(state)) => print_info(state)?,
|
||||
(Cli::Sync { repo: _, other }, Opened::Kv(our_state)) => {
|
||||
match startup(other, None)? {
|
||||
Opened::Kv(their_state) => sync_state(our_state, their_state)?,
|
||||
_ => panic!("cannot sync different room types"),
|
||||
}
|
||||
for event in &other.room.events {
|
||||
repo.append_event(event.clone())?;
|
||||
},
|
||||
(Cli::Sync { repo: _, other }, Opened::Forum(our_state)) => {
|
||||
match startup(other, None)? {
|
||||
Opened::Forum(their_state) => sync_state(our_state, their_state)?,
|
||||
_ => panic!("cannot sync different room types"),
|
||||
}
|
||||
dbg!(
|
||||
repo.room.resolve_state(*repo.store),
|
||||
other.room.resolve_state(*other.store)
|
||||
);
|
||||
dbg!(repo.room);
|
||||
}
|
||||
};
|
||||
},
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -65,6 +65,7 @@ pub struct EventVerify<T> {
|
|||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
/// This defines content of an event as either a builtin event or application-specific event.
|
||||
// NOTE: move <T> somewhere else, so that it's possible to deserialize CreateContent more easily?
|
||||
pub enum CoreContent<T> {
|
||||
Create(CreateContent),
|
||||
Custom(T),
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use serde::Serialize;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::table::{State, StateConfig, TableRow};
|
||||
use crate::event::{Event, EventId};
|
||||
|
@ -6,7 +6,7 @@ use std::{cmp::Ordering, collections::HashSet, fmt::Debug};
|
|||
|
||||
/// small shards of code designed to resolve state
|
||||
pub trait Resolver {
|
||||
type EventType: Debug + Serialize + Clone;
|
||||
type EventType: Clone + Debug + Serialize + for<'a> Deserialize<'a>;
|
||||
|
||||
/// Given a set of ordered events, resolve the final state
|
||||
fn resolve<S: State>(&self, state: &S, event: &Event<Self::EventType>) -> Vec<ResolverEffect>;
|
||||
|
|
|
@ -71,9 +71,9 @@ impl<R: Resolver> Room<R> {
|
|||
{
|
||||
let resolver = self.get_resolver();
|
||||
let sorted = sort(|a, b| resolver.tiebreak(a, b), &self.events);
|
||||
let mut state = initial_state;
|
||||
let state = initial_state;
|
||||
for event in sorted {
|
||||
let effects = resolver.resolve(&mut state, event);
|
||||
let effects = resolver.resolve(&state, event);
|
||||
for effect in effects {
|
||||
match effect {
|
||||
ResolverEffect::Insert { table, row } => {
|
||||
|
|
|
@ -76,7 +76,7 @@ pub struct IndexConfig {
|
|||
pub column: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct TableRow {
|
||||
pub values: HashMap<String, ColumnValue>,
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@ impl Resolver for ForumResolver {
|
|||
}
|
||||
|
||||
fn name(&self) -> &str {
|
||||
"forum-v1"
|
||||
"forum-v0"
|
||||
}
|
||||
|
||||
fn get_state_config(&self) -> StateConfig {
|
||||
|
|
|
@ -96,6 +96,7 @@ impl Table for MemoryTable {
|
|||
}
|
||||
|
||||
fn insert(&mut self, row: TableRow) -> Result<(), Self::Err> {
|
||||
Ok(self.map.write().unwrap().push(row))
|
||||
self.map.write().unwrap().push(row);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue