diff --git a/server/migrations/20230708052532_shares.sql b/server/migrations/20230708052532_shares.sql deleted file mode 100644 index b1a28c8..00000000 --- a/server/migrations/20230708052532_shares.sql +++ /dev/null @@ -1,7 +0,0 @@ --- Add migration script here -CREATE TABLE shares ( - id TEXT PRIMARY KEY NOT NULL, - ref TEXT NOT NULL, - expires_at INT, - FOREIGN KEY (ref) REFERENCES refs(ref) -) diff --git a/server/migrations/20230708060857_sessions.sql b/server/migrations/20230708060857_sessions.sql index 747bfd1..188a14a 100644 --- a/server/migrations/20230708060857_sessions.sql +++ b/server/migrations/20230708060857_sessions.sql @@ -5,5 +5,3 @@ CREATE TABLE sessions ( user TEXT NOT NULL, level INT NOT NULL ); --- TODO: proper session management -INSERT INTO sessions VALUES ('hunter2', '%2zO9HaRwyyA2-AOGcLNKnoTbpnQ6q7gTv6ds_OR-djU', 2); diff --git a/server/src/cli.rs b/server/src/cli.rs index 1bbd72b..60dc06b 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -1,5 +1,7 @@ use clap::{Parser, Subcommand}; +use crate::peer::Contact; + #[derive(Debug, Parser)] #[command( name = "ufh", @@ -17,5 +19,7 @@ pub enum Action { Serve { #[arg(short, long, help = "which port to listen on", default_value = "3210")] port: u16, + #[arg(short, long, help = "constact json of nodes to bootstrap")] + bootstrap: Vec, }, } diff --git a/server/src/items/events.rs b/server/src/items/events.rs index 060933e..41e22ab 100644 --- a/server/src/items/events.rs +++ b/server/src/items/events.rs @@ -42,14 +42,13 @@ pub enum DelayedAction { Redact(Vec), Tag(Vec, Vec), Edit(Vec, Value), - None, } pub async fn prepare_special( me: &Items, event: &Event, relations: &Relations, -) -> Result { +) -> Result, Error> { match &event.content { EventContent::User(_) => { let query = ufh::query::Query::builder() @@ -98,7 +97,7 @@ pub async fn prepare_special( refs.push(rel_ref.clone()); } - return Ok(DelayedAction::Redact(refs)); + return Ok(Some(DelayedAction::Redact(refs))); } EventContent::LocalTag(content) => { if event.relations.is_empty() { @@ -113,10 +112,10 @@ pub async fn prepare_special( targets.push(item_ref.clone()); } - return Ok(DelayedAction::Tag(targets, content.tags.clone())); + return Ok(Some(DelayedAction::Tag(targets, content.tags.clone()))); } EventContent::Acl(_) => { - return Ok(DelayedAction::None); + return Ok(None); } EventContent::Update(content) => { if relations @@ -159,10 +158,10 @@ pub async fn prepare_special( "content": content.0, }))?; - return Ok(DelayedAction::Edit( + return Ok(Some(DelayedAction::Edit( relations.keys().cloned().collect(), content.0.clone(), - )); + ))); } EventContent::Other { event_type, .. } => { if event_type.starts_with("x.") { @@ -177,7 +176,7 @@ pub async fn prepare_special( debug!("validated {} event", event.get_type()); - Ok(DelayedAction::None) + Ok(None) } /// the event will not be in the database yet @@ -213,7 +212,6 @@ pub async fn commit_special(me: &Items, action: &DelayedAction) -> Result<(), Er .await?; } } - DelayedAction::None => (), }; Ok(()) @@ -225,7 +223,7 @@ pub async fn derive( event: &Event, file: &FileEvent, ) -> Result<(DeriveFile, Option, Option), Error> { - let bytes = get_blob(me, event, None).await?; + let bytes = get_blob(me, event).await?; let deriver = Deriver::begin(bytes, file.name.as_deref()).await; let file = deriver.get_file().await; let media = deriver.get_media().await; @@ -260,7 +258,7 @@ pub async fn update_search_index( } pub async fn reindex(me: &Items, event: &Event) -> Result<(), Error> { - let bytes = get_blob(me, event, None).await?; + let bytes = get_blob(me, event).await?; let file = match &event.content { EventContent::File(f) => Some(f), _ => None, diff --git a/server/src/items/mod.rs b/server/src/items/mod.rs index a5dbc2b..d2ccb2c 100644 --- a/server/src/items/mod.rs +++ b/server/src/items/mod.rs @@ -7,15 +7,15 @@ use crate::{ db::{sqlite::Sqlite, Database}, search::tantivy::Tantivy, }, - Relations, + Relations, peer::{NodeId, Contact, RPCResponse, RPCRequest}, P2PState, }; use axum::http::StatusCode; use bytes::Bytes; use events::DelayedAction; use lru::LruCache; use once_cell::sync::OnceCell; -use std::num::NonZeroUsize; -use tokio::sync::Mutex; +use std::{num::NonZeroUsize, sync::Arc, collections::HashSet}; +use tokio::sync::RwLock; use tracing::{trace, debug, info}; use ufh::{ derived::Derived, @@ -36,7 +36,7 @@ pub enum Item { pub struct WipCreate { pub item_ref: ItemRef, event: Event, - action: DelayedAction, + action: Option, } #[derive(Debug)] @@ -58,14 +58,16 @@ pub struct Items { db: Sqlite, search: Tantivy, blobs: blobs::Client, + p2p: Arc, } impl Items { - pub fn new(db: &Sqlite, blobs: &blobs::Client, search: &Tantivy) -> Items { + pub fn new(db: &Sqlite, blobs: &blobs::Client, search: &Tantivy, p2p: &Arc) -> Items { Items { db: db.clone(), blobs: blobs.clone(), search: search.clone(), + p2p: p2p.clone(), } } @@ -107,6 +109,9 @@ impl Items { // handle special events let action = events::prepare_special(self, &event, &relations).await?; + if action.is_some() { + debug!("prepared special-case action"); + } Ok(WipCreate { item_ref: event.id.clone(), @@ -122,9 +127,11 @@ impl Items { let rowid = self.db.event_create(&event).await?; debug!("created event (rowid={})", rowid); - - events::commit_special(self, &wip.action).await?; - debug!("commit special cases"); + + if let Some(action) = &wip.action { + events::commit_special(self, action).await?; + debug!("commit special cases"); + } Ok(DerivelessCreate { item_ref: wip.item_ref, rowid, event }) } @@ -181,27 +188,32 @@ impl Items { Ok(item_ref) } - pub async fn get(&self, item_ref: &ItemRef, via: Option<&str>) -> Result { - static CACHE: OnceCell>> = OnceCell::new(); + pub async fn get(&self, item_ref: &ItemRef) -> Result { + static CACHE: OnceCell>> = OnceCell::new(); // let mut cache = CACHE.get_or_init(|| Mutex::new(LruCache::new(NonZeroUsize::new(100).unwrap()))).lock().await; - let mut cache = CACHE - .get_or_init(|| Mutex::new(LruCache::new(NonZeroUsize::new(1).unwrap()))) - .lock() + let cache = CACHE + .get_or_init(|| RwLock::new(LruCache::new(NonZeroUsize::new(1).unwrap()))) + .read() .await; - match cache.peek(item_ref).cloned() { + let cached = cache.peek(item_ref).cloned(); + drop(cache); + match cached { Some(item) => { - cache.promote(item_ref); + CACHE.get().expect("already init").write().await.promote(item_ref); Ok(item) } None => { - let item = self.get_uncached(item_ref, via).await?; - cache.put(item_ref.clone(), item.clone()); + let item = self.get_uncached(item_ref).await?; + CACHE.get().expect("already init").write().await.put(item_ref.clone(), item.clone()); Ok(item) } } } - async fn get_uncached(&self, item_ref: &ItemRef, via: Option<&str>) -> Result { + #[tracing::instrument(skip_all, fields(item_ref, via))] + async fn get_uncached(&self, item_ref: &ItemRef) -> Result { + tracing::Span::current().record("item_ref", item_ref.to_string()); + if let Some(item) = self.db.event_fetch(item_ref).await? { return Ok(Item::Event(item)); } @@ -209,67 +221,103 @@ impl Items { match self.blobs.get(item_ref).await { Ok(blobs::Item::WipEvent(wip)) => { debug!("event didn't exist in db, re-adding"); - let event = self.create_event(wip).await?.event; - return Ok(Item::Event(event)); + return self.import_event(wip).await; } - Ok(blobs::Item::Raw(bytes)) => return Ok(Item::Blob(bytes)), + Ok(blobs::Item::Raw(bytes)) => { + // make sure the blob is in db + // TODO (performance): remove this + self.db.create_blob(item_ref, bytes.len() as u32).await?; + return Ok(Item::Blob(bytes)); + }, Err(Error::NotFound) => (), Err(err) => return Err(err), }; - if let Some(via) = via { - match self.blobs.get_via(item_ref, via).await { - Ok(blobs::Item::WipEvent(wip)) => return self.process(wip, via).await, - Ok(blobs::Item::Raw(bytes)) => return Ok(Item::Blob(bytes)), - Err(Error::NotFound) => (), - Err(err) => return Err(err), + for hoster in find_hosters(&self.p2p, &NodeId::new_from_ref(item_ref)).await { + match self.blobs.get_via(item_ref, &hoster.host).await { + Ok(blobs::Item::WipEvent(wip)) => { + debug!("importing event from other server"); + return self.import_event(wip).await; + }, + Ok(blobs::Item::Raw(bytes)) => { + debug!("pulled new blob (size={})", bytes.len()); + self.blobs.put(blobs::Item::Raw(bytes.clone())).await?; + self.db.create_blob(item_ref, bytes.len() as u32).await?; + return Ok(Item::Blob(bytes)); + }, + Err(_) => (), }; } - Err(Error::NotFound) + return Err(Error::NotFound); } - async fn process(&self, wip: WipEvent, via: &str) -> Result { + #[async_recursion::async_recursion] + async fn import_event(&self, wip: WipEvent) -> Result { if !wip.has_valid_signature() { return Err(Error::Validation("missing or invalid signature")); } if let EventContent::File(file) = &wip.content { for item_ref in &file.chunks { - self.pull_blob(item_ref, via).await?; - } - }; - - if let Some(rels) = &wip.relations { - for item_ref in rels.keys() { - let item = self.blobs.get_via(item_ref, via).await?; - self.blobs.put(item).await?; + self.get(item_ref).await?; } } + // FIXME: pull relations + // beware of infinite loops (because x.acl) and pull failures (also because x.acl) + // if let Some(rels) = &wip.relations { + // for item_ref in rels.keys() { + // let item = self.blobs.get_via(item_ref, via).await?; + // self.blobs.put(item).await?; + // } + // } + let event = self.create_event(wip).await?.event; Ok(Item::Event(event)) } +} - async fn pull_blob(&self, item_ref: &ItemRef, via: &str) -> Result<(), Error> { - let blob = match self.blobs.get(item_ref).await { - Ok(blobs::Item::WipEvent(_)) => panic!("expected blob, got event"), - Ok(blobs::Item::Raw(blob)) => blob, - Err(Error::NotFound) => match self.blobs.get_via(item_ref, via).await? { - blobs::Item::WipEvent(_) => { - return Err(Error::Validation("expected blob, got event")) +// TODO (performance): cancel if there's no progress on getting "closer" to the target? +async fn find_hosters(p2p: &P2PState, key: &NodeId) -> Vec { + let map = p2p.map.lock().await; + if let Some(value) = map.get(key) { + value.clone() + } else { + drop(map); + let mut queried = HashSet::new(); + let router = p2p.router.lock().await; + let mut nodes = router.find_closest(key, 3); + drop(router); + while !nodes.is_empty() { + let contact = nodes.remove(0); + if contact == p2p.contact { + continue; + } + let Ok(response) = contact.send(&p2p.contact, RPCRequest::FindValue(*key)).await else { + let mut router = p2p.router.lock().await; + router.remove(&contact.id); + continue; + }; + let mut router = p2p.router.lock().await; + router.update(contact.clone()); + drop(router); + match response { + RPCResponse::FindNode(received) => { + queried.insert(contact.id); + for contact in received { + if !queried.contains(&contact.id) { + nodes.push(contact); + } + } } - blobs::Item::Raw(blob) => { - self.blobs.put(blobs::Item::Raw(blob.clone())).await?; - blob + RPCResponse::FindValue(value) => { + return value.clone(); } - }, - Err(err) => return Err(err), - }; - - self.db.create_blob(item_ref, blob.len() as u32).await?; - - Ok(()) + RPCResponse::Ok => (), + } + } + vec![] } } diff --git a/server/src/main.rs b/server/src/main.rs index fef2711..3029e25 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,9 +1,5 @@ #![feature(async_fn_in_trait)] // ahh yes, experimental features - -#![allow( - clippy::type_complexity -)] - +#![allow(clippy::type_complexity)] // general purpose lints #![warn( clippy::get_unwrap, @@ -26,6 +22,7 @@ use axum::extract::Json; use axum::{routing, Router, Server}; use clap::Parser; +use peer::NodeId; use serde::Serialize; use std::collections::HashMap; use std::net::{SocketAddr, SocketAddrV4}; @@ -51,6 +48,8 @@ mod state; pub(crate) use error::Error; use ufh::event::{Event, RelInfo}; +use crate::peer::{Contact, RPCRequest}; + const MAX_SIZE: u64 = 1024 * 1024; // TODO (future): maybe use a websocket instead of long polling? @@ -59,13 +58,19 @@ const MAX_SIZE: u64 = 1024 * 1024; type Relations = HashMap; type RowId = u32; +pub struct P2PState { + contact: Contact, + router: Mutex, + map: Mutex>>, +} + pub struct ServerState { db: state::db::sqlite::Sqlite, search: state::search::tantivy::Tantivy, items: items::Items, queries: RwLock>, // maybe move this to state? events: broadcast::Sender<(Event, Relations, RowId)>, - p2p: Mutex, + p2p: Arc, } // TODO: replace sharing system with aliases @@ -86,27 +91,51 @@ async fn main() -> Result<(), Error> { let command: cli::Command = cli::Command::parse(); match command.action { - cli::Action::Serve { port } => serve(port).await, + cli::Action::Serve { port, bootstrap } => serve(port, bootstrap).await, } } -async fn serve(port: u16) -> Result<(), Error> { +async fn serve(port: u16, bootstrap: Vec) -> Result<(), Error> { info!("Hello, world!"); info!("load db"); let db = state::db::sqlite::Sqlite::open("data/data.db").await?; let blob_client = blobs::Client::new("http://localhost:3219"); let search = state::search::tantivy::Tantivy::open("data/tantivy").await?; - let items_client = items::Items::new(&db, &blob_client, &search); - let p2p = peer::Node::new(peer::NodeId::new(), port); + + let node_id = peer::NodeId::new_from_rand(); + let p2p = Arc::new(P2PState { + router: Mutex::new(peer::Router::new(node_id)), + contact: Contact { + id: node_id, + host: format!("127.0.0.1:{}", port), + }, + map: Mutex::new(HashMap::new()), + }); + + let items_client = items::Items::new(&db, &blob_client, &search, &p2p); + let state = ServerState { db, search, items: items_client, queries: RwLock::new(HashMap::new()), events: tokio::sync::broadcast::channel(64).0, - p2p: Mutex::new(p2p), + p2p, }; + let state = Arc::new(state); + + let bootstrap_state = state.clone(); + tokio::spawn(async move { + let contacts = bootstrap.len(); + for contact in bootstrap { + if contact.send(&bootstrap_state.p2p.contact, RPCRequest::Ping).await.is_ok() { + let mut router = bootstrap_state.p2p.router.lock().await; + router.update(contact); + } + } + info!("bootstrapped {} nodes", contacts); + }); let router = Router::new() .route("/", routing::get(instance_info)) @@ -114,7 +143,7 @@ async fn serve(port: u16) -> Result<(), Error> { .nest("/alias", routes::aliases::routes()) .nest("/search", routes::search::routes()) .nest("/p2p", routes::p2p::routes()) - .with_state(Arc::new(state)) + .with_state(state) .layer(axum::middleware::from_fn(middleware::csp)) .layer(CorsLayer::permissive()) .layer(TraceLayer::new_for_http()); diff --git a/server/src/peer/mod.rs b/server/src/peer/mod.rs index 6956fff..38117d4 100644 --- a/server/src/peer/mod.rs +++ b/server/src/peer/mod.rs @@ -1,10 +1,14 @@ // this code will have no networking, pretty much only pure logic/state machines -// #![allow(unused)] // TODO (commit): remove this before comitting +#![allow(unused)] // TODO (commit): remove this before comitting use serde::{Deserialize, Serialize}; use sha2::Digest; -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + str::FromStr, +}; +use ufh::item::ItemRef; /// the length of each key const KEY_LEN: usize = 20; @@ -25,28 +29,31 @@ pub struct NodeId([u8; KEY_LEN]); struct Distance([u8; KEY_LEN]); #[derive(Debug)] -struct Router { +pub struct Router { for_id: NodeId, buckets: Vec>, } -#[derive(Debug)] -pub struct Node { - pub contact: Contact, - router: Router, - store: HashMap, -} +// #[derive(Debug)] +// pub struct Node { +// pub contact: Contact, +// router: Router, +// store: HashMap, +// } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Contact { - id: NodeId, - host: String, + pub id: NodeId, + pub host: String, } #[derive(Debug, Serialize, Deserialize)] pub enum RPCRequest { Ping, - Store(NodeId, String), + Have(NodeId), + // TODO: subscribe to relations + // Subscribe(NodeId), + // Unsubscribe(NodeId), FindNode(NodeId), FindValue(NodeId), } @@ -55,11 +62,11 @@ pub enum RPCRequest { pub enum RPCResponse { Ok, FindNode(Vec), - FindValue(String), + FindValue(Vec), } impl NodeId { - pub fn new() -> NodeId { + pub fn new_from_rand() -> NodeId { NodeId(rand::random()) } @@ -72,6 +79,10 @@ impl NodeId { let trimmed: [u8; KEY_LEN] = hash[0..KEY_LEN].try_into().unwrap(); NodeId(trimmed) } + + pub fn new_from_ref(item_ref: &ItemRef) -> Self { + NodeId::new_from_str(&item_ref.to_string()) + } } impl Distance { @@ -100,7 +111,7 @@ impl Distance { impl Contact { // TODO (future): i really should split apart network logic - async fn send( + pub async fn send( &self, sender: &Self, message: RPCRequest, @@ -117,7 +128,7 @@ impl Contact { }; reqwest::Client::new() - .post(format!("http://{}/p2p/recv", self.host)) + .post(format!("http://{}/p2p/send", self.host)) .json(&request) .send() .await? @@ -126,15 +137,23 @@ impl Contact { } } +impl FromStr for Contact { + type Err = serde_json::Error; + + fn from_str(s: &str) -> Result { + serde_json::from_str(s) + } +} + impl Router { - fn new(for_id: NodeId) -> Router { + pub fn new(for_id: NodeId) -> Router { Router { buckets: vec![0; N_BUCKETS].iter().map(|_| Vec::new()).collect(), for_id, } } - fn update(&mut self, contact: Contact) { + pub fn update(&mut self, contact: Contact) { let id = contact.id; let prefix_length = Distance::between(&id, &self.for_id).clz(); let bucket = &mut self.buckets[prefix_length]; @@ -150,7 +169,7 @@ impl Router { } } - fn remove(&mut self, id: &NodeId) { + pub fn remove(&mut self, id: &NodeId) { let prefix_length = Distance::between(id, &self.for_id).clz(); let bucket = &mut self.buckets[prefix_length]; let element_idx = bucket.iter().position(|i| &i.id == id); @@ -159,7 +178,7 @@ impl Router { } } - fn find_closest(&self, target: &NodeId, count: usize) -> Vec { + pub fn find_closest(&self, target: &NodeId, count: usize) -> Vec { let mut ret = Vec::new(); for bucket in &self.buckets { for contact in bucket { @@ -173,16 +192,14 @@ impl Router { } } +/* impl Node { - pub fn new(id: NodeId, port: u16) -> Self { + pub fn new(id: NodeId, host: String) -> Self { Node { // id, router: Router::new(id), store: HashMap::new(), - contact: Contact { - id, - host: format!("127.0.0.1:{}", port), - }, + contact: Contact { id, host }, } } @@ -241,7 +258,7 @@ impl Node { let mut nodes = self.router.find_closest(key, 1); while !nodes.is_empty() { let contact = nodes.remove(0); - if self.contact == contact { + if contact == self.contact { continue; } let Some(response) = self.send(&contact, RPCRequest::FindValue(*key)).await else { @@ -261,10 +278,9 @@ impl Node { } RPCResponse::Ok => (), } - dbg!("loop"); } - dbg!("not found"); None } } } +*/ diff --git a/server/src/perms.rs b/server/src/perms.rs index 7a30340..6bd9c76 100644 --- a/server/src/perms.rs +++ b/server/src/perms.rs @@ -97,7 +97,13 @@ pub async fn can_send_event(db: &Sqlite, wip: &WipEvent) -> Result }; let rel_ids: Vec<_> = relations.keys().cloned().collect(); - let rel_events = db.bulk_fetch(&rel_ids, false).await?; + let rel_events = match db.bulk_fetch(&rel_ids, false).await { + Ok(rel_events) => rel_events, + // Err(Error::NotFound) => { + // todo!(); + // }, + Err(err) => return Err(err), + }; let relations = rel_events .into_iter() .map(|(item_ref, item)| { diff --git a/server/src/routes/p2p.rs b/server/src/routes/p2p.rs index d8bd7c1..b128a22 100644 --- a/server/src/routes/p2p.rs +++ b/server/src/routes/p2p.rs @@ -1,13 +1,10 @@ use crate::{ - peer::{Contact, NodeId, RPCRequest, RPCResponse}, + peer::{Contact, RPCRequest, RPCResponse}, ServerState, }; -use axum::{ - extract::{ConnectInfo, Path, State}, - routing, Json, Router, -}; +use axum::{extract::State, routing, Json, Router}; +use reqwest::StatusCode; use serde::{Deserialize, Serialize}; -use std::net::SocketAddr; use std::sync::Arc; use tracing::debug; @@ -17,8 +14,7 @@ type Response = Result; pub fn routes() -> Router> { Router::new() - .route("/recv", routing::post(message)) - .route("/kv/:key", routing::post(set).get(get)) + .route("/send", routing::post(message)) .route("/bootstrap", routing::post(bootstrap)) .route("/info", routing::get(info)) } @@ -29,58 +25,58 @@ struct Request { contact: Contact, } -#[derive(Deserialize, Serialize)] -struct KeySet { - value: String, -} - async fn message( - ConnectInfo(_info): ConnectInfo, State(state): State>, - Json(req): Json, + Json(request): Json, ) -> Response> { - debug!("handle p2p message"); - // NOTE: p2p must NEVER send a request to itself, or it will deadlock - println!("receive {:?}", req); - let mut p2p = state.p2p.lock().await; - let res = p2p.receive(&req.contact, req.info); - Ok(Json(res)) -} - -async fn set( - ConnectInfo(_info): ConnectInfo, - State(state): State>, - Path(key): Path, - Json(req): Json, -) -> Response<()> { - debug!("handle p2p set"); - let key = NodeId::new_from_str(&key); - let mut p2p = state.p2p.lock().await; - p2p.set(&key, &req.value).await; - Ok(()) -} - -async fn get(State(state): State>, Path(key): Path) -> Response { - debug!("handle p2p get"); - let key = NodeId::new_from_str(&key); - let mut p2p = state.p2p.lock().await; - let res = p2p.get(&key).await; - Ok(res.unwrap_or_else(|| String::from("no value"))) + let response = match request.info { + RPCRequest::Ping => RPCResponse::Ok, + RPCRequest::Have(node_id) => { + let mut map = state.p2p.map.lock().await; + map.entry(node_id) + .or_insert_with(Vec::new) + .push(request.contact); + RPCResponse::Ok + } + RPCRequest::FindNode(node_id) => { + let router = state.p2p.router.lock().await; + let contacts = router.find_closest(&node_id, 20); + RPCResponse::FindNode(contacts) + } + RPCRequest::FindValue(node_id) => { + let map = state.p2p.map.lock().await; + if let Some(value) = map.get(&node_id) { + RPCResponse::FindValue(value.clone()) + } else { + let router = state.p2p.router.lock().await; + let contacts = router.find_closest(&node_id, 20); + RPCResponse::FindNode(contacts) + } + } + }; + Ok(Json(response)) } async fn bootstrap( - ConnectInfo(_info): ConnectInfo, State(state): State>, Json(contact): Json, -) -> Response<()> { +) -> Response { debug!("handle p2p bootstrap"); - let mut p2p = state.p2p.lock().await; - p2p.bootstrap(contact).await; - Ok(()) + if contact + .send(&state.p2p.contact, RPCRequest::Ping) + .await + .is_ok() + { + let mut router = state.p2p.router.lock().await; + router.update(contact); + } else { + let mut router = state.p2p.router.lock().await; + router.remove(&contact.id); + } + Ok(StatusCode::NO_CONTENT) } async fn info(State(state): State>) -> Response> { - debug!("handle p2p bootstrap"); - let p2p = state.p2p.lock().await; - Ok(Json(p2p.contact.clone())) + debug!("get contact info"); + Ok(Json(state.p2p.contact.clone())) } diff --git a/server/src/routes/things/blob.rs b/server/src/routes/things/blob.rs index aabad54..cec2df8 100644 --- a/server/src/routes/things/blob.rs +++ b/server/src/routes/things/blob.rs @@ -2,11 +2,10 @@ use crate::error::Error; use crate::routes::util::get_blob; use crate::state::db::{Database, DbItem}; use crate::ServerState; -use axum::extract::{Path, Query, State}; +use axum::extract::{Path, State}; use axum::headers::Range; use axum::http::{HeaderMap, HeaderValue, StatusCode}; use axum::TypedHeader; -use serde::Deserialize; use std::ops::Bound; use std::sync::Arc; use tracing::debug; @@ -17,22 +16,16 @@ use axum::headers::HeaderMapExt; use super::Response; use crate::items::Item; -#[derive(Debug, Deserialize)] -pub struct BlobParams { - via: Option, -} - #[tracing::instrument(skip_all, fields(item_ref))] pub async fn route( State(state): State>, range: Option>, Path(item_ref): Path, - Query(params): Query, ) -> Response<(StatusCode, HeaderMap, Vec)> { tracing::Span::current().record("item_ref", &item_ref.to_string()); debug!("fetch blob"); - let (event, file) = match state.items.get(&item_ref, params.via.as_deref()).await? { + let (event, file) = match state.items.get(&item_ref).await? { Item::Event(event) => match event.content.clone() { EventContent::File(file) => (event, file), _ => return Err(Error::Validation("this is not a x.file event")), @@ -96,7 +89,7 @@ pub async fn route( let mut chunks = Vec::with_capacity(file.chunks.len()); for item_ref in &intersection { - let Item::Blob(blob) = state.items.get(item_ref, params.via.as_deref()).await? else { + let Item::Blob(blob) = state.items.get(item_ref).await? else { unreachable!("file didn't reference a blob"); }; chunks.push(blob); @@ -113,7 +106,7 @@ pub async fn route( let mut chunks = Vec::with_capacity(file.chunks.len()); for item_ref in &intersection { - let Item::Blob(blob) = state.items.get(item_ref, params.via.as_deref()).await? else { + let Item::Blob(blob) = state.items.get(item_ref).await? else { unreachable!("file didn't reference a blob"); }; chunks.push(blob); @@ -132,7 +125,7 @@ pub async fn route( } } else { debug!("getting blob chunks"); - let blob = get_blob(&state.items, &event, params.via.as_deref()).await?; + let blob = get_blob(&state.items, &event).await?; Ok((StatusCode::OK, headers, blob.to_vec())) } } diff --git a/server/src/routes/things/create.rs b/server/src/routes/things/create.rs index 526a2c6..8208bb0 100644 --- a/server/src/routes/things/create.rs +++ b/server/src/routes/things/create.rs @@ -2,6 +2,8 @@ use super::{perms, Authenticate}; pub(crate) use crate::error::Error; use crate::ServerState; use crate::MAX_SIZE; +use crate::peer::NodeId; +use crate::peer::RPCRequest; use axum::body::HttpBody; use axum::extract::{Json, Query, RawBody, State}; use axum::http::HeaderMap; @@ -85,6 +87,7 @@ pub async fn route( if !params.wait { let item_ref = wip.item_ref.clone(); + let item_ref2 = wip.item_ref.clone(); tokio::spawn(async move { let _lock = ROWID_LOCK.lock().await; let wip = match state.items.commit_event_create(wip).await { @@ -98,6 +101,23 @@ pub async fn route( let _ = state .events .send((create.event, create.relations, create.rowid)); + let key = NodeId::new_from_ref(&item_ref2); + let mut map = state.p2p.map.lock().await; + map.insert(key, vec![state.p2p.contact.clone()]); + drop(map); + let router = state.p2p.router.lock().await; + let contacts = router.find_closest(&key, 4); + drop(router); + for contact in contacts { + if contact == state.p2p.contact { + continue; + } + + if contact.send(&state.p2p.contact, RPCRequest::Have(key)).await.is_err() { + let mut router = state.p2p.router.lock().await; + router.remove(&contact.id); + } + } }); return Ok((StatusCode::ACCEPTED, Json(json!({ "ref": item_ref })))); } diff --git a/server/src/routes/things/fetch.rs b/server/src/routes/things/fetch.rs index cdc9f07..9df8ac7 100644 --- a/server/src/routes/things/fetch.rs +++ b/server/src/routes/things/fetch.rs @@ -1,11 +1,10 @@ use axum::{ - extract::{Path, Query, State}, + extract::{Path, State}, headers::ContentType, TypedHeader, }; use bytes::Bytes; use reqwest::StatusCode; -use serde::Deserialize; use std::sync::Arc; use tracing::debug; use ufh::item::ItemRef; @@ -18,23 +17,17 @@ use crate::{ ServerState, }; -#[derive(Debug, Deserialize)] -pub struct FetchParams { - via: Option, -} - #[tracing::instrument(skip_all, fields(item_ref))] pub async fn route( State(state): State>, auth: Authenticate, Path(item_ref): Path, - Query(params): Query, ) -> Response<(StatusCode, TypedHeader, Bytes)> { tracing::Span::current().record("item_ref", &item_ref.to_string()); debug!("fetch"); - let item = state.items.get(&item_ref, params.via.as_deref()).await?; - + let item = state.items.get(&item_ref).await?; + match item { Item::Blob(blob) => { debug!("got blob"); diff --git a/server/src/routes/things/thumbnail.rs b/server/src/routes/things/thumbnail.rs index 190afdc..03cd039 100644 --- a/server/src/routes/things/thumbnail.rs +++ b/server/src/routes/things/thumbnail.rs @@ -42,7 +42,7 @@ pub async fn route( .event_fetch(&item_ref) .await? .ok_or(Error::NotFound)?; - let blob = get_blob(&state.items, &event, params.via.as_deref()).await?; + let blob = get_blob(&state.items, &event).await?; debug!("generate thumbnail"); let thumb = generate_thumb(&blob, &size)?; state.db.thumbnail_put(&item_ref, &size, &thumb).await?; @@ -66,7 +66,6 @@ fn generate_thumb(bytes: &Bytes, size: &ThumbnailSize) -> Result { pub struct ThumbnailParams { height: u32, width: u32, - via: Option, } #[derive(Debug, Serialize, Deserialize, Clone, Copy)] diff --git a/server/src/routes/util.rs b/server/src/routes/util.rs index af8bb5d..5eaf169 100644 --- a/server/src/routes/util.rs +++ b/server/src/routes/util.rs @@ -170,14 +170,13 @@ impl Authenticate { pub async fn get_blob( items: &Items, file_event: &Event, - via: Option<&str>, ) -> Result { let EventContent::File(file) = &file_event.content else { return Err(Error::Validation("not a file event")); }; let mut chunks = Vec::with_capacity(file.chunks.len()); for item_ref in &file.chunks { - let Item::Blob(blob) = items.get(item_ref, via).await? else { + let Item::Blob(blob) = items.get(item_ref).await? else { // possibly unreachable!(), assuming everything validated properly on insert return Err(Error::Validation("file didn't reference a blob")); }; diff --git a/web/Nav.svelte b/web/Nav.svelte index 8e9b343..d0d7f47 100644 --- a/web/Nav.svelte +++ b/web/Nav.svelte @@ -108,7 +108,7 @@ {#if item.id === selectedResParentId} {@const name = getName(selectedRes) || "unnamed"} diff --git a/web/Pins.svelte b/web/Pins.svelte index 4bdacc3..8f4d7b9 100644 --- a/web/Pins.svelte +++ b/web/Pins.svelte @@ -9,17 +9,17 @@ contain: strict; display: flex; flex-direction: column; - padding: 8px; - gap: 8px; + padding: calc(8px / var(--resize)); + gap: calc(8px / var(--resize)); overflow-y: auto; width: var(--pin-size); background: var(--bg-quartiary); & > .item { background: var(--bg-primary); - min-height: 48px; - width: 48px; - border-radius: 4px; + min-height: calc(48px / var(--resize)); + width: calc(48px / var(--resize)); + border-radius: calc(4px / var(--resize)); } } diff --git a/web/scenes/Forum/Comments.svelte b/web/scenes/Forum/Comments.svelte index f1d68ae..1f30c33 100644 --- a/web/scenes/Forum/Comments.svelte +++ b/web/scenes/Forum/Comments.svelte @@ -4,13 +4,26 @@ export let state: any; export let event: Event; export let comments: Array; - $: children = comments - .filter(comment => filterRels(comment, "comment").indexOf(event.id) !== -1) + $: children = filterChildren(comments, event) .sort((a, b) => a.origin_ts - b.origin_ts); $: author = api.actors.fetch(event.sender); $: isFromOp = $state.opId === event.sender; $: replied = $state.replyId === event.id; - let collapsed = false; + $: collapsed = $state.collapsed.has(event.id); + + function filterChildren(comments: Array, parent: Event): Array { + return comments.filter(comment => filterRels(comment, "comment").includes(parent.id)); + } + + function countChildren(comments: Array, parent: Event): number { + const toCheck = [parent]; + let count = 0; + while (toCheck.length) { + toCheck.push(...filterChildren(comments, toCheck.pop()!)); + count++; + } + return count; + }
- + {#if collapsed} + [{countChildren(comments, event)}] + {/if}
{#await author} loading... @@ -79,6 +95,7 @@ .comment { border-left: solid var(--borders) 1px; margin-left: -1px; + margin-bottom: -1px; // &.fromop { // // TODO: find a good color @@ -110,6 +127,10 @@ } } + & > .childCount { + color: var(--fg-dimmed); + } + & > .author > .green { color: #77c57d; } diff --git a/web/scenes/Forum/Post.svelte b/web/scenes/Forum/Post.svelte index d3354c7..db92a1e 100644 --- a/web/scenes/Forum/Post.svelte +++ b/web/scenes/Forum/Post.svelte @@ -13,6 +13,7 @@ const state = new Reduxer({ replyId: null as null | string, opId: bucket.sender, + collapsed: new Set(), }, { reply(_state, replyId: string | null) { if (replyId) { @@ -20,6 +21,27 @@ } return { replyId }; }, + collapse(state, commentId: string) { + state.collapsed.add(commentId); + return state; + }, + expand(state, commentId: string) { + state.collapsed.delete(commentId); + return state; + }, + expandAll(state) { + state.collapsed.clear(); + return state; + }, + collapseTopLevel(state) { + const collapsed = [...state.collapsed]; + for (const comment of $comments) { + if (topLevelComments.find(i => i.id === filterRels(comment, "comment")[0])) { + collapsed.push(comment.id); + } + } + return { ...state, collapsed: new Set(collapsed) }; + }, }); const comments = query({ @@ -51,6 +73,7 @@

{bucket.content.body || "no body"}


+ {$comments.length} comments -
    {#each topLevelComments as event (event.id)}