From 60ace51b1f122e22a9244f8ee808bbe09d5bd8d9 Mon Sep 17 00:00:00 2001 From: tezlm Date: Wed, 19 Jul 2023 19:19:36 -0700 Subject: [PATCH] blob server long polling --- Cargo.lock | 1 + laundry.md | 3 ++- server/Cargo.toml | 1 + server/src/blobs.rs | 4 ++- server/src/derive.rs | 5 ++++ server/src/items/events.rs | 4 ++- server/src/main.rs | 3 ++- server/src/routes/things/enumerate.rs | 16 +++--------- server/src/routes/util.rs | 11 +++++++++ spec/notes.md | 14 +++++++---- store-fs/migrations/20230603122256_blobs.sql | 2 +- store-fs/src/fs.rs | 26 ++++++++------------ store-fs/src/main.rs | 22 ++++++----------- 13 files changed, 59 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 697a70e..2d5ec7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1992,6 +1992,7 @@ version = "0.1.0" dependencies = [ "async-recursion", "axum", + "bitflags 2.3.3", "bytes", "clap", "futures-util", diff --git a/laundry.md b/laundry.md index a146b28..0f019df 100644 --- a/laundry.md +++ b/laundry.md @@ -25,7 +25,8 @@ - [ ] full text search - [ ] website embed api? - [-] ui [0] - - [-] image/file view + - [x] fuse + - [x] image/file view - [ ] link view (eg. bookmarks or rss feeds) - [ ] forum - [ ] tests [1] diff --git a/server/Cargo.toml b/server/Cargo.toml index 59808fb..7fa4608 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] async-recursion = "1.0.4" axum = { version = "0.6.18", features = ["macros", "headers", "ws"] } +bitflags = "2.3.3" bytes = "1.4.0" clap = { version = "4.3.14", features = ["derive"] } futures-util = "0.3.28" diff --git a/server/src/blobs.rs b/server/src/blobs.rs index 84ac5d0..1d9bbeb 100644 --- a/server/src/blobs.rs +++ b/server/src/blobs.rs @@ -10,6 +10,8 @@ use crate::Error; // -> for this, i need to find out how to split blobs (every backend // -> gets a copy? each blob goes to 2 of 3 servers (or similar splitting)?) +// TODO (maybe): receive events from blobserver via long polling + #[derive(Debug, serde::Deserialize)] struct UploadResponse { #[serde(rename = "ref")] @@ -60,7 +62,7 @@ impl Client { debug!("put blob"); let bytes = item.to_bytes(); let res = self.http - .post(format!("{}/blobs/upload", self.base_url)) + .post(format!("{}/blobs", self.base_url)) .header("content-length", bytes.len()) .body(bytes.clone()) .send() diff --git a/server/src/derive.rs b/server/src/derive.rs index 6814178..e53474b 100644 --- a/server/src/derive.rs +++ b/server/src/derive.rs @@ -78,6 +78,11 @@ struct FFprobeDataWrapper ( #[allow(unused)] /// derive tag information from media, like images/video/audio pub async fn derive_media(buffer: &[u8]) -> Option { + match infer::get(buffer)?.matcher_type() { + MatcherType::Image | MatcherType::Audio | MatcherType::Video => (), + _ => return None, + }; + use tokio::process::Command; use std::process::{Stdio, ChildStdin}; use std::io::{BufWriter, Cursor}; diff --git a/server/src/items/events.rs b/server/src/items/events.rs index 7f92694..a8a9463 100644 --- a/server/src/items/events.rs +++ b/server/src/items/events.rs @@ -55,7 +55,7 @@ pub async fn handle_special(db: &Sqlite, event: &Event, relations: &Relations) - debug!("validated redaction"); db.redact_event(rel_ref).await?; - + // TODO: garbage collect unreferenced blobs } }, @@ -89,6 +89,8 @@ pub async fn derive(me: &Items, event: &Event, file: &FileEvent) -> Result Response> { version: env!("CARGO_PKG_VERSION").to_string(), }, features: Vec::from([ + "core".into(), "thumbnail".into(), - // "share".into(), + // "alias".into(), // "account".into(), // "session".into(), ]), diff --git a/server/src/routes/things/enumerate.rs b/server/src/routes/things/enumerate.rs index 9f4096b..36026d3 100644 --- a/server/src/routes/things/enumerate.rs +++ b/server/src/routes/things/enumerate.rs @@ -70,18 +70,10 @@ pub async fn route( loop { break match recv.recv().await { Err(err) => Err(err.into()), - // Ok((event, relations, rowid)) => match query.matches(&event, &relations) { - // mt @ MatchType::Event => Ok((mt, event, rowid)), - // mt @ MatchType::Relation => Ok((mt, event, rowid)), - // MatchType::None => continue, - // }, - Ok((event, relations, rowid)) => { - let m= query.matches(&event, &relations); - match m { - mt @ MatchType::Event => Ok((mt, event, rowid)), - mt @ MatchType::Relation => Ok((mt, event, rowid)), - MatchType::None => continue, - } + Ok((event, relations, rowid)) => match query.matches(&event, &relations) { + mt @ MatchType::Event => Ok((mt, event, rowid)), + mt @ MatchType::Relation => Ok((mt, event, rowid)), + MatchType::None => continue, }, } } diff --git a/server/src/routes/util.rs b/server/src/routes/util.rs index 3fc6520..20835de 100644 --- a/server/src/routes/util.rs +++ b/server/src/routes/util.rs @@ -11,6 +11,17 @@ use self::perms::AuthLevel; use super::things::Error; use crate::items::Item; +bitflags::bitflags! { + struct Permissions: u8 { + const READ = 0x01; + const WRITE = 0x02; + const LIST = 0x04; + const ALIAS = 0x08; + const SUDO = 0x10; + const ADMIN = 0x20; + } +} + pub mod perms { // surely theres a better method pub trait AuthLevel { diff --git a/spec/notes.md b/spec/notes.md index 15ec391..a47117b 100644 --- a/spec/notes.md +++ b/spec/notes.md @@ -254,10 +254,14 @@ Example of an acl: } ``` -## db +## features -Store `WipEvent`s in blob storage and full `Event`s in db (with id and derived). +Different servers can have different features. Here are the official +ones so far: -## code - -A lot of code is being duplicated, especially the `enum Item`s. +- `core`: supports the core api (`/things/...`) +- `aliase`: server name -> hash mappings +- `thumbnail`: generates small images/icons for x.file events +- `account`: users can manage accounts +- `session`: users can manage sessions +- `share`: deprecated diff --git a/store-fs/migrations/20230603122256_blobs.sql b/store-fs/migrations/20230603122256_blobs.sql index 2e6abbc..0930516 100644 --- a/store-fs/migrations/20230603122256_blobs.sql +++ b/store-fs/migrations/20230603122256_blobs.sql @@ -1 +1 @@ -CREATE TABLE blobs (id INTEGER PRIMARY KEY AUTOINCREMENT, hash TEXT NOT NULL); +CREATE TABLE blobs (hash TEXT PRIMARY KEY NOT NULL); diff --git a/store-fs/src/fs.rs b/store-fs/src/fs.rs index 2be7c04..eef3c9a 100644 --- a/store-fs/src/fs.rs +++ b/store-fs/src/fs.rs @@ -33,11 +33,11 @@ impl FileStore { Ok(Self { blob_path, ref_db: db, - stream: broadcast::channel(32).0, + stream: broadcast::channel(1).0, // set low for now, to check for bad code }) } - pub async fn put(&mut self, blob: &[u8]) -> Result { + pub async fn put(&self, blob: &[u8]) -> Result { let hash = { let mut hasher = sha2::Sha224::default(); hasher.update(blob); @@ -48,30 +48,26 @@ impl FileStore { tokio::fs::write(path, blob).await?; let item_ref_str = item_ref.to_string(); - query!("INSERT INTO blobs (hash) VALUES (?)", item_ref_str).execute(&self.ref_db).await?; + query!("INSERT OR IGNORE INTO blobs (hash) VALUES (?)", item_ref_str).execute(&self.ref_db).await?; - // for poller in &self.pollers { - // println!("sending ref..."); - // _ = poller.send_async(item_ref.clone()).await; - // println!("sent ref"); - // } + let _ = self.stream.send(item_ref.clone()); Ok(item_ref) } - pub async fn get(&mut self, item: &ItemRef) -> Result, Error> { + pub async fn get(&self, item: &ItemRef) -> Result, Error> { let file_path = self.blob_path.join(item.to_string()); Ok(tokio::fs::read(file_path).await?) } - pub async fn delete(&mut self, item: &ItemRef) -> Result<(), Error> { + pub async fn delete(&self, item: &ItemRef) -> Result<(), Error> { let file_path = self.blob_path.join(item.to_string()); let item_str = item.to_string(); query!("DELETE FROM blobs WHERE hash = ?", item_str).execute(&self.ref_db).await?; Ok(tokio::fs::remove_file(file_path).await?) } - pub async fn list(&mut self, after: Option, limit: usize, timeout: Option) -> Result, Error> { + pub async fn list(&self, after: Option, limit: usize, timeout: Option) -> Result, Error> { // this code doesn't seem good but works use futures_util::TryStreamExt as _; @@ -79,7 +75,7 @@ impl FileStore { if let Some(after) = after { let after_str = after.to_string(); let limit = limit as u32; - let mut rows = query!("SELECT hash FROM blobs WHERE id > (SELECT id FROM blobs WHERE hash = ?) LIMIT ?", after_str, limit) + let mut rows = query!("SELECT hash FROM blobs WHERE rowid > (SELECT rowid FROM blobs WHERE hash = ?) ORDER BY rowid LIMIT ?", after_str, limit) .fetch(&self.ref_db); while let Ok(Some(row)) = rows.try_next().await { let item_str: String = row.hash; @@ -88,7 +84,7 @@ impl FileStore { } } else { let limit = limit as u32; - let mut rows = query!("SELECT hash FROM blobs LIMIT ?", limit) + let mut rows = query!("SELECT hash FROM blobs ORDER BY rowid LIMIT ?", limit) .fetch(&self.ref_db); while let Ok(Some(row)) = rows.try_next().await { let item_str: String = row.hash; @@ -98,13 +94,11 @@ impl FileStore { }; if let Some(timeout) = timeout { - let timeout = std::time::Duration::from_millis(timeout); if entries.is_empty() { + let timeout = std::time::Duration::from_millis(timeout); if let Ok(result) = tokio::time::timeout(timeout, self.stream.subscribe().recv()).await { let item = result.map_err(|_| Error::Static("couldnt receive from channel"))?; return Ok(vec![item]); - } else { - return Ok(entries); } } } diff --git a/store-fs/src/main.rs b/store-fs/src/main.rs index f5549ca..8b77ebf 100644 --- a/store-fs/src/main.rs +++ b/store-fs/src/main.rs @@ -3,8 +3,6 @@ use axum::http::StatusCode; use futures_util::StreamExt; use serde_json::{Value, json}; use std::sync::Arc; -use tokio::sync::Mutex; - use ufh::item::ItemRef; mod fs; @@ -16,19 +14,18 @@ mod error; // TODO: error handling enum struct UfhState { - store: Mutex, + store: fs::FileStore, } #[tokio::main] async fn main() -> Result<(), Box> { let cwd = std::env::current_dir().expect("couldn't get current working directory!"); let state = UfhState { - store: Mutex::new(fs::FileStore::init(cwd.join("data")).await?), + store: fs::FileStore::init(cwd.join("data")).await?, }; let router = axum::Router::new() - .route("/blobs/upload", axum::routing::post(blob_upload)) // TODO: rename to just `/blobs` instead of `/blobs/upload`? + .route("/blobs", axum::routing::get(blob_list).post(blob_upload)) .route("/blobs/:item_ref", axum::routing::get(blob_download).delete(blob_delete)) - .route("/blobs/", axum::routing::get(blob_list)) .with_state(Arc::new(state)); axum::Server::bind(&"0.0.0.0:3219".parse().unwrap()) .serve(router.into_make_service()) @@ -58,8 +55,7 @@ async fn blob_upload( chunks.push(chunk); } - let item_ref = state.store.lock() - .await + let item_ref = state.store .put(&chunks.concat()) .await .map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": err.to_string() }))))?; @@ -70,20 +66,17 @@ async fn blob_download( State(state): State>, path: Path, ) -> Result, (StatusCode, Json)> { - state.store.lock() - .await + state.store .get(&path.item_ref) .await .map_err(|err| (StatusCode::NOT_FOUND, Json(json!({ "error": err.to_string() })))) } -#[axum::debug_handler] async fn blob_delete( State(state): State>, path: Path, ) -> Result<(), Json> { - state.store.lock() - .await + state.store .delete(&path.item_ref) .await .map_err(|err| Json(json!({ "error": err.to_string() }))) @@ -94,8 +87,7 @@ async fn blob_list( Query(query): Query, ) -> Result, Json> { let limit = query.limit.unwrap_or(20).min(100); - let list = state.store.lock() - .await + let list = state.store .list(query.after, limit, query.timeout) .await .map_err(|err| Json(json!({ "error": err.to_string() })))?;