better announcing
This commit is contained in:
parent
e7e4ac63f1
commit
7d1fb9d991
17 changed files with 340 additions and 49 deletions
150
Cargo.lock
generated
150
Cargo.lock
generated
|
@ -172,6 +172,12 @@ dependencies = [
|
|||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic"
|
||||
version = "0.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c59bdb34bc650a32731b31bd8f0829cc15d24a708ee31559e0bb34f2bc320cba"
|
||||
|
||||
[[package]]
|
||||
name = "autocfg"
|
||||
version = "1.1.0"
|
||||
|
@ -769,6 +775,12 @@ dependencies = [
|
|||
"zip",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equivalent"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5"
|
||||
|
||||
[[package]]
|
||||
name = "errno"
|
||||
version = "0.3.1"
|
||||
|
@ -844,6 +856,20 @@ dependencies = [
|
|||
"simd-adler32",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "figment"
|
||||
version = "0.10.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4547e226f4c9ab860571e070a9034192b3175580ecea38da34fcdb53a018c9a5"
|
||||
dependencies = [
|
||||
"atomic",
|
||||
"pear",
|
||||
"serde",
|
||||
"toml",
|
||||
"uncased",
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "flate2"
|
||||
version = "1.0.26"
|
||||
|
@ -1069,7 +1095,7 @@ dependencies = [
|
|||
"futures-sink",
|
||||
"futures-util",
|
||||
"http",
|
||||
"indexmap",
|
||||
"indexmap 1.9.3",
|
||||
"slab",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
|
@ -1353,6 +1379,16 @@ dependencies = [
|
|||
"hashbrown 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "indexmap"
|
||||
version = "2.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d5477fe2230a79769d8dc68e0eabf5437907c0457a5614a9e8dddb67f65eb65d"
|
||||
dependencies = [
|
||||
"equivalent",
|
||||
"hashbrown 0.14.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "infer"
|
||||
version = "0.14.0"
|
||||
|
@ -1362,6 +1398,12 @@ dependencies = [
|
|||
"cfb",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inlinable_string"
|
||||
version = "0.1.15"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb"
|
||||
|
||||
[[package]]
|
||||
name = "inout"
|
||||
version = "0.1.3"
|
||||
|
@ -1931,6 +1973,29 @@ dependencies = [
|
|||
"sha2 0.10.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pear"
|
||||
version = "0.2.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61a386cd715229d399604b50d1361683fe687066f42d56f54be995bc6868f71c"
|
||||
dependencies = [
|
||||
"inlinable_string",
|
||||
"pear_codegen",
|
||||
"yansi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pear_codegen"
|
||||
version = "0.2.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "da9f0f13dac8069c139e8300a6510e3f4143ecf5259c60b116a9b271b4ca0d54"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"proc-macro2-diagnostics",
|
||||
"quote",
|
||||
"syn 2.0.27",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "percent-encoding"
|
||||
version = "2.3.0"
|
||||
|
@ -2047,6 +2112,19 @@ dependencies = [
|
|||
"unicode-ident",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro2-diagnostics"
|
||||
version = "0.10.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.27",
|
||||
"version_check",
|
||||
"yansi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "qoi"
|
||||
version = "0.4.1"
|
||||
|
@ -2455,6 +2533,15 @@ dependencies = [
|
|||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_spanned"
|
||||
version = "0.6.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "96426c9936fd7a0124915f9185ea1d20aa9445cc9821142f0a73bc9207a2e186"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_urlencoded"
|
||||
version = "0.7.1"
|
||||
|
@ -2478,6 +2565,7 @@ dependencies = [
|
|||
"bytes",
|
||||
"clap",
|
||||
"epub",
|
||||
"figment",
|
||||
"futures-util",
|
||||
"hex",
|
||||
"html2text",
|
||||
|
@ -2681,7 +2769,7 @@ dependencies = [
|
|||
"futures-util",
|
||||
"hashlink",
|
||||
"hex",
|
||||
"indexmap",
|
||||
"indexmap 1.9.3",
|
||||
"itoa",
|
||||
"libc",
|
||||
"libsqlite3-sys",
|
||||
|
@ -3180,6 +3268,40 @@ dependencies = [
|
|||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml"
|
||||
version = "0.7.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c17e963a819c331dcacd7ab957d80bc2b9a9c1e71c804826d2f283dd65306542"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"serde_spanned",
|
||||
"toml_datetime",
|
||||
"toml_edit",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml_datetime"
|
||||
version = "0.6.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml_edit"
|
||||
version = "0.19.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8123f27e969974a3dfba720fdb560be359f57b44302d280ba72e76a74480e8a"
|
||||
dependencies = [
|
||||
"indexmap 2.0.0",
|
||||
"serde",
|
||||
"serde_spanned",
|
||||
"toml_datetime",
|
||||
"winnow",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.4.13"
|
||||
|
@ -3335,6 +3457,15 @@ dependencies = [
|
|||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "uncased"
|
||||
version = "0.9.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9b9bc53168a4be7402ab86c3aad243a84dd7381d09be0eddc81280c1da95ca68"
|
||||
dependencies = [
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicase"
|
||||
version = "2.6.0"
|
||||
|
@ -3675,6 +3806,15 @@ version = "0.48.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a"
|
||||
|
||||
[[package]]
|
||||
name = "winnow"
|
||||
version = "0.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f46aab759304e4d7b2075a9aecba26228bb073ee8c50db796b2c72c676b5d807"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winreg"
|
||||
version = "0.10.1"
|
||||
|
@ -3701,6 +3841,12 @@ dependencies = [
|
|||
"markup5ever",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "yansi"
|
||||
version = "1.0.0-rc"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9ee746ad3851dd3bc40e4a028ab3b00b99278d929e48957bcb2d111874a7e43e"
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.6.1"
|
||||
|
|
|
@ -548,7 +548,7 @@ async fn main() -> Result<(), Error> {
|
|||
"x.annotate.local" => {
|
||||
EventContent::LocalAnnotate(serde_json::from_str(&content)?)
|
||||
}
|
||||
"x.user" => EventContent::User(serde_json::from_str(&content)?),
|
||||
"x.user" => EventContent::Actor(serde_json::from_str(&content)?),
|
||||
"x.acl" => EventContent::Acl(serde_json::from_str(&content)?),
|
||||
"x.redact" => EventContent::Redact(serde_json::from_str(&content)?),
|
||||
_ => EventContent::Other {
|
||||
|
|
|
@ -69,8 +69,11 @@ pub struct WipEvent {
|
|||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||
#[serde(tag = "type", content = "content")]
|
||||
pub enum EventContent {
|
||||
#[serde(rename = "x.user")]
|
||||
User(UserEvent),
|
||||
#[serde(rename = "x.actor")]
|
||||
Actor(ActorEvent),
|
||||
// TODO: find better name
|
||||
#[serde(rename = "x.allow")]
|
||||
Allow(AllowEvent),
|
||||
#[serde(rename = "x.file")]
|
||||
File(FileEvent),
|
||||
#[serde(rename = "x.redact")]
|
||||
|
@ -105,8 +108,18 @@ pub struct RelInfo {
|
|||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||
pub struct UserEvent {
|
||||
name: String,
|
||||
pub struct ActorEvent {
|
||||
pub name: String,
|
||||
#[serde(rename = "type")]
|
||||
pub actor_type: ActorType,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum ActorType {
|
||||
User,
|
||||
Node,
|
||||
Bot,
|
||||
}
|
||||
|
||||
// TODO: currently unused
|
||||
|
@ -130,11 +143,18 @@ pub struct FileEvent {
|
|||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||
pub struct UpdateEvent(pub Value);
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
|
||||
pub struct AllowEvent {
|
||||
expires: u64,
|
||||
host: String,
|
||||
}
|
||||
|
||||
impl EventContent {
|
||||
pub fn get_type(&self) -> &str {
|
||||
match self {
|
||||
EventContent::Actor(_) => "x.actor",
|
||||
EventContent::Allow(_) => "x.allow",
|
||||
EventContent::File(_) => "x.file",
|
||||
EventContent::User(_) => "x.user",
|
||||
EventContent::Redact(_) => "x.redact",
|
||||
EventContent::Update(_) => "x.update",
|
||||
EventContent::Annotate(_) => "x.annotate",
|
||||
|
|
|
@ -13,6 +13,7 @@ bitflags = "2.3.3"
|
|||
bytes = "1.4.0"
|
||||
clap = { version = "4.3.14", features = ["derive"] }
|
||||
epub = "2.1.1"
|
||||
figment = { version = "0.10.10", features = ["toml", "env"] }
|
||||
futures-util = "0.3.28"
|
||||
hex = "0.4.3"
|
||||
html2text = "0.6.0"
|
||||
|
|
|
@ -3,7 +3,7 @@ use axum::http::StatusCode;
|
|||
use bytes::Bytes;
|
||||
use sha2::Digest;
|
||||
use std::ops::Deref;
|
||||
use tracing::debug;
|
||||
use tracing::{trace, debug};
|
||||
use ufh::{
|
||||
event::WipEvent,
|
||||
item::{HashType, ItemRef},
|
||||
|
@ -95,10 +95,8 @@ impl Client {
|
|||
|
||||
pub async fn get_via(&self, item_ref: &ItemRef, via: &str) -> Result<Item, Error> {
|
||||
debug!("get blob through server {}", via);
|
||||
let url = format!("https://changeme/things/{}", item_ref);
|
||||
let mut url = reqwest::Url::parse(&url).expect("invalid url?");
|
||||
url.set_host(Some(via))
|
||||
.map_err(|_| Error::Validation("invalid hostname"))?;
|
||||
let url = format!("http://{}/things/{}", via, item_ref);
|
||||
trace!("target url: {}", url);
|
||||
|
||||
let req = match self.http.get(url).send().await? {
|
||||
req if req.status() == StatusCode::GONE => req,
|
||||
|
|
|
@ -17,6 +17,8 @@ pub struct Command {
|
|||
pub enum Action {
|
||||
/// start a server
|
||||
Serve {
|
||||
#[arg(short, long, help = "the store port", default_value = "3219")]
|
||||
store: u16,
|
||||
#[arg(short, long, help = "which port to listen on", default_value = "3210")]
|
||||
port: u16,
|
||||
#[arg(short = 'n', long, help = "the hostname, for p2p", default_value = "127.0.0.1")]
|
||||
|
|
|
@ -35,7 +35,7 @@ pub enum Error {
|
|||
|
||||
impl From<reqwest::Error> for Error {
|
||||
fn from(err: reqwest::Error) -> Self {
|
||||
if err.status() == Some(StatusCode::NOT_FOUND) {
|
||||
if err.status().is_some_and(|code| code == StatusCode::NOT_FOUND) {
|
||||
Error::NotFound
|
||||
} else {
|
||||
Error::Reqwest(err)
|
||||
|
|
|
@ -51,7 +51,7 @@ pub async fn prepare_special(
|
|||
relations: &Relations,
|
||||
) -> Result<Option<DelayedAction>, Error> {
|
||||
match &event.content {
|
||||
EventContent::User(_) => {
|
||||
EventContent::Actor(_) => {
|
||||
let query = ufh::query::Query::builder()
|
||||
.with_type("x.user")
|
||||
.with_sender(&event.sender)
|
||||
|
|
|
@ -16,7 +16,7 @@ use lru::LruCache;
|
|||
use once_cell::sync::OnceCell;
|
||||
use std::{num::NonZeroUsize, sync::Arc, collections::HashSet};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{trace, debug, info};
|
||||
use tracing::{trace, debug, info, error};
|
||||
use ufh::{
|
||||
derived::Derived,
|
||||
event::{Event, EventContent, WipEvent},
|
||||
|
@ -188,7 +188,7 @@ impl Items {
|
|||
Ok(item_ref)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all, fields(item_ref, via))]
|
||||
#[tracing::instrument(skip_all, fields(item_ref))]
|
||||
pub async fn get(&self, item_ref: &ItemRef) -> Result<Item, Error> {
|
||||
static CACHE: OnceCell<RwLock<LruCache<ItemRef, Item>>> = OnceCell::new();
|
||||
// let mut cache = CACHE.get_or_init(|| Mutex::new(LruCache::new(NonZeroUsize::new(100).unwrap()))).lock().await;
|
||||
|
@ -226,14 +226,21 @@ impl Items {
|
|||
Ok(blobs::Item::Raw(bytes)) => {
|
||||
// make sure the blob is in db
|
||||
// TODO (performance): remove this
|
||||
debug!("re-inserting into db");
|
||||
self.db.create_blob(item_ref, bytes.len() as u32).await?;
|
||||
return Ok(Item::Blob(bytes));
|
||||
},
|
||||
Err(Error::NotFound) => (),
|
||||
Err(err) => return Err(err),
|
||||
Err(Error::NotFound) => {
|
||||
trace!("not found");
|
||||
},
|
||||
Err(err) => {
|
||||
error!("blobs::get failed: {}", err);
|
||||
return Err(err)
|
||||
}
|
||||
};
|
||||
|
||||
for hoster in find_hosters(&self.p2p, &NodeId::new_from_ref(item_ref)).await {
|
||||
trace!("importing item from (host={})", hoster.host);
|
||||
match self.blobs.get_via(item_ref, &hoster.host).await {
|
||||
Ok(blobs::Item::WipEvent(wip)) => {
|
||||
debug!("importing event from other server");
|
||||
|
@ -245,13 +252,18 @@ impl Items {
|
|||
self.db.create_blob(item_ref, bytes.len() as u32).await?;
|
||||
return Ok(Item::Blob(bytes));
|
||||
},
|
||||
Err(Error::NotFound) => {
|
||||
trace!("target item not found");
|
||||
},
|
||||
Err(Error::Reqwest(err)) if err.status().is_some_and(|s| s.is_client_error()) => {
|
||||
debug!("couldn't fetch from other server: {}", err);
|
||||
return Err(err.into());
|
||||
},
|
||||
Err(_) => (),
|
||||
};
|
||||
}
|
||||
|
||||
trace!("item not found");
|
||||
return Err(Error::NotFound);
|
||||
}
|
||||
|
||||
|
@ -280,15 +292,22 @@ impl Items {
|
|||
}
|
||||
|
||||
// TODO (performance): cancel if there's no progress on getting "closer" to the target?
|
||||
#[tracing::instrument(skip_all, fields(key))]
|
||||
async fn find_hosters(p2p: &P2PState, key: &NodeId) -> Vec<Contact> {
|
||||
tracing::Span::current().record("key", key.to_string());
|
||||
|
||||
let map = p2p.map.lock().await;
|
||||
trace!("map: {:?}", map);
|
||||
|
||||
if let Some(value) = map.get(key) {
|
||||
trace!("found item locally");
|
||||
value.clone()
|
||||
} else {
|
||||
drop(map);
|
||||
let mut queried = HashSet::new();
|
||||
let router = p2p.router.lock().await;
|
||||
let mut nodes = router.find_closest(key, 3);
|
||||
trace!("found initial nodes: {:?}", nodes);
|
||||
drop(router);
|
||||
while !nodes.is_empty() {
|
||||
let contact = nodes.remove(0);
|
||||
|
@ -296,6 +315,7 @@ async fn find_hosters(p2p: &P2PState, key: &NodeId) -> Vec<Contact> {
|
|||
continue;
|
||||
}
|
||||
let Ok(response) = contact.send(&p2p.contact, RPCRequest::FindValue(*key)).await else {
|
||||
trace!("failed to send, removing node (host={})", p2p.contact.host);
|
||||
let mut router = p2p.router.lock().await;
|
||||
router.remove(&contact.id);
|
||||
continue;
|
||||
|
@ -305,6 +325,7 @@ async fn find_hosters(p2p: &P2PState, key: &NodeId) -> Vec<Contact> {
|
|||
drop(router);
|
||||
match response {
|
||||
RPCResponse::FindNode(received) => {
|
||||
trace!("node sent FindNode message (host={})", p2p.contact.host);
|
||||
queried.insert(contact.id);
|
||||
for contact in received {
|
||||
if !queried.contains(&contact.id) {
|
||||
|
@ -313,11 +334,15 @@ async fn find_hosters(p2p: &P2PState, key: &NodeId) -> Vec<Contact> {
|
|||
}
|
||||
}
|
||||
RPCResponse::FindValue(value) => {
|
||||
trace!("node sent FindValue message (host={})", p2p.contact.host);
|
||||
return value.clone();
|
||||
}
|
||||
RPCResponse::Ok => (),
|
||||
RPCResponse::Ok => {
|
||||
trace!("node sent Ok message (host={})", p2p.contact.host);
|
||||
},
|
||||
}
|
||||
}
|
||||
trace!("no nodes");
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,6 +49,8 @@ pub(crate) use error::Error;
|
|||
use ufh::event::{Event, RelInfo};
|
||||
|
||||
use crate::peer::{Contact, RPCRequest};
|
||||
use crate::routes::things::create::notify_have;
|
||||
use crate::state::db::{Database, Location};
|
||||
|
||||
const MAX_SIZE: u64 = 1024 * 1024;
|
||||
|
||||
|
@ -89,18 +91,37 @@ async fn main() -> Result<(), Error> {
|
|||
.finish();
|
||||
tracing::subscriber::set_global_default(log_subscriber).expect("failed to setup logger");
|
||||
|
||||
// use figment::{Figment, providers::{Serialized, Toml, Env}};
|
||||
// let config: Config = figment::Figment::new()
|
||||
// .merge(Toml::file("/etc/config.toml"))
|
||||
// .merge(Toml::file("config.toml"))
|
||||
// .merge(Env::prefixed("UFH_SERVER_"))
|
||||
// .extract();
|
||||
|
||||
let command: cli::Command = cli::Command::parse();
|
||||
match command.action {
|
||||
cli::Action::Serve { id, host, port, bootstrap } => serve(id, host, port, bootstrap).await,
|
||||
cli::Action::Serve {
|
||||
id,
|
||||
host,
|
||||
port,
|
||||
store,
|
||||
bootstrap,
|
||||
} => serve(id, host, port, store, bootstrap).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn serve(id: Option<NodeId>, host: String, port: u16, bootstrap: Vec<Contact>) -> Result<(), Error> {
|
||||
async fn serve(
|
||||
id: Option<NodeId>,
|
||||
host: String,
|
||||
port: u16,
|
||||
store_port: u16,
|
||||
bootstrap: Vec<Contact>,
|
||||
) -> Result<(), Error> {
|
||||
info!("Hello, world!");
|
||||
|
||||
info!("load db");
|
||||
let db = state::db::sqlite::Sqlite::open("data/data.db").await?;
|
||||
let blob_client = blobs::Client::new("http://localhost:3219");
|
||||
let blob_client = blobs::Client::new(&format!("http://localhost:{}", store_port));
|
||||
let search = state::search::tantivy::Tantivy::open("data/tantivy").await?;
|
||||
|
||||
let node_id = id.unwrap_or_else(peer::NodeId::new_from_rand);
|
||||
|
@ -112,9 +133,9 @@ async fn serve(id: Option<NodeId>, host: String, port: u16, bootstrap: Vec<Conta
|
|||
},
|
||||
map: Mutex::new(HashMap::new()),
|
||||
});
|
||||
|
||||
|
||||
let items_client = items::Items::new(&db, &blob_client, &search, &p2p);
|
||||
|
||||
|
||||
let state = ServerState {
|
||||
db,
|
||||
search,
|
||||
|
@ -129,12 +150,35 @@ async fn serve(id: Option<NodeId>, host: String, port: u16, bootstrap: Vec<Conta
|
|||
tokio::spawn(async move {
|
||||
let contacts = bootstrap.len();
|
||||
for contact in bootstrap {
|
||||
if contact.send(&bootstrap_state.p2p.contact, RPCRequest::Ping).await.is_ok() {
|
||||
if contact
|
||||
.send(&bootstrap_state.p2p.contact, RPCRequest::Ping)
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
let mut router = bootstrap_state.p2p.router.lock().await;
|
||||
router.update(contact);
|
||||
}
|
||||
}
|
||||
info!("bootstrapped {} nodes", contacts);
|
||||
|
||||
let mut announced_ref_count = 0;
|
||||
let mut location = Location::Beginning;
|
||||
loop {
|
||||
let query = bootstrap_state
|
||||
.db
|
||||
.query_refs(location, 100)
|
||||
.await
|
||||
.expect("db failed to query!");
|
||||
announced_ref_count += query.refs.len();
|
||||
for item_ref in query.refs {
|
||||
notify_have(&bootstrap_state.p2p, &item_ref).await;
|
||||
}
|
||||
let Some(count) = query.next else {
|
||||
break;
|
||||
};
|
||||
location = Location::Index(count);
|
||||
}
|
||||
info!("announced {} refs", announced_ref_count);
|
||||
});
|
||||
|
||||
let router = Router::new()
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
// this code will have no networking, pretty much only pure logic/state machines
|
||||
|
||||
// TODO (future, important): use ipv6 instead of hostnames
|
||||
|
||||
#![allow(unused)] // TODO (commit): remove this before comitting
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
|
|
@ -29,6 +29,7 @@ async fn message(
|
|||
State(state): State<Arc<ServerState>>,
|
||||
Json(request): Json<Request>,
|
||||
) -> Response<Json<RPCResponse>> {
|
||||
// FIXME (security): hostname must be validated!
|
||||
let response = match request.info {
|
||||
RPCRequest::Ping => {
|
||||
let mut router = state.p2p.router.lock().await;
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use super::{perms, Authenticate};
|
||||
use crate::P2PState;
|
||||
pub(crate) use crate::error::Error;
|
||||
use crate::ServerState;
|
||||
use crate::MAX_SIZE;
|
||||
|
@ -14,7 +15,7 @@ use serde::Deserialize;
|
|||
use serde_json::{json, Value};
|
||||
use tokio::sync::Mutex;
|
||||
use std::sync::Arc;
|
||||
use tracing::{debug, error};
|
||||
use tracing::{trace, error};
|
||||
use ufh::event::WipEvent;
|
||||
use ufh::item::ItemRef;
|
||||
|
||||
|
@ -101,23 +102,7 @@ pub async fn route(
|
|||
let _ = state
|
||||
.events
|
||||
.send((create.event, create.relations, create.rowid));
|
||||
let key = NodeId::new_from_ref(&item_ref2);
|
||||
let mut map = state.p2p.map.lock().await;
|
||||
map.insert(key, vec![state.p2p.contact.clone()]);
|
||||
drop(map);
|
||||
let router = state.p2p.router.lock().await;
|
||||
let contacts = router.find_closest(&key, 4);
|
||||
drop(router);
|
||||
for contact in contacts {
|
||||
if contact == state.p2p.contact {
|
||||
continue;
|
||||
}
|
||||
|
||||
if contact.send(&state.p2p.contact, RPCRequest::Have(key)).await.is_err() {
|
||||
let mut router = state.p2p.router.lock().await;
|
||||
router.remove(&contact.id);
|
||||
}
|
||||
}
|
||||
notify_have(&state.p2p, &item_ref2).await;
|
||||
});
|
||||
return Ok((StatusCode::ACCEPTED, Json(json!({ "ref": item_ref }))));
|
||||
}
|
||||
|
@ -129,11 +114,35 @@ pub async fn route(
|
|||
let _ = state
|
||||
.events
|
||||
.send((create.event, create.relations, create.rowid));
|
||||
debug!("notified pollers of event");
|
||||
trace!("notified pollers of event");
|
||||
notify_have(&state.p2p, &item_ref).await;
|
||||
|
||||
item_ref
|
||||
} else {
|
||||
state.items.create_blob(blob).await?
|
||||
let item_ref = state.items.create_blob(blob).await?;
|
||||
notify_have(&state.p2p, &item_ref).await;
|
||||
item_ref
|
||||
};
|
||||
Ok((StatusCode::CREATED, Json(json!({ "ref": item_ref }))))
|
||||
}
|
||||
|
||||
pub async fn notify_have(p2p: &P2PState, item_ref: &ItemRef) {
|
||||
let key = NodeId::new_from_ref(&item_ref);
|
||||
let mut map = p2p.map.lock().await;
|
||||
map.insert(key, vec![p2p.contact.clone()]);
|
||||
drop(map);
|
||||
let router = p2p.router.lock().await;
|
||||
let contacts = router.find_closest(&key, 4);
|
||||
drop(router);
|
||||
for contact in contacts {
|
||||
if contact == p2p.contact {
|
||||
continue;
|
||||
}
|
||||
|
||||
if contact.send(&p2p.contact, RPCRequest::Have(key)).await.is_err() {
|
||||
let mut router = p2p.router.lock().await;
|
||||
router.remove(&contact.id);
|
||||
}
|
||||
}
|
||||
trace!("notify dht of ref {}", item_ref);
|
||||
}
|
||||
|
|
|
@ -21,6 +21,12 @@ pub struct QueryResult {
|
|||
pub next: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RefsResult {
|
||||
pub refs: Vec<ItemRef>,
|
||||
pub next: Option<u32>,
|
||||
}
|
||||
|
||||
// TODO (future): rework this trait
|
||||
// a lot of trait methods are extracted as-is into this trait and sqlite,
|
||||
// but it would be better to rework the methods and parameter types
|
||||
|
@ -44,6 +50,7 @@ pub enum Thumbnail {
|
|||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum Location {
|
||||
Beginning,
|
||||
// this is awful and should be removed, but is currently the only good way of getting an event's current acl
|
||||
Reverse,
|
||||
Index(u32),
|
||||
}
|
||||
|
@ -67,6 +74,7 @@ pub trait Database {
|
|||
async fn query_events(&self, query: &Query, after: Location, limit: u32) -> Result<QueryResult, Self::Error>;
|
||||
// return type is currently a bit of a kludge for now
|
||||
async fn query_relations(&self, relations: &[QueryRelation], for_events: &[ItemRef], after: Location, limit: u32) -> Result<(HashMap<ItemRef, Event>, Option<u32>), Self::Error>;
|
||||
async fn query_refs(&self, after: Location, limit: u32) -> Result<RefsResult, Self::Error>;
|
||||
|
||||
// misc
|
||||
async fn tags_set(&self, item_refs: &[ItemRef], tags: &[String]) -> Result<(), Self::Error>;
|
||||
|
|
|
@ -16,7 +16,7 @@ use ufh::event::EventContent;
|
|||
use ufh::query::QueryRelation;
|
||||
use ufh::{item::ItemRef, query};
|
||||
|
||||
use super::{Database, DbItem, Location, Thumbnail};
|
||||
use super::{Database, DbItem, Location, RefsResult, Thumbnail};
|
||||
use crate::routes::things::thumbnail::ThumbnailSize;
|
||||
use crate::{Error, Event};
|
||||
|
||||
|
@ -362,6 +362,37 @@ impl Database for Sqlite {
|
|||
Ok((map, last_rowid))
|
||||
}
|
||||
|
||||
async fn query_refs(&self, after: Location, limit: u32) -> Result<RefsResult, Self::Error> {
|
||||
match after {
|
||||
Location::Beginning => {
|
||||
let items = sql!(
|
||||
"SELECT rowid, ref as item_ref FROM refs ORDER BY rowid LIMIT ?",
|
||||
limit
|
||||
)
|
||||
.fetch_all(&self.pool)
|
||||
.await?;
|
||||
let mut refs = Vec::new();
|
||||
let mut next = None;
|
||||
for item in items {
|
||||
refs.push(item.item_ref.parse()?);
|
||||
next = Some(item.rowid as u32);
|
||||
}
|
||||
Ok(RefsResult { refs, next })
|
||||
}
|
||||
Location::Index(idx) => {
|
||||
let items = sql!("SELECT rowid, ref as item_ref FROM refs WHERE rowid > ? ORDER BY rowid LIMIT ?", idx, limit).fetch_all(&self.pool).await?;
|
||||
let mut refs = Vec::new();
|
||||
let mut next = None;
|
||||
for item in items {
|
||||
refs.push(item.item_ref.parse()?);
|
||||
next = Some(item.rowid as u32);
|
||||
}
|
||||
Ok(RefsResult { refs, next })
|
||||
}
|
||||
Location::Reverse => unimplemented!("don't do this."),
|
||||
}
|
||||
}
|
||||
|
||||
async fn bulk_fetch(
|
||||
&self,
|
||||
item_refs: &[ItemRef],
|
||||
|
|
|
@ -2,6 +2,7 @@ use axum::extract::{BodyStream, Json, Path, Query, State};
|
|||
use axum::http::StatusCode;
|
||||
use futures_util::StreamExt;
|
||||
use serde_json::{json, Value};
|
||||
use std::net::{SocketAddr, SocketAddrV4};
|
||||
use std::sync::Arc;
|
||||
use ufh::item::ItemRef;
|
||||
|
||||
|
@ -30,7 +31,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||
axum::routing::get(blob_download).delete(blob_delete),
|
||||
)
|
||||
.with_state(Arc::new(state));
|
||||
axum::Server::bind(&"0.0.0.0:3219".parse().unwrap())
|
||||
// TODO: real cli (or better yet, config file)
|
||||
let port = std::env::args().skip(1).next().as_deref().unwrap_or("3219").parse().expect("invalid number");
|
||||
let addr = SocketAddr::V4(SocketAddrV4::new("0.0.0.0".parse().unwrap(), port));
|
||||
axum::Server::bind(&addr)
|
||||
.serve(router.into_make_service())
|
||||
.await
|
||||
.expect("could not serve");
|
||||
|
|
|
@ -360,4 +360,4 @@ async function getClient() {
|
|||
}
|
||||
|
||||
export const api = await getClient();
|
||||
Object.assign(globalThis, { api, base64, ed25519 });
|
||||
Object.assign(globalThis, { api, base64, ed25519, canonicalize });
|
||||
|
|
Loading…
Reference in a new issue