more auth stuff, import old events on initial view

This commit is contained in:
tezlm 2023-08-20 12:32:06 -07:00
parent cc55eb6126
commit ddbc30ad85
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
23 changed files with 430 additions and 1526 deletions

1
.gitignore vendored
View file

@ -2,3 +2,4 @@ target
data
node_modules
local
server.toml

8
Cargo.lock generated
View file

@ -2484,9 +2484,9 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.183"
version = "1.0.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c"
checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9"
dependencies = [
"serde_derive",
]
@ -2504,9 +2504,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.183"
version = "1.0.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682"
dependencies = [
"proc-macro2",
"quote",

View file

@ -1,3 +1,7 @@
# [package]
# name = "ufh"
# version = "0.1.0"
[workspace]
resolver = "2"
members = ["store-*", "server", "lib", "cli"]

View file

@ -17,7 +17,7 @@ json-canon = "0.1.3"
lru = "0.11.0"
once_cell = "1.18.0"
reqwest = { version = "0.11.18", features = ["json", "rustls-tls"], default-features = false }
serde = { version = "1.0.166", features = ["derive"] }
serde = { version = "=1.0.171", features = ["derive"] }
serde_json = "1.0.100"
sha2 = { version = "0.10.7", features = ["asm"] }
tokio = { version = "1.29.1", features = ["rt-multi-thread", "macros"] }

View file

@ -86,3 +86,21 @@ ones so far:
see <https://docs.rs/tantivy/latest/tantivy/query/struct.QueryParser.html>
deriving takes a lot of dependencies, which might not be desierable
## tagging
I'm not entirely sure if i should build tags into the protocol, but
since its here I might as well do it right.
Right now there is hierchical tagging (`foo.bar` includes `foo`) but
having mutex tagging (`things/foo` or `things/bar` but not both) would be nice.
- `foo.bar` includes `foo`
- `foo/bar` mutexes `foo/baz`
- `foo/bar/baz` mutexes `foo/baz/qux` (same prefix, different part after last `/`)
- `foo/bar/baz` doesn't mutex `foo/qux`
- `foo/bar/baz` doesn't mutex `foo/qux/baz`
- `foo.bar/baz` mutexes `foo.bar/qux`
- `foo.bar` includes `foo.bar/qux`
etc

View file

@ -11,7 +11,7 @@ ed25519-dalek = "1.0.1"
hex = "0.4.3"
json-canon = "0.1.3"
rand = "^0.7"
serde = { version = "1.0.164", features = ["derive"] }
serde = { version = "=1.0.171", features = ["derive"] }
serde_json = "1.0.100"
thiserror = "1.0.40"
tracing = "0.1.37"

View file

@ -13,7 +13,7 @@ use thiserror::Error;
#[derive(PartialEq, Eq, Clone, Hash)]
pub struct ActorId([u8; 32]);
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
#[derive(PartialEq, Eq, Clone, Hash)]
pub struct ActorSecret([u8; 64]);
#[derive(PartialEq, Eq, Clone, Hash, Serialize, Deserialize)]
@ -110,6 +110,12 @@ impl<'de> Deserialize<'de> for ActorSecret {
}
}
impl Debug for ActorSecret {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ActorSecret {{ {} }}", b64engine.encode(self.0))
}
}
impl FromStr for ActorId {
type Err = ActorIdParseError;

View file

@ -21,6 +21,9 @@ pub struct Event {
pub content: EventContent,
// FIXME: disallow empty hashmap (either have a populated map or missing key)
// this breaks signatures!
// currently, a map *must* be specified, but it might be nicer to
// either allow "map with keys" or "no map" instead of "always have
// a map"
// #[serde(skip_serializing_if = "HashMap::is_empty", default)]
pub relations: HashMap<ItemRef, RelInfo>,
#[serde(skip_serializing_if = "Derived::is_empty", default)]

View file

@ -18,6 +18,10 @@ pub struct Query {
/// after filtering, relations are fetched
#[serde(skip_serializing_if = "HashSet::is_empty", default)]
pub relations: HashSet<QueryRelation>,
// TODO: specify (forward) relations to fetch?
// #[serde(skip_serializing_if = "HashSet::is_empty", default)]
// pub fetch: HashSet<QueryRelation>,
// TODO: find a way to do this more elegantly?
#[serde(skip_serializing_if = "HashSet::is_empty", default)]
@ -32,18 +36,13 @@ pub struct QueryBuilder {
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
#[serde(untagged)]
pub enum QueryRelation {
FromRel(QueryFromRel),
FromRelTo(QueryFromRelTo),
/// (source_type, rel_type)
FromRel(String, String),
/// (source_type, rel_type, target_type)
FromRelTo(String, String, String),
}
/// (source_type, rel_type)
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
pub struct QueryFromRel(pub String, pub String);
/// (source_type, rel_type, target_type)
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Hash)]
pub struct QueryFromRelTo(pub String, pub String, pub String);
fn always_false() -> bool {
false
}
@ -84,7 +83,7 @@ impl Query {
pub fn matches_relation(&self, source: &Event, target: &Event, rel_info: &RelInfo) -> bool {
for rel_query in self.relations.iter().chain(&self.ephemeral) {
match rel_query {
QueryRelation::FromRel(QueryFromRel(source_type, rel_type)) => {
QueryRelation::FromRel(source_type, rel_type) => {
if &rel_info.rel_type == rel_type
&& source.content.get_type() == source_type
&& self.matches_relationless(target, true)
@ -92,7 +91,7 @@ impl Query {
return true;
}
}
QueryRelation::FromRelTo(QueryFromRelTo(source_type, rel_type, target_type)) => {
QueryRelation::FromRelTo(source_type, rel_type, target_type) => {
if &rel_info.rel_type == rel_type
&& source.content.get_type() == source_type
&& target.content.get_type() == target_type
@ -123,12 +122,12 @@ impl Query {
}
impl QueryRelation {
pub fn from_rel(source_type: String, rel_type: String) -> Self {
Self::FromRel(QueryFromRel(source_type, rel_type))
pub fn from_rel(source_type: impl Into<String>, rel_type: impl Into<String>) -> Self {
Self::FromRel(source_type.into(), rel_type.into())
}
pub fn from_rel_to(source_type: String, rel_type: String, target_type: String) -> Self {
Self::FromRelTo(QueryFromRelTo(source_type, rel_type, target_type))
pub fn from_rel_to(source_type: impl Into<String>, rel_type: impl Into<String>, target_type: impl Into<String>) -> Self {
Self::FromRelTo(source_type.into(), rel_type.into(), target_type.into())
}
}
@ -163,6 +162,11 @@ impl QueryBuilder {
}
self
}
pub fn with_relation(mut self, relation: &QueryRelation) -> Self {
self.query.relations.insert(relation.clone());
self
}
pub fn build(self) -> Query {
self.query

View file

@ -26,7 +26,7 @@ nanoid = "0.4.0"
once_cell = "1.18.0"
rand = "0.8.5"
reqwest = { version = "0.11.18", features = ["json", "rustls-tls"], default-features = false }
serde = { version = "1.0.164", features = ["derive"] }
serde = { version = "=1.0.171", features = ["derive"] }
serde-aux = "4.2.0"
serde_json = "1.0.97"
sha2 = { version = "0.10.7", features = ["asm"] }

View file

@ -1,4 +1,4 @@
use crate::{Error, routes::util::P2PAuth, consts::HEADER_GRANT};
use crate::{Error, routes::util::{P2PAuth, NodeAuth}, consts::{HEADER_GRANT, HEADER_NODE}};
use axum::http::StatusCode;
use bytes::Bytes;
use sha2::Digest;
@ -94,13 +94,15 @@ impl Client {
Ok(Some(Item::from_bytes(bytes)?))
}
pub async fn get_via(&self, item_ref: &ItemRef, via: &str, grant: Option<&P2PAuth>) -> Result<Option<Item>, Error> {
pub async fn get_via(&self, item_ref: &ItemRef, via: &str, node_auth: &NodeAuth, grant: Option<&P2PAuth>) -> Result<Option<Item>, Error> {
debug!("get blob through server {}", via);
let url = format!("http://{}/things/{}", via, item_ref);
trace!("auth: {:?}", node_auth);
trace!("grant: {:?}", grant);
trace!("target url: {}", url);
let mut req = self.http.get(url);
req = req.header(HEADER_NODE, serde_json::to_string(node_auth)?);
if let Some(grant) = grant {
req = req.header(HEADER_GRANT, serde_json::to_string(grant)?);
}

View file

@ -1,9 +1,13 @@
use crate::{
blobs,
consts::{HEADER_GRANT, HEADER_NODE},
items::events::update_search_index,
peer::{Contact, NodeId, RPCRequest, RPCResponse},
perms::can_send_event,
routes::{things::{thumbnail::ThumbnailSize, Error}, util::P2PAuth},
routes::{
things::{enumerate::QueryResult, thumbnail::ThumbnailSize, Error},
util::{NodeAuth, P2PAuth},
},
state::{
db::{sqlite::Sqlite, Database},
search::tantivy::Tantivy,
@ -15,10 +19,11 @@ use bytes::Bytes;
use events::DelayedAction;
use lru::LruCache;
use once_cell::sync::OnceCell;
use std::{collections::HashSet, num::NonZeroUsize, sync::Arc};
use std::{collections::HashSet, num::NonZeroUsize, sync::Arc, time::SystemTime};
use tokio::sync::RwLock;
use tracing::{debug, error, info, trace};
use ufh::{
actor::ActorSecret,
derived::Derived,
event::{Event, EventContent, WipEvent},
item::ItemRef,
@ -63,15 +68,23 @@ pub struct Items {
search: Tantivy,
blobs: blobs::Client,
p2p: Arc<P2PState>,
server_secret: ActorSecret,
}
impl Items {
pub fn new(db: &Sqlite, blobs: &blobs::Client, search: &Tantivy, p2p: &Arc<P2PState>) -> Items {
pub fn new(
db: &Sqlite,
blobs: &blobs::Client,
search: &Tantivy,
p2p: &Arc<P2PState>,
secret: ActorSecret,
) -> Items {
Items {
db: db.clone(),
blobs: blobs.clone(),
search: search.clone(),
p2p: p2p.clone(),
server_secret: secret,
}
}
@ -205,7 +218,11 @@ impl Items {
}
#[tracing::instrument(skip_all, fields(item_ref))]
pub async fn get(&self, item_ref: &ItemRef, grant: Option<&P2PAuth>) -> Result<Option<Item>, Error> {
pub async fn get(
&self,
item_ref: &ItemRef,
grant: Option<&P2PAuth>,
) -> Result<Option<Item>, Error> {
static CACHE: OnceCell<RwLock<LruCache<ItemRef, Item>>> = OnceCell::new();
let cache = CACHE
.get_or_init(|| RwLock::new(LruCache::new(NonZeroUsize::new(CACHE_SIZE).unwrap())));
@ -227,7 +244,11 @@ impl Items {
// TODO (future, maybe): batch lookup requests per bucket
// this only makes sense when you have a lot of requests concerning one bucket at once
async fn get_uncached(&self, item_ref: &ItemRef, grant: Option<&P2PAuth>) -> Result<Option<Item>, Error> {
async fn get_uncached(
&self,
item_ref: &ItemRef,
grant: Option<&P2PAuth>,
) -> Result<Option<Item>, Error> {
tracing::Span::current().record("item_ref", item_ref.to_string());
if let Some(item) = self.db.event_fetch(item_ref).await? {
@ -237,7 +258,7 @@ impl Items {
match self.blobs.get(item_ref).await {
Ok(Some(blobs::Item::WipEvent(wip))) => {
debug!("event didn't exist in db, re-adding");
return Ok(Some(self.import_event(wip, grant).await?));
return Ok(Some(self.import_event(wip, grant, None).await?));
}
Ok(Some(blobs::Item::Raw(bytes))) => {
// make sure the blob is in db
@ -255,10 +276,19 @@ impl Items {
for hoster in find_hosters(&self.p2p, &NodeId::new_from_ref(item_ref)).await {
trace!("importing item from (host={})", hoster.host);
match self.blobs.get_via(item_ref, &hoster.host, grant).await {
trace!("using secret = {:?}", self.server_secret);
trace!("using id = {}", self.server_secret.get_id());
let auth = self.gen_node_auth();
match self
.blobs
.get_via(item_ref, &hoster.host, &auth, grant)
.await
{
Ok(Some(blobs::Item::WipEvent(wip))) => {
debug!("importing event from other server");
return Ok(Some(self.import_event(wip, grant).await?));
return Ok(Some(
self.import_event(wip, grant, Some(&hoster.host)).await?,
));
}
Ok(Some(blobs::Item::Raw(bytes))) => {
debug!("pulled new blob (size={})", bytes.len());
@ -275,7 +305,7 @@ impl Items {
}
Err(err) => {
trace!("error while fetching: {}", err);
},
}
};
}
@ -285,7 +315,12 @@ impl Items {
}
#[async_recursion::async_recursion]
async fn import_event(&self, wip: WipEvent, grant: Option<&'async_recursion P2PAuth>) -> Result<Item, Error> {
async fn import_event(
&self,
wip: WipEvent,
grant: Option<&'async_recursion P2PAuth>,
host: Option<&'async_recursion String>,
) -> Result<Item, Error> {
if !wip.has_valid_signature() {
return Err(Error::Validation("missing or invalid signature"));
}
@ -297,6 +332,7 @@ impl Items {
}
if let Some(rels) = &wip.relations {
debug!("pulling event (forward) relations");
for item_ref in rels.keys() {
self.get(item_ref, grant).await?;
}
@ -304,8 +340,85 @@ impl Items {
let event = self.create_event(wip).await?.event;
// NOTE: `subscribe` doesn't send old events, so i have to pull them here
// FIXME: i only pull events visible to one user, then never pull again
// FIXME: emit an RPCRequest::Subscribe to events every time a new user views an event for the first time
// note: the subscribe should only be for the root event
if let (Some(grant), Some(host)) = (grant, host) {
let auth = self.gen_node_auth();
let query = ufh::query::QueryBuilder::new()
.with_ref(&event.id)
.with_relation(&ufh::query::QueryRelation::from_rel("*", "*"))
.build();
debug!("pulling event (reverse) relations");
let url = format!("http://{}/things/query", host);
trace!("with query {:?}", query);
trace!("with auth {:?}", auth);
trace!("with grant {:?}", grant);
trace!("with url {}", url);
use serde::Deserialize;
#[derive(Deserialize)]
struct QueryInit {
query: String,
}
let init: QueryInit = reqwest::Client::new()
.post(url)
.header(HEADER_NODE, serde_json::to_string(&auth)?)
.header(HEADER_GRANT, serde_json::to_string(&grant)?)
.json(&query)
.send()
.await?
.error_for_status()?
.json()
.await?;
// TODO: keep pulling in a loop
// FIXME: urlencode query
let url = format!("http://{}/things?query={}", host, init.query);
trace!("init query, begin enumerate with url {}", url);
let query: QueryResult = reqwest::Client::new()
.get(url)
.header(HEADER_NODE, serde_json::to_string(&auth)?)
.header(HEADER_GRANT, serde_json::to_string(&grant)?)
.json(&query)
.send()
.await?
.error_for_status()?
.json()
.await?;
trace!("pulling relations: {:?}", query.relations);
for event in query.relations.unwrap_or_default().into_values() {
let wip = WipEvent {
content: event.content,
relations: Some(event.relations),
sender: event.sender,
derived: None,
signature: Some(event.signature),
origin_ts: event.origin_ts,
};
self.import_event(wip, Some(grant), Some(host)).await?;
}
// TODO: subscribe
// subscribe_to(&self.p2p, &NodeId::new_from_ref(&event.id)).await?;
}
Ok(Item::Event(event))
}
fn gen_node_auth(&self) -> NodeAuth {
let mut auth = NodeAuth {
actor_id: self.server_secret.get_id(),
expires_at: SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
+ 1000 * 60,
signature: None,
};
let json = json_canon::to_vec(&auth).expect("always serializable");
auth.signature = Some(self.server_secret.sign(&json));
auth
}
}
// TODO (performance): cancel if there's no progress on getting "closer" to the target?
@ -370,3 +483,45 @@ async fn find_hosters(p2p: &P2PState, key: &NodeId) -> Vec<Contact> {
vec![]
}
}
// TODO (performance): cancel if there's no progress on getting "closer" to the target?
#[allow(unused)]
async fn subscribe_to(p2p: &P2PState, key: &NodeId) -> Result<usize, Error> {
let hosters = find_hosters(p2p, key).await;
if hosters.is_empty() {
return Ok(0);
}
for hoster in &hosters {
hoster
.send(&p2p.contact, RPCRequest::Subscribe(vec![*key]))
.await?;
}
Ok(hosters.len())
}
#[test]
fn test_node_signature() {
use ufh::actor::ActorId;
let (_, secret) = ActorId::new();
let mut auth = NodeAuth {
actor_id: secret.get_id(),
expires_at: SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
+ 1000 * 60,
signature: None,
};
auth.signature = Some(secret.sign(&json_canon::to_vec(&auth).expect("always signable")));
let stripped = NodeAuth {
actor_id: auth.actor_id,
expires_at: auth.expires_at,
signature: None,
};
assert!(stripped.actor_id.verify(
&json_canon::to_vec(&stripped).expect("always signable"),
&auth.signature.unwrap()
));
}

View file

@ -35,7 +35,7 @@ use tokio::sync::broadcast;
use tokio::sync::{Mutex, RwLock};
use tower_http::{cors::CorsLayer, trace::TraceLayer};
use tracing::{debug, info};
use ufh::actor::ActorId;
use ufh::actor::{ActorId, ActorSecret};
use ufh::item::ItemRef;
use ufh::query::Query;
use figment::{Figment, providers::{Toml, Env}};
@ -70,17 +70,6 @@ type RowId = u32;
// panic!("size of contact: {}", std::mem::size_of::<Contact>());
// }
/*
#[derive(Debug, Serialize, Deserialize)]
pub struct Signed<T: Debug + Serialize + Deserialize> {
#[serde(skip_serialization_if="Option::is_none")]
signature: Option<ActorSignature>,
sender: ActorId,
#[serde(flatten)]
value: T,
}
*/
pub struct P2PState {
contact: Contact,
router: Mutex<peer::Router>,
@ -112,7 +101,7 @@ struct Config {
port: u16,
#[serde(default="default_host")]
host: String,
id: Option<ActorId>,
id: Option<ActorSecret>,
bootstrap: Vec<Contact>,
}
@ -145,7 +134,8 @@ async fn serve(config: &Config) -> Result<(), Error> {
let blob_client = blobs::Client::new(&format!("http://localhost:{}", config.store));
let search = state::search::tantivy::Tantivy::open("data/tantivy").await?;
let event = if let Some(actor_id) = &config.id {
let (event, secret) = if let Some(actor_secret) = &config.id {
let actor_id = actor_secret.get_id();
let query = Query::builder()
.with_sender(&actor_id)
.with_type("x.actor")
@ -164,7 +154,7 @@ async fn serve(config: &Config) -> Result<(), Error> {
}
_ => panic!("server id event type is not x.actor"),
}
event
(event, actor_secret.clone())
} else {
let tmp_p2p = Arc::new(P2PState {
router: Mutex::new(peer::Router::new(NodeId::new_from_rand())),
@ -174,9 +164,9 @@ async fn serve(config: &Config) -> Result<(), Error> {
},
map: Mutex::new(HashMap::new()),
});
let items_client = items::Items::new(&db, &blob_client, &search, &tmp_p2p);
let (actor_id, actor_secret) = ActorId::new();
info!("creating new actor of type Node, id is {}", actor_id);
info!("creating new actor of type Node, id is {}, secret is {:?}", actor_id, actor_secret);
let items_client = items::Items::new(&db, &blob_client, &search, &tmp_p2p, actor_secret.clone());
let mut wip = ufh::event::WipEvent {
content: ufh::event::EventContent::Actor(ufh::event::ActorEvent {
name: config.host.clone(),
@ -195,7 +185,7 @@ async fn serve(config: &Config) -> Result<(), Error> {
wip.signature = Some(actor_secret.sign(wip.to_json().as_bytes()));
let wip = items_client.begin_event_create(wip).await?;
let wip = items_client.commit_event_create(wip).await?;
items_client.finish_event_create(wip).await?.event
(items_client.finish_event_create(wip).await?.event, actor_secret)
};
let node_id = NodeId::new_from_actor(&event.sender);
@ -208,7 +198,7 @@ async fn serve(config: &Config) -> Result<(), Error> {
map: Mutex::new(HashMap::new()),
});
let items_client = items::Items::new(&db, &blob_client, &search, &p2p);
let items_client = items::Items::new(&db, &blob_client, &search, &p2p, secret);
let state = ServerState {
db,
@ -238,7 +228,9 @@ async fn serve(config: &Config) -> Result<(), Error> {
}
info!("bootstrapped {} nodes", contacts);
const ANNOUNCE_TTL: u64 = 1000 * 60 * 15; // 15 minutes
// TODO: increase ttl, it's really low for testing
// const ANNOUNCE_TTL: u64 = 1000 * 60 * 15; // 15 minutes
const ANNOUNCE_TTL: u64 = 1000 * 15; // 15 seconds
loop {
let time = std::time::SystemTime::now()

View file

@ -262,96 +262,3 @@ impl Router {
ret.into_iter().take(count).map(|i| i.0.clone()).collect()
}
}
/*
impl Node {
pub fn new(id: NodeId, host: String) -> Self {
Node {
// id,
router: Router::new(id),
store: HashMap::new(),
contact: Contact { id, host },
}
}
pub async fn bootstrap(&mut self, contact: Contact) {
self.send(&contact, RPCRequest::Ping).await;
self.router.update(contact);
}
async fn send(&mut self, contact: &Contact, message: RPCRequest) -> Option<RPCResponse> {
if let Ok(res) = contact.send(&self.contact, message).await {
return Some(res);
}
// node is dead
self.router.remove(&contact.id);
None
}
pub fn receive(&mut self, sender: &Contact, message: RPCRequest) -> RPCResponse {
self.router.update(sender.clone());
match message {
RPCRequest::Ping => RPCResponse::Ok,
RPCRequest::Store(key, value) => {
self.store.insert(key, value);
RPCResponse::Ok
}
RPCRequest::FindNode(id) => {
let contacts = self.router.find_closest(&id, 20);
RPCResponse::FindNode(contacts)
}
RPCRequest::FindValue(key) => {
if let Some(value) = self.store.get(&key) {
RPCResponse::FindValue(value.clone())
} else {
let contacts = self.router.find_closest(&key, 20);
RPCResponse::FindNode(contacts)
}
}
}
}
pub async fn set(&mut self, key: &NodeId, value: &str) {
let contacts = self.router.find_closest(key, 1);
for contact in contacts {
self.send(&contact, RPCRequest::Store(*key, value.to_owned()))
.await;
}
self.store.insert(*key, value.to_owned());
}
pub async fn get(&mut self, key: &NodeId) -> Option<String> {
if let Some(value) = self.store.get(key) {
Some(value.clone())
} else {
let mut queried = HashSet::new();
let mut nodes = self.router.find_closest(key, 1);
while !nodes.is_empty() {
let contact = nodes.remove(0);
if contact == self.contact {
continue;
}
let Some(response) = self.send(&contact, RPCRequest::FindValue(*key)).await else {
continue;
};
match response {
RPCResponse::FindNode(received) => {
queried.insert(contact.id);
for contact in received {
if !queried.contains(&contact.id) {
nodes.push(contact);
}
}
}
RPCResponse::FindValue(value) => {
return Some(value);
}
RPCResponse::Ok => (),
}
}
None
}
}
}
*/

View file

@ -163,7 +163,7 @@ async fn is_relation_valid(db: &Sqlite, event: &Event, ctx: &Context<'_>) -> Res
}
async fn get_acl(db: &Sqlite, item_ref: &ItemRef) -> Option<Acl> {
let relation = QueryRelation::from_rel("x.acl".into(), "acl".into());
let relation = QueryRelation::from_rel("x.acl", "acl");
let result = db
.query_relations(&[relation], &[item_ref.clone()], Location::Reverse, 1)
.await

View file

@ -69,7 +69,8 @@ async fn message(
}
RPCRequest::Subscribe(node_ids) => {
trace!("receive Subscribe (id={:?})", node_ids);
todo!()
// TODO
RPCResponse::Ok
}
};
Ok(Json(response))

View file

@ -26,13 +26,13 @@ pub struct QueryParams {
query: String,
}
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct QueryResult {
events: Vec<Event>,
pub events: Vec<Event>,
#[serde(skip_serializing_if = "Option::is_none")]
relations: Option<HashMap<ItemRef, Event>>,
pub relations: Option<HashMap<ItemRef, Event>>,
#[serde(skip_serializing_if = "Option::is_none")]
next: Option<String>,
pub next: Option<String>,
}
impl IntoResponse for QueryResult {

View file

@ -9,7 +9,7 @@ use crate::ServerState;
mod blob;
mod check;
pub mod create;
mod enumerate;
pub mod enumerate;
pub mod fetch;
mod query;
pub mod thumbnail;

View file

@ -10,7 +10,11 @@ use ufh::{
event::{Event, EventContent},
};
use crate::{items::Items, ServerState, consts::HEADER_GRANT};
use crate::{
consts::{HEADER_GRANT, HEADER_NODE},
items::Items,
ServerState,
};
use std::{
marker::PhantomData,
sync::Arc,
@ -111,6 +115,14 @@ pub struct P2PAuth {
pub expires_at: u128,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeAuth {
pub actor_id: ActorId,
#[serde(skip_serializing_if = "Option::is_none")]
pub signature: Option<ActorSignature>,
pub expires_at: u128,
}
#[axum::async_trait]
impl<T: AuthLevel> axum::extract::FromRequestParts<Arc<ServerState>> for Authenticate<T> {
type Rejection = (StatusCode, String);
@ -124,42 +136,72 @@ impl<T: AuthLevel> axum::extract::FromRequestParts<Arc<ServerState>> for Authent
.expect("infallible");
let header = <TypedHeader<Authorization<Bearer>>>::from_request_parts(parts, state).await;
trace!("headers = {:?}", headers);
let token = match (&header, headers.get(HEADER_GRANT)) {
let token = match (
&header,
(headers.get(HEADER_NODE), headers.get(HEADER_GRANT)),
) {
(Ok(header), _) => header.token(),
(Err(_), Some(auth)) => {
trace!("request has auth: {:?}", auth);
let auth: P2PAuth = serde_json::from_slice(auth.as_bytes())
(Err(_), (Some(node), Some(grant))) => {
trace!("request has node: {:?}", node);
trace!("request has grant: {:?}", grant);
let mut node: NodeAuth = serde_json::from_slice(node.as_bytes())
.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
trace!("parsed auth: {:?}", auth);
let Some(signature) = auth.signature else {
return Err((StatusCode::BAD_REQUEST, "missing signature".into()));
};
let stripped = P2PAuth {
actor_id: auth.actor_id,
allows: auth.allows,
signature: None,
expires_at: auth.expires_at,
};
if !stripped
.actor_id
.verify(&json_canon::to_vec(&stripped).unwrap(), &signature)
{
return Err((StatusCode::BAD_REQUEST, "bad signature".into()));
}
let mut grant: P2PAuth = serde_json::from_slice(grant.as_bytes())
.map_err(|err| (StatusCode::BAD_REQUEST, err.to_string()))?;
trace!("request has parsed node: {:?}", node);
trace!("request has parsed grant: {:?}", grant);
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
if stripped.expires_at < time {
return Err((StatusCode::BAD_REQUEST, "expired auth".into()));
trace!("current system time (in milliseconds) = {}", time);
// verify node signature
let Some(node_signature) = node.signature.take() else {
trace!("foreign request is bad: missing node signature");
return Err((StatusCode::BAD_REQUEST, "missing node signature".into()));
};
if !node
.actor_id
.verify(&json_canon::to_vec(&node).unwrap(), &node_signature)
{
trace!("foreign request is bad: bad node signature");
return Err((StatusCode::BAD_REQUEST, "bad node signature".into()));
}
if node.expires_at < time {
trace!("foreign request is bad: expired node auth");
return Err((StatusCode::BAD_REQUEST, "expired node auth".into()));
}
// verify grant signature
let Some(grant_signature) = grant.signature.take() else {
trace!("foreign request is bad: missing grant signature");
return Err((StatusCode::BAD_REQUEST, "missing grant signature".into()));
};
if !grant
.actor_id
.verify(&json_canon::to_vec(&grant).unwrap(), &grant_signature)
{
trace!("foreign request is bad: bad grant signature");
return Err((StatusCode::BAD_REQUEST, "bad grant signature".into()));
}
if grant.expires_at < time {
trace!("foreign request is bad: expired grant auth");
return Err((StatusCode::BAD_REQUEST, "expired grant auth".into()));
}
if grant.allows != node.actor_id {
trace!("foreign request is bad: grant doesnt't allow node");
return Err((StatusCode::BAD_REQUEST, "grant doesn't allow node".into()));
}
return Ok(Authenticate {
id: None,
user: Some(stripped.actor_id),
user: Some(grant.actor_id),
level: 1, // servers can only read
_lvl: PhantomData,
});
},
}
(Err(_), _) if T::new().to_num() == 0 => "",
(Err(err), _) => return Err((StatusCode::BAD_REQUEST, err.to_string())),
};
@ -216,7 +258,11 @@ impl<T: AuthLevel> Authenticate<T> {
}
}
pub async fn get_blob(items: &Items, file_event: &Event, grant: Option<&P2PAuth>) -> Result<Bytes, Error> {
pub async fn get_blob(
items: &Items,
file_event: &Event,
grant: Option<&P2PAuth>,
) -> Result<Bytes, Error> {
let EventContent::File(file) = &file_event.content else {
return Err(Error::Validation("not a file event"));
};

44
server/src/signed.rs Normal file
View file

@ -0,0 +1,44 @@
// maybe in the future i could redo code like this?
use ufh::actor::{ActorId, ActorSignature, ActorSecret};
use serde::{Serialize, Deserialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct Signed<T: Serialize> {
signature: ActorSignature,
sender: ActorId,
#[serde(flatten)]
value: T,
}
#[derive(Serialize, Deserialize)]
struct Unsigned<'a, T: Serialize> {
sender: &'a ActorId,
#[serde(flatten)]
value: &'a T,
}
impl<T: Serialize> Signed<T> {
pub fn new(secret: ActorSecret, value: T) -> Signed<T> {
let temp = Unsigned {
sender: &secret.get_id(),
value: &value,
};
let json = json_canon::to_vec(&temp).expect("Signed::T must be serializable!");
let signature = secret.sign(&json);
Signed {
sender: temp.sender,
signature,
value,
}
}
pub fn verify(&self) -> bool {
let temp = Unsigned {
sender: &self.sender,
value: &self.value,
};
let json = json_canon::to_vec(&temp).expect("Signed::T must be serializable!");
self.sender.verify(&json, &self.signature)
}
}

View file

@ -332,29 +332,72 @@ impl Database for Sqlite {
WHERE 1 = 1
",
);
builder.push(" AND events_from.flags & 1 = 0"); // don't send redacted events
let mut rels_from_rel = Vec::new();
let mut rels_from_rel_to = Vec::new();
// don't send redacted events
builder.push(" AND events_from.flags & 1 = 0 AND (1 = 0");
for relation in relations {
match relation {
QueryRelation::FromRel(q) => rels_from_rel.push(q),
QueryRelation::FromRelTo(q) => rels_from_rel_to.push(q),
// this code is a mess and should be cleaned up
enum Query<'a> {
FromRel(&'a str, &'a str),
FromRelTo(&'a str, &'a str, &'a str),
}
let matchable = match relation {
QueryRelation::FromRel(from_type, rel_type) => Query::FromRel(from_type.as_str(), rel_type.as_str()),
QueryRelation::FromRelTo(from_type, rel_type, to_type) => Query::FromRelTo(from_type.as_str(), rel_type.as_str(), to_type.as_str()),
};
match matchable {
Query::FromRel("*", "*") | Query::FromRelTo("*", "*", "*") => {
builder.push(" OR 1 = 1 ");
}
Query::FromRel("*", rel_type) | Query::FromRelTo("*", rel_type, "*") => {
builder
.push(" OR graph.rel_type = ")
.push_bind(rel_type);
}
Query::FromRel(from_type, "*") | Query::FromRelTo(from_type, "*", "*") => {
builder
.push(" OR events_from.type = ")
.push_bind(from_type);
}
Query::FromRelTo("*", "*", to_type) => {
builder
.push(" OR events_to.type = ")
.push_bind(to_type);
}
Query::FromRelTo(from_type, "*", to_type) => {
builder
.push(" OR (events_from.type = ")
.push_bind(from_type)
.push(" AND events_to.type = ")
.push_bind(to_type)
.push(")");
}
Query::FromRelTo("*", rel_type, to_type) => {
builder
.push(" OR (graph.rel_type = ")
.push_bind(rel_type)
.push(" AND events_to.type = ")
.push_bind(to_type)
.push(")");
}
Query::FromRel(from_type, rel_type) | Query::FromRelTo(from_type, rel_type, "*") => {
builder
.push(" OR (events_from.type = ")
.push_bind(from_type)
.push(" AND graph.rel_type = ")
.push_bind(rel_type)
.push(")");
}
Query::FromRelTo(from_type, rel_type, to_type) => {
builder
.push(" OR (events_from.type = ")
.push_bind(from_type)
.push(" AND graph.rel_type = ")
.push_bind(rel_type)
.push(" AND events_to.type = ")
.push_bind(to_type)
.push(")");
}
}
}
builder.push(" AND (1 = 0");
if !rels_from_rel.is_empty() {
builder.push(" OR (events_from.type, graph.rel_type) IN");
builder.push_tuples(&rels_from_rel, |mut q, tup| {
q.push_bind(tup.0.clone()).push_bind(tup.1.clone());
});
}
if !rels_from_rel_to.is_empty() {
builder.push(" OR (events_from.type, graph.rel_type, events_to.type) IN");
builder.push_tuples(rels_from_rel_to, |mut q, tup| {
q.push_bind(tup.0.clone())
.push_bind(tup.1.clone())
.push_bind(tup.2.clone());
});
}
builder.push(")");
match after {

1322
store-fs/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -9,7 +9,7 @@ edition = "2021"
[dependencies]
axum = { version = "0.6.18", features = ["macros", "headers"] }
futures-util = "0.3.28"
serde = { version = "1.0.163", features = ["derive"] }
serde = { version = "=1.0.171", features = ["derive"] }
serde_json = "1.0.96"
sha2 = { version = "0.10.6", features = ["asm"] }
sqlx = { version = "0.6.3", features = ["sqlite", "runtime-tokio-rustls", "offline"] }