better range requests

This commit is contained in:
tezlm 2023-08-10 19:06:04 -07:00
parent ee2542ed87
commit 9afa466887
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
15 changed files with 289 additions and 47 deletions

View file

@ -25,7 +25,7 @@ use ufh::{
item::{HashType, ItemRef},
query::{Query, QueryRelation},
};
use ascii_table::{AsciiTable, Align as AsciiAlign};
// use ascii_table::{AsciiTable, Align as AsciiAlign};
use crate::net::PutItem;
pub type Error = Box<dyn std::error::Error>;

View file

@ -3,7 +3,7 @@ use std::collections::{HashSet, HashMap};
// use crate::event::EventContent;
use serde_json::Value;
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
pub struct Derived {
#[serde(skip_serializing_if = "Option::is_none", default)]
pub file: Option<DeriveFile>,
@ -21,7 +21,7 @@ pub struct Derived {
pub etc: HashMap<String, Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct DeriveFile {
pub size: u64,
pub mime: String,
@ -30,7 +30,7 @@ pub struct DeriveFile {
#[serde(skip_serializing_if = "Option::is_none")]
pub width: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub duration: Option<u64>,
pub duration: Option<f64>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
@ -50,9 +50,6 @@ pub struct DeriveMedia {
// this seems to already be adding a lot of implementation details
// is it worth adding a ton of fields or is `exif: HashMap<String, String>` fine?
// #[serde(skip_serializing_if = "HashMap::is_empty")] pub other: HashMap<DeriveComment>,
// unneeded due to `/things/{item_ref}/thumbnail` existing already?
// #[serde(skip_serializing_if = "Vec::is_empty")] thumbnails: Vec<String>,
}
impl Derived {

View file

@ -4,15 +4,24 @@ use std::collections::HashMap;
use std::io::Cursor;
use std::process::Stdio;
use tokio::process::Command;
use tracing::debug;
use tracing::{debug, trace};
use ufh::derived::DeriveMedia;
#[derive(Debug, Serialize, Deserialize)]
pub struct Ffprobe {
pub packets: Vec<Packet>,
pub streams: Vec<Stream>,
pub format: Format,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Packet {
#[serde(rename = "stream_index")]
pub index: u64,
#[serde(rename = "duration_time")]
pub duration: Option<String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Stream {
pub index: u64,
@ -56,6 +65,8 @@ impl Ffprobe {
"json",
"-show_streams",
"-show_format",
// "-show_chapters",
"-show_packets",
"-",
])
.stdin(Stdio::piped())
@ -64,14 +75,29 @@ impl Ffprobe {
.spawn()
.expect("couldn't find ffprobe");
let mut cmd_stdin = cmd.stdin.take().expect("ffprobe should take stdin");
trace!("begin ffprobing");
let writer = async {
let mut stdin = cmd.stdin.take().expect("ffprobe should have stdin");
let mut cursor = Cursor::new(&buffer);
let _ = tokio::io::copy(&mut cursor, &mut stdin).await;
trace!("copy stdin");
drop(stdin);
};
let reader = async {
let mut stdout = cmd.stdout.take().expect("ffprobe should have stdout");
let mut buffer = Cursor::new(Vec::new());
let _ = tokio::io::copy(&mut stdout, &mut buffer).await;
drop(stdout);
buffer
};
let (_, buffer) = tokio::join!(writer, reader);
if !cmd.wait().await.unwrap().success() {
panic!("ffprobe didnt exit well");
}
let mut cursor = Cursor::new(&buffer);
let _ = tokio::io::copy(&mut cursor, &mut cmd_stdin).await;
drop(cmd_stdin);
let output = cmd.wait_with_output().await.expect("failed to read").stdout;
let string = String::from_utf8(output).expect("ffprobe should output utf8");
trace!("read ffprobe output");
let string = String::from_utf8(buffer.into_inner()).expect("ffprobe should output utf8");
serde_json::from_str(&string).expect("ffprobe should output json")
}
@ -90,7 +116,7 @@ impl Ffprobe {
info
}
pub fn dimensions(&self) -> (Option<u64>, Option<u64>, Option<u64>) {
pub fn dimensions(&self) -> (Option<u64>, Option<u64>, Option<f64>) {
let mut dimensions = (
None,
None,
@ -100,11 +126,23 @@ impl Ffprobe {
let (width, height, duration) = stream.dimensions();
dimensions.0 = dimensions.0.or(width);
dimensions.1 = dimensions.1.or(height);
dimensions.2 = dimensions.2.or(duration);
dimensions.2 = dimensions.2.or(duration.map(|d| d as f64));
}
dimensions.2 = dimensions.2.or_else(|| self.get_duration());
dimensions
}
fn get_duration(&self) -> Option<f64> {
let mut stream_lengths = vec![0.0; self.streams.len()];
for packet in &self.packets {
if let Some(Ok(duration)) = packet.duration.as_ref().map(|s| s.parse::<f64>()) {
stream_lengths[packet.index as usize] += duration;
}
}
stream_lengths.sort_unstable_by(f64::total_cmp);
stream_lengths.last().copied()
}
pub async fn thumbnail(&self, buffer: &[u8]) -> Option<Vec<u8>> {
debug!("generate thumbnail");
match self
@ -156,7 +194,7 @@ impl Tags {
title: map.get("title").map(String::from),
artist: map.get("artist").map(String::from),
album: map.get("album").map(String::from),
comment: map.get("comment").map(String::from),
comment: map.get("synopsis").or_else(|| map.get("comment")).map(String::from),
url: map.get("purl").map(String::from),
}
}

View file

@ -29,7 +29,7 @@ pub async fn get_relations(db: &Sqlite, event: &Event) -> Result<Relations, Erro
let rel_info = &event.relations[&item_ref];
match item {
DbItem::Event(event) => Ok((item_ref, (*event, rel_info.clone()))),
DbItem::Blob => Err(Error::Validation(
DbItem::Blob(_) => Err(Error::Validation(
"some relations are to blobs instead of events",
)),
}
@ -53,7 +53,7 @@ pub async fn prepare_special(
match &event.content {
EventContent::File(f) => {
match me.db.bulk_fetch(&f.chunks, false).await {
Ok(items) if items.iter().all(|i| matches!(i, (_, DbItem::Blob))) => Ok(()),
Ok(items) if items.iter().all(|i| matches!(i, (_, DbItem::Blob(_)))) => Ok(()),
Ok(_) => Err(Error::Validation("one or more chunk items isn't a blob")),
Err(Error::NotFound) => {
Err(Error::Validation("one or more chunk items doesn't exist"))

View file

@ -117,7 +117,6 @@ async fn serve(port: u16) -> Result<(), Error> {
.nest("/search", routes::search::routes())
.nest("/p2p", routes::p2p::routes())
.with_state(Arc::new(state))
.layer(axum::middleware::from_fn(middleware::range))
.layer(axum::middleware::from_fn(middleware::csp))
.layer(CorsLayer::permissive())
.layer(TraceLayer::new_for_http());

View file

@ -18,6 +18,7 @@ pub async fn csp<B>(request: Request<B>, next: Next<B>) -> axum::response::Respo
// i hate range requests and working with range requests now
// this was initially so audio/video would stream properly
// but firefox is stupid and won't work anyway, even though chromium does
#[allow(unused)]
pub async fn range<B>(
range: Option<TypedHeader<axum::headers::Range>>,
request: Request<B>,

View file

@ -97,7 +97,7 @@ pub async fn can_send_event(db: &Sqlite, wip: &WipEvent) -> Result<bool, Error>
let rel_info = &relations[&item_ref];
match item {
DbItem::Event(event) => Ok((item_ref, (*event, rel_info.clone()))),
DbItem::Blob => Err(Error::Validation(
DbItem::Blob(_) => Err(Error::Validation(
"some relations are to blobs instead of events",
)),
}

View file

@ -58,15 +58,13 @@ async fn search(
.search(&params.q, limit, offset, sniplen)
.await?;
let item_refs: Vec<ItemRef> = results.iter().map(|i| i.item_ref.clone()).collect();
let mut items = state.db.bulk_fetch(&item_refs, true).await?;
let results: Vec<SearchDoc> = results
let results = state.db.bulk_fetch(&item_refs, true)
.await?
.into_iter()
.map(|doc| {
let item = items
.remove(&doc.item_ref)
.expect("search results has duplicate or nonexistent event");
.zip(results)
.map(|((_, item), doc)| {
match item {
DbItem::Blob => panic!("search result returned a blob"),
DbItem::Blob(_) => panic!("search result returned a blob"),
DbItem::Event(event) => SearchDoc {
score: doc.score,
snippet: doc.snippet,

View file

@ -1,13 +1,18 @@
use crate::error::Error;
use crate::routes::util::get_blob;
use crate::state::db::{Database, DbItem};
use crate::ServerState;
use axum::extract::{Path, Query, State};
use axum::headers::Range;
use axum::http::{HeaderMap, HeaderValue, StatusCode};
use axum::TypedHeader;
use serde::Deserialize;
use std::ops::Bound;
use std::sync::Arc;
use tracing::debug;
use ufh::event::EventContent;
use ufh::item::ItemRef;
use axum::headers::HeaderMapExt;
use super::Response;
use crate::items::Item;
@ -20,6 +25,7 @@ pub struct BlobParams {
#[tracing::instrument(skip_all, fields(item_ref))]
pub async fn route(
State(state): State<Arc<ServerState>>,
range: Option<TypedHeader<Range>>,
Path(item_ref): Path<ItemRef>,
Query(params): Query<BlobParams>,
) -> Response<(StatusCode, HeaderMap, Vec<u8>)> {
@ -34,12 +40,12 @@ pub async fn route(
Item::Blob(_) => return Err(Error::Validation("this is not an event at all")),
};
debug!("getting blob chunks");
let blob = get_blob(&state.items, &event, params.via.as_deref()).await?;
let mime = event
.derived
.file
.map_or_else(|| String::from("application/octet-stream"), |f| f.mime);
.as_ref()
.map_or_else(|| "application/octet-stream", |f| &f.mime);
// TODO: research and decide whether to use inline or attachment
let disposition = match &file.name {
None => HeaderValue::from_static("inline"),
@ -55,9 +61,109 @@ pub async fn route(
let mut headers = HeaderMap::new();
headers.insert(
"content-type",
mime.try_into().expect("probably is a header value"),
mime.try_into().expect("invalid header value"),
);
headers.insert("content-disposition", disposition);
headers.typed_insert(axum::headers::AcceptRanges::bytes());
Ok((StatusCode::OK, headers, blob.to_vec()))
if let Some(range) = range {
let mut ranges = range.0.iter();
let range = ranges.next().expect("range request has at least one range");
if ranges.next().is_some() {
return Err(Error::Http(
StatusCode::RANGE_NOT_SATISFIABLE,
"multipart ranges aren't supported".into(),
));
}
let blobs: Vec<(ItemRef, usize)> = state
.db
.bulk_fetch(&file.chunks, false)
.await?
.into_iter()
.map(|(item_ref, item)| match item {
DbItem::Event(_) => {
unreachable!("all file chunks should've been validated to be blobs on create")
}
DbItem::Blob(size) => (item_ref, size as usize),
})
.collect();
let total_len: usize = blobs.iter().map(|i| i.1).sum();
match range {
(Bound::Included(start), Bound::Included(end)) => {
debug!("getting blob range({}-{})", start, end);
let (intersection, offset) = slice(blobs, (start as usize, end as usize));
let offset = offset.expect("always exists if range has intersection?");
let mut chunks = Vec::with_capacity(file.chunks.len());
for item_ref in &intersection {
let Item::Blob(blob) = state.items.get(item_ref, params.via.as_deref()).await? else {
unreachable!("file didn't reference a blob");
};
chunks.push(blob);
}
let content_range = axum::headers::ContentRange::bytes(start..=end, Some(total_len as u64)).unwrap();
headers.typed_insert(content_range);
Ok((StatusCode::PARTIAL_CONTENT, headers, chunks.concat()[offset..offset + end as usize - start as usize].to_vec()))
}
(Bound::Included(start), Bound::Unbounded) => {
debug!("getting blob range({}-)", start);
let (intersection, offset) = slice(blobs, (start as usize, total_len));
let offset = offset.expect("always exists if range has intersection?");
let mut chunks = Vec::with_capacity(file.chunks.len());
for item_ref in &intersection {
let Item::Blob(blob) = state.items.get(item_ref, params.via.as_deref()).await? else {
unreachable!("file didn't reference a blob");
};
chunks.push(blob);
}
let content_range = axum::headers::ContentRange::bytes(start.., Some(total_len as u64)).unwrap();
headers.typed_insert(content_range);
Ok((StatusCode::PARTIAL_CONTENT, headers, chunks.concat()[offset..].to_vec()))
}
_ => {
Err(Error::Http(
StatusCode::RANGE_NOT_SATISFIABLE,
"that kind of range isn't supported".into(),
))
}
}
} else {
debug!("getting blob chunks");
let blob = get_blob(&state.items, &event, params.via.as_deref()).await?;
Ok((StatusCode::OK, headers, blob.to_vec()))
}
}
use std::cmp;
/// assumes start1 < end1 && start2 < end2
fn get_overlap((start1, end1): (usize, usize), (start2, end2): (usize, usize)) -> usize {
let closest_end = cmp::min(end1, end2) as isize;
let furthest_start = cmp::max(start1, start2) as isize;
let diff = closest_end - furthest_start;
diff.abs() as usize
}
fn slice(
chunks: Vec<(ItemRef, usize)>,
(start, end): (usize, usize),
) -> (Vec<ItemRef>, Option<usize>) {
let mut start_idx = 0;
let mut intersection = Vec::new();
let mut offset = None;
for (item_ref, size) in chunks {
let end_idx = start_idx + size;
let overlap = get_overlap((start, end), (start_idx, end_idx));
if overlap > 0 {
if offset.is_none() {
offset = Some(start - start_idx);
}
intersection.push(item_ref);
}
start_idx = end_idx;
}
(intersection, offset)
}

View file

@ -31,7 +31,8 @@ pub async fn route(
.db
.bulk_fetch(&options.refs, true)
.await?
.into_keys()
.into_iter()
.map(|(item_ref, _)| item_ref)
.collect();
Ok(Json(BulkCheckResponse { have }))
}

View file

@ -1,3 +1,5 @@
#![allow(unused)]
use crate::items::events::get_relations;
use crate::perms::can_view_event;
use crate::routes::util::{perms, Authenticate};

View file

@ -167,7 +167,6 @@ impl<T: AuthLevel> Authenticate<T> {
}
}
// TODO: allow fetching ranges of files more easily
pub async fn get_blob(
items: &Items,
file_event: &Event,

View file

@ -32,8 +32,7 @@ pub struct QueryResult {
#[derive(Debug)]
pub enum DbItem {
Event(Box<Event>),
// Blob(u64),
Blob,
Blob(u64),
}
#[derive(Debug)]
@ -69,7 +68,7 @@ pub trait Database {
async fn query_events(&self, query: &Query, after: Location, limit: u32) -> Result<QueryResult, Self::Error>;
// return type is currently a bit of a kludge
async fn query_relations(&self, relations: &[QueryRelation], for_events: &[ItemRef], after: Location, limit: u32) -> Result<(HashMap<ItemRef, Event>, Option<u32>), Self::Error>;
async fn bulk_fetch(&self, item_refs: &[ItemRef], partial: bool) -> Result<HashMap<ItemRef, DbItem>, Self::Error>;
async fn bulk_fetch(&self, item_refs: &[ItemRef], partial: bool) -> Result<Vec<(ItemRef, DbItem)>, Self::Error>;
// routes::things::create has a lot of file-specific things
async fn tags_set(&self, item_refs: &[ItemRef], tags: &[String]) -> Result<(), Self::Error>;

View file

@ -6,7 +6,7 @@ use futures_util::TryStreamExt;
use serde_json::Value;
use sqlx::query as sql;
use sqlx::{sqlite::SqliteConnectOptions, QueryBuilder, Row, SqlitePool};
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::RwLock;
@ -365,7 +365,7 @@ impl Database for Sqlite {
&self,
item_refs: &[ItemRef],
partial: bool,
) -> Result<HashMap<ItemRef, DbItem>, Self::Error> {
) -> Result<Vec<(ItemRef, DbItem)>, Self::Error> {
let _lock = self.lock.read().await;
let mut builder = sqlx::QueryBuilder::new(
"
@ -382,13 +382,14 @@ impl Database for Sqlite {
}
builder.push(")");
let records = builder.build().fetch_all(&self.pool).await?;
let mut map = HashMap::with_capacity(records.len());
let mut items = Vec::with_capacity(records.len());
for record in records {
let item_ref_str = record.get("item_ref");
let item_ref = ItemRef::from_str(item_ref_str)?;
let json: String = record.get("json");
let item = if json.is_empty() {
DbItem::Blob
let size: u32 = record.get("size");
DbItem::Blob(size.into())
} else {
let event: Event = serde_json::from_str(&json)?;
let derived: Option<String> = record.get("derived");
@ -398,14 +399,19 @@ impl Database for Sqlite {
.unwrap_or_default();
DbItem::Event(Box::new(Event { derived, ..event }))
};
map.insert(item_ref, item);
items.push((item_ref, item));
}
let has_all = HashSet::<_>::from_iter(item_refs) == HashSet::<_>::from_iter(map.keys());
if !partial && !has_all {
if !partial && item_refs.len() != items.len() {
return Err(Error::NotFound);
}
// TODO (performance): ...
items.sort_by(|(a_ref, _), (b_ref, _)| {
let a_pos = item_refs.iter().position(|i| i == a_ref).unwrap();
let b_pos = item_refs.iter().position(|i| i == b_ref).unwrap();
a_pos.cmp(&b_pos)
});
Ok(map)
Ok(items)
}
async fn tags_set(&self, item_refs: &[ItemRef], tags: &[String]) -> Result<(), Self::Error> {

96
web/lib/api-next.ts Normal file
View file

@ -0,0 +1,96 @@
import * as ed25519 from "@noble/ed25519";
import * as base64Normal from "uint8-to-base64";
import canonicalize from "canonicalize";
export * as ed25519 from "@noble/ed25519";
import EventEmitter from "events";
import TypedEmitter from "typed-emitter";
import log from "./log";
export const base64 = {
encode(data: Uint8Array): string {
return base64Normal
.encode(data)
.replace(/\+/g, "-")
.replace(/\//g, "_")
.replace(/=/g, "")
},
decode(data: string): Uint8Array {
const cleaned = data
.replace(/\-/g, "+")
.replace(/_/g, "/");
return base64Normal.decode(cleaned);
},
}
type ref = string;
interface QueryOptions {
refs?: Array<ref>,
senders?: Array<string>,
types?: Array<string>,
tags?: Array<string>,
relations?: Array<[string, string] | [string, string, string]>,
ephemeral?: Array<[string, string] | [string, string, string]>,
}
type Relations = Record<ref, { type: string, key?: string }>;
type TypedEvents = {
"x.file": { chunks: Array<string>, name?: string },
"x.redact": { reason?: string },
"x.acl": {
roles?: Record<string, Array<[string, string, string]>>,
users?: Record<string, Array<string>>,
admins?: Array<string>,
},
// "x.user": { name?: string },
};
function todo(): never {
throw new Error("not implemented!");
}
// export class Event<
// EventType extends keyof TypedEvents | string,
// EventContent extends (EventType extends keyof TypedEvents ? TypedEvents[EventType] : Record<string, any>),
// > {
// id: string;
// type: EventType;
// content: EventContent | null;
export class Event {
id: string;
type: string;
content: Record<string, any> | null;
sender: string;
signature: string;
origin_ts: number;
derived?: Record<string, any>;
relations?: Relations;
constructor() {
throw new Error("you cannot use the constructor");
}
static from(json: Record<string, any>) {
return Object.assign(new Event(), json);
}
getContent() { return this.derived?.update ?? this.content }
isRedacted () { return this.content === null }
async redact(reason?: string) { todo() }
async edit(content: Record<string, any>): Promise<ref> { todo() }
}
export class Events extends Map<string, Event> {
async fetch(ref: string): Promise<Event> { todo() }
}
export class Query {
}
export class Client {
async createEvent(type: string, content: Record<string, any>, relations: Relations = {}): Promise<ref> { todo() }
async createBlob(blob: ArrayBuffer): Promise<ref> { todo() }
}