fix events being dropped

This commit is contained in:
tezlm 2023-08-08 12:00:53 -07:00
parent f24e16c02b
commit 8ec03647d2
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
10 changed files with 294 additions and 46 deletions

View file

@ -160,6 +160,7 @@ pub async fn prepare_special(
Ok(DelayedAction::None)
}
/// the event will not be in the database yet
pub async fn commit_special(me: &Items, action: &DelayedAction) -> Result<(), Error> {
debug!("commit special {:?}", action);
// TODO (performance): batch transactions
@ -187,11 +188,7 @@ pub async fn commit_special(me: &Items, action: &DelayedAction) -> Result<(), Er
me.finish_event_create(crate::items::WipCreate {
item_ref: item_ref.clone(),
create: crate::items::Create {
event,
relations: Default::default(),
rowid: 0,
},
event,
action: DelayedAction::None,
})
.await?;
@ -203,6 +200,7 @@ pub async fn commit_special(me: &Items, action: &DelayedAction) -> Result<(), Er
Ok(())
}
/// event will still not exist in db
pub async fn derive(
me: &Items,
event: &Event,

View file

@ -35,7 +35,7 @@ pub enum Item {
#[derive(Debug)]
pub struct WipCreate {
pub item_ref: ItemRef,
create: Create,
event: Event,
action: DelayedAction,
}
@ -98,28 +98,25 @@ impl Items {
// handle special events
let action = events::prepare_special(self, &event, &relations).await?;
let rowid = self.db.event_create(&event).await?;
debug!("created event (rowid={})", rowid);
Ok(WipCreate {
item_ref: event.id.clone(),
create: Create {
event,
relations,
rowid,
},
event,
action,
})
}
/// finish extra processing, eg. deriving metadata or indexing for fts
/// split out to continue in background
// FIXME: this code is ugly and may still have race conditions
#[async_recursion::async_recursion]
pub async fn finish_event_create(&self, wip: WipCreate) -> Result<Create, Error> {
let create = wip.create;
let event = create.event;
let event = wip.event;
let rowid = self.db.event_create(&event).await?;
debug!("created event (rowid={})", rowid);
events::commit_special(self, &wip.action).await?;
debug!("commit special cases");
let derived = if let EventContent::File(file) = &event.content {
debug!("begin derive");
@ -145,18 +142,14 @@ impl Items {
Derived::default()
};
let event = Event { derived, ..event };
let event = Event { derived: derived.clone(), ..event };
// TODO (performance): this exists to get the updated event derives, but could easily be done without a query
let relations = events::get_relations(&self.db, &event).await?;
update_search_index(self, &event, &relations).await?;
Ok(Create {
event,
relations: create.relations,
rowid: create.rowid,
})
Ok(Create { event, relations, rowid })
}
#[async_recursion::async_recursion]

View file

@ -6,9 +6,11 @@ use axum::body::HttpBody;
use axum::extract::{Json, Query, RawBody, State};
use axum::http::HeaderMap;
use bytes::Bytes;
use once_cell::sync::Lazy;
use reqwest::StatusCode;
use serde::Deserialize;
use serde_json::{json, Value};
use tokio::sync::Mutex;
use std::sync::Arc;
use tracing::{debug, error};
use ufh::event::WipEvent;
@ -16,6 +18,12 @@ use ufh::item::ItemRef;
type Response<T> = Result<T, Error>;
// for strictly increasing rowids in the stream
// see items/mod.rs: this isn't good for performance since this forces
// events to be processed one at a time. sending rather than processing
// mutex is better, but i need the rowid.
static ROWID_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
#[derive(Debug, Deserialize)]
pub struct Params {
// true by default for compatability
@ -75,12 +83,10 @@ pub async fn route(
let wip: WipEvent = serde_json::from_slice(&blob)?;
let wip = state.items.begin_event_create(wip).await?;
// FIXME: the state.events channel can become out of order when ?wait=false
// then again, it can get out of order when multiple people upload. this is critical to fix
#[cfg(off)]
if !params.wait {
let item_ref = wip.item_ref.clone();
tokio::spawn(async move {
let _lock = ROWID_LOCK.lock().await;
let create = match state.items.finish_event_create(wip).await {
Ok(create) => create,
Err(err) => return error!("failed to finish creating event: {}", err),
@ -92,6 +98,7 @@ pub async fn route(
return Ok((StatusCode::ACCEPTED, Json(json!({ "ref": item_ref }))));
}
let _lock = ROWID_LOCK.lock().await;
let create = state.items.finish_event_create(wip).await?;
let item_ref = create.event.id.clone();
let _ = state

View file

@ -33,7 +33,6 @@ pub struct QueryResult {
relations: Option<HashMap<ItemRef, Event>>,
#[serde(skip_serializing_if = "Option::is_none")]
next: Option<String>,
_debug: &'static str,
}
impl IntoResponse for QueryResult {
@ -107,7 +106,6 @@ pub async fn route(
events: result.events,
relations: None,
next: Some(format!("{}.{}", next, 0)),
_debug: "skipping",
});
}
}
@ -207,7 +205,6 @@ pub async fn route(
events: vec![event],
relations: None,
next: Some(format!("{}.{}", rowid, rowid)),
_debug: "from poller",
});
}
(MatchType::Relation, event, rowid) => {
@ -216,7 +213,6 @@ pub async fn route(
events: Vec::new(),
relations: Some(HashMap::from([(event.id.clone(), event)])),
next: Some(format!("{}.{}", rowid, rowid)),
_debug: "from poller",
});
}
(MatchType::None, _, _) => unreachable!("handled by next_event(...)"),
@ -242,7 +238,6 @@ pub async fn route(
events: result.events,
relations,
next,
_debug: "from db",
})
}

View file

@ -25,7 +25,6 @@ pub fn routes() -> Router<Arc<ServerState>> {
.get(enumerate::route)
.head(enumerate::head),
)
.route("/sync", routing::post(watch::route))
.route("/check", routing::post(check::route))
.route("/query", routing::post(query::route))
.route("/:item_ref", routing::get(fetch::route))

View file

@ -32,6 +32,7 @@ pub struct QueryResult {
#[derive(Debug)]
pub enum DbItem {
Event(Box<Event>),
// Blob(u64),
Blob,
}

View file

@ -1,14 +1,14 @@
// TODO (future): i should find out how to split trait implementations
// between files before this gets comically long
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use bytes::Bytes;
use futures_util::TryStreamExt;
use serde_json::Value;
use sqlx::query as sql;
use sqlx::{sqlite::SqliteConnectOptions, QueryBuilder, Row, SqlitePool};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, trace};
use ufh::derived::Derived;
@ -16,7 +16,7 @@ use ufh::event::EventContent;
use ufh::query::QueryRelation;
use ufh::{item::ItemRef, query};
use super::{Database, DbItem, Thumbnail, Location};
use super::{Database, DbItem, Location, Thumbnail};
use crate::routes::things::thumbnail::ThumbnailSize;
use crate::{Error, Event};
@ -43,7 +43,10 @@ impl Sqlite {
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal);
let pool = SqlitePool::connect_with(options).await?;
sqlx::migrate!().run(&pool).await?;
Ok(Sqlite { pool, lock: Arc::new(RwLock::new(())) })
Ok(Sqlite {
pool,
lock: Arc::new(RwLock::new(())),
})
}
// leaky abstraction to be removed
@ -82,9 +85,8 @@ impl Database for Sqlite {
let mut sql_relations =
QueryBuilder::new("INSERT OR IGNORE INTO relations (ref_from, ref_to, rel_type) ");
sql_relations.push_values(&event.relations, |mut item, (target, info)| {
let target_str = target.to_string();
item.push_bind(item_ref_str.clone())
.push_bind(target_str)
.push_bind(target.to_string())
.push_bind(info.rel_type.clone());
});
sql_relations.build().execute(&mut tx).await?;
@ -230,11 +232,19 @@ impl Database for Sqlite {
}
builder.push(")");
}
match after {
Location::Beginning => builder.push(" ORDER BY events.rowid LIMIT ").push_bind(limit),
Location::Index(rowid) => builder.push(" AND events.rowid > ").push_bind(rowid).push(" ORDER BY events.rowid LIMIT ").push_bind(limit),
Location::Reverse => builder.push(" ORDER BY events.rowid DESC LIMIT ").push_bind(limit),
Location::Beginning => builder
.push(" ORDER BY events.rowid LIMIT ")
.push_bind(limit),
Location::Index(rowid) => builder
.push(" AND events.rowid > ")
.push_bind(rowid)
.push(" ORDER BY events.rowid LIMIT ")
.push_bind(limit),
Location::Reverse => builder
.push(" ORDER BY events.rowid DESC LIMIT ")
.push_bind(limit),
};
debug!("generated sql: {}", builder.sql());
@ -321,7 +331,10 @@ impl Database for Sqlite {
}
match after {
Location::Beginning => builder.push(" ORDER BY events_from.rowid"),
Location::Index(idx) => builder.push(" AND events_from.rowid > ").push_bind(idx).push(" ORDER BY events_from.rowid "),
Location::Index(idx) => builder
.push(" AND events_from.rowid > ")
.push_bind(idx)
.push(" ORDER BY events_from.rowid "),
Location::Reverse => unimplemented!("cannot currently get last relations"),
};
builder.push(" LIMIT ").push_bind(limit);
@ -408,7 +421,8 @@ impl Database for Sqlite {
// TODO (performance, future): batch sql queries?
for item_ref in item_refs {
self.derived_put(item_ref, "tags", serde_json::to_value(tags)?).await?;
self.derived_put(item_ref, "tags", serde_json::to_value(tags)?)
.await?;
}
let mut delete_query = QueryBuilder::new("DELETE FROM tags WHERE (ref) IN (");

213
server/test.patch Normal file
View file

@ -0,0 +1,213 @@
diff --git a/server/src/items/mod.rs b/server/src/items/mod.rs
index bd09009..959a56e 100644
--- a/server/src/items/mod.rs
+++ b/server/src/items/mod.rs
@@ -35,7 +35,8 @@ pub enum Item {
#[derive(Debug)]
pub struct WipCreate {
pub item_ref: ItemRef,
- create: Create,
+ event: Event,
+ relations: Relations,
action: DelayedAction,
}
@@ -98,16 +99,10 @@ impl Items {
// handle special events
let action = events::prepare_special(self, &event, &relations).await?;
- let rowid = self.db.event_create(&event).await?;
- debug!("created event (rowid={})", rowid);
-
Ok(WipCreate {
item_ref: event.id.clone(),
- create: Create {
- event,
- relations,
- rowid,
- },
+ event,
+ relations,
action,
})
}
@@ -116,9 +111,11 @@ impl Items {
/// split out to continue in background
#[async_recursion::async_recursion]
pub async fn finish_event_create(&self, wip: WipCreate) -> Result<Create, Error> {
- let create = wip.create;
- let event = create.event;
+ let event = wip.event;
+ let relations = wip.relations;
+ let rowid = self.db.event_create(&event).await?;
+ debug!("created event (rowid={})", rowid);
events::commit_special(self, &wip.action).await?;
let derived = if let EventContent::File(file) = &event.content {
@@ -154,7 +151,7 @@ impl Items {
Ok(Create {
event,
- relations: create.relations,
+ relations,
rowid: create.rowid,
})
}
diff --git a/server/src/middleware.rs b/server/src/middleware.rs
index 7879150..6c55782 100644
--- a/server/src/middleware.rs
+++ b/server/src/middleware.rs
@@ -31,7 +31,10 @@ pub async fn range<B>(
};
let mut iter = range.iter();
- let range = iter.next().expect("range request always has one range?");
+ let Some(range) = iter.next() else {
+ return axum::http::StatusCode::RANGE_NOT_SATISFIABLE.into_response();
+ };
+
if iter.next().is_some() {
return axum::http::StatusCode::RANGE_NOT_SATISFIABLE.into_response();
}
@@ -47,6 +50,8 @@ pub async fn range<B>(
use axum::headers::ContentRange;
use bytes::Bytes;
+
+ let content_length = data.len() as u64;
let slice = match range {
(Bound::Unbounded, Bound::Unbounded) => Bytes::from(data),
@@ -59,12 +64,10 @@ pub async fn range<B>(
_ => return axum::http::StatusCode::RANGE_NOT_SATISFIABLE.into_response(),
};
- let content_length = slice.len() as u64;
-
let server_range = match range {
(Bound::Unbounded, Bound::Unbounded) => ContentRange::bytes(.., Some(content_length)),
(Bound::Included(start), Bound::Unbounded) => {
- ContentRange::bytes(start.., Some(content_length))
+ ContentRange::bytes(start..start + slice.len() as u64, Some(content_length))
}
(Bound::Included(start), Bound::Included(end)) => {
ContentRange::bytes(start..=end, Some(content_length))
@@ -78,7 +81,7 @@ pub async fn range<B>(
parts.headers.insert(
"content-length",
- HeaderValue::from_str(&content_length.to_string()).unwrap(),
+ HeaderValue::from_str(&slice.len().to_string()).unwrap(),
);
parts.status = StatusCode::PARTIAL_CONTENT;
diff --git a/server/src/routes/things/create.rs b/server/src/routes/things/create.rs
index 4387ab1..134d0ed 100644
--- a/server/src/routes/things/create.rs
+++ b/server/src/routes/things/create.rs
@@ -75,9 +75,6 @@ pub async fn route(
let wip: WipEvent = serde_json::from_slice(&blob)?;
let wip = state.items.begin_event_create(wip).await?;
- // FIXME: the state.events channel can become out of order when ?wait=false
- // then again, it can get out of order when multiple people upload. this is critical to fix
- #[cfg(off)]
if !params.wait {
let item_ref = wip.item_ref.clone();
tokio::spawn(async move {
diff --git a/server/src/routes/things/enumerate.rs b/server/src/routes/things/enumerate.rs
index 156e096..5f525a0 100644
--- a/server/src/routes/things/enumerate.rs
+++ b/server/src/routes/things/enumerate.rs
@@ -33,7 +33,6 @@ pub struct QueryResult {
relations: Option<HashMap<ItemRef, Event>>,
#[serde(skip_serializing_if = "Option::is_none")]
next: Option<String>,
- _debug: &'static str,
}
impl IntoResponse for QueryResult {
@@ -107,7 +106,6 @@ pub async fn route(
events: result.events,
relations: None,
next: Some(format!("{}.{}", next, 0)),
- _debug: "skipping",
});
}
}
@@ -207,7 +205,6 @@ pub async fn route(
events: vec![event],
relations: None,
next: Some(format!("{}.{}", rowid, rowid)),
- _debug: "from poller",
});
}
(MatchType::Relation, event, rowid) => {
@@ -216,7 +213,6 @@ pub async fn route(
events: Vec::new(),
relations: Some(HashMap::from([(event.id.clone(), event)])),
next: Some(format!("{}.{}", rowid, rowid)),
- _debug: "from poller",
});
}
(MatchType::None, _, _) => unreachable!("handled by next_event(...)"),
@@ -242,7 +238,6 @@ pub async fn route(
events: result.events,
relations,
next,
- _debug: "from db",
})
}
diff --git a/server/src/routes/things/mod.rs b/server/src/routes/things/mod.rs
index 150dbe8..6522ee3 100644
--- a/server/src/routes/things/mod.rs
+++ b/server/src/routes/things/mod.rs
@@ -25,7 +25,7 @@ pub fn routes() -> Router<Arc<ServerState>> {
.get(enumerate::route)
.head(enumerate::head),
)
- .route("/sync", routing::post(watch::route))
+ // .route("/sync", routing::post(watch::route))
.route("/check", routing::post(check::route))
.route("/query", routing::post(query::route))
.route("/:item_ref", routing::get(fetch::route))
diff --git a/server/src/state/db/mod.rs b/server/src/state/db/mod.rs
index b6784c4..2d27cbd 100644
--- a/server/src/state/db/mod.rs
+++ b/server/src/state/db/mod.rs
@@ -32,6 +32,7 @@ pub struct QueryResult {
#[derive(Debug)]
pub enum DbItem {
Event(Box<Event>),
+ // Blob(u64),
Blob,
}
diff --git a/server/src/state/db/sqlite.rs b/server/src/state/db/sqlite.rs
index 3742eb1..a841105 100644
--- a/server/src/state/db/sqlite.rs
+++ b/server/src/state/db/sqlite.rs
@@ -73,18 +73,17 @@ impl Database for Sqlite {
.execute(&mut tx)
.await?;
debug!("insert into events");
- let insert = sql!("INSERT OR IGNORE INTO events (ref, sender, type, json, flags) VALUES (?, ?, ?, ?, ?) RETURNING rowid", item_ref_str, item_sender_str, item_type_str, event_str, 0)
+ let insert = sql!("INSERT OR REPLACE INTO events (ref, sender, type, json, flags) VALUES (?, ?, ?, ?, ?) RETURNING rowid", item_ref_str, item_sender_str, item_type_str, event_str, 0)
.fetch_one(&mut tx)
.await?;
if !event.relations.is_empty() && event.content.get_type() != "x.redact" {
debug!("insert into relations");
let mut sql_relations =
- QueryBuilder::new("INSERT OR IGNORE INTO relations (ref_from, ref_to, rel_type) ");
+ QueryBuilder::new("INSERT OR REPLACE INTO relations (ref_from, ref_to, rel_type) ");
sql_relations.push_values(&event.relations, |mut item, (target, info)| {
- let target_str = target.to_string();
item.push_bind(item_ref_str.clone())
- .push_bind(target_str)
+ .push_bind(target.to_string())
.push_bind(info.rel_type.clone());
});
sql_relations.build().execute(&mut tx).await?;

View file

@ -129,7 +129,7 @@
<svelte:component this={getComponent(selected.type)} bucket={selected} />
{/if}
{:catch err}
<div class="empty">failed to load that: {err}</div>
<div class="empty">failed to load: {err}</div>
{/await}
</main>
{#if $state.sidebar}

28
web/tooltip.ts Normal file
View file

@ -0,0 +1,28 @@
import { computePosition, flip, offset, shift, arrow } from "@floating-ui/dom";
export function tip(node, options) {
const tooltip = document.createElement("div");
Object.assign(tooltip.style, {
position: "fixed",
background: "red",
});
tooltip.innerText = options.text;
console.log(tooltip);
document.body.append(tooltip);
computePosition(node, tooltip, {
placement: "left",
middleware: [offset(8), flip(), shift(), arrow()],
}).then(({x, y}) => {
Object.assign(tooltip.style, {
left: `${x}px`,
top: `${y}px`,
});
});
return {
destroy() {
tooltip.remove();
},
};
}