fix cyclic networks, some ideas and cleanup

This commit is contained in:
tezlm 2023-08-21 21:11:24 -07:00
parent 366ddb4f93
commit e7b305ecc2
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
11 changed files with 210 additions and 72 deletions

83
docs/crdt.md Normal file
View file

@ -0,0 +1,83 @@
# crdts?
Should I make the json-format mroe crdts-like?
Importantly, this allows patch-like instead of put-like updates. If 2
people try to update the same event twice only one person's update will
be fully applied. This might be desirable, but if you want that behavior
you can specify the fields to keep in the patch.
For example, when editing a document, the conflicts will be merged rather
than one person's edit overwriting everything.
## changes
Arrays are unordered, use objects if you need order
```js
{
unordered: ["foo", "bar", "baz"],
ordered: {
"a": "foo",
"b": "bar",
"c": "baz",
}
}
```
Null no longer exists except for unsetting a key
```js
{ unset_me: "hello" } + { unset_me: null } == {}
```
In terms of object structure, this might be it? However, documents have
strings (`hello ~bold{world}`) which could be merged.
```
> hello ~bold{world}
+ !
+ asdfg
= hello ~bold{asdfg}!
```
Would be nice but I'm not sure how to define this, especially considering
you wouldn't get insert position/offests you'd get
```
> hello ~bold{world}
+ hello ~bold{world}!
+ hello ~bold{asdfg}
= hello ~bold{asdfg}!
```
so some diffing algorithm would need to be implemented
## tree-style edits
```
current impl
| l.thing
| + x.update
| + x.update
| + x.update
| + x.update
tree style impl
| l.thing
| + x.update
| | + x.update
| | | + x.update
| + x.update
```
updates (and annotations) have prev_events/replace other updates
this reduces a need for trusting timestamps (which could still be used
as a tiebreaker)
other than that im not sure if i should make this a tree (keep in mind
you're letting people edit your stuff, so you probably trust them to
at least have the correct timestamp). the biggest annnoyance would be
someone making an edit with a really big timestamp because then everyone
else would need big timestamps to overwrite it and it would become a mess

View file

@ -16,9 +16,34 @@ type Relations = HashMap<ItemRef, (Event, RelInfo)>;
pub struct Acl {
pub roles: HashMap<RoleId, Role>,
pub users: HashMap<ActorId, HashSet<RoleId>>,
// TODO (maybe): remove admins, since they're not special cased in any way
// admins can be implemented with a special `admin` role
pub admins: Vec<ActorId>, // HashSet would be better, but is unordered (bad with canonical json)
// pub visibility: Visibility,
}
// TODO: default roles
// allow specifying roles that would be the default, and manually setting non-default roles on users
// eg. public visibility + "can send post" role = everyone can send posts
// then you can override roles for each person to eg. ban someone from posting
// NOTE: this might be a bad way of banning people, since the acl will keep growing
// in that case i'd need to rework acls in general, maybe make the main
// "acl" thing only define roles and admins, then allow specifying which
// users have which roles outside of that (acl event + annotate?).
// #[derive(Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
// pub enum Visibility {
// /// only people allowed to view can view (default)
// #[default]
// Private,
//
// /// anyone can view
// Public,
//
// /// visible + the server (and bots) can index it/make it searchable
// Index,
// }
// FIXME: allow missing fields but not null
// see: https://github.com/serde-rs/json/issues/376
// see: https://github.com/serde-rs/serde/issues/984#issuecomment-314143738

View file

@ -396,6 +396,7 @@ impl Items {
trace!("pulling relations: {:?}", query.relations);
// FIXME: ordering can be wrong here, breaking things like x.update
// FIXME: between importing events and subscribing, some events may be lost
for event in query.relations.unwrap_or_default().into_values() {
let wip = WipEvent {
content: event.content,
@ -457,37 +458,47 @@ impl Items {
}
for hoster in &hosters {
trace!("on hoster {:?}", hoster);
hoster
.send_auth(
&self.p2p.contact,
RPCRequest::Subscribe(vec![item_ref.clone()]),
&self.gen_node_auth(),
grant,
)
.await?;
use sqlx::query as sql;
let item_ref_str = item_ref.to_string();
let user_str = grant.actor_id.to_string();
let contact_str = serde_json::to_string(hoster).expect("always serializable");
trace!("insert into subs (incoming)");
let _lock = self.db.lock.write().await;
let query = sql!(
"INSERT OR IGNORE INTO subscriptions (ref, user, node, direction) VALUES (?, ?, ?, 'i')",
item_ref_str,
user_str,
contact_str,
)
.execute(self.db.pool())
.await;
if let Err(err) = query {
error!("db error: {}", err);
}
self.subscribe(&vec![item_ref.clone()], grant, hoster).await?;
}
trace!("finished sending subscribes");
Ok(())
}
pub async fn subscribe(
&self,
refs: &[ItemRef],
grant: &P2PAuth,
target: &Contact,
) -> Result<(), Error> {
trace!("subscribe to refs = {:?}, grant = {:?}, contact {:?}", refs, grant, target);
let msg = RPCRequest::Subscribe(refs.into());
target
.send_auth(&self.p2p.contact, msg, &self.gen_node_auth(), grant)
.await?;
let mut query = sqlx::QueryBuilder::new("INSERT OR IGNORE INTO subscriptions (ref, user, node, direction) ");
let contact_str =
serde_json::to_string(&target).expect("always serializable");
query.push_values(refs, |mut query, item_ref| {
query
.push_bind(item_ref.to_string())
.push_bind(grant.actor_id.to_string())
.push_bind(&contact_str)
.push_bind("i");
});
trace!("insert into subs (incoming), sql = {}", query.sql());
let _lock = self.db.lock.write().await;
let query = query.build().execute(self.db.pool()).await;
if let Err(err) = query {
error!("db error: {}", err);
return Err(err.into());
}
Ok(())
}
}
// TODO (performance): cancel if there's no progress on getting "closer" to the target?

View file

@ -86,7 +86,17 @@ pub struct ServerState {
items: items::Items,
queries: RwLock<HashMap<String, Query>>, // maybe move this to state?
grants: RwLock<HashMap<ActorId, P2PAuth>>, // TODO: move this to db
/// events (that will be sent to other event)
// TODO (future): i *do* want to send foreign events to other
// servers (instead of full mesh), but need a good way to prevent
// cycles. currently, not having this will cause an infinite ping/pong
// between two servers
local_events: broadcast::Sender<Create>,
/// all events
events: broadcast::Sender<Create>,
p2p: Arc<P2PState>,
server_event: Event,
}
@ -224,6 +234,7 @@ async fn serve(config: &Config) -> Result<(), Error> {
queries: RwLock::new(HashMap::new()),
grants: RwLock::new(HashMap::new()),
events: tokio::sync::broadcast::channel(64).0,
local_events: tokio::sync::broadcast::channel(64).0,
p2p,
server_event: event,
};
@ -274,7 +285,7 @@ async fn serve(config: &Config) -> Result<(), Error> {
}
});
let mut watcher = state.events.subscribe();
let mut watcher = state.local_events.subscribe();
let watcher_state = state.clone();
tokio::spawn(async move {
let state = watcher_state;

View file

@ -69,6 +69,7 @@ pub struct Contact {
pub enum RPCRequest {
Ping,
Announce(Vec<NodeId>),
// TODO (future): batch grants
Subscribe(Vec<ItemRef>),
Relation(ActorId, Vec<ItemRef>),
// Unsubscribe(NodeId),

View file

@ -7,10 +7,12 @@ use axum::{extract::State, routing, Json, Router};
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use tracing::{debug, error, trace};
use ufh::{actor::ActorId, item::ItemRef};
use super::{
things::Error,
@ -97,24 +99,39 @@ async fn message(
let mut tx = state.db.pool().begin().await?;
let user_str = user.to_string();
let contact_str = serde_json::to_string(&request.contact).expect("always serializable");
let mut missing = Vec::new();
let mut missing: HashMap<ActorId, Vec<ItemRef>> = HashMap::new();
for item_ref in refs {
trace!("with ref = {}", item_ref);
let item_ref_str = item_ref.to_string();
trace!("select count");
// FIXME: this is extremely basic - it only will send
// a subscribe if there are no current subscriptiosns
let query = sql!(
"SELECT count(*) as count FROM subscriptions WHERE ref = ? AND direction = 'i'",
// FIXME: this will send subscriptions for *all* users,
// even if its not visible to them
// FIXME: there isn't a path here for resending subscriptions
// i'll add one in /peer/grant
let records = sql!(
"SELECT user FROM subscriptions WHERE ref = ? AND direction = 'i'",
item_ref_str,
)
.fetch_one(&mut tx)
.fetch_all(&mut tx)
.await?;
if query.count == 0 {
trace!("ref is missing, will refetch");
missing.push(item_ref.clone());
let existing_users: HashSet<ActorId> = records
.into_iter()
.map(|r| r.user.parse().expect("validated on input"))
.collect();
let grants = state.grants.read().await;
let users: Vec<_> = grants
.keys()
.filter(|actor| !existing_users.contains(actor))
.cloned()
.collect();
if !users.is_empty() {
trace!("ref is missing for users {:?}, will resubscribe", users);
for user in users {
missing.entry(user).or_default().push(item_ref.clone());
}
}
drop(grants);
trace!("insert into subs (outgoing)");
let query = sql!(
"INSERT OR IGNORE INTO subscriptions (ref, user, node, direction) VALUES (?, ?, ?, 'o')",
@ -145,37 +162,11 @@ async fn message(
missing
);
trace!("i have {} users's grants", grants.len());
for grant in grants.values() {
trace!(
"send back subscribe with grant {:?} for items {:?}",
grant,
missing
);
let sub = RPCRequest::Subscribe(missing.clone());
let res = request
.contact
.send_auth(&state.p2p.contact, sub, &state.items.gen_node_auth(), grant)
.await;
if res.is_err() {
break;
}
let mut query = sqlx::QueryBuilder::new("INSERT OR IGNORE INTO subscriptions (ref, user, node, direction) ");
let contact_str =
serde_json::to_string(&request.contact).expect("always serializable");
query.push_values(&missing, |mut query, item_ref| {
query
.push_bind(item_ref.to_string())
.push_bind(grant.actor_id.to_string())
.push_bind(&contact_str)
.push_bind("i");
});
trace!("insert into subs (incoming), sql = {}", query.sql());
let _lock = state.db.lock.write().await;
let query = query.build().execute(state.db.pool()).await;
if let Err(err) = query {
error!("db error: {}", err);
for (user, refs) in missing {
let grant = grants.get(&user).expect("existed a moment ago");
trace!("send back subscribe");
let sub = state.items.subscribe(&refs, grant, &request.contact).await;
if sub.is_err() {
break;
}
}

View file

@ -103,7 +103,8 @@ pub async fn route(
Ok(create) => create,
Err(err) => return error!("failed to finish creating event: {}", err),
};
let _ = state.events.send(create);
let _ = state.events.send(create.clone());
let _ = state.local_events.send(create);
notify_have_and_update_db(&state, &item_ref2).await;
});
return Ok((StatusCode::ACCEPTED, Json(json!({ "ref": item_ref }))));
@ -113,7 +114,8 @@ pub async fn route(
let wip = state.items.commit_event_create(wip).await?;
let create = state.items.finish_event_create(wip).await?;
let item_ref = create.event.id.clone();
let _ = state.events.send(create);
let _ = state.events.send(create.clone());
let _ = state.local_events.send(create);
trace!("notified pollers of event");
notify_have_and_update_db(&state, &item_ref).await;

View file

@ -451,7 +451,7 @@ impl Database for Sqlite {
if let Some(announce_time) = announce_time {
builder
.push(" AND (last_announce == NULL OR last_announce < ")
.push(" AND (last_announce = NULL OR last_announce < ")
.push_bind(announce_time as i64)
.push(")");
}

View file

@ -129,6 +129,7 @@
padding: 2px 0;
overflow-y: auto;
contain: strict;
padding-bottom: 64px;
& > .wrapper {
padding: 2px 4px;

View file

@ -1,5 +1,18 @@
<script lang="ts">
// TODO: rename this to Workspaces.svelte?
// TODO: implement
// workspaces are a cross between (discord server, matrix space) and browser window
let workspaces: Array<Workspace> = [];
type ref = string;
interface Workspace {
name: string,
// icon: ref,
events: Array<ref>,
}
</script>
<nav id="pins">
{#each new Array(10).fill(null) as _}
{#each workspaces as _workspace}
<div class="item"></div>
{/each}
</nav>

View file

@ -24,8 +24,8 @@
</script>
<div class="wrapper">
<form on:submit|preventDefault={handlePost}>
<em>new post</em>
<table>
<tr><td><em>new post</em></td></tr>
<tr><td>title:</td><td><input bind:value={submitTitle} /></td></tr>
<tr><td>body:</td><td><textarea bind:value={submitBody} ></textarea></td></tr>
<tr><td></td><td><input type="submit" value="post"></td></tr>