bugfixes, update readme

This commit is contained in:
tezlm 2023-10-08 07:19:58 -07:00
parent 9ee44db6c9
commit e50e9af891
Signed by: tezlm
GPG key ID: 649733FCD94AFBBA
5 changed files with 306 additions and 77 deletions

126
README.md
View file

@ -1,4 +1,130 @@
# Conduit
## The p2 experiment
p2 (aka push v2, or piston) is an experimental push rule parser designed
to be faster and flexible-er than the current push rule system!
```json
{
"m.unread": {
"rule": "event.type in ['m.room.message', 'm.room.encrypted', 'm.room.name', 'm.room.topic'] && !('m.new_content' in event.content) && !(event.sender in account_data.'m.ignored_users')",
"actions": [],
"enabled": true
},
"m.notify": {
"rule": "@'m.unread' && user_id in event.content.'m.mentions'.user_ids",
"actions": ["notify", "store"],
"enabled": true
},
"m.invite": {
"rule": "event.type == 'm.room.member' && event.state_key == user_id && event.content.membership == 'invite'",
"actions": ["notify"],
"enabled": true
}
}
```
### what?
p2 is composed of **counters**, **rules**, and **actions**. Each
counter has one rule and n actions. In the above config, there are three
counters. When a rule matches an event, that counter is incremented.
Actions specify what actions the server should take when a counter
is incremented. Currently, there is `notify` for sending a push
notification and `store` for storing notifications in a list for later
(`/notifications`). Perhaps `count` could be split out from counters
into its own action later on.
Instead of having a separate set of apis and `/sync` field for
notify/highlight counts, p2 takes place entirely in account data. Rules
are specified via the `p2.rules` event and unread/notify counts are set
by the server in `p2.counts`. Per-room account data can override global
push rules.
The rules aren't mean to be machine-edited - make them read from account
data instead.
The implementation of p2 takes the config and compiles it to
stack-based bytecode, and lazy loads state events and account data. Is
it overengineered? Probably! Does it work? Definitely! Most of the code
is in a [single file](src/utils/p2.rs) currently.
### why?
Matrix push rules in their current state are a crime against
humanity. They're overengineered, yet aren't able to do things like
telling you when a rooms has unread messages. Rules are evaluated in
an awful home grown json thing that is not jsonpath nor jsonschema
but more limited than either.
All rules go into a single giant mess of an object - per room
account data apparently doesn't exist. Rules are split into
override/sender/content/room/underride for no rhyme or reason. Want a
room to be muted? That rule goes in "override". Want a room to notify
for all messages? That rule goes in "rooms"!
Rules are hardcoded to either be `notify` or `highlight`. There's no way
to make custom/extensible rules, so counters/rules that don't exist in
spec (ie. `unread`) can't be implemented or worked around.
### why not?
The most obvious reason: this breaks compatibility with everything else in
the matrix ecosystem, so it requires forking and modifying clients. This
takes varying amounts of effort and can become incompatible or out of
sync with upstream.
In terms of implementation, this system is very much a work in progress
and can't handle things like `@room` mentions yet and syntax/rules may
break. The code is a mess, and doesn't properly handle read receipts
nor does it atomically update the `p2.counts` config event.
This system requires `m.mentions` to be in plaintext, which some people
may not want.
This system might be harder to retrofit things into. With the json push
rules, it's as simple as adding a `do_some_specific_thing`.
### spec?
This is a custom system that exists outside of the official matrix
client-server spec. If a version of p2 gets merged into the official
spec it would be wonderful, but unfortunately it would be a pipe dream.
### rant
Matrix is has *so much* potential as a general purpose protocol or
event an instant messaging protocol, but so many features seem to be
poorly designed. There are so many rough edges that it's hard for me to
recommend it to anyone who isn't willing to spend a bit of time working
around its idiosyncraties.
I strongly believe `m.mentions` shouldn't have been encrypted, as a
form of it is functionally *required* by servers to accurately send
push notifications. It's frustrating that mentions are critical to
push notifications, but are one of the only pieces of metadata that
are encrypted. Unfortunately, [mentions probably won't be moved to
plaintext](https://github.com/matrix-org/matrix-spec-proposals/pull/3952).
I know I should probably be happy that matrix is fixing its chronically
bad plaintext leaks, but this really shouldn't have been done.
On the other hand, state events like name/topic/avatar, which if
encrypted will have difficulties with `/hierarchy` or summaries, aren't
as critical or can be worked around. Reactions are also plaintext, though
the reaction keys could be encrypted. Room member nicknames/avatars are
stored as plaintext, assuming they aren't clobbered when a user changes
their global displayname/avatar_url.
All these little things add up to a frustrating experience. Despite
all that, I'm very hopeful for matrix and it's probably the best option
that exists.
---
original readme below:
### A Matrix homeserver written in Rust
#### What is Matrix?

View file

@ -28,7 +28,7 @@ pub async fn set_global_account_data_route(
match event_type.as_str() {
"p2.rules" => {
let _rules = P2Rules::compile_value(data.clone())?;
P2Rules::compile_value(data.clone())?;
}
"p2.counters" => {
// unused for global account data
@ -64,7 +64,7 @@ pub async fn set_room_account_data_route(
match event_type.as_str() {
"p2.rules" => {
let _rules = P2Rules::compile_value(data.clone())?;
P2Rules::compile_value(data.clone())?;
}
"p2.counters" => {
if serde_json::from_value::<P2Counters>(data.clone()).is_err() {

View file

@ -3,6 +3,7 @@ use std::{collections::hash_map, mem::size_of, sync::Arc};
use ruma::{
api::client::error::ErrorKind, CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId,
};
use serde::Deserialize;
use tracing::error;
use crate::{
@ -329,7 +330,14 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
.get(Some(room_id), &user_id, event_type.into())?
.unwrap_or_default();
let mut parsed: P2Counters = serde_json::from_str(raw_value.get()).unwrap_or_default();
#[derive(Deserialize)]
struct P2CountersWrapper {
content: P2Counters,
}
let mut parsed = serde_json::from_str::<P2CountersWrapper>(raw_value.get())
.map(|e| e.content)
.unwrap_or_default();
for name in names {
*parsed.counters.entry(name).or_insert(0) += 1;
@ -339,7 +347,10 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
Some(room_id),
&user_id,
event_type.into(),
&serde_json::to_value(parsed).expect("should always be serializable"),
&serde_json::json!({
"type": event_type,
"content": parsed,
}),
)?;
}
@ -374,7 +385,6 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
Ok(())
}
// TODO: cache
fn p2_get_rules(&self, room_id: &RoomId, user_id: &UserId) -> Result<Arc<P2Rules>> {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
@ -383,21 +393,28 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
if let Some(rules) = self
.p2_userroomid_rules_cache
.lock()
.expect("FIXME: panic in lock")
.unwrap()
.get_mut(key.as_slice())
{
return Ok(rules.clone());
}
let room_rules: Option<P2Rules> = services()
#[derive(Deserialize)]
struct P2RulesWrapper {
content: P2Rules,
}
let room_rules = services()
.account_data
.get(Some(room_id), user_id, "p2.rules".into())?
.map(|ev| serde_json::from_str(ev.get()).expect("validated on input"));
.map(|ev| serde_json::from_str::<P2RulesWrapper>(ev.get()).expect("validated on input"))
.map(|wrap| wrap.content);
let global_rules: Option<P2Rules> = services()
let global_rules = services()
.account_data
.get(None, user_id, "p2.rules".into())?
.map(|ev| serde_json::from_str(ev.get()).expect("validated on input"));
.map(|ev| serde_json::from_str::<P2RulesWrapper>(ev.get()).expect("validated on input"))
.map(|wrap| wrap.content);
let rules = Arc::new(match (room_rules, global_rules) {
(Some(room), Some(global)) => room.merge(global),
@ -408,7 +425,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
self.p2_userroomid_rules_cache
.lock()
.expect("FIXME: panic in lock")
.unwrap()
.insert(key.into_boxed_slice(), rules.clone());
Ok(rules)

View file

@ -38,7 +38,9 @@ use tracing::{error, info, warn};
use crate::{
api::server_server,
service::pdu::{EventHash, PduBuilder},
services, utils::{self, p2::P2Action}, Error, PduEvent, Result,
services,
utils::{self, p2::P2Action},
Error, PduEvent, Result,
};
use super::state_compressor::CompressedStateEvent;
@ -318,7 +320,7 @@ impl Service {
let sync_pdu = pdu.to_sync_room_event();
// legacy notification/highlight system
let mut notifies = Vec::new();
let mut highlights = Vec::new();
@ -374,18 +376,17 @@ impl Service {
if highlight {
highlights.push(user.clone());
}
for push_key in services().pusher.get_pushkeys(user) {
services().sending.send_push_pdu(&pdu_id, user, push_key?)?;
}
// pushers are now handled by p2, no need to duplicate notifications
}
self.db
.increment_notification_counts(&pdu.room_id, notifies, highlights)?;
// shiny new p2 notification system
let mut counters = Vec::new();
let mut store = Vec::new();
for user in services()
.rooms
@ -399,24 +400,40 @@ impl Service {
}
let rules = self.db.p2_get_rules(&pdu.room_id, user)?;
let (matched, actions) = rules.eval(&pdu.room_id, user, serde_json::to_value(&sync_pdu).expect("always valid"));
counters.push((user.to_owned(), matched.into_iter().collect()));
for action in actions {
match action {
P2Action::Store => {},
P2Action::Notify => {},
P2Action::Custom(_) => {},
}
let matched = rules.eval(
&pdu.room_id,
user,
serde_json::to_value(&sync_pdu).expect("always valid"),
);
let should_notify = matched
.iter()
.any(|(_, actions)| actions.contains(&P2Action::Notify));
let counters_to_store: Vec<_> = matched
.iter()
.filter(|(_, actions)| actions.contains(&P2Action::Store))
.map(|(matched, _)| matched.to_owned())
.collect();
if !counters_to_store.is_empty() {
store.push((user.to_owned(), counters_to_store));
}
for push_key in services().pusher.get_pushkeys(user) {
services().sending.send_push_pdu(&pdu_id, user, push_key?)?;
if !matched.is_empty() {
counters.push((user.to_owned(), matched.into_keys().collect()));
}
if should_notify {
for push_key in services().pusher.get_pushkeys(user) {
services().sending.send_push_pdu(&pdu_id, user, push_key?)?;
}
}
}
self.db.p2_add_store(&pdu.event_id, store)?;
self.db.p2_increment_counters(&pdu.room_id, counters)?;
match pdu.kind {
TimelineEventType::RoomRedaction => {
if let Some(redact_id) = &pdu.redacts {

View file

@ -30,6 +30,7 @@ pub enum P2RuleOp {
Ident(String),
Set(Vec<String>),
Get(String),
GetDyn,
Rule(String),
Eq,
Neq,
@ -68,6 +69,7 @@ pub enum P2Action {
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct P2Counters {
pub counters: HashMap<Box<str>, u64>,
#[serde(flatten)]
_other: HashMap<String, JVal>,
}
@ -103,18 +105,27 @@ impl P2Rules {
Ok(rules)
}
pub fn eval(&self, room: &RoomId, user: &UserId, json: JVal) -> (HashSet<Box<str>>, HashSet<P2Action>) {
let mut matched = HashSet::new();
let mut actions = HashSet::new();
// pub fn eval(&self, room: &RoomId, user: &UserId, json: JVal) -> (HashSet<Box<str>>, HashSet<P2Action>) {
pub fn eval(
&self,
room: &RoomId,
user: &UserId,
json: JVal,
) -> HashMap<Box<str>, HashSet<P2Action>> {
// TODO: don't use duplicate structures
let mut matched = HashMap::new();
let mut matched_set = HashSet::new();
for (name, rule) in &self.rules {
if rule.eval(room, user, &json, &matched) {
matched.insert(name.clone());
if let Some(a) = self.actions.get(name) {
actions.extend(a.iter().cloned());
if rule.eval(room, user, &json, &matched_set) {
matched_set.insert(name.clone());
if let Some(acts) = self.actions.get(name) {
matched.insert(name.clone(), acts.iter().cloned().collect());
} else {
matched.insert(name.clone(), HashSet::new());
}
}
}
(matched, actions)
matched
}
pub fn merge(self, other: P2Rules) -> P2Rules {
@ -129,6 +140,8 @@ impl P2Rules {
}
impl P2Rule {
/// Take an expression and tokenize, parse, and generate bytecode
/// for it
fn parse(s: &str) -> Result<P2Rule, P2RuleError> {
macro_rules! rgx {
($pat:expr) => {
@ -172,11 +185,16 @@ impl P2Rule {
let mut ops = vec![];
let mut depends = vec![];
P2Rule::generate(&tokens, &mut ops, &mut depends, &mut 0, 0)?;
let mut start = 0;
P2Rule::generate(&tokens, &mut ops, &mut depends, &mut start, 0)?;
if tokens.get(start).is_some() {
return Err(P2RuleError::InvalidToken);
}
Ok(P2Rule { ops, depends })
}
/// recursively walk through tokens, "unravelling" it into bytecode
fn generate(
tokens: &[(&str, Option<&str>)],
ops: &mut Vec<P2RuleOp>,
@ -243,11 +261,23 @@ impl P2Rule {
loop {
match tokens.get(*start) {
Some(("closeparan", None)) => {
Some(("closeparan" | "closebracket", None)) => {
// rewind so the parent can see it
*start -= 1;
break;
}
Some(("openbracket", None)) => {
*start += 1;
P2Rule::generate(tokens, ops, depends, start, 0)?;
*start += 1;
match tokens.get(*start) {
Some(("closebracket", None)) => (),
Some((_, _)) => return Err(P2RuleError::InvalidToken),
None => return Err(P2RuleError::UnexpectedEOF),
};
dbg!(tokens.get(*start));
ops.push(P2RuleOp::GetDyn);
}
Some(("operator", Some(","))) => {
*start -= 1;
break;
@ -302,6 +332,7 @@ impl P2Rule {
Ok(())
}
/// evaluate a json event against this rule
fn eval(&self, room: &RoomId, user: &UserId, json: &JVal, matched: &HashSet<Box<str>>) -> bool {
use bumpalo::collections::Vec;
@ -347,46 +378,71 @@ impl P2Rule {
"event" => Value::Json(json),
"account" => Value::AccountData,
"state" => Value::RoomState(None),
"user_id" => Value::Json(bump.alloc(JVal::String(user.to_string()))),
_ => Value::Json(&JVal::Null),
},
P2RuleOp::Get(s) => match stack.pop().unwrap() {
Value::Json(JVal::Object(obj)) => {
Value::Json(obj.get(s).unwrap_or(&JVal::Null))
P2RuleOp::Get(_) | P2RuleOp::GetDyn => {
let s = match op {
P2RuleOp::Get(s) => s,
P2RuleOp::GetDyn => {
let s = stack
.pop()
.map(|s| match s {
Value::Json(JVal::String(s)) => Some(s),
_ => None,
})
.flatten();
match s {
Some(s) => s,
None => {
stack.push(Value::Json(&JVal::Null));
continue;
}
}
}
_ => unreachable!(),
};
match stack.pop().unwrap() {
Value::Json(JVal::Object(obj)) => {
Value::Json(obj.get(s).unwrap_or(&JVal::Null))
}
Value::AccountData => {
let data_type =
ruma::events::RoomAccountDataEventType::from(s.to_string());
let data = services()
.account_data
.get(Some(room), user, data_type.clone())
.ok()
.unwrap_or_else(|| {
services()
.account_data
.get(None, user, data_type)
.ok()
.flatten()
})
.map(|v| {
serde_json::from_str(v.get())
.expect("database has invalid data")
})
.unwrap_or(JVal::Null);
Value::Json(bump.alloc(data))
}
Value::RoomState(None) => Value::RoomState(Some(s)),
Value::RoomState(Some(state_key)) => {
let ev = services()
.rooms
.state_accessor
.room_state_get(room, &s.to_owned().into(), state_key.into())
.ok()
.flatten()
.map(|ev| serde_json::from_str(ev.content.get()).ok())
.flatten()
.unwrap_or(JVal::Null);
Value::Json(bump.alloc(ev))
}
_ => Value::Json(&JVal::Null),
}
Value::AccountData => {
let data_type = ruma::events::RoomAccountDataEventType::from(s.to_string());
let data = services()
.account_data
.get(Some(room), user, data_type.clone())
.ok()
.unwrap_or_else(|| {
services()
.account_data
.get(None, user, data_type)
.ok()
.flatten()
})
.map(|v| {
serde_json::from_str(v.get()).expect("database has invalid data")
})
.unwrap_or(JVal::Null);
Value::Json(bump.alloc(data))
}
Value::RoomState(None) => Value::RoomState(Some(s)),
Value::RoomState(Some(state_key)) => {
let ev = services()
.rooms
.state_accessor
.room_state_get(room, &s.to_owned().into(), state_key.into())
.ok()
.flatten()
.map(|ev| serde_json::from_str(ev.content.get()).ok())
.flatten()
.unwrap_or(JVal::Null);
Value::Json(bump.alloc(ev))
}
_ => Value::Json(&JVal::Null),
},
}
P2RuleOp::Eq => {
let a = stack.pop().unwrap();
let b = stack.pop().unwrap();
@ -494,6 +550,7 @@ impl<'de> Deserialize<'de> for P2Rule {
}
}
// TODO: fix tests
// #[cfg(test)]
#[cfg(off)]
mod tests {
@ -720,4 +777,16 @@ mod tests {
}))
.unwrap_err();
}
#[test]
fn test_dyn() {
let rules = P2Rules::compile_value(serde_json::json!({
"rules": {
"a": "foo[user_id] && bar",
},
"actions": {}
}))
.unwrap();
panic!("{:?}", rules);
}
}