servers can subscribe to events to receive updates
This commit is contained in:
parent
ddbc30ad85
commit
152b9dc670
12 changed files with 352 additions and 134 deletions
8
server/migrations/20230820214459_subscriptions.sql
Normal file
8
server/migrations/20230820214459_subscriptions.sql
Normal file
|
@ -0,0 +1,8 @@
|
|||
-- where to send each event
|
||||
-- TODO: redo this to add users and foreign key for sender
|
||||
CREATE TABLE subscriptions (
|
||||
ref TEXT PRIMARY KEY NOT NULL,
|
||||
user TEXT NOT NULL,
|
||||
node TEXT NOT NULL,
|
||||
FOREIGN KEY (ref) REFERENCES events(ref)
|
||||
);
|
|
@ -1,3 +1,4 @@
|
|||
pub const ALWAYS_VIEWABLE: [&str; 2] = ["x.file", "x.actor"];
|
||||
pub const IMMUTABLE_EVENTS: [&str; 3] = ["x.file", "x.redact", "x.update"];
|
||||
pub const HEADER_GRANT: &str = "x-ufh-grant";
|
||||
pub const HEADER_NODE: &str = "x-ufh-node";
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
// TODO: refactor this
|
||||
// this is turning into the code that handles all the logic
|
||||
|
||||
use crate::{
|
||||
blobs,
|
||||
consts::{HEADER_GRANT, HEADER_NODE},
|
||||
consts::{HEADER_GRANT, HEADER_NODE, IMMUTABLE_EVENTS},
|
||||
items::events::update_search_index,
|
||||
peer::{Contact, NodeId, RPCRequest, RPCResponse},
|
||||
perms::can_send_event,
|
||||
|
@ -55,7 +58,7 @@ pub struct DerivelessCreate {
|
|||
event: Event,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Create {
|
||||
pub event: Event,
|
||||
pub relations: Relations,
|
||||
|
@ -66,7 +69,7 @@ pub struct Create {
|
|||
pub struct Items {
|
||||
db: Sqlite,
|
||||
search: Tantivy,
|
||||
blobs: blobs::Client,
|
||||
pub blobs: blobs::Client,
|
||||
p2p: Arc<P2PState>,
|
||||
server_secret: ActorSecret,
|
||||
}
|
||||
|
@ -258,7 +261,8 @@ impl Items {
|
|||
match self.blobs.get(item_ref).await {
|
||||
Ok(Some(blobs::Item::WipEvent(wip))) => {
|
||||
debug!("event didn't exist in db, re-adding");
|
||||
return Ok(Some(self.import_event(wip, grant, None).await?));
|
||||
let create = self.import_event(wip, grant, None).await?;
|
||||
return Ok(Some(Item::Event(create.event)));
|
||||
}
|
||||
Ok(Some(blobs::Item::Raw(bytes))) => {
|
||||
// make sure the blob is in db
|
||||
|
@ -286,9 +290,8 @@ impl Items {
|
|||
{
|
||||
Ok(Some(blobs::Item::WipEvent(wip))) => {
|
||||
debug!("importing event from other server");
|
||||
return Ok(Some(
|
||||
self.import_event(wip, grant, Some(&hoster.host)).await?,
|
||||
));
|
||||
let create = self.import_event(wip, grant, Some(&hoster.host)).await?;
|
||||
return Ok(Some(Item::Event(create.event)));
|
||||
}
|
||||
Ok(Some(blobs::Item::Raw(bytes))) => {
|
||||
debug!("pulled new blob (size={})", bytes.len());
|
||||
|
@ -315,12 +318,12 @@ impl Items {
|
|||
}
|
||||
|
||||
#[async_recursion::async_recursion]
|
||||
async fn import_event(
|
||||
pub async fn import_event(
|
||||
&self,
|
||||
wip: WipEvent,
|
||||
grant: Option<&'async_recursion P2PAuth>,
|
||||
host: Option<&'async_recursion String>,
|
||||
) -> Result<Item, Error> {
|
||||
) -> Result<Create, Error> {
|
||||
if !wip.has_valid_signature() {
|
||||
return Err(Error::Validation("missing or invalid signature"));
|
||||
}
|
||||
|
@ -338,12 +341,14 @@ impl Items {
|
|||
}
|
||||
}
|
||||
|
||||
let event = self.create_event(wip).await?.event;
|
||||
let create = self.create_event(wip).await?;
|
||||
let event = &create.event;
|
||||
let is_root = event.relations.is_empty();
|
||||
|
||||
// NOTE: `subscribe` doesn't send old events, so i have to pull them here
|
||||
// TODO: self heading/idempotent: the server should automatically
|
||||
// heal/refetch data if its in an inconsistent state
|
||||
// FIXME: i only pull events visible to one user, then never pull again
|
||||
// FIXME: emit an RPCRequest::Subscribe to events every time a new user views an event for the first time
|
||||
// note: the subscribe should only be for the root event
|
||||
if let (Some(grant), Some(host)) = (grant, host) {
|
||||
let auth = self.gen_node_auth();
|
||||
let query = ufh::query::QueryBuilder::new()
|
||||
|
@ -398,14 +403,16 @@ impl Items {
|
|||
self.import_event(wip, Some(grant), Some(host)).await?;
|
||||
}
|
||||
|
||||
// TODO: subscribe
|
||||
// subscribe_to(&self.p2p, &NodeId::new_from_ref(&event.id)).await?;
|
||||
// x.file events are immutable
|
||||
if is_root && !IMMUTABLE_EVENTS.contains(&event.get_type()) {
|
||||
self.subscribe_to(&event.id, grant).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Item::Event(event))
|
||||
Ok(create)
|
||||
}
|
||||
|
||||
fn gen_node_auth(&self) -> NodeAuth {
|
||||
pub fn gen_node_auth(&self) -> NodeAuth {
|
||||
let mut auth = NodeAuth {
|
||||
actor_id: self.server_secret.get_id(),
|
||||
expires_at: SystemTime::now()
|
||||
|
@ -419,6 +426,27 @@ impl Items {
|
|||
auth.signature = Some(self.server_secret.sign(&json));
|
||||
auth
|
||||
}
|
||||
|
||||
async fn subscribe_to(&self, item_ref: &ItemRef, grant: &P2PAuth) -> Result<(), Error> {
|
||||
let key = NodeId::new_from_ref(item_ref);
|
||||
let hosters = find_hosters(&self.p2p, &key).await;
|
||||
if hosters.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
for hoster in &hosters {
|
||||
hoster
|
||||
.send_auth(
|
||||
&self.p2p.contact,
|
||||
RPCRequest::Subscribe(vec![item_ref.clone()]),
|
||||
&self.gen_node_auth(),
|
||||
grant,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// TODO (performance): cancel if there's no progress on getting "closer" to the target?
|
||||
|
@ -484,23 +512,6 @@ async fn find_hosters(p2p: &P2PState, key: &NodeId) -> Vec<Contact> {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO (performance): cancel if there's no progress on getting "closer" to the target?
|
||||
#[allow(unused)]
|
||||
async fn subscribe_to(p2p: &P2PState, key: &NodeId) -> Result<usize, Error> {
|
||||
let hosters = find_hosters(p2p, key).await;
|
||||
if hosters.is_empty() {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
for hoster in &hosters {
|
||||
hoster
|
||||
.send(&p2p.contact, RPCRequest::Subscribe(vec![*key]))
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(hosters.len())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_node_signature() {
|
||||
use ufh::actor::ActorId;
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
#![feature(async_fn_in_trait)] // ahh yes, experimental features
|
||||
#![allow(clippy::type_complexity)]
|
||||
|
||||
// general purpose lints
|
||||
#![warn(
|
||||
clippy::get_unwrap,
|
||||
|
@ -24,9 +23,14 @@ use axum::extract::{Json, State};
|
|||
use axum::{routing, Router, Server};
|
||||
use clap::Parser;
|
||||
use figment::providers::Format;
|
||||
use figment::{
|
||||
providers::{Env, Toml},
|
||||
Figment,
|
||||
};
|
||||
use items::Create;
|
||||
use peer::NodeId;
|
||||
use routes::util::P2PAuth;
|
||||
use serde::{Serialize, Deserialize};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::net::{SocketAddr, SocketAddrV4};
|
||||
use std::sync::Arc;
|
||||
|
@ -34,14 +38,14 @@ use std::time::{Duration, SystemTime};
|
|||
use tokio::sync::broadcast;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
use tower_http::{cors::CorsLayer, trace::TraceLayer};
|
||||
use tracing::{debug, info};
|
||||
use tracing::{debug, info, trace};
|
||||
use ufh::actor::{ActorId, ActorSecret};
|
||||
use ufh::item::ItemRef;
|
||||
use ufh::query::Query;
|
||||
use figment::{Figment, providers::{Toml, Env}};
|
||||
|
||||
mod blobs;
|
||||
mod cli;
|
||||
mod consts;
|
||||
mod derive;
|
||||
mod error;
|
||||
mod items;
|
||||
|
@ -50,19 +54,19 @@ mod peer;
|
|||
mod perms;
|
||||
mod routes;
|
||||
mod state;
|
||||
mod consts;
|
||||
|
||||
pub(crate) use error::Error;
|
||||
use ufh::event::{ActorType, Event, RelInfo};
|
||||
|
||||
use crate::items::events::get_relations;
|
||||
use crate::peer::{Contact, RPCRequest};
|
||||
use crate::perms::can_view_event;
|
||||
use crate::routes::things::create::notify_have;
|
||||
use crate::state::db::{Database, Location};
|
||||
|
||||
const MAX_SIZE: u64 = 1024 * 1024;
|
||||
|
||||
type Relations = HashMap<ItemRef, (Event, RelInfo)>;
|
||||
type RowId = u32;
|
||||
|
||||
// #[test]
|
||||
// fn get_relations_size() {
|
||||
|
@ -82,24 +86,30 @@ pub struct ServerState {
|
|||
items: items::Items,
|
||||
queries: RwLock<HashMap<String, Query>>, // maybe move this to state?
|
||||
grants: RwLock<HashMap<ActorId, P2PAuth>>, // TODO: move this to db
|
||||
events: broadcast::Sender<(Event, Relations, RowId)>,
|
||||
events: broadcast::Sender<Create>,
|
||||
p2p: Arc<P2PState>,
|
||||
server_event: Event,
|
||||
}
|
||||
|
||||
// TODO (future): use generic database trait instead of `Sqlite`
|
||||
|
||||
fn default_port() -> u16 { 3210 }
|
||||
fn default_store() -> u16 { 3219 }
|
||||
fn default_host() -> String { "127.0.0.1".into() }
|
||||
fn default_port() -> u16 {
|
||||
3210
|
||||
}
|
||||
fn default_store() -> u16 {
|
||||
3219
|
||||
}
|
||||
fn default_host() -> String {
|
||||
"127.0.0.1".into()
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
struct Config {
|
||||
#[serde(default="default_store")]
|
||||
#[serde(default = "default_store")]
|
||||
store: u16,
|
||||
#[serde(default="default_port")]
|
||||
#[serde(default = "default_port")]
|
||||
port: u16,
|
||||
#[serde(default="default_host")]
|
||||
#[serde(default = "default_host")]
|
||||
host: String,
|
||||
id: Option<ActorSecret>,
|
||||
bootstrap: Vec<Contact>,
|
||||
|
@ -165,8 +175,12 @@ async fn serve(config: &Config) -> Result<(), Error> {
|
|||
map: Mutex::new(HashMap::new()),
|
||||
});
|
||||
let (actor_id, actor_secret) = ActorId::new();
|
||||
info!("creating new actor of type Node, id is {}, secret is {:?}", actor_id, actor_secret);
|
||||
let items_client = items::Items::new(&db, &blob_client, &search, &tmp_p2p, actor_secret.clone());
|
||||
info!(
|
||||
"creating new actor of type Node, id is {}, secret is {:?}",
|
||||
actor_id, actor_secret
|
||||
);
|
||||
let items_client =
|
||||
items::Items::new(&db, &blob_client, &search, &tmp_p2p, actor_secret.clone());
|
||||
let mut wip = ufh::event::WipEvent {
|
||||
content: ufh::event::EventContent::Actor(ufh::event::ActorEvent {
|
||||
name: config.host.clone(),
|
||||
|
@ -185,7 +199,10 @@ async fn serve(config: &Config) -> Result<(), Error> {
|
|||
wip.signature = Some(actor_secret.sign(wip.to_json().as_bytes()));
|
||||
let wip = items_client.begin_event_create(wip).await?;
|
||||
let wip = items_client.commit_event_create(wip).await?;
|
||||
(items_client.finish_event_create(wip).await?.event, actor_secret)
|
||||
(
|
||||
items_client.finish_event_create(wip).await?.event,
|
||||
actor_secret,
|
||||
)
|
||||
};
|
||||
|
||||
let node_id = NodeId::new_from_actor(&event.sender);
|
||||
|
@ -258,6 +275,65 @@ async fn serve(config: &Config) -> Result<(), Error> {
|
|||
}
|
||||
});
|
||||
|
||||
let mut watcher = state.events.subscribe();
|
||||
let watcher_state = state.clone();
|
||||
tokio::spawn(async move {
|
||||
let state = watcher_state;
|
||||
loop {
|
||||
let Ok(create) = watcher.recv().await else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let state = state.clone();
|
||||
tokio::spawn(async move {
|
||||
handle_event(&state, &create.event, &create.event, &create.relations).await
|
||||
});
|
||||
|
||||
#[async_recursion::async_recursion]
|
||||
async fn handle_event(
|
||||
state: &ServerState,
|
||||
subscription: &Event,
|
||||
event: &Event,
|
||||
relations: &HashMap<ItemRef, (Event, RelInfo)>,
|
||||
) -> Result<(), Error> {
|
||||
if relations.is_empty() {
|
||||
// this is a top level/nexus event
|
||||
use sqlx::query as sql;
|
||||
trace!("handling subscription, subscription = {:?}, event = {:?}, relations = {:?}", subscription, event, relations);
|
||||
debug!("notify subscribers of ref {}", event.id);
|
||||
let sub_ref_str = subscription.id.to_string();
|
||||
let _lock = state.db.lock.read().await;
|
||||
let subs = sql!("SELECT * FROM subscriptions WHERE ref = ?", sub_ref_str)
|
||||
.fetch_all(state.db.pool())
|
||||
.await?;
|
||||
drop(_lock);
|
||||
for sub in subs {
|
||||
let user = sub.user.parse().expect("validated on input");
|
||||
trace!("sub = {:?}", sub);
|
||||
if can_view_event(&state.db, event, &user).await {
|
||||
trace!("can view, notifying");
|
||||
let contact: Contact =
|
||||
serde_json::from_str(&sub.node).expect("validated on input");
|
||||
let request = RPCRequest::Relation(user, vec![event.id.clone()]);
|
||||
let _ = contact.send(&state.p2p.contact, request).await;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
trace!(
|
||||
"finding subscription, event = {:?}, relations = {:?}",
|
||||
event,
|
||||
relations
|
||||
);
|
||||
for (relation, _) in relations.values() {
|
||||
let relations = get_relations(&state.db, relation).await?;
|
||||
handle_event(state, relation, event, &relations).await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let router = Router::new()
|
||||
.route("/", routing::get(instance_info))
|
||||
.nest("/things", routes::things::routes())
|
||||
|
|
|
@ -8,6 +8,8 @@ use std::{fmt::{Display, Debug}, str::FromStr};
|
|||
use tracing::{trace, debug};
|
||||
use ufh::{item::ItemRef, actor::ActorId};
|
||||
|
||||
use crate::{routes::{util::{NodeAuth, P2PAuth}, actors::Error}, consts::{HEADER_NODE, HEADER_GRANT}};
|
||||
|
||||
/// the length of each key
|
||||
const KEY_LEN: usize = 20;
|
||||
|
||||
|
@ -58,7 +60,8 @@ pub struct Contact {
|
|||
pub enum RPCRequest {
|
||||
Ping,
|
||||
Announce(Vec<NodeId>),
|
||||
Subscribe(Vec<NodeId>),
|
||||
Subscribe(Vec<ItemRef>),
|
||||
Relation(ActorId, Vec<ItemRef>),
|
||||
// TODO: subscribe to relations
|
||||
// Subscribe(NodeId),
|
||||
// Unsubscribe(NodeId),
|
||||
|
@ -201,6 +204,37 @@ impl Contact {
|
|||
.json()
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn send_auth(
|
||||
&self,
|
||||
sender: &Self,
|
||||
message: RPCRequest,
|
||||
auth: &NodeAuth,
|
||||
grant: &P2PAuth,
|
||||
) -> Result<RPCResponse, Error> {
|
||||
#[derive(Debug, Serialize)]
|
||||
struct Request<'a> {
|
||||
info: RPCRequest,
|
||||
contact: &'a Contact,
|
||||
}
|
||||
|
||||
let request = Request {
|
||||
info: message,
|
||||
contact: sender,
|
||||
};
|
||||
|
||||
let response = reqwest::Client::new()
|
||||
.post(format!("http://{}/peer/send", self.host))
|
||||
.header(HEADER_NODE, serde_json::to_string(auth)?)
|
||||
.header(HEADER_GRANT, serde_json::to_string(grant)?)
|
||||
.json(&request)
|
||||
.send()
|
||||
.await?
|
||||
.json()
|
||||
.await?;
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for Contact {
|
||||
|
|
|
@ -20,7 +20,7 @@ use ufh::{
|
|||
// TODO: find out how to cache this
|
||||
#[async_recursion::async_recursion]
|
||||
#[tracing::instrument(skip_all, fields(event.id, user))]
|
||||
pub async fn can_view_event(db: &Sqlite, event: &Event, user: &ActorId) -> bool {
|
||||
pub async fn can_enumerate_event(db: &Sqlite, event: &Event, user: &ActorId) -> bool {
|
||||
// event is visible if user has sent the event
|
||||
if &event.sender == user {
|
||||
trace!("event matches because sender == user");
|
||||
|
@ -30,7 +30,7 @@ pub async fn can_view_event(db: &Sqlite, event: &Event, user: &ActorId) -> bool
|
|||
// or if event relates to any visible event
|
||||
let relations = get_relations(db, event).await.unwrap();
|
||||
for (event, _) in relations.values() {
|
||||
if can_view_event(db, event, user).await {
|
||||
if can_enumerate_event(db, event, user).await {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -54,12 +54,12 @@ pub async fn can_view_event(db: &Sqlite, event: &Event, user: &ActorId) -> bool
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn can_enumerate_event(db: &Sqlite, event: &Event, user: &ActorId) -> bool {
|
||||
pub async fn can_view_event(db: &Sqlite, event: &Event, user: &ActorId) -> bool {
|
||||
if ALWAYS_VIEWABLE.contains(&event.get_type()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
can_view_event(db, event, user).await
|
||||
can_enumerate_event(db, event, user).await
|
||||
}
|
||||
|
||||
struct Context<'a> {
|
||||
|
|
|
@ -5,10 +5,16 @@ use crate::{
|
|||
use axum::{extract::State, routing, Json, Router};
|
||||
use reqwest::StatusCode;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{sync::Arc, time::{SystemTime, UNIX_EPOCH}};
|
||||
use std::{
|
||||
sync::Arc,
|
||||
time::{SystemTime, UNIX_EPOCH},
|
||||
};
|
||||
use tracing::{debug, trace};
|
||||
|
||||
use super::{things::Error, util::P2PAuth};
|
||||
use super::{
|
||||
things::Error,
|
||||
util::{perms, Authenticate, P2PAuth},
|
||||
};
|
||||
|
||||
type Response<T> = Result<T, Error>;
|
||||
|
||||
|
@ -29,9 +35,10 @@ struct Request {
|
|||
/// the main/only interface p2p will use
|
||||
async fn message(
|
||||
State(state): State<Arc<ServerState>>,
|
||||
auth: Authenticate<perms::None>,
|
||||
Json(request): Json<Request>,
|
||||
) -> Response<Json<RPCResponse>> {
|
||||
// FIXME (security): hostname must be validated!
|
||||
// FIXME (security): validate more things! (eg. hostname, user grant)
|
||||
trace!("receive message from {}", request.contact.host);
|
||||
let response = match request.info {
|
||||
RPCRequest::Ping => {
|
||||
|
@ -57,6 +64,10 @@ async fn message(
|
|||
RPCResponse::FindNode(contacts)
|
||||
}
|
||||
RPCRequest::FindValue(node_id) => {
|
||||
// FIXME (security): don't leak event ids
|
||||
// a server can pretend to have a node, then the server will
|
||||
// try to request with the full event id, leaking the ref
|
||||
|
||||
trace!("receive FindValue (id={})", node_id);
|
||||
let map = state.p2p.map.lock().await;
|
||||
if let Some(value) = map.get(&node_id) {
|
||||
|
@ -67,9 +78,80 @@ async fn message(
|
|||
RPCResponse::FindNode(contacts)
|
||||
}
|
||||
}
|
||||
RPCRequest::Subscribe(node_ids) => {
|
||||
trace!("receive Subscribe (id={:?})", node_ids);
|
||||
// TODO
|
||||
RPCRequest::Subscribe(refs) => {
|
||||
trace!("receive Subscribe (id={:?})", refs);
|
||||
let Some(user) = auth.user else {
|
||||
return Err(Error::Validation("no user to subscribe"));
|
||||
};
|
||||
trace!("with user = {}", user);
|
||||
trace!("with contact = {:?}", request.contact);
|
||||
trace!(
|
||||
"with serialized contact = {:?}",
|
||||
serde_json::to_string(&request.contact)
|
||||
);
|
||||
|
||||
use sqlx::query as sql;
|
||||
|
||||
let _lock = state.db.lock.write().await;
|
||||
let mut tx = state.db.pool().begin().await?;
|
||||
let user_str = user.to_string();
|
||||
let contact_str = serde_json::to_string(&request.contact).expect("always serializable");
|
||||
|
||||
for item_ref in refs {
|
||||
trace!("with ref = {}", item_ref);
|
||||
let item_ref_str = item_ref.to_string();
|
||||
sql!(
|
||||
"INSERT INTO subscriptions (ref, user, node) VALUES (?, ?, ?)",
|
||||
item_ref_str,
|
||||
user_str,
|
||||
contact_str
|
||||
)
|
||||
.execute(&mut tx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
tx.commit().await?;
|
||||
trace!("commit");
|
||||
|
||||
RPCResponse::Ok
|
||||
}
|
||||
RPCRequest::Relation(actor, refs) => {
|
||||
debug!(
|
||||
"receive event ids from subscription (user = {}, refs = {:?})",
|
||||
actor, refs
|
||||
);
|
||||
let grants = state.grants.read().await;
|
||||
let grant = grants.get(&actor).cloned();
|
||||
drop(grants);
|
||||
for item_ref in refs {
|
||||
let Some(blob) = state
|
||||
.items
|
||||
.blobs
|
||||
.get_via(
|
||||
&item_ref,
|
||||
&request.contact.host,
|
||||
&state.items.gen_node_auth(),
|
||||
grant.as_ref(),
|
||||
)
|
||||
.await?
|
||||
else {
|
||||
return Err(Error::Validation("your item doesn't exist"));
|
||||
};
|
||||
|
||||
let wip = match blob {
|
||||
crate::blobs::Item::Raw(_) => {
|
||||
return Err(Error::Validation("your item is a blob but i want an event"))
|
||||
}
|
||||
crate::blobs::Item::WipEvent(wip) => wip,
|
||||
};
|
||||
|
||||
debug!("importing event from other server");
|
||||
let create = state
|
||||
.items
|
||||
.import_event(wip, grant.as_ref(), Some(&request.contact.host))
|
||||
.await?;
|
||||
let _ = state.events.send(create);
|
||||
}
|
||||
RPCResponse::Ok
|
||||
}
|
||||
};
|
||||
|
|
|
@ -103,9 +103,7 @@ pub async fn route(
|
|||
Ok(create) => create,
|
||||
Err(err) => return error!("failed to finish creating event: {}", err),
|
||||
};
|
||||
let _ = state
|
||||
.events
|
||||
.send((create.event, create.relations, create.rowid));
|
||||
let _ = state.events.send(create);
|
||||
notify_have_and_update_db(&state, &item_ref2).await;
|
||||
});
|
||||
return Ok((StatusCode::ACCEPTED, Json(json!({ "ref": item_ref }))));
|
||||
|
@ -115,9 +113,7 @@ pub async fn route(
|
|||
let wip = state.items.commit_event_create(wip).await?;
|
||||
let create = state.items.finish_event_create(wip).await?;
|
||||
let item_ref = create.event.id.clone();
|
||||
let _ = state
|
||||
.events
|
||||
.send((create.event, create.relations, create.rowid));
|
||||
let _ = state.events.send(create);
|
||||
trace!("notified pollers of event");
|
||||
notify_have_and_update_db(&state, &item_ref).await;
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ pub async fn route(
|
|||
tracing::Span::current().record("query", ¶ms.query);
|
||||
debug!("enumerate");
|
||||
|
||||
let user = auth.user.unwrap();
|
||||
let user = auth.user.ok_or_else(|| Error::Validation("no user"))?;
|
||||
let queries = state.queries.read().await;
|
||||
let query = queries
|
||||
.get(¶ms.query)
|
||||
|
@ -174,16 +174,16 @@ pub async fn route(
|
|||
loop {
|
||||
break match recv.recv().await {
|
||||
Err(err) => Err(err.into()),
|
||||
Ok((event, relations, rowid))
|
||||
if can_enumerate_event(&state.db, &event, &user).await =>
|
||||
Ok(create)
|
||||
if can_enumerate_event(&state.db, &create.event, &user).await =>
|
||||
{
|
||||
match query.matches(&event, &relations) {
|
||||
mt @ MatchType::Event => Ok((mt, event, rowid)),
|
||||
mt @ MatchType::Relation => Ok((mt, event, rowid)),
|
||||
match query.matches(&create.event, &create.relations) {
|
||||
mt @ MatchType::Event => Ok((mt, create.event, create.rowid)),
|
||||
mt @ MatchType::Relation => Ok((mt, create.event, create.rowid)),
|
||||
MatchType::None => {
|
||||
trace!("received non-matching event");
|
||||
if check_relations(&state, &query, &relations).await? {
|
||||
Ok((MatchType::Relation, event, rowid))
|
||||
if check_relations(&state, &query, &create.relations).await? {
|
||||
Ok((MatchType::Relation, create.event, create.rowid))
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
#![allow(unused)]
|
||||
|
||||
use crate::items::Create;
|
||||
use crate::items::events::get_relations;
|
||||
use crate::perms::can_view_event;
|
||||
use crate::routes::util::{perms, Authenticate};
|
||||
|
@ -56,7 +57,7 @@ async fn sync(state: Arc<ServerState>, ws: &mut WebSocket) -> Result<(), Error>
|
|||
Noop,
|
||||
Close,
|
||||
Client(Request),
|
||||
Server((Event, Relations, u32)),
|
||||
Server(Create),
|
||||
}
|
||||
|
||||
let mut user: Option<ActorId> = None;
|
||||
|
@ -115,7 +116,7 @@ async fn sync(state: Arc<ServerState>, ws: &mut WebSocket) -> Result<(), Error>
|
|||
}
|
||||
},
|
||||
},
|
||||
Msg::Server((event, relations, _)) => {
|
||||
Msg::Server(Create { event, relations, .. }) => {
|
||||
let Some(user) = &user else {
|
||||
continue;
|
||||
};
|
||||
|
|
|
@ -100,6 +100,7 @@ pub struct Authenticate<T: perms::AuthLevel> {
|
|||
pub id: Option<String>,
|
||||
pub user: Option<ActorId>,
|
||||
pub level: u8,
|
||||
pub is_server: bool,
|
||||
_lvl: PhantomData<T>,
|
||||
}
|
||||
|
||||
|
@ -136,74 +137,74 @@ impl<T: AuthLevel> axum::extract::FromRequestParts<Arc<ServerState>> for Authent
|
|||
.expect("infallible");
|
||||
let header = <TypedHeader<Authorization<Bearer>>>::from_request_parts(parts, state).await;
|
||||
trace!("headers = {:?}", headers);
|
||||
let token = match (
|
||||
&header,
|
||||
(headers.get(HEADER_NODE), headers.get(HEADER_GRANT)),
|
||||
) {
|
||||
(Ok(header), _) => header.token(),
|
||||
(Err(_), (Some(node), Some(grant))) => {
|
||||
trace!("request has node: {:?}", node);
|
||||
trace!("request has grant: {:?}", grant);
|
||||
let mut node: NodeAuth = serde_json::from_slice(node.as_bytes())
|
||||
.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
|
||||
let mut grant: P2PAuth = serde_json::from_slice(grant.as_bytes())
|
||||
.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
|
||||
trace!("request has parsed node: {:?}", node);
|
||||
trace!("request has parsed grant: {:?}", grant);
|
||||
let time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
trace!("current system time (in milliseconds) = {}", time);
|
||||
if let (Some(node), Some(grant)) = (headers.get(HEADER_NODE), headers.get(HEADER_GRANT)) {
|
||||
trace!("request has node: {:?}", node);
|
||||
trace!("request has grant: {:?}", grant);
|
||||
let mut node: NodeAuth = serde_json::from_slice(node.as_bytes())
|
||||
.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
|
||||
let mut grant: P2PAuth = serde_json::from_slice(grant.as_bytes())
|
||||
.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
|
||||
trace!("request has parsed node: {:?}", node);
|
||||
trace!("request has parsed grant: {:?}", grant);
|
||||
let time = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
trace!("current system time (in milliseconds) = {}", time);
|
||||
|
||||
// verify node signature
|
||||
let Some(node_signature) = node.signature.take() else {
|
||||
trace!("foreign request is bad: missing node signature");
|
||||
return Err((StatusCode::BAD_REQUEST, "missing node signature".into()));
|
||||
};
|
||||
// verify node signature
|
||||
let Some(node_signature) = node.signature.take() else {
|
||||
trace!("foreign request is bad: missing node signature");
|
||||
return Err((StatusCode::BAD_REQUEST, "missing node signature".into()));
|
||||
};
|
||||
|
||||
if !node
|
||||
.actor_id
|
||||
.verify(&json_canon::to_vec(&node).unwrap(), &node_signature)
|
||||
{
|
||||
trace!("foreign request is bad: bad node signature");
|
||||
return Err((StatusCode::BAD_REQUEST, "bad node signature".into()));
|
||||
}
|
||||
if node.expires_at < time {
|
||||
trace!("foreign request is bad: expired node auth");
|
||||
return Err((StatusCode::BAD_REQUEST, "expired node auth".into()));
|
||||
}
|
||||
if !node
|
||||
.actor_id
|
||||
.verify(&json_canon::to_vec(&node).unwrap(), &node_signature)
|
||||
{
|
||||
trace!("foreign request is bad: bad node signature");
|
||||
return Err((StatusCode::BAD_REQUEST, "bad node signature".into()));
|
||||
}
|
||||
if node.expires_at < time {
|
||||
trace!("foreign request is bad: expired node auth");
|
||||
return Err((StatusCode::BAD_REQUEST, "expired node auth".into()));
|
||||
}
|
||||
|
||||
// verify grant signature
|
||||
let Some(grant_signature) = grant.signature.take() else {
|
||||
trace!("foreign request is bad: missing grant signature");
|
||||
return Err((StatusCode::BAD_REQUEST, "missing grant signature".into()));
|
||||
};
|
||||
if !grant
|
||||
.actor_id
|
||||
.verify(&json_canon::to_vec(&grant).unwrap(), &grant_signature)
|
||||
{
|
||||
trace!("foreign request is bad: bad grant signature");
|
||||
return Err((StatusCode::BAD_REQUEST, "bad grant signature".into()));
|
||||
}
|
||||
if grant.expires_at < time {
|
||||
trace!("foreign request is bad: expired grant auth");
|
||||
return Err((StatusCode::BAD_REQUEST, "expired grant auth".into()));
|
||||
}
|
||||
if grant.allows != node.actor_id {
|
||||
trace!("foreign request is bad: grant doesnt't allow node");
|
||||
return Err((StatusCode::BAD_REQUEST, "grant doesn't allow node".into()));
|
||||
}
|
||||
// verify grant signature
|
||||
let Some(grant_signature) = grant.signature.take() else {
|
||||
trace!("foreign request is bad: missing grant signature");
|
||||
return Err((StatusCode::BAD_REQUEST, "missing grant signature".into()));
|
||||
};
|
||||
if !grant
|
||||
.actor_id
|
||||
.verify(&json_canon::to_vec(&grant).unwrap(), &grant_signature)
|
||||
{
|
||||
trace!("foreign request is bad: bad grant signature");
|
||||
return Err((StatusCode::BAD_REQUEST, "bad grant signature".into()));
|
||||
}
|
||||
if grant.expires_at < time {
|
||||
trace!("foreign request is bad: expired grant auth");
|
||||
return Err((StatusCode::BAD_REQUEST, "expired grant auth".into()));
|
||||
}
|
||||
if grant.allows != node.actor_id {
|
||||
trace!("foreign request is bad: grant doesnt't allow node");
|
||||
return Err((StatusCode::BAD_REQUEST, "grant doesn't allow node".into()));
|
||||
}
|
||||
|
||||
if T::new().to_num() <= 1 {
|
||||
return Ok(Authenticate {
|
||||
id: None,
|
||||
user: Some(grant.actor_id),
|
||||
level: 1, // servers can only read
|
||||
is_server: true,
|
||||
_lvl: PhantomData,
|
||||
});
|
||||
}
|
||||
(Err(_), _) if T::new().to_num() == 0 => "",
|
||||
(Err(err), _) => return Err((StatusCode::BAD_REQUEST, err.to_string())),
|
||||
}
|
||||
let token = match &header {
|
||||
Ok(header) => header.token(),
|
||||
Err(_) if T::new().to_num() == 0 => "",
|
||||
Err(err) => return Err((StatusCode::BAD_REQUEST, err.to_string())),
|
||||
};
|
||||
Authenticate::from_token(&state.db, token).await
|
||||
}
|
||||
|
@ -232,6 +233,7 @@ impl<T: AuthLevel> Authenticate<T> {
|
|||
id: None,
|
||||
user: None,
|
||||
level: 0,
|
||||
is_server: false,
|
||||
_lvl: PhantomData,
|
||||
});
|
||||
} else {
|
||||
|
@ -253,6 +255,7 @@ impl<T: AuthLevel> Authenticate<T> {
|
|||
id: Some(token.into()),
|
||||
user: Some(query.user),
|
||||
level: query.level,
|
||||
is_server: false,
|
||||
_lvl: PhantomData,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ pub struct Sqlite {
|
|||
pool: sqlx::SqlitePool,
|
||||
// yes, this ruins concurrency. no, i don't feel like dealing with this properly
|
||||
// in the future i'd tell everyone to use postgres
|
||||
lock: Arc<RwLock<()>>,
|
||||
pub lock: Arc<RwLock<()>>,
|
||||
}
|
||||
|
||||
// TODO: actually implement and use these flags
|
||||
|
@ -71,6 +71,12 @@ impl Sqlite {
|
|||
lock: Arc::new(RwLock::new(())),
|
||||
})
|
||||
}
|
||||
|
||||
// TODO (future): this is nice for prototyping, but should be
|
||||
// refactored into db/mod.rs later on
|
||||
pub fn pool(&self) -> &SqlitePool {
|
||||
&self.pool
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: confirm that every function here is idempotent...
|
||||
|
|
Loading…
Reference in a new issue