This commit is contained in:
tezlm 2023-10-21 23:09:16 -07:00
commit cac684eb95
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
7 changed files with 2538 additions and 0 deletions

3
.gitignore vendored Normal file
View file

@ -0,0 +1,3 @@
/target
/result
/test-db

1998
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

19
Cargo.toml Normal file
View file

@ -0,0 +1,19 @@
[package]
name = "media"
version = "0.1.0"
edition = "2021"
rust-version = "1.73.0"
[dependencies]
axum = { version = "0.6.20", features = ["macros"] }
fastcdc = "3.1.0"
futures = "0.3.28"
moka = { version = "0.12.1", features = ["future"] }
rand = "0.8.5"
reqwest = { version = "0.11.22", features = ["json", "rustls-tls-native-roots"], default-features = false }
rocksdb = "0.21.0"
serde = { version = "1.0.189", features = ["derive"] }
serde_json = "1.0.107"
sha2 = "0.10.8"
thiserror = "1.0.50"
tokio = { version = "1.33.0", features = ["full"] }

121
flake.lock Normal file
View file

@ -0,0 +1,121 @@
{
"nodes": {
"crane": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1697840921,
"narHash": "sha256-zXHwu104SQOxogkMgg+w22c3+zI/FvK83TAkfLmeKw0=",
"owner": "ipetkov",
"repo": "crane",
"rev": "758ae442227103fa501276e8225609a11c99718e",
"type": "github"
},
"original": {
"owner": "ipetkov",
"repo": "crane",
"type": "github"
}
},
"fenix": {
"inputs": {
"nixpkgs": [
"nixpkgs"
],
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1697869218,
"narHash": "sha256-6xvEGBaIFg9nruyvmznfGjWgi5uSezbLyDqgEJKLV90=",
"owner": "nix-community",
"repo": "fenix",
"rev": "595a9eed67a4bf54dfe3e5c3299362a695fef758",
"type": "github"
},
"original": {
"owner": "nix-community",
"repo": "fenix",
"type": "github"
}
},
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1694529238,
"narHash": "sha256-zsNZZGTGnMOf9YpHKJqMSsa0dXbfmxeoJ7xHlrt+xmY=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "ff7b65b44d01cf9ba6a71320833626af21126384",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1697723726,
"narHash": "sha256-SaTWPkI8a5xSHX/rrKzUe+/uVNy6zCGMXgoeMb7T9rg=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "7c9cc5a6e5d38010801741ac830a3f8fd667a7a0",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"crane": "crane",
"fenix": "fenix",
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
}
},
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1697820143,
"narHash": "sha256-wcqAeCB+pv+BzVd1VkXYZ+fyPzMy+YdxaOuLS7XHIhY=",
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "954fb1d673107f3de5cab9b06cb3d1a2323eb6e0",
"type": "github"
},
"original": {
"owner": "rust-lang",
"ref": "nightly",
"repo": "rust-analyzer",
"type": "github"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

95
flake.nix Normal file
View file

@ -0,0 +1,95 @@
# copied from conduit pretty much
{
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs?ref=nixos-unstable";
flake-utils.url = "github:numtide/flake-utils";
fenix = {
url = "github:nix-community/fenix";
inputs.nixpkgs.follows = "nixpkgs";
};
crane = {
url = "github:ipetkov/crane";
inputs.nixpkgs.follows = "nixpkgs";
inputs.flake-utils.follows = "flake-utils";
};
};
outputs =
{ self
, nixpkgs
, flake-utils
, fenix
, crane
}: flake-utils.lib.eachDefaultSystem (system:
let
pkgs = nixpkgs.legacyPackages.${system};
# Use mold on Linux
stdenv = if pkgs.stdenv.isLinux then
pkgs.stdenvAdapters.useMoldLinker pkgs.stdenv
else
pkgs.stdenv;
# Nix-accessible `Cargo.toml`
cargoToml = builtins.fromTOML (builtins.readFile ./Cargo.toml);
# The Rust toolchain to use
toolchain = fenix.packages.${system}.toolchainOf {
# Use the Rust version defined in `Cargo.toml`
channel = cargoToml.package.rust-version;
# THE rust-version HASH
sha256 = "sha256-rLP8+fTxnPHoR96ZJiCa/5Ans1OojI7MLsmSqR2ip8o=";
};
# The system's RocksDB
ROCKSDB_INCLUDE_DIR = "${pkgs.rocksdb}/include";
ROCKSDB_LIB_DIR = "${pkgs.rocksdb}/lib";
# Shared between the package and the devShell
nativeBuildInputs = (with pkgs.rustPlatform; [
bindgenHook
]);
builder =
((crane.mkLib pkgs).overrideToolchain toolchain.toolchain).buildPackage;
in
{
packages.default = builder {
src = ./.;
inherit
stdenv
nativeBuildInputs
ROCKSDB_INCLUDE_DIR
ROCKSDB_LIB_DIR;
};
devShells.default = (pkgs.mkShell.override { inherit stdenv; }) {
# Rust Analyzer needs to be able to find the path to default crate
# sources, and it can read this environment variable to do so
RUST_SRC_PATH = "${toolchain.rust-src}/lib/rustlib/src/rust/library";
inherit
ROCKSDB_INCLUDE_DIR
ROCKSDB_LIB_DIR;
# Development tools
nativeBuildInputs = nativeBuildInputs ++ (with toolchain; [
cargo
clippy
rust-src
rustc
rustfmt
]);
};
checks = {
packagesDefault = self.packages.${system}.default;
devShellsDefault = self.devShells.${system}.default;
};
});
}

121
src/main.rs Normal file
View file

@ -0,0 +1,121 @@
use std::{sync::Arc, io::Cursor};
use axum::{routing::{get, post}, Json, extract::{State, Path, BodyStream}};
use serde::Deserialize;
use storage::{Metadata, Storage};
use serde_json::{json, Value};
use tokio::sync::Mutex;
use futures::stream::StreamExt;
mod storage;
struct Globals {
storage: Mutex<Storage>,
}
#[tokio::main]
async fn main() {
let mut options = rocksdb::Options::default();
options.create_if_missing(true);
options.set_enable_blob_files(true);
let storage = Mutex::new(Storage::new(rocksdb::DB::open(&options, "test-db").unwrap()));
let state = Globals { storage };
let router = axum::Router::new()
.route("/", get(route_it_works))
.route("/_matrix/media/r0/config", get(route_config))
.route("/_matrix/media/v3/config", get(route_config))
.route("/_matrix/media/r0/upload", post(route_upload_sync))
.route("/_matrix/media/v3/upload", post(route_upload_sync))
.route("/_matrix/media/r0/download/:server/:mxc", get(route_download))
.route("/_matrix/media/v3/download/:server/:mxc", get(route_download))
.with_state(Arc::new(state))
;
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(router.into_make_service())
.await
.unwrap();
}
async fn route_it_works() -> &'static str {
"it works!"
}
async fn route_config() -> Json<Value> {
Json(json!({
"m.upload.size": 268435456,
}))
}
async fn route_create() -> &'static str {
todo!()
}
#[axum::debug_handler]
async fn route_upload_sync(State(globals): State<Arc<Globals>>, mut body: BodyStream) -> Result<(), ()> {
let mut chunks = vec![];
while let Some(Ok(body)) = body.next().await {
chunks.extend_from_slice(&body);
}
let meta = Metadata {
server: "celery.eu.org".into(),
mxc: "a1b2c3".into(),
user_id: None,
size: chunks.len() as u64,
content_type: None,
file_name: None,
};
globals.storage.lock().await.insert(&mut Cursor::new(chunks), meta).await.unwrap();
Ok(())
}
async fn route_upload_async() -> &'static str {
todo!()
}
async fn route_download(State(globals): State<Arc<Globals>>, Path((server, mxc)): Path<(String, String)>) -> Result<Vec<u8>, ()> {
let mut globals = globals.storage.lock().await;
let (metadata, chunks) = globals.get_metadata(server, mxc).await.unwrap().unwrap();
let data = globals.get_file(&chunks).await.unwrap().collect();
Ok(data)
}
async fn route_thumbnail() -> &'static str {
todo!()
}
async fn route_download_named() -> &'static str {
todo!()
}
// not technically in spec
async fn route_thumbnail_named() -> &'static str {
todo!()
}
async fn route_preview_url() -> &'static str {
todo!()
}
// GET /admin/stats
async fn route_admin_stats() -> &'static str {
todo!()
}
// POST /admin/purge
async fn route_admin_purge() -> &'static str {
todo!()
}
// POST /admin/users/@mxid:server.tld
async fn route_admin_get_user_config() -> &'static str {
todo!()
}
// PUT /admin/users/@mxid:server.tld
async fn route_admin_put_user_config() -> &'static str {
todo!()
}

181
src/storage.rs Normal file
View file

@ -0,0 +1,181 @@
use std::{io::Read, ops::Deref};
use rocksdb::WriteBatch;
use tokio::sync::Mutex;
use thiserror::Error;
pub struct Storage {
db: rocksdb::DB,
counter: Mutex<u64>,
}
#[derive(Debug)]
pub struct Metadata {
/// the server this media originates from (mxc://[server.tld]/mediaId)
pub server: String,
/// the unique identifier after the server (mxc://server.tld/[mediaId])
pub mxc: String,
/// the user who uploaded this media, may be null if it's remote
pub user_id: Option<String>,
/// the cached size of this media
pub size: u64,
/// the supposed mime type, either sniffed or from the user
pub content_type: Option<String>,
/// the supplied file name from user or remote server
pub file_name: Option<String>,
}
#[repr(u8)]
enum KeyGroup {
/// a global counter for generating blob ids from
Counter, // u8 (id) -> u64
/// metadata for media
MediaMetadata, // server + mxc -> metadata + (size + blobid)[]
/// metadata for blobs
BlobMetadata, // blobid -> hash + references
/// the actual blobs themselves
BlobContent, // blobid -> bytes
/// an index to find a blob from a hash
HashIndex, // hash -> blobid
}
#[derive(Debug, Error)]
pub enum StorageError {
#[error("{0}")]
Rocksdb(#[from] rocksdb::Error),
}
impl Storage {
pub fn new(db: rocksdb::DB) -> Storage {
Storage { db, counter: Mutex::new(0) }
}
pub async fn insert(&mut self, data: &mut (dyn Read + Send + Sync), metadata: Metadata) -> Result<(), StorageError> {
let mut batch = WriteBatch::default();
use fastcdc::v2020::StreamCDC;
let mut size = 0;
let mut blobs = vec![];
let chunker = StreamCDC::new(data, 1024, 8192, 32768);
for chunk in chunker {
let chunk = chunk.unwrap();
let mut key = vec![KeyGroup::BlobContent as u8];
let hash_key = {
use sha2::{Sha256, Digest};
let mut s = Sha256::new();
s.update(&chunk.data);
let mut k = vec![KeyGroup::HashIndex as u8];
k.extend_from_slice(&s.finalize());
k
};
let id = if let Some(existing_id) = self.db.get_pinned(&hash_key)? {
u64::from_be_bytes(existing_id.deref().try_into().unwrap())
} else {
let id = self.next_counter().await?;
key.extend_from_slice(&id.to_be_bytes());
batch.put(key, chunk.data);
batch.put(hash_key, id.to_be_bytes());
id
};
blobs.push(id);
size += chunk.length;
}
let mut key = vec![KeyGroup::MediaMetadata as u8];
let mut data = vec![];
key.extend_from_slice(metadata.server.as_bytes());
key.push(0xff);
key.extend_from_slice(metadata.mxc.as_bytes());
if let Some(user_id) = metadata.user_id {
data.extend_from_slice(user_id.as_bytes());
}
data.push(0xff);
data.extend_from_slice(&size.to_be_bytes());
data.push(0xff);
if let Some(content_type) = metadata.content_type {
data.extend_from_slice(content_type.as_bytes());
}
data.push(0xff);
if let Some(file_name) = metadata.file_name {
data.extend_from_slice(file_name.as_bytes());
}
for blob in blobs {
data.push(0xff);
data.extend_from_slice(&blob.to_be_bytes());
}
batch.put(key, data);
self.db.write(batch)?;
Ok(())
}
pub async fn get_metadata(&mut self, server: String, mxc: String) -> Result<Option<(Metadata, Vec<u64>)>, StorageError> {
let mut key = vec![KeyGroup::MediaMetadata as u8];
key.extend_from_slice(server.as_bytes());
key.push(0xff);
key.extend_from_slice(mxc.as_bytes());
let Some(bytes) = self.db.get_pinned(key)? else { return Ok(None) };
let mut split = bytes.split(|b| *b == 0xff);
let user_id = match split.next().unwrap() {
bytes if bytes.is_empty() => None,
bytes => Some(std::str::from_utf8(bytes).expect("is valid uf8").to_string()),
};
let size = u64::from_be_bytes(split.next().unwrap().try_into().unwrap());
let content_type = match split.next().unwrap() {
bytes if bytes.is_empty() => None,
bytes => Some(std::str::from_utf8(bytes).expect("is valid uf8").to_string()),
};
let file_name = match split.next().unwrap() {
bytes if bytes.is_empty() => None,
bytes => Some(std::str::from_utf8(bytes).expect("is valid uf8").to_string()),
};
let blob_ids = split.map(|s| u64::from_be_bytes(s.try_into().unwrap())).collect();
let metadata = Metadata {
server,
mxc,
user_id,
size,
content_type,
file_name,
};
Ok(Some((metadata, blob_ids)))
}
pub async fn get_file(&mut self, blob_ids: &[u64]) -> Result<Box<dyn Iterator<Item = u8>>, StorageError> {
let keys = blob_ids.iter().map(|i| [KeyGroup::BlobContent as u8].into_iter().chain(i.to_be_bytes()).collect::<Vec<u8>>());
let got = self.db.multi_get(keys).into_iter().collect::<Result<Option<Vec<Vec<u8>>>, _>>()?.unwrap();
Ok(Box::new(got.into_iter().flatten()))
}
async fn next_counter(&self) -> Result<u64, StorageError> {
let mut id = self.counter.lock().await;
let next = match *id {
0 => {
let bytes = self.db.get_pinned([KeyGroup::Counter as u8])?;
let next = u64::from_be_bytes(bytes.as_deref().unwrap_or(&[0, 0, 0, 0, 0, 0, 0, 0]).try_into().unwrap());
next + 1
},
n => n + 1,
};
self.db.put([KeyGroup::Counter as u8], next.to_be_bytes())?;
*id = next;
Ok(next)
}
}