implement basic p2p (x.file with no rels work now)

This commit is contained in:
tezlm 2023-08-14 08:56:08 -07:00
parent 561376b679
commit 3c828e137d
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
18 changed files with 337 additions and 201 deletions

View file

@ -1,7 +0,0 @@
-- Add migration script here
CREATE TABLE shares (
id TEXT PRIMARY KEY NOT NULL,
ref TEXT NOT NULL,
expires_at INT,
FOREIGN KEY (ref) REFERENCES refs(ref)
)

View file

@ -5,5 +5,3 @@ CREATE TABLE sessions (
user TEXT NOT NULL, user TEXT NOT NULL,
level INT NOT NULL level INT NOT NULL
); );
-- TODO: proper session management
INSERT INTO sessions VALUES ('hunter2', '%2zO9HaRwyyA2-AOGcLNKnoTbpnQ6q7gTv6ds_OR-djU', 2);

View file

@ -1,5 +1,7 @@
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use crate::peer::Contact;
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
#[command( #[command(
name = "ufh", name = "ufh",
@ -17,5 +19,7 @@ pub enum Action {
Serve { Serve {
#[arg(short, long, help = "which port to listen on", default_value = "3210")] #[arg(short, long, help = "which port to listen on", default_value = "3210")]
port: u16, port: u16,
#[arg(short, long, help = "constact json of nodes to bootstrap")]
bootstrap: Vec<Contact>,
}, },
} }

View file

@ -42,14 +42,13 @@ pub enum DelayedAction {
Redact(Vec<ItemRef>), Redact(Vec<ItemRef>),
Tag(Vec<ItemRef>, Vec<String>), Tag(Vec<ItemRef>, Vec<String>),
Edit(Vec<ItemRef>, Value), Edit(Vec<ItemRef>, Value),
None,
} }
pub async fn prepare_special( pub async fn prepare_special(
me: &Items, me: &Items,
event: &Event, event: &Event,
relations: &Relations, relations: &Relations,
) -> Result<DelayedAction, Error> { ) -> Result<Option<DelayedAction>, Error> {
match &event.content { match &event.content {
EventContent::User(_) => { EventContent::User(_) => {
let query = ufh::query::Query::builder() let query = ufh::query::Query::builder()
@ -98,7 +97,7 @@ pub async fn prepare_special(
refs.push(rel_ref.clone()); refs.push(rel_ref.clone());
} }
return Ok(DelayedAction::Redact(refs)); return Ok(Some(DelayedAction::Redact(refs)));
} }
EventContent::LocalTag(content) => { EventContent::LocalTag(content) => {
if event.relations.is_empty() { if event.relations.is_empty() {
@ -113,10 +112,10 @@ pub async fn prepare_special(
targets.push(item_ref.clone()); targets.push(item_ref.clone());
} }
return Ok(DelayedAction::Tag(targets, content.tags.clone())); return Ok(Some(DelayedAction::Tag(targets, content.tags.clone())));
} }
EventContent::Acl(_) => { EventContent::Acl(_) => {
return Ok(DelayedAction::None); return Ok(None);
} }
EventContent::Update(content) => { EventContent::Update(content) => {
if relations if relations
@ -159,10 +158,10 @@ pub async fn prepare_special(
"content": content.0, "content": content.0,
}))?; }))?;
return Ok(DelayedAction::Edit( return Ok(Some(DelayedAction::Edit(
relations.keys().cloned().collect(), relations.keys().cloned().collect(),
content.0.clone(), content.0.clone(),
)); )));
} }
EventContent::Other { event_type, .. } => { EventContent::Other { event_type, .. } => {
if event_type.starts_with("x.") { if event_type.starts_with("x.") {
@ -177,7 +176,7 @@ pub async fn prepare_special(
debug!("validated {} event", event.get_type()); debug!("validated {} event", event.get_type());
Ok(DelayedAction::None) Ok(None)
} }
/// the event will not be in the database yet /// the event will not be in the database yet
@ -213,7 +212,6 @@ pub async fn commit_special(me: &Items, action: &DelayedAction) -> Result<(), Er
.await?; .await?;
} }
} }
DelayedAction::None => (),
}; };
Ok(()) Ok(())
@ -225,7 +223,7 @@ pub async fn derive(
event: &Event, event: &Event,
file: &FileEvent, file: &FileEvent,
) -> Result<(DeriveFile, Option<DeriveMedia>, Option<Bytes>), Error> { ) -> Result<(DeriveFile, Option<DeriveMedia>, Option<Bytes>), Error> {
let bytes = get_blob(me, event, None).await?; let bytes = get_blob(me, event).await?;
let deriver = Deriver::begin(bytes, file.name.as_deref()).await; let deriver = Deriver::begin(bytes, file.name.as_deref()).await;
let file = deriver.get_file().await; let file = deriver.get_file().await;
let media = deriver.get_media().await; let media = deriver.get_media().await;
@ -260,7 +258,7 @@ pub async fn update_search_index(
} }
pub async fn reindex(me: &Items, event: &Event) -> Result<(), Error> { pub async fn reindex(me: &Items, event: &Event) -> Result<(), Error> {
let bytes = get_blob(me, event, None).await?; let bytes = get_blob(me, event).await?;
let file = match &event.content { let file = match &event.content {
EventContent::File(f) => Some(f), EventContent::File(f) => Some(f),
_ => None, _ => None,

View file

@ -7,15 +7,15 @@ use crate::{
db::{sqlite::Sqlite, Database}, db::{sqlite::Sqlite, Database},
search::tantivy::Tantivy, search::tantivy::Tantivy,
}, },
Relations, Relations, peer::{NodeId, Contact, RPCResponse, RPCRequest}, P2PState,
}; };
use axum::http::StatusCode; use axum::http::StatusCode;
use bytes::Bytes; use bytes::Bytes;
use events::DelayedAction; use events::DelayedAction;
use lru::LruCache; use lru::LruCache;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use std::num::NonZeroUsize; use std::{num::NonZeroUsize, sync::Arc, collections::HashSet};
use tokio::sync::Mutex; use tokio::sync::RwLock;
use tracing::{trace, debug, info}; use tracing::{trace, debug, info};
use ufh::{ use ufh::{
derived::Derived, derived::Derived,
@ -36,7 +36,7 @@ pub enum Item {
pub struct WipCreate { pub struct WipCreate {
pub item_ref: ItemRef, pub item_ref: ItemRef,
event: Event, event: Event,
action: DelayedAction, action: Option<DelayedAction>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -58,14 +58,16 @@ pub struct Items {
db: Sqlite, db: Sqlite,
search: Tantivy, search: Tantivy,
blobs: blobs::Client, blobs: blobs::Client,
p2p: Arc<P2PState>,
} }
impl Items { impl Items {
pub fn new(db: &Sqlite, blobs: &blobs::Client, search: &Tantivy) -> Items { pub fn new(db: &Sqlite, blobs: &blobs::Client, search: &Tantivy, p2p: &Arc<P2PState>) -> Items {
Items { Items {
db: db.clone(), db: db.clone(),
blobs: blobs.clone(), blobs: blobs.clone(),
search: search.clone(), search: search.clone(),
p2p: p2p.clone(),
} }
} }
@ -107,6 +109,9 @@ impl Items {
// handle special events // handle special events
let action = events::prepare_special(self, &event, &relations).await?; let action = events::prepare_special(self, &event, &relations).await?;
if action.is_some() {
debug!("prepared special-case action");
}
Ok(WipCreate { Ok(WipCreate {
item_ref: event.id.clone(), item_ref: event.id.clone(),
@ -122,9 +127,11 @@ impl Items {
let rowid = self.db.event_create(&event).await?; let rowid = self.db.event_create(&event).await?;
debug!("created event (rowid={})", rowid); debug!("created event (rowid={})", rowid);
events::commit_special(self, &wip.action).await?; if let Some(action) = &wip.action {
debug!("commit special cases"); events::commit_special(self, action).await?;
debug!("commit special cases");
}
Ok(DerivelessCreate { item_ref: wip.item_ref, rowid, event }) Ok(DerivelessCreate { item_ref: wip.item_ref, rowid, event })
} }
@ -181,27 +188,32 @@ impl Items {
Ok(item_ref) Ok(item_ref)
} }
pub async fn get(&self, item_ref: &ItemRef, via: Option<&str>) -> Result<Item, Error> { pub async fn get(&self, item_ref: &ItemRef) -> Result<Item, Error> {
static CACHE: OnceCell<Mutex<LruCache<ItemRef, Item>>> = OnceCell::new(); static CACHE: OnceCell<RwLock<LruCache<ItemRef, Item>>> = OnceCell::new();
// let mut cache = CACHE.get_or_init(|| Mutex::new(LruCache::new(NonZeroUsize::new(100).unwrap()))).lock().await; // let mut cache = CACHE.get_or_init(|| Mutex::new(LruCache::new(NonZeroUsize::new(100).unwrap()))).lock().await;
let mut cache = CACHE let cache = CACHE
.get_or_init(|| Mutex::new(LruCache::new(NonZeroUsize::new(1).unwrap()))) .get_or_init(|| RwLock::new(LruCache::new(NonZeroUsize::new(1).unwrap())))
.lock() .read()
.await; .await;
match cache.peek(item_ref).cloned() { let cached = cache.peek(item_ref).cloned();
drop(cache);
match cached {
Some(item) => { Some(item) => {
cache.promote(item_ref); CACHE.get().expect("already init").write().await.promote(item_ref);
Ok(item) Ok(item)
} }
None => { None => {
let item = self.get_uncached(item_ref, via).await?; let item = self.get_uncached(item_ref).await?;
cache.put(item_ref.clone(), item.clone()); CACHE.get().expect("already init").write().await.put(item_ref.clone(), item.clone());
Ok(item) Ok(item)
} }
} }
} }
async fn get_uncached(&self, item_ref: &ItemRef, via: Option<&str>) -> Result<Item, Error> { #[tracing::instrument(skip_all, fields(item_ref, via))]
async fn get_uncached(&self, item_ref: &ItemRef) -> Result<Item, Error> {
tracing::Span::current().record("item_ref", item_ref.to_string());
if let Some(item) = self.db.event_fetch(item_ref).await? { if let Some(item) = self.db.event_fetch(item_ref).await? {
return Ok(Item::Event(item)); return Ok(Item::Event(item));
} }
@ -209,67 +221,103 @@ impl Items {
match self.blobs.get(item_ref).await { match self.blobs.get(item_ref).await {
Ok(blobs::Item::WipEvent(wip)) => { Ok(blobs::Item::WipEvent(wip)) => {
debug!("event didn't exist in db, re-adding"); debug!("event didn't exist in db, re-adding");
let event = self.create_event(wip).await?.event; return self.import_event(wip).await;
return Ok(Item::Event(event));
} }
Ok(blobs::Item::Raw(bytes)) => return Ok(Item::Blob(bytes)), Ok(blobs::Item::Raw(bytes)) => {
// make sure the blob is in db
// TODO (performance): remove this
self.db.create_blob(item_ref, bytes.len() as u32).await?;
return Ok(Item::Blob(bytes));
},
Err(Error::NotFound) => (), Err(Error::NotFound) => (),
Err(err) => return Err(err), Err(err) => return Err(err),
}; };
if let Some(via) = via { for hoster in find_hosters(&self.p2p, &NodeId::new_from_ref(item_ref)).await {
match self.blobs.get_via(item_ref, via).await { match self.blobs.get_via(item_ref, &hoster.host).await {
Ok(blobs::Item::WipEvent(wip)) => return self.process(wip, via).await, Ok(blobs::Item::WipEvent(wip)) => {
Ok(blobs::Item::Raw(bytes)) => return Ok(Item::Blob(bytes)), debug!("importing event from other server");
Err(Error::NotFound) => (), return self.import_event(wip).await;
Err(err) => return Err(err), },
Ok(blobs::Item::Raw(bytes)) => {
debug!("pulled new blob (size={})", bytes.len());
self.blobs.put(blobs::Item::Raw(bytes.clone())).await?;
self.db.create_blob(item_ref, bytes.len() as u32).await?;
return Ok(Item::Blob(bytes));
},
Err(_) => (),
}; };
} }
Err(Error::NotFound) return Err(Error::NotFound);
} }
async fn process(&self, wip: WipEvent, via: &str) -> Result<Item, Error> { #[async_recursion::async_recursion]
async fn import_event(&self, wip: WipEvent) -> Result<Item, Error> {
if !wip.has_valid_signature() { if !wip.has_valid_signature() {
return Err(Error::Validation("missing or invalid signature")); return Err(Error::Validation("missing or invalid signature"));
} }
if let EventContent::File(file) = &wip.content { if let EventContent::File(file) = &wip.content {
for item_ref in &file.chunks { for item_ref in &file.chunks {
self.pull_blob(item_ref, via).await?; self.get(item_ref).await?;
}
};
if let Some(rels) = &wip.relations {
for item_ref in rels.keys() {
let item = self.blobs.get_via(item_ref, via).await?;
self.blobs.put(item).await?;
} }
} }
// FIXME: pull relations
// beware of infinite loops (because x.acl) and pull failures (also because x.acl)
// if let Some(rels) = &wip.relations {
// for item_ref in rels.keys() {
// let item = self.blobs.get_via(item_ref, via).await?;
// self.blobs.put(item).await?;
// }
// }
let event = self.create_event(wip).await?.event; let event = self.create_event(wip).await?.event;
Ok(Item::Event(event)) Ok(Item::Event(event))
} }
}
async fn pull_blob(&self, item_ref: &ItemRef, via: &str) -> Result<(), Error> { // TODO (performance): cancel if there's no progress on getting "closer" to the target?
let blob = match self.blobs.get(item_ref).await { async fn find_hosters(p2p: &P2PState, key: &NodeId) -> Vec<Contact> {
Ok(blobs::Item::WipEvent(_)) => panic!("expected blob, got event"), let map = p2p.map.lock().await;
Ok(blobs::Item::Raw(blob)) => blob, if let Some(value) = map.get(key) {
Err(Error::NotFound) => match self.blobs.get_via(item_ref, via).await? { value.clone()
blobs::Item::WipEvent(_) => { } else {
return Err(Error::Validation("expected blob, got event")) drop(map);
let mut queried = HashSet::new();
let router = p2p.router.lock().await;
let mut nodes = router.find_closest(key, 3);
drop(router);
while !nodes.is_empty() {
let contact = nodes.remove(0);
if contact == p2p.contact {
continue;
}
let Ok(response) = contact.send(&p2p.contact, RPCRequest::FindValue(*key)).await else {
let mut router = p2p.router.lock().await;
router.remove(&contact.id);
continue;
};
let mut router = p2p.router.lock().await;
router.update(contact.clone());
drop(router);
match response {
RPCResponse::FindNode(received) => {
queried.insert(contact.id);
for contact in received {
if !queried.contains(&contact.id) {
nodes.push(contact);
}
}
} }
blobs::Item::Raw(blob) => { RPCResponse::FindValue(value) => {
self.blobs.put(blobs::Item::Raw(blob.clone())).await?; return value.clone();
blob
} }
}, RPCResponse::Ok => (),
Err(err) => return Err(err), }
}; }
vec![]
self.db.create_blob(item_ref, blob.len() as u32).await?;
Ok(())
} }
} }

View file

@ -1,9 +1,5 @@
#![feature(async_fn_in_trait)] // ahh yes, experimental features #![feature(async_fn_in_trait)] // ahh yes, experimental features
#![allow(clippy::type_complexity)]
#![allow(
clippy::type_complexity
)]
// general purpose lints // general purpose lints
#![warn( #![warn(
clippy::get_unwrap, clippy::get_unwrap,
@ -26,6 +22,7 @@
use axum::extract::Json; use axum::extract::Json;
use axum::{routing, Router, Server}; use axum::{routing, Router, Server};
use clap::Parser; use clap::Parser;
use peer::NodeId;
use serde::Serialize; use serde::Serialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{SocketAddr, SocketAddrV4}; use std::net::{SocketAddr, SocketAddrV4};
@ -51,6 +48,8 @@ mod state;
pub(crate) use error::Error; pub(crate) use error::Error;
use ufh::event::{Event, RelInfo}; use ufh::event::{Event, RelInfo};
use crate::peer::{Contact, RPCRequest};
const MAX_SIZE: u64 = 1024 * 1024; const MAX_SIZE: u64 = 1024 * 1024;
// TODO (future): maybe use a websocket instead of long polling? // TODO (future): maybe use a websocket instead of long polling?
@ -59,13 +58,19 @@ const MAX_SIZE: u64 = 1024 * 1024;
type Relations = HashMap<ItemRef, (Event, RelInfo)>; type Relations = HashMap<ItemRef, (Event, RelInfo)>;
type RowId = u32; type RowId = u32;
pub struct P2PState {
contact: Contact,
router: Mutex<peer::Router>,
map: Mutex<HashMap<NodeId, Vec<Contact>>>,
}
pub struct ServerState { pub struct ServerState {
db: state::db::sqlite::Sqlite, db: state::db::sqlite::Sqlite,
search: state::search::tantivy::Tantivy, search: state::search::tantivy::Tantivy,
items: items::Items, items: items::Items,
queries: RwLock<HashMap<String, Query>>, // maybe move this to state? queries: RwLock<HashMap<String, Query>>, // maybe move this to state?
events: broadcast::Sender<(Event, Relations, RowId)>, events: broadcast::Sender<(Event, Relations, RowId)>,
p2p: Mutex<peer::Node>, p2p: Arc<P2PState>,
} }
// TODO: replace sharing system with aliases // TODO: replace sharing system with aliases
@ -86,27 +91,51 @@ async fn main() -> Result<(), Error> {
let command: cli::Command = cli::Command::parse(); let command: cli::Command = cli::Command::parse();
match command.action { match command.action {
cli::Action::Serve { port } => serve(port).await, cli::Action::Serve { port, bootstrap } => serve(port, bootstrap).await,
} }
} }
async fn serve(port: u16) -> Result<(), Error> { async fn serve(port: u16, bootstrap: Vec<Contact>) -> Result<(), Error> {
info!("Hello, world!"); info!("Hello, world!");
info!("load db"); info!("load db");
let db = state::db::sqlite::Sqlite::open("data/data.db").await?; let db = state::db::sqlite::Sqlite::open("data/data.db").await?;
let blob_client = blobs::Client::new("http://localhost:3219"); let blob_client = blobs::Client::new("http://localhost:3219");
let search = state::search::tantivy::Tantivy::open("data/tantivy").await?; let search = state::search::tantivy::Tantivy::open("data/tantivy").await?;
let items_client = items::Items::new(&db, &blob_client, &search);
let p2p = peer::Node::new(peer::NodeId::new(), port); let node_id = peer::NodeId::new_from_rand();
let p2p = Arc::new(P2PState {
router: Mutex::new(peer::Router::new(node_id)),
contact: Contact {
id: node_id,
host: format!("127.0.0.1:{}", port),
},
map: Mutex::new(HashMap::new()),
});
let items_client = items::Items::new(&db, &blob_client, &search, &p2p);
let state = ServerState { let state = ServerState {
db, db,
search, search,
items: items_client, items: items_client,
queries: RwLock::new(HashMap::new()), queries: RwLock::new(HashMap::new()),
events: tokio::sync::broadcast::channel(64).0, events: tokio::sync::broadcast::channel(64).0,
p2p: Mutex::new(p2p), p2p,
}; };
let state = Arc::new(state);
let bootstrap_state = state.clone();
tokio::spawn(async move {
let contacts = bootstrap.len();
for contact in bootstrap {
if contact.send(&bootstrap_state.p2p.contact, RPCRequest::Ping).await.is_ok() {
let mut router = bootstrap_state.p2p.router.lock().await;
router.update(contact);
}
}
info!("bootstrapped {} nodes", contacts);
});
let router = Router::new() let router = Router::new()
.route("/", routing::get(instance_info)) .route("/", routing::get(instance_info))
@ -114,7 +143,7 @@ async fn serve(port: u16) -> Result<(), Error> {
.nest("/alias", routes::aliases::routes()) .nest("/alias", routes::aliases::routes())
.nest("/search", routes::search::routes()) .nest("/search", routes::search::routes())
.nest("/p2p", routes::p2p::routes()) .nest("/p2p", routes::p2p::routes())
.with_state(Arc::new(state)) .with_state(state)
.layer(axum::middleware::from_fn(middleware::csp)) .layer(axum::middleware::from_fn(middleware::csp))
.layer(CorsLayer::permissive()) .layer(CorsLayer::permissive())
.layer(TraceLayer::new_for_http()); .layer(TraceLayer::new_for_http());

View file

@ -1,10 +1,14 @@
// this code will have no networking, pretty much only pure logic/state machines // this code will have no networking, pretty much only pure logic/state machines
// #![allow(unused)] // TODO (commit): remove this before comitting #![allow(unused)] // TODO (commit): remove this before comitting
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::Digest; use sha2::Digest;
use std::collections::{HashMap, HashSet}; use std::{
collections::{HashMap, HashSet},
str::FromStr,
};
use ufh::item::ItemRef;
/// the length of each key /// the length of each key
const KEY_LEN: usize = 20; const KEY_LEN: usize = 20;
@ -25,28 +29,31 @@ pub struct NodeId([u8; KEY_LEN]);
struct Distance([u8; KEY_LEN]); struct Distance([u8; KEY_LEN]);
#[derive(Debug)] #[derive(Debug)]
struct Router { pub struct Router {
for_id: NodeId, for_id: NodeId,
buckets: Vec<Vec<Contact>>, buckets: Vec<Vec<Contact>>,
} }
#[derive(Debug)] // #[derive(Debug)]
pub struct Node { // pub struct Node {
pub contact: Contact, // pub contact: Contact,
router: Router, // router: Router,
store: HashMap<NodeId, String>, // store: HashMap<NodeId, String>,
} // }
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Contact { pub struct Contact {
id: NodeId, pub id: NodeId,
host: String, pub host: String,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum RPCRequest { pub enum RPCRequest {
Ping, Ping,
Store(NodeId, String), Have(NodeId),
// TODO: subscribe to relations
// Subscribe(NodeId),
// Unsubscribe(NodeId),
FindNode(NodeId), FindNode(NodeId),
FindValue(NodeId), FindValue(NodeId),
} }
@ -55,11 +62,11 @@ pub enum RPCRequest {
pub enum RPCResponse { pub enum RPCResponse {
Ok, Ok,
FindNode(Vec<Contact>), FindNode(Vec<Contact>),
FindValue(String), FindValue(Vec<Contact>),
} }
impl NodeId { impl NodeId {
pub fn new() -> NodeId { pub fn new_from_rand() -> NodeId {
NodeId(rand::random()) NodeId(rand::random())
} }
@ -72,6 +79,10 @@ impl NodeId {
let trimmed: [u8; KEY_LEN] = hash[0..KEY_LEN].try_into().unwrap(); let trimmed: [u8; KEY_LEN] = hash[0..KEY_LEN].try_into().unwrap();
NodeId(trimmed) NodeId(trimmed)
} }
pub fn new_from_ref(item_ref: &ItemRef) -> Self {
NodeId::new_from_str(&item_ref.to_string())
}
} }
impl Distance { impl Distance {
@ -100,7 +111,7 @@ impl Distance {
impl Contact { impl Contact {
// TODO (future): i really should split apart network logic // TODO (future): i really should split apart network logic
async fn send( pub async fn send(
&self, &self,
sender: &Self, sender: &Self,
message: RPCRequest, message: RPCRequest,
@ -117,7 +128,7 @@ impl Contact {
}; };
reqwest::Client::new() reqwest::Client::new()
.post(format!("http://{}/p2p/recv", self.host)) .post(format!("http://{}/p2p/send", self.host))
.json(&request) .json(&request)
.send() .send()
.await? .await?
@ -126,15 +137,23 @@ impl Contact {
} }
} }
impl FromStr for Contact {
type Err = serde_json::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_json::from_str(s)
}
}
impl Router { impl Router {
fn new(for_id: NodeId) -> Router { pub fn new(for_id: NodeId) -> Router {
Router { Router {
buckets: vec![0; N_BUCKETS].iter().map(|_| Vec::new()).collect(), buckets: vec![0; N_BUCKETS].iter().map(|_| Vec::new()).collect(),
for_id, for_id,
} }
} }
fn update(&mut self, contact: Contact) { pub fn update(&mut self, contact: Contact) {
let id = contact.id; let id = contact.id;
let prefix_length = Distance::between(&id, &self.for_id).clz(); let prefix_length = Distance::between(&id, &self.for_id).clz();
let bucket = &mut self.buckets[prefix_length]; let bucket = &mut self.buckets[prefix_length];
@ -150,7 +169,7 @@ impl Router {
} }
} }
fn remove(&mut self, id: &NodeId) { pub fn remove(&mut self, id: &NodeId) {
let prefix_length = Distance::between(id, &self.for_id).clz(); let prefix_length = Distance::between(id, &self.for_id).clz();
let bucket = &mut self.buckets[prefix_length]; let bucket = &mut self.buckets[prefix_length];
let element_idx = bucket.iter().position(|i| &i.id == id); let element_idx = bucket.iter().position(|i| &i.id == id);
@ -159,7 +178,7 @@ impl Router {
} }
} }
fn find_closest(&self, target: &NodeId, count: usize) -> Vec<Contact> { pub fn find_closest(&self, target: &NodeId, count: usize) -> Vec<Contact> {
let mut ret = Vec::new(); let mut ret = Vec::new();
for bucket in &self.buckets { for bucket in &self.buckets {
for contact in bucket { for contact in bucket {
@ -173,16 +192,14 @@ impl Router {
} }
} }
/*
impl Node { impl Node {
pub fn new(id: NodeId, port: u16) -> Self { pub fn new(id: NodeId, host: String) -> Self {
Node { Node {
// id, // id,
router: Router::new(id), router: Router::new(id),
store: HashMap::new(), store: HashMap::new(),
contact: Contact { contact: Contact { id, host },
id,
host: format!("127.0.0.1:{}", port),
},
} }
} }
@ -241,7 +258,7 @@ impl Node {
let mut nodes = self.router.find_closest(key, 1); let mut nodes = self.router.find_closest(key, 1);
while !nodes.is_empty() { while !nodes.is_empty() {
let contact = nodes.remove(0); let contact = nodes.remove(0);
if self.contact == contact { if contact == self.contact {
continue; continue;
} }
let Some(response) = self.send(&contact, RPCRequest::FindValue(*key)).await else { let Some(response) = self.send(&contact, RPCRequest::FindValue(*key)).await else {
@ -261,10 +278,9 @@ impl Node {
} }
RPCResponse::Ok => (), RPCResponse::Ok => (),
} }
dbg!("loop");
} }
dbg!("not found");
None None
} }
} }
} }
*/

View file

@ -97,7 +97,13 @@ pub async fn can_send_event(db: &Sqlite, wip: &WipEvent) -> Result<bool, Error>
}; };
let rel_ids: Vec<_> = relations.keys().cloned().collect(); let rel_ids: Vec<_> = relations.keys().cloned().collect();
let rel_events = db.bulk_fetch(&rel_ids, false).await?; let rel_events = match db.bulk_fetch(&rel_ids, false).await {
Ok(rel_events) => rel_events,
// Err(Error::NotFound) => {
// todo!();
// },
Err(err) => return Err(err),
};
let relations = rel_events let relations = rel_events
.into_iter() .into_iter()
.map(|(item_ref, item)| { .map(|(item_ref, item)| {

View file

@ -1,13 +1,10 @@
use crate::{ use crate::{
peer::{Contact, NodeId, RPCRequest, RPCResponse}, peer::{Contact, RPCRequest, RPCResponse},
ServerState, ServerState,
}; };
use axum::{ use axum::{extract::State, routing, Json, Router};
extract::{ConnectInfo, Path, State}, use reqwest::StatusCode;
routing, Json, Router,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use tracing::debug; use tracing::debug;
@ -17,8 +14,7 @@ type Response<T> = Result<T, Error>;
pub fn routes() -> Router<Arc<ServerState>> { pub fn routes() -> Router<Arc<ServerState>> {
Router::new() Router::new()
.route("/recv", routing::post(message)) .route("/send", routing::post(message))
.route("/kv/:key", routing::post(set).get(get))
.route("/bootstrap", routing::post(bootstrap)) .route("/bootstrap", routing::post(bootstrap))
.route("/info", routing::get(info)) .route("/info", routing::get(info))
} }
@ -29,58 +25,58 @@ struct Request {
contact: Contact, contact: Contact,
} }
#[derive(Deserialize, Serialize)]
struct KeySet {
value: String,
}
async fn message( async fn message(
ConnectInfo(_info): ConnectInfo<SocketAddr>,
State(state): State<Arc<ServerState>>, State(state): State<Arc<ServerState>>,
Json(req): Json<Request>, Json(request): Json<Request>,
) -> Response<Json<RPCResponse>> { ) -> Response<Json<RPCResponse>> {
debug!("handle p2p message"); let response = match request.info {
// NOTE: p2p must NEVER send a request to itself, or it will deadlock RPCRequest::Ping => RPCResponse::Ok,
println!("receive {:?}", req); RPCRequest::Have(node_id) => {
let mut p2p = state.p2p.lock().await; let mut map = state.p2p.map.lock().await;
let res = p2p.receive(&req.contact, req.info); map.entry(node_id)
Ok(Json(res)) .or_insert_with(Vec::new)
} .push(request.contact);
RPCResponse::Ok
async fn set( }
ConnectInfo(_info): ConnectInfo<SocketAddr>, RPCRequest::FindNode(node_id) => {
State(state): State<Arc<ServerState>>, let router = state.p2p.router.lock().await;
Path(key): Path<String>, let contacts = router.find_closest(&node_id, 20);
Json(req): Json<KeySet>, RPCResponse::FindNode(contacts)
) -> Response<()> { }
debug!("handle p2p set"); RPCRequest::FindValue(node_id) => {
let key = NodeId::new_from_str(&key); let map = state.p2p.map.lock().await;
let mut p2p = state.p2p.lock().await; if let Some(value) = map.get(&node_id) {
p2p.set(&key, &req.value).await; RPCResponse::FindValue(value.clone())
Ok(()) } else {
} let router = state.p2p.router.lock().await;
let contacts = router.find_closest(&node_id, 20);
async fn get(State(state): State<Arc<ServerState>>, Path(key): Path<String>) -> Response<String> { RPCResponse::FindNode(contacts)
debug!("handle p2p get"); }
let key = NodeId::new_from_str(&key); }
let mut p2p = state.p2p.lock().await; };
let res = p2p.get(&key).await; Ok(Json(response))
Ok(res.unwrap_or_else(|| String::from("no value")))
} }
async fn bootstrap( async fn bootstrap(
ConnectInfo(_info): ConnectInfo<SocketAddr>,
State(state): State<Arc<ServerState>>, State(state): State<Arc<ServerState>>,
Json(contact): Json<Contact>, Json(contact): Json<Contact>,
) -> Response<()> { ) -> Response<StatusCode> {
debug!("handle p2p bootstrap"); debug!("handle p2p bootstrap");
let mut p2p = state.p2p.lock().await; if contact
p2p.bootstrap(contact).await; .send(&state.p2p.contact, RPCRequest::Ping)
Ok(()) .await
.is_ok()
{
let mut router = state.p2p.router.lock().await;
router.update(contact);
} else {
let mut router = state.p2p.router.lock().await;
router.remove(&contact.id);
}
Ok(StatusCode::NO_CONTENT)
} }
async fn info(State(state): State<Arc<ServerState>>) -> Response<Json<Contact>> { async fn info(State(state): State<Arc<ServerState>>) -> Response<Json<Contact>> {
debug!("handle p2p bootstrap"); debug!("get contact info");
let p2p = state.p2p.lock().await; Ok(Json(state.p2p.contact.clone()))
Ok(Json(p2p.contact.clone()))
} }

View file

@ -2,11 +2,10 @@ use crate::error::Error;
use crate::routes::util::get_blob; use crate::routes::util::get_blob;
use crate::state::db::{Database, DbItem}; use crate::state::db::{Database, DbItem};
use crate::ServerState; use crate::ServerState;
use axum::extract::{Path, Query, State}; use axum::extract::{Path, State};
use axum::headers::Range; use axum::headers::Range;
use axum::http::{HeaderMap, HeaderValue, StatusCode}; use axum::http::{HeaderMap, HeaderValue, StatusCode};
use axum::TypedHeader; use axum::TypedHeader;
use serde::Deserialize;
use std::ops::Bound; use std::ops::Bound;
use std::sync::Arc; use std::sync::Arc;
use tracing::debug; use tracing::debug;
@ -17,22 +16,16 @@ use axum::headers::HeaderMapExt;
use super::Response; use super::Response;
use crate::items::Item; use crate::items::Item;
#[derive(Debug, Deserialize)]
pub struct BlobParams {
via: Option<String>,
}
#[tracing::instrument(skip_all, fields(item_ref))] #[tracing::instrument(skip_all, fields(item_ref))]
pub async fn route( pub async fn route(
State(state): State<Arc<ServerState>>, State(state): State<Arc<ServerState>>,
range: Option<TypedHeader<Range>>, range: Option<TypedHeader<Range>>,
Path(item_ref): Path<ItemRef>, Path(item_ref): Path<ItemRef>,
Query(params): Query<BlobParams>,
) -> Response<(StatusCode, HeaderMap, Vec<u8>)> { ) -> Response<(StatusCode, HeaderMap, Vec<u8>)> {
tracing::Span::current().record("item_ref", &item_ref.to_string()); tracing::Span::current().record("item_ref", &item_ref.to_string());
debug!("fetch blob"); debug!("fetch blob");
let (event, file) = match state.items.get(&item_ref, params.via.as_deref()).await? { let (event, file) = match state.items.get(&item_ref).await? {
Item::Event(event) => match event.content.clone() { Item::Event(event) => match event.content.clone() {
EventContent::File(file) => (event, file), EventContent::File(file) => (event, file),
_ => return Err(Error::Validation("this is not a x.file event")), _ => return Err(Error::Validation("this is not a x.file event")),
@ -96,7 +89,7 @@ pub async fn route(
let mut chunks = Vec::with_capacity(file.chunks.len()); let mut chunks = Vec::with_capacity(file.chunks.len());
for item_ref in &intersection { for item_ref in &intersection {
let Item::Blob(blob) = state.items.get(item_ref, params.via.as_deref()).await? else { let Item::Blob(blob) = state.items.get(item_ref).await? else {
unreachable!("file didn't reference a blob"); unreachable!("file didn't reference a blob");
}; };
chunks.push(blob); chunks.push(blob);
@ -113,7 +106,7 @@ pub async fn route(
let mut chunks = Vec::with_capacity(file.chunks.len()); let mut chunks = Vec::with_capacity(file.chunks.len());
for item_ref in &intersection { for item_ref in &intersection {
let Item::Blob(blob) = state.items.get(item_ref, params.via.as_deref()).await? else { let Item::Blob(blob) = state.items.get(item_ref).await? else {
unreachable!("file didn't reference a blob"); unreachable!("file didn't reference a blob");
}; };
chunks.push(blob); chunks.push(blob);
@ -132,7 +125,7 @@ pub async fn route(
} }
} else { } else {
debug!("getting blob chunks"); debug!("getting blob chunks");
let blob = get_blob(&state.items, &event, params.via.as_deref()).await?; let blob = get_blob(&state.items, &event).await?;
Ok((StatusCode::OK, headers, blob.to_vec())) Ok((StatusCode::OK, headers, blob.to_vec()))
} }
} }

View file

@ -2,6 +2,8 @@ use super::{perms, Authenticate};
pub(crate) use crate::error::Error; pub(crate) use crate::error::Error;
use crate::ServerState; use crate::ServerState;
use crate::MAX_SIZE; use crate::MAX_SIZE;
use crate::peer::NodeId;
use crate::peer::RPCRequest;
use axum::body::HttpBody; use axum::body::HttpBody;
use axum::extract::{Json, Query, RawBody, State}; use axum::extract::{Json, Query, RawBody, State};
use axum::http::HeaderMap; use axum::http::HeaderMap;
@ -85,6 +87,7 @@ pub async fn route(
if !params.wait { if !params.wait {
let item_ref = wip.item_ref.clone(); let item_ref = wip.item_ref.clone();
let item_ref2 = wip.item_ref.clone();
tokio::spawn(async move { tokio::spawn(async move {
let _lock = ROWID_LOCK.lock().await; let _lock = ROWID_LOCK.lock().await;
let wip = match state.items.commit_event_create(wip).await { let wip = match state.items.commit_event_create(wip).await {
@ -98,6 +101,23 @@ pub async fn route(
let _ = state let _ = state
.events .events
.send((create.event, create.relations, create.rowid)); .send((create.event, create.relations, create.rowid));
let key = NodeId::new_from_ref(&item_ref2);
let mut map = state.p2p.map.lock().await;
map.insert(key, vec![state.p2p.contact.clone()]);
drop(map);
let router = state.p2p.router.lock().await;
let contacts = router.find_closest(&key, 4);
drop(router);
for contact in contacts {
if contact == state.p2p.contact {
continue;
}
if contact.send(&state.p2p.contact, RPCRequest::Have(key)).await.is_err() {
let mut router = state.p2p.router.lock().await;
router.remove(&contact.id);
}
}
}); });
return Ok((StatusCode::ACCEPTED, Json(json!({ "ref": item_ref })))); return Ok((StatusCode::ACCEPTED, Json(json!({ "ref": item_ref }))));
} }

View file

@ -1,11 +1,10 @@
use axum::{ use axum::{
extract::{Path, Query, State}, extract::{Path, State},
headers::ContentType, headers::ContentType,
TypedHeader, TypedHeader,
}; };
use bytes::Bytes; use bytes::Bytes;
use reqwest::StatusCode; use reqwest::StatusCode;
use serde::Deserialize;
use std::sync::Arc; use std::sync::Arc;
use tracing::debug; use tracing::debug;
use ufh::item::ItemRef; use ufh::item::ItemRef;
@ -18,23 +17,17 @@ use crate::{
ServerState, ServerState,
}; };
#[derive(Debug, Deserialize)]
pub struct FetchParams {
via: Option<String>,
}
#[tracing::instrument(skip_all, fields(item_ref))] #[tracing::instrument(skip_all, fields(item_ref))]
pub async fn route( pub async fn route(
State(state): State<Arc<ServerState>>, State(state): State<Arc<ServerState>>,
auth: Authenticate<perms::None>, auth: Authenticate<perms::None>,
Path(item_ref): Path<ItemRef>, Path(item_ref): Path<ItemRef>,
Query(params): Query<FetchParams>,
) -> Response<(StatusCode, TypedHeader<ContentType>, Bytes)> { ) -> Response<(StatusCode, TypedHeader<ContentType>, Bytes)> {
tracing::Span::current().record("item_ref", &item_ref.to_string()); tracing::Span::current().record("item_ref", &item_ref.to_string());
debug!("fetch"); debug!("fetch");
let item = state.items.get(&item_ref, params.via.as_deref()).await?; let item = state.items.get(&item_ref).await?;
match item { match item {
Item::Blob(blob) => { Item::Blob(blob) => {
debug!("got blob"); debug!("got blob");

View file

@ -42,7 +42,7 @@ pub async fn route(
.event_fetch(&item_ref) .event_fetch(&item_ref)
.await? .await?
.ok_or(Error::NotFound)?; .ok_or(Error::NotFound)?;
let blob = get_blob(&state.items, &event, params.via.as_deref()).await?; let blob = get_blob(&state.items, &event).await?;
debug!("generate thumbnail"); debug!("generate thumbnail");
let thumb = generate_thumb(&blob, &size)?; let thumb = generate_thumb(&blob, &size)?;
state.db.thumbnail_put(&item_ref, &size, &thumb).await?; state.db.thumbnail_put(&item_ref, &size, &thumb).await?;
@ -66,7 +66,6 @@ fn generate_thumb(bytes: &Bytes, size: &ThumbnailSize) -> Result<Bytes, Error> {
pub struct ThumbnailParams { pub struct ThumbnailParams {
height: u32, height: u32,
width: u32, width: u32,
via: Option<String>,
} }
#[derive(Debug, Serialize, Deserialize, Clone, Copy)] #[derive(Debug, Serialize, Deserialize, Clone, Copy)]

View file

@ -170,14 +170,13 @@ impl<T: AuthLevel> Authenticate<T> {
pub async fn get_blob( pub async fn get_blob(
items: &Items, items: &Items,
file_event: &Event, file_event: &Event,
via: Option<&str>,
) -> Result<Bytes, Error> { ) -> Result<Bytes, Error> {
let EventContent::File(file) = &file_event.content else { let EventContent::File(file) = &file_event.content else {
return Err(Error::Validation("not a file event")); return Err(Error::Validation("not a file event"));
}; };
let mut chunks = Vec::with_capacity(file.chunks.len()); let mut chunks = Vec::with_capacity(file.chunks.len());
for item_ref in &file.chunks { for item_ref in &file.chunks {
let Item::Blob(blob) = items.get(item_ref, via).await? else { let Item::Blob(blob) = items.get(item_ref).await? else {
// possibly unreachable!(), assuming everything validated properly on insert // possibly unreachable!(), assuming everything validated properly on insert
return Err(Error::Validation("file didn't reference a blob")); return Err(Error::Validation("file didn't reference a blob"));
}; };

View file

@ -108,7 +108,7 @@
{#if item.id === selectedResParentId} {#if item.id === selectedResParentId}
{@const name = getName(selectedRes) || "unnamed"} {@const name = getName(selectedRes) || "unnamed"}
<a <a
href="#/{item.id}" href="#/{selectedRes.id}"
class="wrapper selected child" class="wrapper selected child"
on:contextmenu|preventDefault|stopPropagation={openContext(selectedRes)} on:contextmenu|preventDefault|stopPropagation={openContext(selectedRes)}
> >

View file

@ -9,17 +9,17 @@
contain: strict; contain: strict;
display: flex; display: flex;
flex-direction: column; flex-direction: column;
padding: 8px; padding: calc(8px / var(--resize));
gap: 8px; gap: calc(8px / var(--resize));
overflow-y: auto; overflow-y: auto;
width: var(--pin-size); width: var(--pin-size);
background: var(--bg-quartiary); background: var(--bg-quartiary);
& > .item { & > .item {
background: var(--bg-primary); background: var(--bg-primary);
min-height: 48px; min-height: calc(48px / var(--resize));
width: 48px; width: calc(48px / var(--resize));
border-radius: 4px; border-radius: calc(4px / var(--resize));
} }
} }
</style> </style>

View file

@ -4,13 +4,26 @@
export let state: any; export let state: any;
export let event: Event; export let event: Event;
export let comments: Array<Event>; export let comments: Array<Event>;
$: children = comments $: children = filterChildren(comments, event)
.filter(comment => filterRels(comment, "comment").indexOf(event.id) !== -1)
.sort((a, b) => a.origin_ts - b.origin_ts); .sort((a, b) => a.origin_ts - b.origin_ts);
$: author = api.actors.fetch(event.sender); $: author = api.actors.fetch(event.sender);
$: isFromOp = $state.opId === event.sender; $: isFromOp = $state.opId === event.sender;
$: replied = $state.replyId === event.id; $: replied = $state.replyId === event.id;
let collapsed = false; $: collapsed = $state.collapsed.has(event.id);
function filterChildren(comments: Array<Event>, parent: Event): Array<Event> {
return comments.filter(comment => filterRels(comment, "comment").includes(parent.id));
}
function countChildren(comments: Array<Event>, parent: Event): number {
const toCheck = [parent];
let count = 0;
while (toCheck.length) {
toCheck.push(...filterChildren(comments, toCheck.pop()!));
count++;
}
return count;
}
</script> </script>
<div <div
class="comment" class="comment"
@ -19,9 +32,12 @@
class:collapsed class:collapsed
> >
<header> <header>
<button class="collapse" on:click={() => collapsed = !collapsed}> <button class="collapse" on:click={state.curry(collapsed ? "expand" : "collapse", event.id)}>
{collapsed ? "+" : "-"} {collapsed ? "+" : "-"}
</button> </button>
{#if collapsed}
<span class="childCount">[{countChildren(comments, event)}]</span>
{/if}
<div class="author"> <div class="author">
{#await author} {#await author}
<i>loading...</i> <i>loading...</i>
@ -79,6 +95,7 @@
.comment { .comment {
border-left: solid var(--borders) 1px; border-left: solid var(--borders) 1px;
margin-left: -1px; margin-left: -1px;
margin-bottom: -1px;
// &.fromop { // &.fromop {
// // TODO: find a good color // // TODO: find a good color
@ -110,6 +127,10 @@
} }
} }
& > .childCount {
color: var(--fg-dimmed);
}
& > .author > .green { & > .author > .green {
color: #77c57d; color: #77c57d;
} }

View file

@ -13,6 +13,7 @@
const state = new Reduxer({ const state = new Reduxer({
replyId: null as null | string, replyId: null as null | string,
opId: bucket.sender, opId: bucket.sender,
collapsed: new Set(),
}, { }, {
reply(_state, replyId: string | null) { reply(_state, replyId: string | null) {
if (replyId) { if (replyId) {
@ -20,6 +21,27 @@
} }
return { replyId }; return { replyId };
}, },
collapse(state, commentId: string) {
state.collapsed.add(commentId);
return state;
},
expand(state, commentId: string) {
state.collapsed.delete(commentId);
return state;
},
expandAll(state) {
state.collapsed.clear();
return state;
},
collapseTopLevel(state) {
const collapsed = [...state.collapsed];
for (const comment of $comments) {
if (topLevelComments.find(i => i.id === filterRels(comment, "comment")[0])) {
collapsed.push(comment.id);
}
}
return { ...state, collapsed: new Set(collapsed) };
},
}); });
const comments = query({ const comments = query({
@ -51,6 +73,7 @@
<p>{bucket.content.body || "no body"}</p> <p>{bucket.content.body || "no body"}</p>
</article> </article>
<hr /> <hr />
{$comments.length} comments - <button on:click={state.curry("collapseTopLevel")}>collapse</button> <button on:click={state.curry("expandAll")}>expand</button>
<ul class="comments"> <ul class="comments">
{#each topLevelComments as event (event.id)} {#each topLevelComments as event (event.id)}
<li class="toplevel"><Comments {state} comments={$comments} {event} /></li> <li class="toplevel"><Comments {state} comments={$comments} {event} /></li>