Begin work on improved receipts

This commit is contained in:
tezlm 2023-12-07 07:16:44 -08:00
parent 04a4e3b7db
commit 1dda05d609
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
23 changed files with 275 additions and 533 deletions

8
PLANS.md Normal file
View file

@ -0,0 +1,8 @@
# plans
- Replace rocksdb with something that isn't so awful to use; I don't want to write custom serde and indexes for all the data
- Replace search with tantivy
- Fix backfilled events not being correctly processed
- Clean up and potentially contribute back code if it's matrix compatible (unfortunately, it probably won't be)
- Implement fast joins + tagging with state events
- Implement a custom room version

View file

@ -1,5 +1,5 @@
use crate::{services, Result, Ruma};
use ruma::api::client::push::{get_pushers, set_pusher, get_inbox, set_ack};
use ruma::api::client::push::{get_inbox, get_pushers, set_ack, set_pusher};
/// # `GET /_matrix/client/r0/pushers`
///
@ -34,9 +34,13 @@ pub async fn set_pushers_route(
/// # `POST /_matrix/client/v1/inbox`
///
/// Gets mentions and threads of interest to the sender user.
fn todo_1() {}
fn _todo_1() {}
/// # `POST /_matrix/client/v1/ack`
///
/// Marks rooms, mentions, and threads as read.
fn todo_2() {}
fn set_ack_route(
body: Ruma<set_ack::v3::Request>,
) -> Result<set_ack::v3::Request> {
todo!()
}

View file

@ -4,7 +4,7 @@ use crate::{service::pdu::PduBuilder, services, Error, Result, Ruma, RumaRespons
use ruma::{
api::client::{
error::ErrorKind,
state::{get_state_events, get_state_events_for_key, send_state_event},
state::{get_state_events_for_key, send_state_event},
},
events::{
room::canonical_alias::RoomCanonicalAliasEventContent, AnyStateEventContent, StateEventType,

View file

@ -993,31 +993,8 @@ async fn load_joined_room(
.filter_map(|r| r.ok()),
);
let notification_count = if send_notification_counts {
Some(
services()
.rooms
.user
.notification_count(sender_user, room_id)?
.try_into()
.expect("notification count can't go that high"),
)
} else {
None
};
let highlight_count = if send_notification_counts {
Some(
services()
.rooms
.user
.highlight_count(sender_user, room_id)?
.try_into()
.expect("highlight count can't go that high"),
)
} else {
None
};
let notification_count = None;
let highlight_count = None;
let prev_batch = timeline_pdus
.first()
@ -1036,14 +1013,16 @@ async fn load_joined_room(
.map(|(_, pdu)| pdu.to_sync_room_event())
.collect();
let mut edus: Vec<_> = services()
.rooms
.edus
.read_receipt
.readreceipts_since(room_id, since)
.filter_map(|r| r.ok()) // Filter out buggy events
.map(|(_, _, v)| v)
.collect();
// let mut edus: Vec<_> = services()
// .rooms
// .edus
// .read_receipt
// .readreceipts_since(room_id, since)
// .filter_map(|r| r.ok()) // Filter out buggy events
// .map(|(_, _, v)| v)
// .collect();
let mut edus: Vec<_> = vec![];
if services().rooms.edus.typing.last_typing_update(room_id)? > since {
edus.push(
@ -1622,31 +1601,29 @@ pub async fn sync_events_v4_route(
None
};
let unreads = services()
.rooms
.user
.get_counts(&sender_user, room_id)?;
rooms.insert(
room_id.clone(),
sync_events::v4::SlidingSyncRoom {
initial: Some(roomsince == &0),
invite_state: None,
unreads: UnreadNotificationsCount {
last_ack: None,
mention_user: Some(
services()
.rooms
.user
.highlight_count(&sender_user, room_id)?
.try_into()
.expect("notification count can't go that high"),
last_ack: Some(
services().rooms.short.get_eventid_from_short(
services()
.rooms
.user
.last_notification_read(&sender_user, room_id)?
)?.into()
),
mention_bulk: None,
notify: Some(
services()
.rooms
.user
.notification_count(&sender_user, room_id)?
.try_into()
.expect("notification count can't go that high"),
),
messages: None,
mention_user: Some(unreads.0.try_into().expect("notification count can't go that high")),
mention_bulk: Some(unreads.1.try_into().expect("notification count can't go that high")),
messages: Some(unreads.2.try_into().expect("notification count can't go that high")),
notify: Some(unreads.3.try_into().expect("notification count can't go that high")),
},
unreads_threaded: BTreeMap::new(),
timeline: room_events,

View file

@ -788,48 +788,6 @@ pub async fn send_transaction_message_route(
{
match edu {
Edu::Presence(_) => {}
Edu::Receipt(receipt) => {
for (room_id, room_updates) in receipt.receipts {
for (user_id, user_updates) in room_updates.read {
if let Some((event_id, _)) = user_updates
.event_ids
.iter()
.filter_map(|id| {
services()
.rooms
.timeline
.get_pdu_count(id)
.ok()
.flatten()
.map(|r| (id, r))
})
.max_by_key(|(_, count)| *count)
{
let mut user_receipts = BTreeMap::new();
user_receipts.insert(user_id.clone(), user_updates.data);
let mut receipts = BTreeMap::new();
receipts.insert(ReceiptType::Read, user_receipts);
let mut receipt_content = BTreeMap::new();
receipt_content.insert(event_id.to_owned(), receipts);
let event = ReceiptEvent {
content: ReceiptEventContent(receipt_content),
room_id: room_id.clone(),
};
services()
.rooms
.edus
.read_receipt
.readreceipt_update(&user_id, &room_id, event)?;
} else {
// TODO fetch missing events
debug!("No known event ids in read receipt: {:?}", user_updates);
}
}
}
}
Edu::Typing(typing) => {
if services()
.rooms
@ -933,6 +891,7 @@ pub async fn send_transaction_message_route(
)?;
}
}
Edu::Receipt(_) => {}
Edu::_Custom(_) => {}
}
}

View file

@ -60,6 +60,8 @@ pub trait KvTree: Send + Sync {
fn increment(&self, key: &[u8]) -> Result<Vec<u8>>;
fn increment_batch(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()>;
fn upsert(&self, key: &[u8], update: &dyn Fn(Option<&[u8]>) -> Vec<u8>) -> Result<()>;
fn scan_prefix<'a>(
&'a self,
prefix: Vec<u8>,

View file

@ -228,6 +228,17 @@ impl KvTree for RocksDbEngineTree<'_> {
Ok(())
}
fn upsert(&self, key: &[u8], update: &dyn Fn(Option<&[u8]>) -> Vec<u8>) -> Result<()> {
let lock = self.write_lock.write().unwrap();
let old = self.db.rocks.get_cf(&self.cf(), key)?;
let new = update(old.as_deref());
self.db.rocks.put_cf(&self.cf(), key, &new)?;
drop(lock);
Ok(())
}
fn scan_prefix<'a>(
&'a self,
prefix: Vec<u8>,

View file

@ -228,6 +228,18 @@ impl KvTree for SqliteTable {
Ok(())
}
fn upsert(&self, key: &[u8], update: &dyn Fn(Option<&[u8]>) -> Vec<u8>) -> Result<()> {
let guard = self.engine.write_lock();
let old = self.get_with_guard(&guard, &key)?;
let new = update(old.as_deref());
self.insert_with_guard(&guard, &key, &new)?;
drop(guard);
Ok(())
}
fn remove(&self, key: &[u8]) -> Result<()> {
let guard = self.engine.write_lock();

View file

@ -63,10 +63,7 @@ impl service::globals::Data for KeyValueDatabase {
futures.push(self.userroomid_joined.watch_prefix(&userid_prefix));
futures.push(self.userroomid_invitestate.watch_prefix(&userid_prefix));
futures.push(self.userroomid_leftstate.watch_prefix(&userid_prefix));
futures.push(
self.userroomid_notificationcounts
.watch_prefix(&userid_prefix),
);
futures.push(self.userroomid_unreads.watch_prefix(&userid_prefix));
// Events for rooms we are in
for room_id in services()
@ -95,8 +92,6 @@ impl service::globals::Data for KeyValueDatabase {
// EDUs
futures.push(self.roomid_lasttypingupdate.watch_prefix(&roomid_bytes));
futures.push(self.readreceiptid_readreceipt.watch_prefix(&roomid_prefix));
// Key changes
futures.push(self.keychangeid_userid.watch_prefix(&roomid_prefix));

View file

@ -1,150 +1 @@
use std::mem;
use ruma::{
events::receipt::ReceiptEvent, serde::Raw, CanonicalJsonObject, OwnedUserId, RoomId, UserId,
};
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
impl service::rooms::edus::read_receipt::Data for KeyValueDatabase {
fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
event: ReceiptEvent,
) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
let mut last_possible_key = prefix.clone();
last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes());
// Remove old entry
if let Some((old, _)) = self
.readreceiptid_readreceipt
.iter_from(&last_possible_key, true)
.take_while(|(key, _)| key.starts_with(&prefix))
.find(|(key, _)| {
key.rsplit(|&b| b == 0xff)
.next()
.expect("rsplit always returns an element")
== user_id.as_bytes()
})
{
// This is the old room_latest
self.readreceiptid_readreceipt.remove(&old)?;
}
let mut room_latest_id = prefix;
room_latest_id.extend_from_slice(&services().globals.next_count()?.to_be_bytes());
room_latest_id.push(0xff);
room_latest_id.extend_from_slice(user_id.as_bytes());
self.readreceiptid_readreceipt.insert(
&room_latest_id,
&serde_json::to_vec(&event).expect("EduEvent::to_string always works"),
)?;
Ok(())
}
fn readreceipts_since<'a>(
&'a self,
room_id: &RoomId,
since: u64,
) -> Box<
dyn Iterator<
Item = Result<(
OwnedUserId,
u64,
Raw<ruma::events::AnySyncEphemeralRoomEvent>,
)>,
> + 'a,
> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
let prefix2 = prefix.clone();
let mut first_possible_edu = prefix.clone();
first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since
Box::new(
self.readreceiptid_readreceipt
.iter_from(&first_possible_edu, false)
.take_while(move |(k, _)| k.starts_with(&prefix2))
.map(move |(k, v)| {
let count = utils::u64_from_bytes(
&k[prefix.len()..prefix.len() + mem::size_of::<u64>()],
)
.map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?;
let user_id = UserId::parse(
utils::string_from_bytes(&k[prefix.len() + mem::size_of::<u64>() + 1..])
.map_err(|_| {
Error::bad_database("Invalid readreceiptid userid bytes in db.")
})?,
)
.map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?;
let mut json =
serde_json::from_slice::<CanonicalJsonObject>(&v).map_err(|_| {
Error::bad_database(
"Read receipt in roomlatestid_roomlatest is invalid json.",
)
})?;
json.remove("room_id");
Ok((
user_id,
count,
Raw::from_json(
serde_json::value::to_raw_value(&json)
.expect("json is valid raw value"),
),
))
}),
)
}
fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> {
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(user_id.as_bytes());
self.roomuserid_privateread
.insert(&key, &count.to_be_bytes())?;
self.roomuserid_lastprivatereadupdate
.insert(&key, &services().globals.next_count()?.to_be_bytes())
}
fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(user_id.as_bytes());
self.roomuserid_privateread
.get(&key)?
.map_or(Ok(None), |v| {
Ok(Some(utils::u64_from_bytes(&v).map_err(|_| {
Error::bad_database("Invalid private read marker bytes")
})?))
})
}
fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(user_id.as_bytes());
Ok(self
.roomuserid_lastprivatereadupdate
.get(&key)?
.map(|bytes| {
utils::u64_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.")
})
})
.transpose()?
.unwrap_or(0))
}
}

View file

@ -1,7 +1,7 @@
use std::{collections::hash_map, mem::size_of, sync::Arc};
use std::{collections::{hash_map, HashMap}, mem::size_of, sync::Arc};
use ruma::{
api::client::{error::ErrorKind, sync::sync_events::UnreadNotificationsCount}, CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId,
api::client::error::ErrorKind, CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId,
};
use tracing::error;
@ -281,30 +281,37 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
fn increment_notification_counts(
&self,
room_id: &RoomId,
mentions_users: Vec<OwnedUserId>,
mentions_bulk: Vec<OwnedUserId>,
notify: Vec<OwnedUserId>,
messages: Vec<OwnedUserId>,
users: HashMap<OwnedUserId, (bool, bool, bool)>,
) -> Result<()> {
let mut mentions_users_batch = Vec::new();
for user in mentions_users {
for (user, (mention_user, mention_bulk, notify)) in users {
let mut userroom_id = user.as_bytes().to_vec();
userroom_id.push(0xff);
userroom_id.extend_from_slice(room_id.as_bytes());
mentions_users_batch.push(userroom_id);
self.userroomid_unreads.upsert(&userroom_id, &|counts| {
let mut counts = counts.map(ToOwned::to_owned).unwrap_or_else(|| vec![48; 0]);
let mentions_user_count = utils::u64_from_bytes(&counts[8..16]).expect("database has valid data");
let mentions_bulk_count = utils::u64_from_bytes(&counts[16..24]).expect("database has valid data");
let messages_count = utils::u64_from_bytes(&counts[24..32]).expect("database has valid data");
let notify_count = utils::u64_from_bytes(&counts[32..48]).expect("database has valid data");
if mention_user {
counts[8..16].copy_from_slice(&(mentions_user_count + 1).to_be_bytes());
}
if mention_bulk {
counts[16..24].copy_from_slice(&(mentions_bulk_count + 1).to_be_bytes());
}
counts[24..32].copy_from_slice(&(messages_count + 1).to_be_bytes());
if notify {
counts[32..48].copy_from_slice(&(notify_count + 1).to_be_bytes());
}
counts
})?;
}
// FIXME: notification counts
/*
get n userroomid_notificationcounts
update those n userroomid_notificationcounts
wrtite those n userroomid_notificationcounts
*/
// let val = self.userroomid_notificationcounts.get(userroom_id)?.unwrap_or_default();
// u64::from_le_bytes();
// .increment_batch(&mut notifies_batch.into_iter())?;
Ok(())
}
}

View file

@ -2,57 +2,36 @@ use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId};
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
// FIXME: use the new push system
impl service::rooms::user::Data for KeyValueDatabase {
fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
let mut userroom_id = user_id.as_bytes().to_vec();
userroom_id.push(0xff);
userroom_id.extend_from_slice(room_id.as_bytes());
let mut roomuser_id = room_id.as_bytes().to_vec();
roomuser_id.push(0xff);
roomuser_id.extend_from_slice(user_id.as_bytes());
// self.userroomid_notificationcount
// .insert(&userroom_id, &0_u64.to_be_bytes())?;
// self.userroomid_highlightcount
// .insert(&userroom_id, &0_u64.to_be_bytes())?;
self.roomuserid_lastnotificationread.insert(
&roomuser_id,
&services().globals.next_count()?.to_be_bytes(),
)?;
self.userroomid_unreads.upsert(&userroom_id, &|counts| {
let mut counts = counts.map(ToOwned::to_owned).unwrap_or_else(|| vec![48; 0]);
counts[8..].copy_from_slice(&[32; 0]);
counts
})?;
Ok(())
}
fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
let mut userroom_id = user_id.as_bytes().to_vec();
userroom_id.push(0xff);
userroom_id.extend_from_slice(room_id.as_bytes());
// self.userroomid_notificationcount
// .get(&userroom_id)?
// .map(|bytes| {
// utils::u64_from_bytes(&bytes)
// .map_err(|_| Error::bad_database("Invalid notification count in db."))
// })
// .unwrap_or(Ok(0))
Ok(0)
}
fn highlight_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
let mut userroom_id = user_id.as_bytes().to_vec();
userroom_id.push(0xff);
userroom_id.extend_from_slice(room_id.as_bytes());
// self.userroomid_highlightcount
// .get(&userroom_id)?
// .map(|bytes| {
// utils::u64_from_bytes(&bytes)
// .map_err(|_| Error::bad_database("Invalid highlight count in db."))
// })
// .unwrap_or(Ok(0))
Ok(0)
fn get_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<(u64, u64, u64, u64)> {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(room_id.as_bytes());
Ok(self.userroomid_unreads
.get(&key)?
.map(|counts| {
let mentions_user_count = utils::u64_from_bytes(&counts[8..16]).expect("database has valid data");
let mentions_bulk_count = utils::u64_from_bytes(&counts[16..24]).expect("database has valid data");
let messages_count = utils::u64_from_bytes(&counts[24..32]).expect("database has valid data");
let notify_count = utils::u64_from_bytes(&counts[32..48]).expect("database has valid data");
(mentions_user_count, mentions_bulk_count, messages_count, notify_count)
})
.unwrap_or((0, 0, 0, 0)))
}
fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
@ -60,15 +39,9 @@ impl service::rooms::user::Data for KeyValueDatabase {
key.push(0xff);
key.extend_from_slice(user_id.as_bytes());
Ok(self
.roomuserid_lastnotificationread
Ok(self.userroomid_unreads
.get(&key)?
.map(|bytes| {
utils::u64_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.")
})
})
.transpose()?
.map(|bytes| utils::u64_from_bytes(&bytes).expect("database isn't corrupted"))
.unwrap_or(0))
}

View file

@ -64,9 +64,6 @@ pub struct KeyValueDatabase {
RwLock<BTreeMap<(OwnedUserId, OwnedDeviceId, String), CanonicalJsonValue>>,
//pub edus: RoomEdus,
pub(super) readreceiptid_readreceipt: Arc<dyn KvTree>, // ReadReceiptId = RoomId + Count + UserId
pub(super) roomuserid_privateread: Arc<dyn KvTree>, // RoomUserId = Room + User, PrivateRead = Count
pub(super) roomuserid_lastprivatereadupdate: Arc<dyn KvTree>, // LastPrivateReadUpdate = Count
pub(super) typingid_userid: Arc<dyn KvTree>, // TypingId = RoomId + TimeoutTime + Count
pub(super) roomid_lasttypingupdate: Arc<dyn KvTree>, // LastRoomTypingUpdate = Count
pub(super) presenceid_presence: Arc<dyn KvTree>, // PresenceId = RoomId + Count + UserId
@ -104,9 +101,8 @@ pub struct KeyValueDatabase {
pub(super) lazyloadedids: Arc<dyn KvTree>, // LazyLoadedIds = UserId + DeviceId + RoomId + LazyLoadedUserId
pub(super) userroomid_notificationcounts: Arc<dyn KvTree>, // NotificationCounts = u64 + u64 + u64 + u64
pub(super) roomuserid_lastnotificationread: Arc<dyn KvTree>, // LastNotificationRead = u64
pub(super) userid_inbox: Arc<dyn KvTree>, // Inbox = u64
pub(super) userroomid_unreads: Arc<dyn KvTree>, // Unreads = LastEventShortId + MentionUser + MentionBulk + Messages + Notify
pub(super) userid_inboxevent: Arc<dyn KvTree>, // InboxEvent = u64
/// Remember the current state hash of a room.
pub(super) roomid_shortstatehash: Arc<dyn KvTree>,
@ -296,10 +292,6 @@ impl KeyValueDatabase {
userdevicesessionid_uiaainfo: builder.open_tree("userdevicesessionid_uiaainfo")?,
userdevicesessionid_uiaarequest: RwLock::new(BTreeMap::new()),
readreceiptid_readreceipt: builder.open_tree("readreceiptid_readreceipt")?,
roomuserid_privateread: builder.open_tree("roomuserid_privateread")?, // "Private" read receipt
roomuserid_lastprivatereadupdate: builder
.open_tree("roomuserid_lastprivatereadupdate")?,
typingid_userid: builder.open_tree("typingid_userid")?,
roomid_lasttypingupdate: builder.open_tree("roomid_lasttypingupdate")?,
presenceid_presence: builder.open_tree("presenceid_presence")?,
@ -333,9 +325,8 @@ impl KeyValueDatabase {
lazyloadedids: builder.open_tree("lazyloadedids")?,
userroomid_notificationcounts: builder.open_tree("userroomid_notificationcounts")?,
roomuserid_lastnotificationread: builder.open_tree("userroomid_highlightcount")?,
userid_inbox: builder.open_tree("userid_inbox")?,
userroomid_unreads: builder.open_tree("userroomid_unreads")?,
userid_inboxevent: builder.open_tree("userid_inboxevent")?,
statekey_shortstatekey: builder.open_tree("statekey_shortstatekey")?,
shortstatekey_statekey: builder.open_tree("shortstatekey_statekey")?,

View file

@ -63,7 +63,6 @@ impl Services {
directory: rooms::directory::Service { db },
edus: rooms::edus::Service {
presence: rooms::edus::presence::Service { db },
read_receipt: rooms::edus::read_receipt::Service { db },
typing: rooms::edus::typing::Service { db },
},
event_handler: rooms::event_handler::Service,

View file

@ -1,11 +1,9 @@
pub mod presence;
pub mod read_receipt;
pub mod typing;
pub trait Data: presence::Data + read_receipt::Data + typing::Data + 'static {}
pub trait Data: presence::Data + typing::Data + 'static {}
pub struct Service {
pub presence: presence::Service,
pub read_receipt: read_receipt::Service,
pub typing: typing::Service,
}

View file

@ -1,36 +1 @@
use crate::Result;
use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId};
pub trait Data: Send + Sync {
/// Replaces the previous read receipt.
fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
event: ReceiptEvent,
) -> Result<()>;
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
fn readreceipts_since<'a>(
&'a self,
room_id: &RoomId,
since: u64,
) -> Box<
dyn Iterator<
Item = Result<(
OwnedUserId,
u64,
Raw<ruma::events::AnySyncEphemeralRoomEvent>,
)>,
> + 'a,
>;
/// Sets a private read marker at `count`.
fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()>;
/// Returns the private read marker.
fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>>;
/// Returns the count of the last typing update in this room.
fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;
}

View file

@ -1,55 +1 @@
mod data;
pub use data::Data;
use crate::Result;
use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId};
pub struct Service {
pub db: &'static dyn Data,
}
impl Service {
/// Replaces the previous read receipt.
pub fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
event: ReceiptEvent,
) -> Result<()> {
self.db.readreceipt_update(user_id, room_id, event)
}
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
#[tracing::instrument(skip(self))]
pub fn readreceipts_since<'a>(
&'a self,
room_id: &RoomId,
since: u64,
) -> impl Iterator<
Item = Result<(
OwnedUserId,
u64,
Raw<ruma::events::AnySyncEphemeralRoomEvent>,
)>,
> + 'a {
self.db.readreceipts_since(room_id, since)
}
/// Sets a private read marker at `count`.
#[tracing::instrument(skip(self))]
pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> {
self.db.private_read_set(room_id, user_id, count)
}
/// Returns the private read marker.
#[tracing::instrument(skip(self))]
pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
self.db.private_read_get(room_id, user_id)
}
/// Returns the count of the last typing update in this room.
pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
self.db.last_privateread_update(user_id, room_id)
}
}

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, collections::HashMap};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId};
@ -82,13 +82,9 @@ pub trait Data: Send + Sync {
from: PduCount,
) -> Result<Box<dyn Iterator<Item = Result<(PduCount, PduEvent)>> + 'a>>;
// FIXME: this desparately needs to be redone
fn increment_notification_counts(
&self,
room_id: &RoomId,
notifies: Vec<OwnedUserId>,
highlights: Vec<OwnedUserId>,
highlights2: Vec<OwnedUserId>,
highlights3: Vec<OwnedUserId>,
users: HashMap<OwnedUserId, (bool, bool, bool)>,
) -> Result<()>;
}

View file

@ -283,11 +283,6 @@ impl Service {
let count1 = services().globals.next_count()?;
// Mark as read first so the sending client doesn't get a notification even if appending
// fails
services()
.rooms
.edus
.read_receipt
.private_read_set(&pdu.room_id, &pdu.sender, count1)?;
services()
.rooms
.user
@ -318,10 +313,7 @@ impl Service {
let sync_pdu = pdu.to_sync_room_event();
let mut mentions_user = Vec::new();
let mut mentions_bulk = Vec::new();
let mut notify = Vec::new();
let mut messages = Vec::new();
let mut unreads = HashMap::new();
for user in services()
.rooms
@ -369,33 +361,30 @@ impl Service {
mentions: Mentions,
}
let mut did_mention_user = false;
let mut did_mention_bulk = false;
let mut mention_user = false;
let mut mention_bulk = false;
if let Ok(ExtractMentions { mentions }) = sync_pdu.deserialize_as() {
if mentions.room && sender_power >= power_levels.notifications.room {
mentions_bulk.push(user.clone());
did_mention_bulk = true;
} else if mentions.thread && sender_power >= power_levels.notifications.room {
// TODO
// mentions_bulk.push(user.clone());
// did_mention_bulk = true;
mention_bulk = true;
} else if mentions.thread && sender_power >= power_levels.notifications.thread {
// TODO: thread notifications
// mention_bulk = true;
}
if mentions.user_ids.contains(user) {
mentions_user.push(user.clone());
did_mention_user = true;
mention_user = true;
}
}
// NOTE: this is unested and may not work?
// TODO: move this up or batch this
if let Ok(MessageEventContent { relations, .. }) = sync_pdu.deserialize_as() {
if !relations.iter().any(|r| matches!(r, Relation::Replacement(_))) {
messages.push(user.clone());
continue;
}
}
// TODO: space rules
let default_rule = Rule::default();
let rule = user_rules_room.as_ref().and_then(|i| i.rule.as_ref())
.or_else(|| user_rules_global.as_ref().and_then(|i| i.rule.as_ref()))
@ -405,23 +394,23 @@ impl Service {
.or_else(|| user_rules_global.as_ref().map(|i| i.suppress_bulk))
.unwrap_or_default();
let should_notify = match rule {
let notify = match rule {
Rule::Everything => true,
Rule::Mentions => did_mention_user || (did_mention_bulk && !suppress_bulk),
Rule::Mentions => mention_user || (mention_bulk && !suppress_bulk),
_ => false,
};
if should_notify {
notify.push(user.clone());
if notify {
for push_key in services().pusher.get_pushkeys(user) {
services().sending.send_push_pdu(&pdu_id, user, push_key?)?;
}
}
unreads.insert(user.clone(), (mention_user, mention_bulk, notify));
}
self.db
.increment_notification_counts(&pdu.room_id, mentions_user, mentions_bulk, notify, messages)?;
.increment_notification_counts(&pdu.room_id, unreads)?;
match pdu.kind {
TimelineEventType::RoomRedaction => {

View file

@ -2,11 +2,10 @@ use crate::Result;
use ruma::{OwnedRoomId, OwnedUserId, RoomId, UserId};
pub trait Data: Send + Sync {
/// <<<< old api
fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>;
fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;
fn highlight_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;
fn get_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<(u64, u64, u64, u64)>;
// Returns the count at which the last reset_notification_counts was called
fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;

View file

@ -14,12 +14,8 @@ impl Service {
self.db.reset_notification_counts(user_id, room_id)
}
pub fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
self.db.notification_count(user_id, room_id)
}
pub fn highlight_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
self.db.highlight_count(user_id, room_id)
pub fn get_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<(u64, u64, u64, u64)> {
self.db.get_counts(user_id, room_id)
}
pub fn last_notification_read(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {

View file

@ -284,70 +284,6 @@ impl Service {
.filter_map(|r| r.ok())
.filter(|user_id| user_id.server_name() == services().globals.server_name()),
);
// Look for read receipts in this room
for r in services()
.rooms
.edus
.read_receipt
.readreceipts_since(&room_id, since)
{
let (user_id, count, read_receipt) = r?;
if count > max_edu_count {
max_edu_count = count;
}
if user_id.server_name() != services().globals.server_name() {
continue;
}
let event: AnySyncEphemeralRoomEvent =
serde_json::from_str(read_receipt.json().get())
.map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?;
let federation_event = match event {
AnySyncEphemeralRoomEvent::Receipt(r) => {
let mut read = BTreeMap::new();
let (event_id, mut receipt) = r
.content
.0
.into_iter()
.next()
.expect("we only use one event per read receipt");
let receipt = receipt
.remove(&ReceiptType::Read)
.expect("our read receipts always set this")
.remove(&user_id)
.expect("our read receipts always have the user here");
read.insert(
user_id,
ReceiptData {
data: receipt.clone(),
event_ids: vec![event_id.clone()],
},
);
let receipt_map = ReceiptMap { read };
let mut receipts = BTreeMap::new();
receipts.insert(room_id.clone(), receipt_map);
Edu::Receipt(ReceiptContent { receipts })
}
_ => {
Error::bad_database("Invalid event type in read_receipts");
continue;
}
};
events.push(serde_json::to_vec(&federation_event).expect("json can be serialized"));
if events.len() >= 20 {
break 'outer;
}
}
}
for user_id in device_list_changes {

128
threadsort.rs Normal file
View file

@ -0,0 +1,128 @@
// how threads should be sorted in inbox
/*
struct HistoryVisibility {
/// Whether people/guests can view this room without joining
world_readable: bool,
/// Whether this room should be crawled and archived publically
robots_archive: bool,
/// Whether this room should be crawled and made searchable in some public directory
robots_index: bool,
}
struct JoinRules {
allow_join: Vec<JoinRule>,
allow_knock: Vec<JoinRule>,
}
// TODO: do i switch away from `m.` because i'm not matrix?
enum JoinRule {
#[serde(rename = "m.public")]
Public,
#[serde(rename = "m.room_membership")]
InRoom {
room_id: RoomId
},
}
known m.purposes
## in spaces
- is group, folder
- is nsfw
rooms added to groups automatically set m.space.parent
## in normal rooms
- is dm, announcement, default
- is nsfw
fn is_room_private(room) {
for rule in room.join_rule.allow_join {
if rule == JoinRule::Public {
return false;
}
}
if let Some(space) room.canonical_space {
is_room_private_v2(space);
} else {
true
}
}
Impl:
1. Get threads
2. Add all mentions, in chronological order, to the beginning of the list
3. Filter out muted rooms and ignored rooms unless show ignore is enabled
4. Filter out unwatched threads if only watching is enabled
*/
/// the kind of room
enum RoomKind {
Standard,
Direct,
Announcement,
}
/// info about a thread
struct ThreadMetrics {
is_room_private: bool,
is_thread_watched: bool,
thread_created_at: u64,
thread_updated_at: u64,
thread_read_at: Option<u64>,
room_kind: RoomKind,
id: (),
}
/// get the next thread from db, alternating between threads from
/// public/private rooms and ordered by update time
fn next_thread() -> Option<ThreadMetrics> {}
// this feels overengineered/overcomplicated, and like it has performance problems
fn get_threads() -> Vec<()> {
let mut threads = vec![];
let sort_limit = 200;
let return_limit = 50;
while threads.len() < sort_limit {
let Some(metrics) = next_thread() else {
break;
};
let base_score = 1.0 / (metrics.thread_created_at as f32 / 360000.0 + 1.0).powf(0.6);
let bump_factor = 1.0 / (metrics.thread_updated_at as f32 / 360000.0 + 1.0).powf(0.6);
let room_factor = match (metrics.is_room_private, metrics.room_kind) {
(_, RoomKind::Direct) => 4.0,
(true, RoomKind::Announcement) => 5.0,
(true, RoomKind::Standard) => 2.5,
(false, RoomKind::Standard) => 1.0,
(false, RoomKind::Announcement) => 4.0,
};
let read_factor = match (metrics.thread_read_at, metrics.thread_updated_at) {
(None, _) => 1.0,
(Some(read), updated) => ((read - updated) as f32 / 360000.0).tanh(),
};
let score = base_score * room_factor * read_factor * bump_factor;
threads.push((metrics.id, score));
}
threads.sort_unstable_by(|(_, s1), (_, s2)| s1.total_cmp(s2));
threads
.into_iter()
.map(|(id, _)| id)
.take(return_limit)
.collect()
}
/*
another option: alternation
- have thread sets "dm/announce", "private", "public", ordered by creation date
- alternate between the three
*/