improved fuse

This commit is contained in:
tezlm 2023-07-18 21:52:09 -07:00
parent c8394eb0b3
commit 327e38c627
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
4 changed files with 287 additions and 144 deletions

13
Cargo.lock generated
View file

@ -398,6 +398,8 @@ dependencies = [
"fuser",
"humantime",
"json-canon",
"lru 0.11.0",
"once_cell",
"reqwest",
"serde",
"serde_json",
@ -1278,6 +1280,15 @@ dependencies = [
"hashbrown 0.13.2",
]
[[package]]
name = "lru"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eedb2bdbad7e0634f83989bf596f497b070130daaa398ab22d84c39e266deec5"
dependencies = [
"hashbrown 0.14.0",
]
[[package]]
name = "matchers"
version = "0.1.0"
@ -1987,7 +1998,7 @@ dependencies = [
"image",
"infer",
"json-canon",
"lru",
"lru 0.10.1",
"mime_guess",
"nanoid",
"once_cell",

View file

@ -16,6 +16,8 @@ ed25519-dalek = "1.0.1"
fuser = "0.12.0"
humantime = "2.1.0"
json-canon = "0.1.3"
lru = "0.11.0"
once_cell = "1.18.0"
reqwest = { version = "0.11.18", features = ["json", "rustls-tls"], default-features = false }
serde = { version = "1.0.166", features = ["derive"] }
serde_json = "1.0.100"

View file

@ -3,31 +3,70 @@
// TODO (future): proper fuse support
// have by-ref/, by-name/, by-tag/ directories
use std::{path::PathBuf, time::{Duration, SystemTime}, str::FromStr, collections::{HashMap, HashSet}};
use std::{path::PathBuf, time::{Duration, SystemTime}, collections::{HashMap, HashSet}, sync::Arc, ffi::c_int, num::NonZeroUsize};
use fuser::{FileAttr, BackgroundSession};
use ufh::{item::ItemRef, event::{EventContent, Event, FileEvent}, actor::ActorSecret, query::Query};
use ufh::{event::{EventContent, Event}, actor::ActorSecret, query::Query, item::ItemRef};
use crate::net::Item;
use tokio::runtime::Handle;
use tokio::{runtime::Handle, sync::Mutex};
use once_cell::sync::OnceCell;
use lru::LruCache;
use crate::net::Client;
#[derive(Debug)]
pub struct Filesystem {
client: Client,
client: Arc<Client>,
rt: Handle,
inodes: HashMap<u64, Event>,
key: ActorSecret,
list: Option<(Vec<u64>, String, Option<String>)>,
inodes: Arc<Mutex<HashMap<u64, TreeItem>>>,
poller: Option<tokio::task::JoinHandle<()>>,
}
#[derive(Debug)]
enum TreeItem {
Directory { name: String, kind: DirKind, entries: Vec<u64> },
Text { name: String, content: String },
Event(Event),
}
#[derive(Debug, Clone, Copy)]
enum DirKind {
Default,
ShowNames,
}
impl TreeItem {
pub fn get_name(&self, in_dir: DirKind) -> String {
match self {
TreeItem::Directory { name, .. } => name.to_string(),
TreeItem::Text { name, .. } => name.to_string(),
TreeItem::Event(event) => {
let EventContent::File(file) = &event.content else {
panic!();
};
match in_dir {
DirKind::Default => event.id.to_string(),
DirKind::ShowNames => file.name.as_deref().unwrap_or("unnamed").to_string(),
}
},
}
}
}
impl Filesystem {
pub fn new(rt: Handle, client: Client, key: ActorSecret) -> Filesystem {
let root = TreeItem::Directory {
name: "root".into(),
kind: DirKind::Default,
entries: Vec::new(),
};
Filesystem {
client,
client: Arc::new(client),
rt,
inodes: HashMap::new(),
list: None,
inodes: Arc::new(Mutex::new(HashMap::from([(1, root)]))),
key,
poller: None,
}
}
@ -37,31 +76,158 @@ impl Filesystem {
}
pub fn spawn_mount(self, mountpoint: &PathBuf) -> Result<BackgroundSession, std::io::Error> {
fuser::spawn_mount2(self, mountpoint, &[])
use fuser::MountOption;
fuser::spawn_mount2(self, mountpoint, &[MountOption::RO, MountOption::NoAtime])
}
fn get_attr(&self, ino: u64, uid: u32, gid: u32) -> Option<FileAttr> {
let attr = FileAttr {
ino,
size: 0,
blocks: 0,
atime: SystemTime::now(),
mtime: SystemTime::now(),
ctime: SystemTime::now(),
crtime: SystemTime::now(),
kind: fuser::FileType::RegularFile,
perm: 0o440,
nlink: 1,
uid,
gid,
rdev: 0,
blksize: 1024 * 1024,
flags: 0
};
let inodes = self.inodes.blocking_lock();
let entry = inodes.get(&ino)?;
match &entry {
TreeItem::Directory { .. } => {
Some(FileAttr {
kind: fuser::FileType::Directory,
perm: 0o550,
..attr
})
},
TreeItem::Text { content, .. } => {
Some(FileAttr { size: content.len() as u64, ..attr })
},
TreeItem::Event(event) => {
let EventContent::File(file) = &event.content else {
panic!("validated at input");
};
Some(FileAttr {
size: event.derived.file.as_ref().unwrap().size,
blocks: file.chunks.len() as u64,
mtime: to_time(event.origin_ts),
ctime: to_time(event.origin_ts),
crtime: to_time(event.origin_ts),
..attr
})
},
}
}
}
impl fuser::Filesystem for Filesystem {
fn lookup(&mut self, req: &fuser::Request<'_>, _parent: u64, name: &std::ffi::OsStr, reply: fuser::ReplyEntry) {
fn init(&mut self, _req: &fuser::Request<'_>, _config: &mut fuser::KernelConfig) -> Result<(), c_int> {
println!("init()");
let query = Query {
refs: None,
senders: Some(HashSet::from([self.key.get_id()])),
types: Some(HashSet::from(["x.file".to_owned()])),
tags: None,
relations: HashSet::new(),
with_redacts: false,
};
let mut inodes = self.inodes.blocking_lock();
let mut create_entry = |item: TreeItem, parent: u64| -> u64 {
let inode = inodes.len() as u64 + 1;
inodes.insert(inode, item);
let TreeItem::Directory { entries, .. } = inodes.get_mut(&parent).unwrap() else {
panic!();
};
entries.push(inode);
inode
};
let dir_root = 1;
let dir_by_ref = create_entry(TreeItem::Directory { name: "by-ref".into(), kind: DirKind::Default, entries: Vec::new() }, dir_root);
let dir_by_name = create_entry(TreeItem::Directory { name: "by-name".into(), kind: DirKind::ShowNames, entries: Vec::new() }, dir_root);
let dir_by_tag = create_entry(TreeItem::Directory { name: "by-tag".into(), kind: DirKind::Default, entries: Vec::new() }, dir_root);
create_entry(TreeItem::Text { name: "README".into(), content: "hello, world!".into() }, 1);
create_entry(TreeItem::Text { name: "TODO".into(), content: "currently unimplemented!".into() }, dir_by_tag);
let client = self.client.clone();
let inodes = self.inodes.clone();
let handle = self.rt.spawn(async move {
let query = client.query(&query).await.unwrap();
let mut after: Option<String> = None;
loop {
let items = client.list(&query, None, after.clone(), None).await.unwrap();
let mut inodes = inodes.lock().await;
for event in items.events {
if let EventContent::File(_) = &event.content {
let inode = {
let inode = inodes.len() as u64 + 1;
let entry = TreeItem::Event(event);
inodes.insert(inode, entry);
inode
};
let mut push_dir = |inode: u64, parent: u64| {
let TreeItem::Directory { entries, .. } = inodes.get_mut(&parent).unwrap() else {
panic!();
};
entries.push(inode);
};
push_dir(inode, dir_by_name);
push_dir(inode, dir_by_ref);
}
}
if items.next.is_some() {
after = items.next;
}
}
});
self.poller = Some(handle);
Ok(())
}
fn destroy(&mut self) {
println!("destroy()");
self.poller.as_ref().unwrap().abort();
}
fn lookup(&mut self, req: &fuser::Request<'_>, parent: u64, name: &std::ffi::OsStr, reply: fuser::ReplyEntry) {
let Some(name) = name.to_str() else {
return reply.error(1);
};
let Ok(item_ref) = ItemRef::from_str(name) else {
println!("lookup(name={})", name);
let lock = self.inodes.blocking_lock();
let TreeItem::Directory { kind, entries, .. } = lock.get(&parent).unwrap() else {
panic!("not a directory");
};
let Some((inode, _)) = entries
.iter()
.map(|ino| (ino, lock.get(&ino).unwrap()))
.find(|(_, entry)| name == entry.get_name(*kind)) else {
return reply.error(1);
};
let inode = u64::from_be_bytes(item_ref.to_hash()[0..8].try_into().unwrap());
let Ok(item) = self.rt.block_on(self.client.get(&item_ref)) else {
return reply.error(1);
};
let Item::Event(event) = item else {
return reply.error(1);
};
let EventContent::File(file) = &event.content else {
return reply.error(1);
};
self.inodes.insert(inode, event.clone());
let attr = event_to_attr(&event, &file, req.uid(), req.gid());
let inode = *inode;
drop(lock);
let attr = self.get_attr(inode, req.uid(), req.gid()).unwrap();
reply.entry(&Duration::from_secs(60 * 5), &attr, 0);
}
@ -76,20 +242,26 @@ impl fuser::Filesystem for Filesystem {
_lock_owner: Option<u64>,
reply: fuser::ReplyData,
) {
// println!("read {}", ino);
println!("read(ino={}, offset={}, size={})", ino, offset, size);
let Some(event) = self.inodes.get(&ino) else {
return reply.error(1);
};
// TODO (performance): cache blobs (lru) and sizes (always) then calc to specific blobs when needed
let Ok(buffer) = self.rt.block_on(get_blob(self, event)) else {
return reply.error(1);
};
let offset = offset as usize;
let size = size as usize;
reply.data(&buffer[(offset).max(0)..(offset + size).min(buffer.len())]);
let inodes = self.inodes.blocking_lock();
let item = inodes.get(&ino).unwrap();
match &item {
TreeItem::Directory { .. } => panic!(),
TreeItem::Text { content, .. } => reply.data(&content.as_bytes()[(offset).max(0)..(offset + size).min(content.len())]),
TreeItem::Event(event) => {
// TODO (performance): cache blobs (lru) and sizes (always) then calc to specific blobs when needed
// TODO (performance): this blocks the mutex!
let Ok(buffer) = self.rt.block_on(get_blob(self, &event)) else {
return reply.error(1);
};
reply.data(&buffer[(offset).max(0)..(offset + size).min(buffer.len())]);
},
};
}
fn readdir(
@ -99,101 +271,65 @@ impl fuser::Filesystem for Filesystem {
_fh: u64,
offset: i64,
mut reply: fuser::ReplyDirectory,
) {
if ino != 1 {
return reply.error(1);
) {
println!("readdir(ino={}, offset={})", ino, offset);
let inodes = self.inodes.blocking_lock();
let directory = inodes.get(&ino).unwrap();
let TreeItem::Directory { entries, kind, .. } = &directory else { panic!() };
let start = offset as usize;
let end = (start + 100).min(entries.len());
for (idx, inode) in entries[start..end].iter().enumerate() {
let entry = inodes.get(&inode).unwrap();
let _ = reply.add(*inode, (start + idx + 1) as i64, fuser::FileType::RegularFile, entry.get_name(*kind));
}
reply.ok();
}
let (events, query, after) = match &mut self.list {
Some(events) => events,
None => {
let query = Query {
refs: None,
senders: Some(HashSet::from([self.key.get_id()])),
types: Some(HashSet::from(["x.file".to_owned()])),
tags: None,
relations: HashSet::new(),
with_redacts: false,
};
let query = self.rt.block_on(self.client.query(&query)).unwrap();
let list = (Vec::new(), query, None);
self.list = Some(list);
self.list.as_mut().unwrap()
},
};
let start = offset as usize + 1;
let end = (start + 10).min(events.len());
// TODO (performance): use long polling instead of fetching on *every* directory read
if start >= end {
let items = self.rt.block_on(self.client.list(&query, None, after.to_owned(), None)).unwrap();
for event in items.events {
let inode = u64::from_be_bytes(event.id.to_hash()[0..8].try_into().unwrap());
if event.content.get_type() == "x.file" && !event.is_redacted() {
self.inodes.insert(inode, event.clone());
events.push(inode);
}
}
if items.next.is_some() {
*after = items.next;
}
}
let end = (start + 10).min(events.len());
for (idx, inode) in events[start..end].iter().enumerate() {
let _ = reply.add(*inode, (start + idx) as i64, fuser::FileType::RegularFile, self.inodes.get(&inode).unwrap().id.to_string());
fn readdirplus(
&mut self,
req: &fuser::Request<'_>,
ino: u64,
_fh: u64,
offset: i64,
mut reply: fuser::ReplyDirectoryPlus,
) {
println!("readdirplus(ino={}, offset={})", ino, offset);
let inodes = self.inodes.blocking_lock();
let directory = inodes.get(&ino).unwrap();
let TreeItem::Directory { entries, kind, .. } = &directory else { panic!() };
let start = offset as usize;
let end = (start + 100).min(entries.len());
for (idx, inode) in entries[start..end].iter().enumerate() {
let entry = inodes.get(&inode).unwrap();
let _ = reply.add(*inode, (start + idx + 1) as i64, entry.get_name(*kind), &Duration::from_secs(60 * 5), &self.get_attr(*inode, req.uid(), req.gid()).unwrap(), 0);
}
reply.ok();
}
fn opendir(&mut self, _req: &fuser::Request<'_>, ino: u64, _flags: i32, reply: fuser::ReplyOpen) {
if ino == 1 {
reply.opened(0, 0);
println!("opendir(ino={})", ino);
let inodes = self.inodes.blocking_lock();
if matches!(inodes.get(&ino), Some(TreeItem::Directory { .. })) {
return reply.opened(0, 0);
} else {
reply.error(1);
}
}
fn statfs(&mut self, _req: &fuser::Request<'_>, _ino: u64, reply: fuser::ReplyStatfs) {
reply.error(1);
}
fn getattr(&mut self, req: &fuser::Request<'_>, ino: u64, reply: fuser::ReplyAttr) {
if ino == 1 {
let attr = FileAttr {
ino: 1,
size: 0,
blocks: 0,
atime: SystemTime::now(),
mtime: SystemTime::now(),
ctime: SystemTime::now(),
crtime: SystemTime::now(),
kind: fuser::FileType::Directory,
perm: 0o550,
nlink: 1,
uid: req.uid(),
gid: req.gid(),
rdev: 0,
blksize: 0,
flags: 0
};
reply.attr(&Duration::from_secs(60 * 5), &attr);
return;
}
let Some(event) = self.inodes.get(&ino) else {
return reply.error(1);
};
}
}
let EventContent::File(file) = &event.content else {
panic!("validated at input");
};
fn getattr(&mut self, req: &fuser::Request<'_>, ino: u64, reply: fuser::ReplyAttr) {
println!("getattr(ino={})", ino);
let attr = event_to_attr(&event, file, req.uid(), req.gid());
reply.attr(&Duration::from_secs(60 * 5), &attr);
match self.get_attr(ino, req.uid(), req.gid()) {
Some(attr) => reply.attr(&Duration::from_secs(60 * 5), &attr),
None => reply.error(1),
}
}
}
@ -201,35 +337,29 @@ fn to_time(time: u64) -> SystemTime {
SystemTime::UNIX_EPOCH + Duration::from_millis(time)
}
fn event_to_attr(event: &Event, file: &FileEvent, uid: u32, gid: u32) -> FileAttr {
let inode = u64::from_be_bytes(event.id.to_hash()[0..8].try_into().unwrap());
FileAttr {
ino: inode,
size: event.derived.file.as_ref().unwrap().size,
blocks: file.chunks.len() as u64,
atime: SystemTime::now(),
mtime: to_time(event.origin_ts),
ctime: to_time(event.origin_ts),
crtime: to_time(event.origin_ts),
kind: fuser::FileType::RegularFile,
perm: 0o440,
nlink: 1,
uid,
gid,
rdev: 0,
blksize: 1024 * 1024,
flags: 0,
}
}
async fn get_blob(fs: &Filesystem, event: &Event) -> Result<Vec<u8>, ()> {
static CACHE: OnceCell<Mutex<LruCache<ItemRef, bytes::Bytes>>> = OnceCell::new();
let cache = CACHE.get_or_init(|| Mutex::new(LruCache::new(NonZeroUsize::new(100).unwrap())));
let EventContent::File(file) = &event.content else {
return Err(());
};
let mut chunks = Vec::with_capacity(file.chunks.len());
for item_ref in &file.chunks {
let Ok(Item::Blob(blob)) = fs.client.get(item_ref).await else {
return Err(());
let mut lock = cache.lock().await;
let cached = lock.peek(&item_ref).cloned();
let blob = match cached {
Some(blob) => {
lock.promote(&item_ref);
blob
},
None => {
let Ok(Item::Blob(blob)) = fs.client.get(item_ref).await else {
return Err(());
};
lock.put(item_ref.clone(), blob.clone());
blob
},
};
chunks.push(blob);
}

2
web/.gitignore vendored
View file

@ -1,3 +1,3 @@
.parcel-cache
.vite
node_modules
dist