blob server long polling

This commit is contained in:
tezlm 2023-07-19 19:19:36 -07:00
parent 6ec1a76339
commit 60ace51b1f
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
13 changed files with 59 additions and 53 deletions

1
Cargo.lock generated
View file

@ -1992,6 +1992,7 @@ version = "0.1.0"
dependencies = [
"async-recursion",
"axum",
"bitflags 2.3.3",
"bytes",
"clap",
"futures-util",

View file

@ -25,7 +25,8 @@
- [ ] full text search
- [ ] website embed api?
- [-] ui [0]
- [-] image/file view
- [x] fuse
- [x] image/file view
- [ ] link view (eg. bookmarks or rss feeds)
- [ ] forum
- [ ] tests [1]

View file

@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
async-recursion = "1.0.4"
axum = { version = "0.6.18", features = ["macros", "headers", "ws"] }
bitflags = "2.3.3"
bytes = "1.4.0"
clap = { version = "4.3.14", features = ["derive"] }
futures-util = "0.3.28"

View file

@ -10,6 +10,8 @@ use crate::Error;
// -> for this, i need to find out how to split blobs (every backend
// -> gets a copy? each blob goes to 2 of 3 servers (or similar splitting)?)
// TODO (maybe): receive events from blobserver via long polling
#[derive(Debug, serde::Deserialize)]
struct UploadResponse {
#[serde(rename = "ref")]
@ -60,7 +62,7 @@ impl Client {
debug!("put blob");
let bytes = item.to_bytes();
let res = self.http
.post(format!("{}/blobs/upload", self.base_url))
.post(format!("{}/blobs", self.base_url))
.header("content-length", bytes.len())
.body(bytes.clone())
.send()

View file

@ -78,6 +78,11 @@ struct FFprobeDataWrapper (
#[allow(unused)]
/// derive tag information from media, like images/video/audio
pub async fn derive_media(buffer: &[u8]) -> Option<DeriveMedia> {
match infer::get(buffer)?.matcher_type() {
MatcherType::Image | MatcherType::Audio | MatcherType::Video => (),
_ => return None,
};
use tokio::process::Command;
use std::process::{Stdio, ChildStdin};
use std::io::{BufWriter, Cursor};

View file

@ -55,7 +55,7 @@ pub async fn handle_special(db: &Sqlite, event: &Event, relations: &Relations) -
debug!("validated redaction");
db.redact_event(rel_ref).await?;
// TODO: garbage collect unreferenced blobs
}
},
@ -89,6 +89,8 @@ pub async fn derive(me: &Items, event: &Event, file: &FileEvent) -> Result<Deriv
let bytes = get_blob(me, event, None).await?;
let derived_file = derive_file(&bytes, file.name.as_deref());
let derived_media = derive_media(&bytes).await;
// TODO: fts
// let derived_text = derive_text(&bytes).await;
let derived = Derived {
file: Some(derived_file),
media: derived_media,

View file

@ -130,8 +130,9 @@ async fn instance_info() -> Response<Json<Info>> {
version: env!("CARGO_PKG_VERSION").to_string(),
},
features: Vec::from([
"core".into(),
"thumbnail".into(),
// "share".into(),
// "alias".into(),
// "account".into(),
// "session".into(),
]),

View file

@ -70,18 +70,10 @@ pub async fn route(
loop {
break match recv.recv().await {
Err(err) => Err(err.into()),
// Ok((event, relations, rowid)) => match query.matches(&event, &relations) {
// mt @ MatchType::Event => Ok((mt, event, rowid)),
// mt @ MatchType::Relation => Ok((mt, event, rowid)),
// MatchType::None => continue,
// },
Ok((event, relations, rowid)) => {
let m= query.matches(&event, &relations);
match m {
mt @ MatchType::Event => Ok((mt, event, rowid)),
mt @ MatchType::Relation => Ok((mt, event, rowid)),
MatchType::None => continue,
}
Ok((event, relations, rowid)) => match query.matches(&event, &relations) {
mt @ MatchType::Event => Ok((mt, event, rowid)),
mt @ MatchType::Relation => Ok((mt, event, rowid)),
MatchType::None => continue,
},
}
}

View file

@ -11,6 +11,17 @@ use self::perms::AuthLevel;
use super::things::Error;
use crate::items::Item;
bitflags::bitflags! {
struct Permissions: u8 {
const READ = 0x01;
const WRITE = 0x02;
const LIST = 0x04;
const ALIAS = 0x08;
const SUDO = 0x10;
const ADMIN = 0x20;
}
}
pub mod perms {
// surely theres a better method
pub trait AuthLevel {

View file

@ -254,10 +254,14 @@ Example of an acl:
}
```
## db
## features
Store `WipEvent`s in blob storage and full `Event`s in db (with id and derived).
Different servers can have different features. Here are the official
ones so far:
## code
A lot of code is being duplicated, especially the `enum Item`s.
- `core`: supports the core api (`/things/...`)
- `aliase`: server name -> hash mappings
- `thumbnail`: generates small images/icons for x.file events
- `account`: users can manage accounts
- `session`: users can manage sessions
- `share`: deprecated

View file

@ -1 +1 @@
CREATE TABLE blobs (id INTEGER PRIMARY KEY AUTOINCREMENT, hash TEXT NOT NULL);
CREATE TABLE blobs (hash TEXT PRIMARY KEY NOT NULL);

View file

@ -33,11 +33,11 @@ impl FileStore {
Ok(Self {
blob_path,
ref_db: db,
stream: broadcast::channel(32).0,
stream: broadcast::channel(1).0, // set low for now, to check for bad code
})
}
pub async fn put(&mut self, blob: &[u8]) -> Result<ItemRef, Error> {
pub async fn put(&self, blob: &[u8]) -> Result<ItemRef, Error> {
let hash = {
let mut hasher = sha2::Sha224::default();
hasher.update(blob);
@ -48,30 +48,26 @@ impl FileStore {
tokio::fs::write(path, blob).await?;
let item_ref_str = item_ref.to_string();
query!("INSERT INTO blobs (hash) VALUES (?)", item_ref_str).execute(&self.ref_db).await?;
query!("INSERT OR IGNORE INTO blobs (hash) VALUES (?)", item_ref_str).execute(&self.ref_db).await?;
// for poller in &self.pollers {
// println!("sending ref...");
// _ = poller.send_async(item_ref.clone()).await;
// println!("sent ref");
// }
let _ = self.stream.send(item_ref.clone());
Ok(item_ref)
}
pub async fn get(&mut self, item: &ItemRef) -> Result<Vec<u8>, Error> {
pub async fn get(&self, item: &ItemRef) -> Result<Vec<u8>, Error> {
let file_path = self.blob_path.join(item.to_string());
Ok(tokio::fs::read(file_path).await?)
}
pub async fn delete(&mut self, item: &ItemRef) -> Result<(), Error> {
pub async fn delete(&self, item: &ItemRef) -> Result<(), Error> {
let file_path = self.blob_path.join(item.to_string());
let item_str = item.to_string();
query!("DELETE FROM blobs WHERE hash = ?", item_str).execute(&self.ref_db).await?;
Ok(tokio::fs::remove_file(file_path).await?)
}
pub async fn list(&mut self, after: Option<ItemRef>, limit: usize, timeout: Option<u64>) -> Result<Vec<ItemRef>, Error> {
pub async fn list(&self, after: Option<ItemRef>, limit: usize, timeout: Option<u64>) -> Result<Vec<ItemRef>, Error> {
// this code doesn't seem good but works
use futures_util::TryStreamExt as _;
@ -79,7 +75,7 @@ impl FileStore {
if let Some(after) = after {
let after_str = after.to_string();
let limit = limit as u32;
let mut rows = query!("SELECT hash FROM blobs WHERE id > (SELECT id FROM blobs WHERE hash = ?) LIMIT ?", after_str, limit)
let mut rows = query!("SELECT hash FROM blobs WHERE rowid > (SELECT rowid FROM blobs WHERE hash = ?) ORDER BY rowid LIMIT ?", after_str, limit)
.fetch(&self.ref_db);
while let Ok(Some(row)) = rows.try_next().await {
let item_str: String = row.hash;
@ -88,7 +84,7 @@ impl FileStore {
}
} else {
let limit = limit as u32;
let mut rows = query!("SELECT hash FROM blobs LIMIT ?", limit)
let mut rows = query!("SELECT hash FROM blobs ORDER BY rowid LIMIT ?", limit)
.fetch(&self.ref_db);
while let Ok(Some(row)) = rows.try_next().await {
let item_str: String = row.hash;
@ -98,13 +94,11 @@ impl FileStore {
};
if let Some(timeout) = timeout {
let timeout = std::time::Duration::from_millis(timeout);
if entries.is_empty() {
let timeout = std::time::Duration::from_millis(timeout);
if let Ok(result) = tokio::time::timeout(timeout, self.stream.subscribe().recv()).await {
let item = result.map_err(|_| Error::Static("couldnt receive from channel"))?;
return Ok(vec![item]);
} else {
return Ok(entries);
}
}
}

View file

@ -3,8 +3,6 @@ use axum::http::StatusCode;
use futures_util::StreamExt;
use serde_json::{Value, json};
use std::sync::Arc;
use tokio::sync::Mutex;
use ufh::item::ItemRef;
mod fs;
@ -16,19 +14,18 @@ mod error;
// TODO: error handling enum
struct UfhState {
store: Mutex<fs::FileStore>,
store: fs::FileStore,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let cwd = std::env::current_dir().expect("couldn't get current working directory!");
let state = UfhState {
store: Mutex::new(fs::FileStore::init(cwd.join("data")).await?),
store: fs::FileStore::init(cwd.join("data")).await?,
};
let router = axum::Router::new()
.route("/blobs/upload", axum::routing::post(blob_upload)) // TODO: rename to just `/blobs` instead of `/blobs/upload`?
.route("/blobs", axum::routing::get(blob_list).post(blob_upload))
.route("/blobs/:item_ref", axum::routing::get(blob_download).delete(blob_delete))
.route("/blobs/", axum::routing::get(blob_list))
.with_state(Arc::new(state));
axum::Server::bind(&"0.0.0.0:3219".parse().unwrap())
.serve(router.into_make_service())
@ -58,8 +55,7 @@ async fn blob_upload(
chunks.push(chunk);
}
let item_ref = state.store.lock()
.await
let item_ref = state.store
.put(&chunks.concat())
.await
.map_err(|err| (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": err.to_string() }))))?;
@ -70,20 +66,17 @@ async fn blob_download(
State(state): State<Arc<UfhState>>,
path: Path<DownloadPath>,
) -> Result<Vec<u8>, (StatusCode, Json<Value>)> {
state.store.lock()
.await
state.store
.get(&path.item_ref)
.await
.map_err(|err| (StatusCode::NOT_FOUND, Json(json!({ "error": err.to_string() }))))
}
#[axum::debug_handler]
async fn blob_delete(
State(state): State<Arc<UfhState>>,
path: Path<DownloadPath>,
) -> Result<(), Json<Value>> {
state.store.lock()
.await
state.store
.delete(&path.item_ref)
.await
.map_err(|err| Json(json!({ "error": err.to_string() })))
@ -94,8 +87,7 @@ async fn blob_list(
Query(query): Query<ListQuery>,
) -> Result<Json<Value>, Json<Value>> {
let limit = query.limit.unwrap_or(20).min(100);
let list = state.store.lock()
.await
let list = state.store
.list(query.after, limit, query.timeout)
.await
.map_err(|err| Json(json!({ "error": err.to_string() })))?;