diff --git a/docs/crdt.md b/docs/crdt.md new file mode 100644 index 00000000..88a521c --- /dev/null +++ b/docs/crdt.md @@ -0,0 +1,83 @@ +# crdts? + +Should I make the json-format mroe crdts-like? + +Importantly, this allows patch-like instead of put-like updates. If 2 +people try to update the same event twice only one person's update will +be fully applied. This might be desirable, but if you want that behavior +you can specify the fields to keep in the patch. + +For example, when editing a document, the conflicts will be merged rather +than one person's edit overwriting everything. + +## changes + +Arrays are unordered, use objects if you need order + +```js +{ + unordered: ["foo", "bar", "baz"], + ordered: { + "a": "foo", + "b": "bar", + "c": "baz", + } +} +``` + +Null no longer exists except for unsetting a key + +```js +{ unset_me: "hello" } + { unset_me: null } == {} +``` + +In terms of object structure, this might be it? However, documents have +strings (`hello ~bold{world}`) which could be merged. + +``` +> hello ~bold{world} ++ ! ++ asdfg += hello ~bold{asdfg}! +``` + +Would be nice but I'm not sure how to define this, especially considering +you wouldn't get insert position/offests you'd get + +``` +> hello ~bold{world} ++ hello ~bold{world}! ++ hello ~bold{asdfg} += hello ~bold{asdfg}! +``` + +so some diffing algorithm would need to be implemented + +## tree-style edits + +``` +current impl +| l.thing +| + x.update +| + x.update +| + x.update +| + x.update + +tree style impl +| l.thing +| + x.update +| | + x.update +| | | + x.update +| + x.update +``` + +updates (and annotations) have prev_events/replace other updates + +this reduces a need for trusting timestamps (which could still be used +as a tiebreaker) + +other than that im not sure if i should make this a tree (keep in mind +you're letting people edit your stuff, so you probably trust them to +at least have the correct timestamp). the biggest annnoyance would be +someone making an edit with a really big timestamp because then everyone +else would need big timestamps to overwrite it and it would become a mess diff --git a/lib/src/acl.rs b/lib/src/acl.rs index 64a531b..867e039 100644 --- a/lib/src/acl.rs +++ b/lib/src/acl.rs @@ -16,9 +16,34 @@ type Relations = HashMap; pub struct Acl { pub roles: HashMap, pub users: HashMap>, + // TODO (maybe): remove admins, since they're not special cased in any way + // admins can be implemented with a special `admin` role pub admins: Vec, // HashSet would be better, but is unordered (bad with canonical json) + // pub visibility: Visibility, } +// TODO: default roles +// allow specifying roles that would be the default, and manually setting non-default roles on users +// eg. public visibility + "can send post" role = everyone can send posts +// then you can override roles for each person to eg. ban someone from posting +// NOTE: this might be a bad way of banning people, since the acl will keep growing +// in that case i'd need to rework acls in general, maybe make the main +// "acl" thing only define roles and admins, then allow specifying which +// users have which roles outside of that (acl event + annotate?). + +// #[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)] +// pub enum Visibility { +// /// only people allowed to view can view (default) +// #[default] +// Private, +// +// /// anyone can view +// Public, +// +// /// visible + the server (and bots) can index it/make it searchable +// Index, +// } + // FIXME: allow missing fields but not null // see: https://github.com/serde-rs/json/issues/376 // see: https://github.com/serde-rs/serde/issues/984#issuecomment-314143738 diff --git a/server/src/items/mod.rs b/server/src/items/mod.rs index 6922405..56fc664 100644 --- a/server/src/items/mod.rs +++ b/server/src/items/mod.rs @@ -396,6 +396,7 @@ impl Items { trace!("pulling relations: {:?}", query.relations); // FIXME: ordering can be wrong here, breaking things like x.update + // FIXME: between importing events and subscribing, some events may be lost for event in query.relations.unwrap_or_default().into_values() { let wip = WipEvent { content: event.content, @@ -457,37 +458,47 @@ impl Items { } for hoster in &hosters { - trace!("on hoster {:?}", hoster); - hoster - .send_auth( - &self.p2p.contact, - RPCRequest::Subscribe(vec![item_ref.clone()]), - &self.gen_node_auth(), - grant, - ) - .await?; - use sqlx::query as sql; - let item_ref_str = item_ref.to_string(); - let user_str = grant.actor_id.to_string(); - let contact_str = serde_json::to_string(hoster).expect("always serializable"); - trace!("insert into subs (incoming)"); - let _lock = self.db.lock.write().await; - let query = sql!( - "INSERT OR IGNORE INTO subscriptions (ref, user, node, direction) VALUES (?, ?, ?, 'i')", - item_ref_str, - user_str, - contact_str, - ) - .execute(self.db.pool()) - .await; - if let Err(err) = query { - error!("db error: {}", err); - } + self.subscribe(&vec![item_ref.clone()], grant, hoster).await?; } trace!("finished sending subscribes"); Ok(()) } + + pub async fn subscribe( + &self, + refs: &[ItemRef], + grant: &P2PAuth, + target: &Contact, + ) -> Result<(), Error> { + trace!("subscribe to refs = {:?}, grant = {:?}, contact {:?}", refs, grant, target); + + let msg = RPCRequest::Subscribe(refs.into()); + target + .send_auth(&self.p2p.contact, msg, &self.gen_node_auth(), grant) + .await?; + + let mut query = sqlx::QueryBuilder::new("INSERT OR IGNORE INTO subscriptions (ref, user, node, direction) "); + let contact_str = + serde_json::to_string(&target).expect("always serializable"); + query.push_values(refs, |mut query, item_ref| { + query + .push_bind(item_ref.to_string()) + .push_bind(grant.actor_id.to_string()) + .push_bind(&contact_str) + .push_bind("i"); + }); + + trace!("insert into subs (incoming), sql = {}", query.sql()); + let _lock = self.db.lock.write().await; + let query = query.build().execute(self.db.pool()).await; + if let Err(err) = query { + error!("db error: {}", err); + return Err(err.into()); + } + + Ok(()) + } } // TODO (performance): cancel if there's no progress on getting "closer" to the target? diff --git a/server/src/main.rs b/server/src/main.rs index 6ef57e7..f8014dc 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -86,7 +86,17 @@ pub struct ServerState { items: items::Items, queries: RwLock>, // maybe move this to state? grants: RwLock>, // TODO: move this to db + + /// events (that will be sent to other event) + // TODO (future): i *do* want to send foreign events to other + // servers (instead of full mesh), but need a good way to prevent + // cycles. currently, not having this will cause an infinite ping/pong + // between two servers + local_events: broadcast::Sender, + + /// all events events: broadcast::Sender, + p2p: Arc, server_event: Event, } @@ -224,6 +234,7 @@ async fn serve(config: &Config) -> Result<(), Error> { queries: RwLock::new(HashMap::new()), grants: RwLock::new(HashMap::new()), events: tokio::sync::broadcast::channel(64).0, + local_events: tokio::sync::broadcast::channel(64).0, p2p, server_event: event, }; @@ -274,7 +285,7 @@ async fn serve(config: &Config) -> Result<(), Error> { } }); - let mut watcher = state.events.subscribe(); + let mut watcher = state.local_events.subscribe(); let watcher_state = state.clone(); tokio::spawn(async move { let state = watcher_state; diff --git a/server/src/peer/mod.rs b/server/src/peer/mod.rs index 8b5d2a2..002b1e0 100644 --- a/server/src/peer/mod.rs +++ b/server/src/peer/mod.rs @@ -69,6 +69,7 @@ pub struct Contact { pub enum RPCRequest { Ping, Announce(Vec), + // TODO (future): batch grants Subscribe(Vec), Relation(ActorId, Vec), // Unsubscribe(NodeId), diff --git a/server/src/routes/p2p.rs b/server/src/routes/p2p.rs index 9b0bb94..48bbca9 100644 --- a/server/src/routes/p2p.rs +++ b/server/src/routes/p2p.rs @@ -7,10 +7,12 @@ use axum::{extract::State, routing, Json, Router}; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; use std::{ + collections::{HashMap, HashSet}, sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; use tracing::{debug, error, trace}; +use ufh::{actor::ActorId, item::ItemRef}; use super::{ things::Error, @@ -97,24 +99,39 @@ async fn message( let mut tx = state.db.pool().begin().await?; let user_str = user.to_string(); let contact_str = serde_json::to_string(&request.contact).expect("always serializable"); - let mut missing = Vec::new(); + let mut missing: HashMap> = HashMap::new(); for item_ref in refs { trace!("with ref = {}", item_ref); let item_ref_str = item_ref.to_string(); trace!("select count"); - // FIXME: this is extremely basic - it only will send - // a subscribe if there are no current subscriptiosns - let query = sql!( - "SELECT count(*) as count FROM subscriptions WHERE ref = ? AND direction = 'i'", + // FIXME: this will send subscriptions for *all* users, + // even if its not visible to them + // FIXME: there isn't a path here for resending subscriptions + // i'll add one in /peer/grant + let records = sql!( + "SELECT user FROM subscriptions WHERE ref = ? AND direction = 'i'", item_ref_str, ) - .fetch_one(&mut tx) + .fetch_all(&mut tx) .await?; - if query.count == 0 { - trace!("ref is missing, will refetch"); - missing.push(item_ref.clone()); + let existing_users: HashSet = records + .into_iter() + .map(|r| r.user.parse().expect("validated on input")) + .collect(); + let grants = state.grants.read().await; + let users: Vec<_> = grants + .keys() + .filter(|actor| !existing_users.contains(actor)) + .cloned() + .collect(); + if !users.is_empty() { + trace!("ref is missing for users {:?}, will resubscribe", users); + for user in users { + missing.entry(user).or_default().push(item_ref.clone()); + } } + drop(grants); trace!("insert into subs (outgoing)"); let query = sql!( "INSERT OR IGNORE INTO subscriptions (ref, user, node, direction) VALUES (?, ?, ?, 'o')", @@ -145,37 +162,11 @@ async fn message( missing ); trace!("i have {} users's grants", grants.len()); - for grant in grants.values() { - trace!( - "send back subscribe with grant {:?} for items {:?}", - grant, - missing - ); - let sub = RPCRequest::Subscribe(missing.clone()); - let res = request - .contact - .send_auth(&state.p2p.contact, sub, &state.items.gen_node_auth(), grant) - .await; - if res.is_err() { - break; - } - - let mut query = sqlx::QueryBuilder::new("INSERT OR IGNORE INTO subscriptions (ref, user, node, direction) "); - let contact_str = - serde_json::to_string(&request.contact).expect("always serializable"); - query.push_values(&missing, |mut query, item_ref| { - query - .push_bind(item_ref.to_string()) - .push_bind(grant.actor_id.to_string()) - .push_bind(&contact_str) - .push_bind("i"); - }); - - trace!("insert into subs (incoming), sql = {}", query.sql()); - let _lock = state.db.lock.write().await; - let query = query.build().execute(state.db.pool()).await; - if let Err(err) = query { - error!("db error: {}", err); + for (user, refs) in missing { + let grant = grants.get(&user).expect("existed a moment ago"); + trace!("send back subscribe"); + let sub = state.items.subscribe(&refs, grant, &request.contact).await; + if sub.is_err() { break; } } diff --git a/server/src/routes/things/create.rs b/server/src/routes/things/create.rs index cd9b6ed..7fc717b 100644 --- a/server/src/routes/things/create.rs +++ b/server/src/routes/things/create.rs @@ -103,7 +103,8 @@ pub async fn route( Ok(create) => create, Err(err) => return error!("failed to finish creating event: {}", err), }; - let _ = state.events.send(create); + let _ = state.events.send(create.clone()); + let _ = state.local_events.send(create); notify_have_and_update_db(&state, &item_ref2).await; }); return Ok((StatusCode::ACCEPTED, Json(json!({ "ref": item_ref })))); @@ -113,7 +114,8 @@ pub async fn route( let wip = state.items.commit_event_create(wip).await?; let create = state.items.finish_event_create(wip).await?; let item_ref = create.event.id.clone(); - let _ = state.events.send(create); + let _ = state.events.send(create.clone()); + let _ = state.local_events.send(create); trace!("notified pollers of event"); notify_have_and_update_db(&state, &item_ref).await; diff --git a/server/src/state/db/sqlite.rs b/server/src/state/db/sqlite.rs index f25db9f..0224baa 100644 --- a/server/src/state/db/sqlite.rs +++ b/server/src/state/db/sqlite.rs @@ -451,7 +451,7 @@ impl Database for Sqlite { if let Some(announce_time) = announce_time { builder - .push(" AND (last_announce == NULL OR last_announce < ") + .push(" AND (last_announce = NULL OR last_announce < ") .push_bind(announce_time as i64) .push(")"); } diff --git a/web/Nav.svelte b/web/Nav.svelte index d0d7f47..f3f7247 100644 --- a/web/Nav.svelte +++ b/web/Nav.svelte @@ -129,6 +129,7 @@ padding: 2px 0; overflow-y: auto; contain: strict; + padding-bottom: 64px; & > .wrapper { padding: 2px 4px; diff --git a/web/Pins.svelte b/web/Pins.svelte index 8f4d7b9..9740ecb 100644 --- a/web/Pins.svelte +++ b/web/Pins.svelte @@ -1,5 +1,18 @@ + diff --git a/web/scenes/Forum/Forum.svelte b/web/scenes/Forum/Forum.svelte index 868df4b..88b8c73 100644 --- a/web/scenes/Forum/Forum.svelte +++ b/web/scenes/Forum/Forum.svelte @@ -24,8 +24,8 @@
- new post +
new post
title:
body: