Compare commits

...

22 commits

Author SHA1 Message Date
2c49ddfcc0
BROKEN push last changes 2024-08-24 14:15:39 -07:00
df9251448b
Rework language (WIP) 2023-11-11 03:56:25 -08:00
6c4c73dd8f
more bugfixes 2023-10-13 15:37:26 -07:00
928bd3c08f
Box<str> 2023-10-10 00:18:32 -07:00
7ec1ec2473
Fix parsing bugs 2023-10-09 23:44:36 -07:00
fd8da62a88
redo json structure 2023-10-09 04:55:54 -07:00
1390287345
rewrite to s-expressions, add mark action 2023-10-09 04:32:52 -07:00
a88bcb4646
clean up code 2023-10-08 22:07:24 -07:00
2b1407d0f5
fix off by one error 2023-10-08 09:18:54 -07:00
1c6ac219a8
greater than and less than 2023-10-08 09:16:28 -07:00
e50e9af891
bugfixes, update readme 2023-10-08 07:19:58 -07:00
9ee44db6c9
fix panic 2023-10-07 13:44:55 -07:00
c83b552727
initial commit for the funny push rules 2023-10-07 13:22:26 -07:00
f77e26c525
Ran cargo fmt 2023-10-04 23:33:09 -07:00
8d7982900a
Fix table headers and pagination 2023-10-04 23:24:33 -07:00
84f259d45e
List pagination and html tables 2023-10-04 21:33:37 -07:00
755261e63f
Room directory admin commands 2023-10-04 20:46:12 -07:00
4c03aa80ab
reply to messages 2023-10-04 20:13:04 -07:00
910d166b5d
Admin room alias commands
- room alias set
- room alias remove
- room alias which
- room alias list
2023-10-03 20:42:31 -07:00
99fb0a2953
Add appservice show command to show config 2023-10-03 17:39:40 -07:00
b20f4d6305
Allow using languages in code blocks.
```yaml
This works now
```
2023-10-03 17:09:17 -07:00
032602de58
Rework admin commands to use subcommands.
This commit doesn't add, remove, or change any
commands, it only organizes them
2023-10-03 16:52:52 -07:00
24 changed files with 2900 additions and 1332 deletions

1168
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -103,6 +103,7 @@ lazy_static = "1.4.0"
async-trait = "0.1.68"
sd-notify = { version = "0.4.1", optional = true }
bumpalo = { version = "3.14.0", features = ["collections"] }
[target.'cfg(unix)'.dependencies]
nix = { version = "0.26.2", features = ["resource"] }

130
README.md
View file

@ -1,4 +1,134 @@
# Conduit
## The p2 experiment
p2 (aka push v2, or piston) is an experimental push rule parser designed
to be faster and flexible-er than the current push rule system!
```json
{
"m.unread": {
"rule": "event.type in ['m.room.message', 'm.room.encrypted', 'm.room.name', 'm.room.topic'] && !('m.new_content' in event.content) && !(event.sender in account_data.'m.ignored_users')",
"actions": [],
"enabled": true
},
"m.highlight": {
"rule": "@'m.unread' && (user_id in event.content.'m.mentions'.user_ids || (event.content.'m.mentions'.room && can_notify('room')))",
"actions": ["notify", "store", "mark"],
"enabled": true
},
"m.notify": {
"rule": "@'m.highlight' || @'m.room' || 'm.direct' in state.'m.purpose'.purposes",
"actions": ["notify"],
"enabled": true
},
"m.invite": {
"rule": "event.type == 'm.room.member' && event.state_key == user_id && event.content.membership == 'invite'",
"actions": ["notify"],
"enabled": true
}
}
```
### what?
p2 is composed of **counters**, **rules**, and **actions**. Each
counter has one rule and n actions. In the above config, there are four
counters. When a rule matches an event, that counter is incremented.
Actions specify what actions the server should take when a counter is
incremented. Currently, there is `notify` for sending a push notification,
`store` for storing notifications in a list for later (`/notifications`),
and `mark` for adding a mark to an event's unsigned content. Perhaps
`count` could be split out from counters into its own action later on.
Instead of having a separate set of apis and `/sync` field for
notify/highlight counts, p2 takes place entirely in account data. Rules
are specified via the `p2.rules` event ~~and unread/notify counts are set
by the server in `p2.counts`.~~ Update: to fix threads and a few other
issues, counters are moved back into unread_counts. Per-room account
data can override global push rules.
The rules aren't mean to be machine-edited - make them read from account
data instead.
This implementation of p2 takes the ruleset and precompiles it to
stack-based bytecode-ish intermediate representation, which is then
evaluated on all new events. It lazy loads state events and account
data. Most of the code is in a [single file](src/utils/p2.rs) currently.
### why?
Matrix push rules in their current state are a crime against
humanity. They're overengineered, yet aren't able to do things like
telling you when a rooms has unread messages. Rules are evaluated in
an awful home grown json thing that is not jsonpath nor jsonschema
but more limited than either.
All rules go into a single giant mess of an object - per room
account data apparently doesn't exist. Rules are split into
override/sender/content/room/underride for no rhyme or reason. Want a
room to be muted? That rule goes in "override". Want a room to notify
for all messages? That rule goes in "rooms"!
Rules are hardcoded to either be `notify` or `highlight`. There's no way
to make custom/extensible rules, so counters/rules that don't exist in
spec (ie. `unread`) can't be implemented or worked around.
### why not?
The most obvious reason: this breaks compatibility with everything else in
the matrix ecosystem, so it requires forking and modifying clients. This
takes varying amounts of effort and can become incompatible or out of
sync with upstream.
In terms of implementation, this system is very much a work in progress
and syntax/rules may change and break. The code is a mess, and doesn't
properly handle read receipts nor does it atomically update the
`p2.counts` config event. It also isn't very performant in its current
state, needing an extra db lookup for every event to check for marks.
This system requires `m.mentions` to be in plaintext, which some people
may not want.
### spec?
This is a custom system that exists outside of the official matrix
client-server spec. If a version of p2 gets merged into the official
spec it would be wonderful, but unfortunately it would be a pipe dream.
### rant
Matrix is has *so much* potential as a general purpose protocol or
event an instant messaging protocol, but so many features seem to be
poorly designed. There are so many rough edges that it's hard for me to
recommend it to anyone who isn't willing to spend a bit of time working
around its idiosyncraties.
I strongly believe `m.mentions` shouldn't have been encrypted, as a
form of it is functionally *required* by servers to accurately send
push notifications. It's frustrating that mentions are critical to
push notifications, but are one of the only pieces of metadata that
are encrypted. Unfortunately, [mentions probably won't be moved to
plaintext](https://github.com/matrix-org/matrix-spec-proposals/pull/3952).
I know I should probably be happy that matrix is fixing its chronically
bad plaintext leaks, but this really shouldn't have been done.
On the other hand, state events like name/topic/avatar, which if
encrypted will have difficulties with `/hierarchy` or summaries, aren't
as critical or can be worked around. Reactions are also plaintext, though
the reaction keys could be encrypted. Room member nicknames/avatars are
stored as plaintext, assuming they aren't clobbered when a user changes
their global displayname/avatar_url.
All these little things add up to a frustrating experience. Despite
all that, I'm very hopeful for matrix and it's probably the best option
that exists.
---
original readme below:
### A Matrix homeserver written in Rust
#### What is Matrix?

View file

@ -1,4 +1,4 @@
use crate::{services, Error, Result, Ruma};
use crate::{services, Error, Result, Ruma, utils::p2::{P2Rules, P2Counters}};
use ruma::{
api::client::{
config::{
@ -26,6 +26,14 @@ pub async fn set_global_account_data_route(
let event_type = body.event_type.to_string();
match event_type.as_str() {
"p2.rules" => {
P2Rules::compile_value(data.clone())?;
services().rooms.timeline.p2_uncache_rules(None, &sender_user)?;
}
_ => (),
}
services().account_data.update(
None,
sender_user,
@ -51,6 +59,18 @@ pub async fn set_room_account_data_route(
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Data is invalid."))?;
let event_type = body.event_type.to_string();
match event_type.as_str() {
"p2.rules" => {
P2Rules::compile_value(data.clone())?;
services().rooms.timeline.p2_uncache_rules(Some(&body.room_id), &sender_user)?;
}
_ => (),
}
if event_type == "org.celery.p2.rules" {
let _rules = P2Rules::compile_value(data.clone())?;
}
services().account_data.update(
Some(&body.room_id),

View file

@ -430,3 +430,13 @@ pub async fn set_pushers_route(
Ok(set_pusher::v3::Response::default())
}
// TODO
// pub async fn p2_list_notifications_route(
// body: Ruma<()>
// ) -> Result<()> {
// let Some(user) = body.sender_user.as_ref() else {
// return Error(Error::BadRequest(ErrorKind::Unknown, ))
// }
// Ok(())
// }

View file

@ -57,4 +57,28 @@ impl service::rooms::alias::Data for KeyValueDatabase {
.map_err(|_| Error::bad_database("Invalid alias in aliasid_alias."))
}))
}
fn all_local_aliases<'a>(
&'a self,
) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, String)>> + 'a> {
Box::new(
self.alias_roomid
.iter()
.map(|(room_alias_bytes, room_id_bytes)| {
let room_alias_localpart = utils::string_from_bytes(&room_alias_bytes)
.map_err(|_| {
Error::bad_database("Invalid alias bytes in aliasid_alias.")
})?;
let room_id = utils::string_from_bytes(&room_id_bytes)
.map_err(|_| {
Error::bad_database("Invalid room_id bytes in aliasid_alias.")
})?
.try_into()
.map_err(|_| Error::bad_database("Invalid room_id in aliasid_alias."))?;
Ok((room_id, room_alias_localpart))
}),
)
}
}

View file

@ -1,150 +0,0 @@
use std::mem;
use ruma::{
events::receipt::ReceiptEvent, serde::Raw, CanonicalJsonObject, OwnedUserId, RoomId, UserId,
};
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
impl service::rooms::edus::read_receipt::Data for KeyValueDatabase {
fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
event: ReceiptEvent,
) -> Result<()> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
let mut last_possible_key = prefix.clone();
last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes());
// Remove old entry
if let Some((old, _)) = self
.readreceiptid_readreceipt
.iter_from(&last_possible_key, true)
.take_while(|(key, _)| key.starts_with(&prefix))
.find(|(key, _)| {
key.rsplit(|&b| b == 0xff)
.next()
.expect("rsplit always returns an element")
== user_id.as_bytes()
})
{
// This is the old room_latest
self.readreceiptid_readreceipt.remove(&old)?;
}
let mut room_latest_id = prefix;
room_latest_id.extend_from_slice(&services().globals.next_count()?.to_be_bytes());
room_latest_id.push(0xff);
room_latest_id.extend_from_slice(user_id.as_bytes());
self.readreceiptid_readreceipt.insert(
&room_latest_id,
&serde_json::to_vec(&event).expect("EduEvent::to_string always works"),
)?;
Ok(())
}
fn readreceipts_since<'a>(
&'a self,
room_id: &RoomId,
since: u64,
) -> Box<
dyn Iterator<
Item = Result<(
OwnedUserId,
u64,
Raw<ruma::events::AnySyncEphemeralRoomEvent>,
)>,
> + 'a,
> {
let mut prefix = room_id.as_bytes().to_vec();
prefix.push(0xff);
let prefix2 = prefix.clone();
let mut first_possible_edu = prefix.clone();
first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since
Box::new(
self.readreceiptid_readreceipt
.iter_from(&first_possible_edu, false)
.take_while(move |(k, _)| k.starts_with(&prefix2))
.map(move |(k, v)| {
let count = utils::u64_from_bytes(
&k[prefix.len()..prefix.len() + mem::size_of::<u64>()],
)
.map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?;
let user_id = UserId::parse(
utils::string_from_bytes(&k[prefix.len() + mem::size_of::<u64>() + 1..])
.map_err(|_| {
Error::bad_database("Invalid readreceiptid userid bytes in db.")
})?,
)
.map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?;
let mut json =
serde_json::from_slice::<CanonicalJsonObject>(&v).map_err(|_| {
Error::bad_database(
"Read receipt in roomlatestid_roomlatest is invalid json.",
)
})?;
json.remove("room_id");
Ok((
user_id,
count,
Raw::from_json(
serde_json::value::to_raw_value(&json)
.expect("json is valid raw value"),
),
))
}),
)
}
fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> {
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(user_id.as_bytes());
self.roomuserid_privateread
.insert(&key, &count.to_be_bytes())?;
self.roomuserid_lastprivatereadupdate
.insert(&key, &services().globals.next_count()?.to_be_bytes())
}
fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(user_id.as_bytes());
self.roomuserid_privateread
.get(&key)?
.map_or(Ok(None), |v| {
Ok(Some(utils::u64_from_bytes(&v).map_err(|_| {
Error::bad_database("Invalid private read marker bytes")
})?))
})
}
fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(user_id.as_bytes());
Ok(self
.roomuserid_lastprivatereadupdate
.get(&key)?
.map(|bytes| {
utils::u64_from_bytes(&bytes).map_err(|_| {
Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.")
})
})
.transpose()?
.unwrap_or(0))
}
}

View file

@ -1,11 +1,18 @@
use std::{collections::hash_map, mem::size_of, sync::Arc};
use std::{collections::hash_map, mem::size_of, sync::Arc, ops::DerefMut};
use ruma::{
api::client::error::ErrorKind, CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId,
};
use serde::Deserialize;
use tracing::error;
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
use crate::{
database::KeyValueDatabase,
service::{self, globals::Data},
services,
utils::{self, p2::P2Rules},
Error, PduEvent, Result,
};
use service::rooms::timeline::PduCount;
@ -245,6 +252,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
pdu.remove_transaction_id()?;
}
pdu.add_age()?;
pdu.p2_transpose_marks(&user_id)?;
let count = pdu_count(&pdu_id)?;
Ok((count, pdu))
}),
@ -272,6 +280,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
pdu.remove_transaction_id()?;
}
pdu.add_age()?;
pdu.p2_transpose_marks(&user_id)?;
let count = pdu_count(&pdu_id)?;
Ok((count, pdu))
}),
@ -305,6 +314,124 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
.increment_batch(&mut highlights_batch.into_iter())?;
Ok(())
}
// FIXME: not transactional (can clobber user stuff), lots of tiny
// but amplified writes, and a meanace to society in general
fn p2_increment_counters(
&self,
room_id: &RoomId,
counters: Vec<(OwnedUserId, Vec<Box<str>>)>,
) -> Result<()> {
let mut keys = vec![];
for (user_id, names) in counters {
for name in names {
let mut key = room_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(user_id.as_bytes());
key.push(0xff);
key.extend_from_slice(name.as_bytes());
keys.push(key);
}
}
self.p2_userroomidcounter_count.increment_batch(&mut keys.into_iter())?;
Ok(())
}
fn p2_add_store(
&self,
event_id: &EventId,
users: Vec<(OwnedUserId, Vec<Box<str>>)>,
) -> Result<()> {
let mut store_ids: Vec<Vec<u8>> = vec![];
let count = self.next_count()?.to_be_bytes();
for (user_id, counters) in users {
for counter in counters {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(counter.as_bytes());
key.push(0xff);
key.extend_from_slice(&count);
key.push(0xff);
key.extend_from_slice(event_id.as_bytes());
store_ids.push(key);
}
}
let mut store_iter = store_ids.into_iter().zip(std::iter::repeat_with(Vec::new));
self.p2_storeid.insert_batch(&mut store_iter)?;
Ok(())
}
fn p2_get_rules(&self, room_id: &RoomId, user_id: &UserId) -> Result<Arc<P2Rules>> {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(room_id.as_bytes());
if let Some(rules) = self
.p2_userroomid_rules_cache
.lock()
.unwrap()
.get_mut(key.as_slice())
{
return Ok(rules.clone());
}
#[derive(Default, Deserialize)]
struct P2RulesWrapper {
content: P2Rules,
}
let room_rules = services()
.account_data
.get(Some(room_id), user_id, "p2.rules".into())?
.map(|ev| serde_json::from_str::<P2RulesWrapper>(ev.get()).unwrap_or_default())
.map(|wrap| wrap.content);
let global_rules = services()
.account_data
.get(None, user_id, "p2.rules".into())?
.map(|ev| serde_json::from_str::<P2RulesWrapper>(ev.get()).unwrap_or_default())
.map(|wrap| wrap.content);
let rules = Arc::new(match (room_rules, global_rules) {
(Some(room), Some(global)) => room.merge(global),
(None, Some(rules)) => rules,
(Some(rules), None) => rules,
(None, None) => P2Rules::default(),
});
self.p2_userroomid_rules_cache
.lock()
.unwrap()
.insert(key.into_boxed_slice(), rules.clone());
Ok(rules)
}
fn p2_uncache_rules(&self, room_id: Option<&RoomId>, user_id: &UserId) -> Result<()> {
if let Some(room_id) = room_id {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(room_id.as_bytes());
self.p2_userroomid_rules_cache
.lock()
.unwrap()
.remove(key.as_slice());
} else {
let mut key = user_id.as_bytes().to_vec();
key.push(0xff);
let mut cache = self.p2_userroomid_rules_cache.lock().unwrap();
let mut tmp = lru_cache::LruCache::new(cache.capacity());
std::mem::swap(cache.deref_mut(), &mut tmp);
cache.extend(tmp.into_iter().filter(|(k, _)| !k.starts_with(&key)));
}
Ok(())
}
}
/// Returns the `count` of this pdu's id.

View file

@ -2,7 +2,7 @@ pub mod abstraction;
pub mod key_value;
use crate::{
service::rooms::timeline::PduCount, services, utils, Config, Error, PduEvent, Result, Services,
service::rooms::timeline::PduCount, services, utils::{self, p2::P2Rules}, Config, Error, PduEvent, Result, Services,
SERVICES,
};
use abstraction::{KeyValueDatabaseEngine, KvTree};
@ -108,6 +108,9 @@ pub struct KeyValueDatabase {
pub(super) userroomid_notificationcount: Arc<dyn KvTree>, // NotifyCount = u64
pub(super) userroomid_highlightcount: Arc<dyn KvTree>, // HightlightCount = u64
pub(super) roomuserid_lastnotificationread: Arc<dyn KvTree>, // LastNotificationRead = u64
pub(super) p2_userroomidcounter_count: Arc<dyn KvTree>,
pub(super) p2_storeid: Arc<dyn KvTree>, // StoreId = UserId + Counter + Count + EventId
/// Remember the current state hash of a room.
pub(super) roomid_shortstatehash: Arc<dyn KvTree>,
@ -172,6 +175,7 @@ pub struct KeyValueDatabase {
pub(super) our_real_users_cache: RwLock<HashMap<OwnedRoomId, Arc<HashSet<OwnedUserId>>>>,
pub(super) appservice_in_room_cache: RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>,
pub(super) lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, PduCount>>,
pub(super) p2_userroomid_rules_cache: Mutex<LruCache<Box<[u8]>, Arc<P2Rules>>>,
}
impl KeyValueDatabase {
@ -336,6 +340,9 @@ impl KeyValueDatabase {
userroomid_notificationcount: builder.open_tree("userroomid_notificationcount")?,
userroomid_highlightcount: builder.open_tree("userroomid_highlightcount")?,
roomuserid_lastnotificationread: builder.open_tree("userroomid_highlightcount")?,
p2_storeid: builder.open_tree("p2_storeid")?,
p2_userroomidcounter_count: builder.open_tree("p2_userroomidcounter_count")?,
statekey_shortstatekey: builder.open_tree("statekey_shortstatekey")?,
shortstatekey_statekey: builder.open_tree("shortstatekey_statekey")?,
@ -397,6 +404,9 @@ impl KeyValueDatabase {
our_real_users_cache: RwLock::new(HashMap::new()),
appservice_in_room_cache: RwLock::new(HashMap::new()),
lasttimelinecount_cache: Mutex::new(HashMap::new()),
p2_userroomid_rules_cache: Mutex::new(LruCache::new(
(100_000.0 * config.conduit_cache_capacity_modifier) as usize,
)),
});
let db = Box::leak(db_raw);

View file

@ -435,6 +435,10 @@ fn routes() -> Router {
"/_matrix/client/v3/rooms/:room_id/initialSync",
get(initial_sync),
)
// .route(
// "/_matrix/client/p2.0/notifications",
// get(client_server::p2_list_notifications_route),
// )
.route("/", get(it_works))
.fallback(not_found)
}

File diff suppressed because it is too large Load diff

View file

@ -14,7 +14,11 @@ use serde_json::{
json,
value::{to_raw_value, RawValue as RawJsonValue},
};
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};
use std::{
cmp::Ordering,
collections::{BTreeMap, HashMap},
sync::Arc,
};
use tracing::warn;
/// Content hashes of a PDU.
@ -116,6 +120,27 @@ impl PduEvent {
Ok(())
}
pub fn p2_transpose_marks(&mut self, user_id: &UserId) -> crate::Result<()> {
let mut unsigned: BTreeMap<String, Box<RawJsonValue>> = self
.unsigned
.as_ref()
.map_or_else(|| Ok(BTreeMap::new()), |u| serde_json::from_str(u.get()))
.map_err(|_| Error::bad_database("Invalid unsigned in pdu event"))?;
let Some(all_marks): Option<HashMap<OwnedUserId, Vec<Box<str>>>> = unsigned.remove("p2.marks.all_users")
.map(|v| serde_json::from_str(v.get()).ok()).flatten() else {
return Ok(())
};
if let Some(marks) = all_marks.get(user_id) {
unsigned.insert("p2.marks".to_owned(), to_raw_value(marks).unwrap());
}
self.unsigned = Some(to_raw_value(&unsigned).expect("unsigned is valid"));
Ok(())
}
#[tracing::instrument(skip(self))]
pub fn to_sync_room_event(&self) -> Raw<AnySyncTimelineEvent> {
let mut json = json!({

View file

@ -16,4 +16,9 @@ pub trait Data: Send + Sync {
&'a self,
room_id: &RoomId,
) -> Box<dyn Iterator<Item = Result<OwnedRoomAliasId>> + 'a>;
/// Returns all local aliases on the server
fn all_local_aliases<'a>(
&'a self,
) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, String)>> + 'a>;
}

View file

@ -32,4 +32,11 @@ impl Service {
) -> Box<dyn Iterator<Item = Result<OwnedRoomAliasId>> + 'a> {
self.db.local_aliases_for_room(room_id)
}
#[tracing::instrument(skip(self))]
pub fn all_local_aliases<'a>(
&'a self,
) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, String)>> + 'a> {
self.db.all_local_aliases()
}
}

View file

@ -1,36 +0,0 @@
use crate::Result;
use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId};
pub trait Data: Send + Sync {
/// Replaces the previous read receipt.
fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
event: ReceiptEvent,
) -> Result<()>;
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
fn readreceipts_since<'a>(
&'a self,
room_id: &RoomId,
since: u64,
) -> Box<
dyn Iterator<
Item = Result<(
OwnedUserId,
u64,
Raw<ruma::events::AnySyncEphemeralRoomEvent>,
)>,
> + 'a,
>;
/// Sets a private read marker at `count`.
fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()>;
/// Returns the private read marker.
fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>>;
/// Returns the count of the last typing update in this room.
fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;
}

View file

@ -1,55 +0,0 @@
mod data;
pub use data::Data;
use crate::Result;
use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId};
pub struct Service {
pub db: &'static dyn Data,
}
impl Service {
/// Replaces the previous read receipt.
pub fn readreceipt_update(
&self,
user_id: &UserId,
room_id: &RoomId,
event: ReceiptEvent,
) -> Result<()> {
self.db.readreceipt_update(user_id, room_id, event)
}
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
#[tracing::instrument(skip(self))]
pub fn readreceipts_since<'a>(
&'a self,
room_id: &RoomId,
since: u64,
) -> impl Iterator<
Item = Result<(
OwnedUserId,
u64,
Raw<ruma::events::AnySyncEphemeralRoomEvent>,
)>,
> + 'a {
self.db.readreceipts_since(room_id, since)
}
/// Sets a private read marker at `count`.
#[tracing::instrument(skip(self))]
pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> {
self.db.private_read_set(room_id, user_id, count)
}
/// Returns the private read marker.
#[tracing::instrument(skip(self))]
pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
self.db.private_read_get(room_id, user_id)
}
/// Returns the count of the last typing update in this room.
pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
self.db.last_privateread_update(user_id, room_id)
}
}

View file

@ -2,7 +2,7 @@ use std::sync::Arc;
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId};
use crate::{PduEvent, Result};
use crate::{PduEvent, Result, utils::p2::P2Rules};
use super::PduCount;
@ -88,4 +88,32 @@ pub trait Data: Send + Sync {
notifies: Vec<OwnedUserId>,
highlights: Vec<OwnedUserId>,
) -> Result<()>;
// this should probably be split into its own service
/// increment counters for rooms and users
fn p2_increment_counters(
&self,
room_id: &RoomId,
counters: Vec<(OwnedUserId, Vec<Box<str>>)>,
) -> Result<()>;
/// store event ids for /notifications
fn p2_add_store(
&self,
event_id: &EventId,
users: Vec<(OwnedUserId, Vec<Box<str>>)>,
) -> Result<()>;
fn p2_get_rules(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Arc<P2Rules>>;
fn p2_uncache_rules(
&self,
room_id: Option<&RoomId>,
user_id: &UserId,
) -> Result<()>;
}

View file

@ -38,7 +38,9 @@ use tracing::{error, info, warn};
use crate::{
api::server_server,
service::pdu::{EventHash, PduBuilder},
services, utils, Error, PduEvent, Result,
services,
utils::{self, p2::P2Action},
Error, PduEvent, Result,
};
use super::state_compressor::CompressedStateEvent;
@ -270,39 +272,6 @@ impl Service {
.state
.set_forward_extremities(&pdu.room_id, leaves, state_lock)?;
let mutex_insert = Arc::clone(
services()
.globals
.roomid_mutex_insert
.write()
.unwrap()
.entry(pdu.room_id.clone())
.or_default(),
);
let insert_lock = mutex_insert.lock().unwrap();
let count1 = services().globals.next_count()?;
// Mark as read first so the sending client doesn't get a notification even if appending
// fails
services()
.rooms
.edus
.read_receipt
.private_read_set(&pdu.room_id, &pdu.sender, count1)?;
services()
.rooms
.user
.reset_notification_counts(&pdu.sender, &pdu.room_id)?;
let count2 = services().globals.next_count()?;
let mut pdu_id = shortroomid.to_be_bytes().to_vec();
pdu_id.extend_from_slice(&count2.to_be_bytes());
// Insert pdu
self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2)?;
drop(insert_lock);
// See if the event matches any known pushers
let power_levels: RoomPowerLevelsEventContent = services()
.rooms
@ -317,6 +286,8 @@ impl Service {
let sync_pdu = pdu.to_sync_room_event();
// legacy notification/highlight system
let mut notifies = Vec::new();
let mut highlights = Vec::new();
@ -372,15 +343,121 @@ impl Service {
if highlight {
highlights.push(user.clone());
}
for push_key in services().pusher.get_pushkeys(user) {
services().sending.send_push_pdu(&pdu_id, user, push_key?)?;
}
// pushers are now handled by p2, no need to duplicate notifications
}
self.db
.increment_notification_counts(&pdu.room_id, notifies, highlights)?;
// shiny new p2 notification system
eprintln!("check p2 stuff for pdu {}", pdu.event_id);
let mut p2_counters = Vec::new();
let mut p2_store = Vec::new();
let mut p2_marks = BTreeMap::new();
let mut p2_notify = Vec::new();
for user in services()
.rooms
.state_cache
.get_our_real_users(&pdu.room_id)?
.iter()
{
// Don't notify the user of their own events
if user == &pdu.sender {
continue;
}
eprintln!("check p2 stuff for user {}", user);
let rules = self.db.p2_get_rules(&pdu.room_id, user)?;
let matched = rules.eval(
&pdu.room_id,
user,
serde_json::to_value(&sync_pdu).expect("always valid"),
);
let should_notify = matched
.iter()
.any(|(_, actions)| actions.contains(&P2Action::Notify));
let counters_to_mark: Vec<_> = matched
.iter()
.filter(|(_, actions)| actions.contains(&P2Action::Mark))
.map(|(matched, _)| CanonicalJsonValue::String(matched.to_string()))
.collect();
let counters_to_store: Vec<_> = matched
.iter()
.filter(|(_, actions)| actions.contains(&P2Action::Store))
.map(|(matched, _)| matched.to_owned())
.collect();
if !counters_to_store.is_empty() {
p2_store.push((user.to_owned(), counters_to_store));
}
if !counters_to_mark.is_empty() {
p2_marks.insert(user.to_string(), CanonicalJsonValue::Array(counters_to_mark));
}
if !matched.is_empty() {
p2_counters.push((user.to_owned(), matched.into_keys().collect()));
}
if should_notify {
p2_notify.push(user.to_owned());
}
}
self.db.p2_add_store(&pdu.event_id, p2_store)?;
self.db.p2_increment_counters(&pdu.room_id, p2_counters)?;
if let CanonicalJsonValue::Object(unsigned) = pdu_json
.entry("unsigned".to_owned())
.or_insert_with(|| CanonicalJsonValue::Object(Default::default())) {
unsigned.insert("p2.marks.all_users".to_owned(), CanonicalJsonValue::Object(p2_marks));
}
let mutex_insert = Arc::clone(
services()
.globals
.roomid_mutex_insert
.write()
.unwrap()
.entry(pdu.room_id.clone())
.or_default(),
);
let insert_lock = mutex_insert.lock().unwrap();
let count1 = services().globals.next_count()?;
// Mark as read first so the sending client doesn't get a notification even if appending
// fails
services()
.rooms
.edus
.read_receipt
.private_read_set(&pdu.room_id, &pdu.sender, count1)?;
services()
.rooms
.user
.reset_notification_counts(&pdu.sender, &pdu.room_id)?;
let count2 = services().globals.next_count()?;
let mut pdu_id = shortroomid.to_be_bytes().to_vec();
pdu_id.extend_from_slice(&count2.to_be_bytes());
// Insert pdu
self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2)?;
drop(insert_lock);
for user_id in &p2_notify {
for push_key in services().pusher.get_pushkeys(user_id) {
services().sending.send_push_pdu(&pdu_id, user_id, push_key?)?;
}
}
match pdu.kind {
TimelineEventType::RoomRedaction => {
if let Some(redact_id) = &pdu.redacts {
@ -466,7 +543,7 @@ impl Service {
&& services().globals.emergency_password().is_none();
if to_conduit && !from_conduit && admin_room.as_ref() == Some(&pdu.room_id) {
services().admin.process_message(body);
services().admin.process_message(body, pdu.event_id.clone());
}
}
}
@ -1199,4 +1276,8 @@ impl Service {
info!("Prepended backfill pdu");
Ok(())
}
pub fn p2_uncache_rules(&self, room_id: Option<&RoomId>, user_id: &UserId) -> Result<()> {
self.db.p2_uncache_rules(room_id, user_id)
}
}

View file

@ -80,6 +80,8 @@ pub enum Error {
#[cfg(feature = "conduit_bin")]
#[error("{0}")]
PathError(#[from] axum::extract::rejection::PathRejection),
#[error("{0}")]
P2Error(#[from] crate::utils::p2::error::P2RuleError),
}
impl Error {

View file

@ -1,4 +1,5 @@
pub mod error;
pub mod p2;
use argon2::{Config, Variant};
use cmp::Ordering;

22
src/utils/p2/error.rs Normal file
View file

@ -0,0 +1,22 @@
use thiserror::Error;
#[derive(Debug, Error)]
pub enum P2RuleError {
#[error("invalid json: {0}")]
InvalidJson(serde_json::Error),
#[error("invalid character at position {0}")]
InvalidChar(usize),
#[error("found invalid token at position {0}")]
InvalidToken(usize),
#[error("unexpected eof")]
UnexpectedEOF,
#[error("unknown function name or bad arity (at `{0}(...)`)")]
BadCall(String),
#[error("dependency doesn't exist or forms a cycle")]
BadDeps,
}

268
src/utils/p2/eval.rs Normal file
View file

@ -0,0 +1,268 @@
use std::collections::HashSet;
use crate::{services, utils::p2::parser::{P2RuleData, P2RuleOp}};
use ruma::{RoomId, UserId};
use serde_json::Value as JVal;
use super::parser::P2Rule;
impl P2Rule {
/// evaluate a json event against this rule
// TODO: clone less
pub(super) fn eval(&self, room: &RoomId, user: &UserId, json: &JVal, matched: &HashSet<Box<str>>) -> bool {
#[derive(Debug, Clone)]
enum Value {
Boolean(bool),
Int(i64),
Json(JVal),
Set(Vec<Value>),
AccountData,
RoomState(Option<String>),
}
impl Value {
fn null() -> Value {
Value::Json(JVal::Null)
}
fn unbox_bool(&self) -> Option<bool> {
match self {
Self::Boolean(true) => Some(true),
Self::Boolean(false) => Some(false),
Self::Json(JVal::Null) => Some(false), // stop null from poisoning (otherwise `foo.bar || true` is false if foo.bar is null)
Self::Json(JVal::Bool(b)) => Some(*b),
_ => None,
}
}
fn unbox_int(&self) -> Option<i64> {
match self {
Self::Json(JVal::Number(i)) => {
Some(i.as_i64().expect("canonical json enforces integers"))
}
Self::Int(i) => Some(*i),
_ => None,
}
}
fn unbox_str(&self) -> Option<&str> {
match self {
Self::Json(JVal::String(s)) => Some(s),
_ => None,
}
}
}
impl PartialEq for Value {
fn eq(&self, other: &Self) -> bool {
// FIXME: sets?
match (self, other) {
(Value::Json(a), Value::Json(b)) => a == b,
(a, b) => {
(a.unbox_bool() == b.unbox_bool() && a.unbox_bool().is_some())
|| (a.unbox_int() == b.unbox_int() && a.unbox_int().is_some())
}
}
}
}
let mut stack: Vec<Value> = Vec::new();
macro_rules! unboxed {
($unbox:ident, $expr:tt) => {{
let a = stack.pop().unwrap().$unbox();
let b = stack.pop().unwrap().$unbox();
match (a, b) {
(Some(a), Some(b)) => Value::Boolean(a $expr b),
(_, _) => Value::Json(JVal::Null),
}
}};
}
for op in &self.ops {
let next = match op {
P2RuleOp::Data(idx) => {
match self.data.get(*idx).unwrap() {
P2RuleData::String(s) => {
let val = serde_json::Value::String(s.to_string());
Value::Json(val)
}
P2RuleData::Integer(int) => Value::Int(*int),
P2RuleData::Ident(ident) => match ident.as_ref() {
"event" => Value::Json(json.clone()),
"account_data" => Value::AccountData,
"state" => Value::RoomState(None),
"user_id" => Value::Json(JVal::String(user.to_string())),
_ => Value::null(),
}
}
}
P2RuleOp::Get => {
enum Get {
Str(String),
Int(i64),
}
let g = match stack.pop().unwrap() {
Value::Json(JVal::String(s)) => Get::Str(s),
Value::Json(JVal::Number(i)) => {
Get::Int(i.as_i64().expect("enforced by canonical json"))
}
Value::Int(i) => Get::Int(i),
_ => {
stack.push(Value::null());
continue;
}
};
match (g, stack.pop().unwrap()) {
(Get::Str(s), Value::Json(JVal::Object(obj))) => {
Value::Json(obj.get(&s).cloned().unwrap_or(JVal::Null))
}
(Get::Int(i), Value::Json(JVal::Array(arr))) => match i.try_into() {
Ok(u) => Value::Json(arr.get::<usize>(u).cloned().unwrap_or(JVal::Null)),
Err(_) => Value::null(),
},
(Get::Int(i), Value::Set(items)) => match i.try_into() {
Ok(idx) => items.get::<usize>(idx).cloned().unwrap_or_else(Value::null),
Err(_) => Value::null(),
},
(Get::Str(s), Value::AccountData) => {
let data_type =
ruma::events::RoomAccountDataEventType::from(s.to_string());
let data = services()
.account_data
.get(Some(room), user, data_type.clone())
.ok()
.flatten()
.or_else(|| {
services()
.account_data
.get(None, user, data_type)
.ok()
.flatten()
})
.map(|v| {
serde_json::from_str(v.get())
.expect("database has invalid data")
})
.unwrap_or(JVal::Null);
Value::Json(data)
}
(Get::Str(s), Value::RoomState(None)) => Value::RoomState(Some(s.to_string())),
(Get::Str(s), Value::RoomState(Some(state_key))) => {
let ev = services()
.rooms
.state_accessor
.room_state_get(room, &s.to_owned().into(), &state_key)
.ok()
.flatten()
.map(|ev| serde_json::from_str(ev.content.get()).ok())
.flatten()
.unwrap_or(JVal::Null);
Value::Json(ev)
}
(_, _) => Value::null(),
}
}
P2RuleOp::Eq => Value::Boolean(stack.pop().unwrap() == stack.pop().unwrap()),
P2RuleOp::Neq => Value::Boolean(stack.pop().unwrap() != stack.pop().unwrap()),
P2RuleOp::Not => match stack.pop().unwrap().unbox_bool() {
Some(b) => Value::Boolean(!b),
None => Value::null(),
},
P2RuleOp::And => unboxed!(unbox_bool, &&),
P2RuleOp::Or => unboxed!(unbox_bool, ||),
P2RuleOp::Gt => unboxed!(unbox_int, >),
P2RuleOp::Lt => unboxed!(unbox_int, <),
P2RuleOp::Gte => unboxed!(unbox_int, >=),
P2RuleOp::Lte => unboxed!(unbox_int, <=),
// maybe, maybe not? regex would be better, but spec doesn't have regex
// P2RuleOp::Like => {
// let item = stack.pop().unwrap();
// let pattern = stack.pop().unwrap();
// match (item, pattern) {
// (Value::Json(JVal::String(text)), Value::Json(JVal::String(pat))) => {
// lazy_static! {
// static ref GLOB: regex::Regex = rgx!(r"_\*");
// }
// let reg = regex::escape(&pat)
// .replace("*", ".*")
// .replace("?", ".?");
// Value::Boolean(rgx!(&reg).is_match(text))
// }
// (_, _) => Value::Boolean(false),
// }
// },
P2RuleOp::Coalesce => todo!(),
// P2RuleOp::Coalesce(n) => stack
// .drain(stack.len() - n..)
// .reduce(|prev, val| match prev {
// Value::Json(JVal::Null) => val,
// not_null => not_null,
// })
// .unwrap(),
P2RuleOp::CanNotify => {
use ruma::events::{
room::power_levels::RoomPowerLevelsEventContent, StateEventType,
};
let key = stack.pop().unwrap();
let key = match key.unbox_str() {
Some(s) => s,
None => {
stack.push(Value::Boolean(false));
continue;
},
};
let power_levels: RoomPowerLevelsEventContent = services()
.rooms
.state_accessor
.room_state_get(room, &StateEventType::RoomPowerLevels, "")
.ok()
.flatten()
.map(|ev| serde_json::from_str(ev.content.get()).ok())
.flatten()
.unwrap_or_default();
let user_level: i64 = (*power_levels.users.get(user)
.unwrap_or_else(|| &power_levels.users_default))
.into();
let notif_level: i64 = power_levels.notifications.get(key)
.map(|&i| i.into())
.unwrap_or(50);
Value::Boolean(user_level >= notif_level)
}
P2RuleOp::Dataset(count) => {
let items: Vec<_> = stack.drain(stack.len() - count - 1..).collect();
Value::Set(items)
}
// check if a property is in an object or an item is in an array/set
P2RuleOp::In => match (stack.pop().unwrap(), stack.pop().unwrap()) {
(Value::Json(JVal::Array(a)), Value::Json(JVal::String(s2))) => {
Value::Boolean(a.iter().any(|v| v.as_str().is_some_and(|s1| s1 == s2)))
}
(int1, int2) if int1.unbox_int().is_some() && int2.unbox_int().is_some() => {
Value::Boolean(int1.unbox_int().unwrap() == int2.unbox_int().unwrap())
}
(Value::Json(JVal::Object(o)), Value::Json(JVal::String(s))) => {
Value::Boolean(o.contains_key(&s))
}
(_, _) => Value::Boolean(false),
},
P2RuleOp::Rule => {
let rule = stack.pop().unwrap();
let rule = rule.unbox_str().unwrap();
Value::Boolean(matched.contains(rule))
},
};
stack.push(next);
}
// only things that are explicitly false count as false
stack.pop().unwrap().unbox_bool().unwrap_or(true)
}
}

176
src/utils/p2/mod.rs Normal file
View file

@ -0,0 +1,176 @@
mod parser;
mod eval;
pub mod error;
use ruma::{RoomId, UserId};
use serde::{Deserialize, Serialize};
use serde_json::Value as JVal;
use std::collections::{HashMap, HashSet, VecDeque};
use parser::P2Rule;
use error::P2RuleError;
#[derive(Debug, Default, PartialEq, Eq)]
pub struct P2Rules {
entries: Vec<(Box<str>, P2Entry)>,
}
#[derive(Debug, Deserialize, PartialEq, Eq, Clone)]
pub struct P2Entry {
rule: P2Rule,
actions: Vec<P2Action>,
enabled: bool,
}
/*
endpoints (prefix = /_matrix/client/unstable/p2)
GET /_matrix/client/unstable/p2/inbox :: get events in an inbox
POST /_matrix/client/unstable/p2/inbox :: manually add an event to an inbox
DELETE /_matrix/client/unstable/p2/inbox/:eventid :: delete an event from an inbox
DELETE /_matrix/client/unstable/p2/inbox :: delete all events from an inbox
POST /_matrix/client/unstable/rooms/:roomid/ack :: reset (or manually set) counts in a room
POST /_matrix/client/unstable/ack_bulk :: reset (or manually set) counts in many rooms
*/
#[derive(Debug, Clone, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[serde(rename_all = "lowercase")]
pub enum P2Action {
/// increment a counter associated with this rule
Count,
/// send a push notification to the user
Notify,
/// store the event for later enumeration via `/inbox`
Store,
/// mark the event in unsigned.p2_matches
Mark,
/// a custom action that won't be handled by the server
Custom(Box<str>),
}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct P2Counters {
pub counters: HashMap<Box<str>, u64>,
#[serde(flatten)]
_other: HashMap<Box<str>, JVal>,
}
fn topological_sort(vec: &mut Vec<(Box<str>, P2Entry)>) -> bool {
let (mut queue, mut missing_deps): (VecDeque<_>, VecDeque<_>) = std::mem::take(vec)
.into_iter()
.partition(|(_, ent)| ent.rule.depends.is_empty());
while let Some(node) = queue.pop_front() {
vec.push(node);
let (has_deps, new_missing_deps) = missing_deps.into_iter().partition(|(_, other)| {
other.rule.depends.iter().all(|dep| vec.iter().any(|(name, _)| name == dep))
});
missing_deps = new_missing_deps;
queue.extend(has_deps);
}
missing_deps.is_empty()
}
impl P2Rules {
pub fn compile_value(v: JVal) -> Result<P2Rules, P2RuleError> {
let mut rules: P2Rules = serde_json::from_value(v).map_err(P2RuleError::InvalidJson)?;
if !topological_sort(&mut rules.entries) {
return Err(P2RuleError::BadDeps);
}
Ok(rules)
}
pub fn eval(
&self,
room: &RoomId,
user: &UserId,
json: JVal,
) -> HashMap<Box<str>, HashSet<P2Action>> {
// TODO: don't use duplicate structures
let mut matched = HashMap::new();
let mut matched_set = HashSet::new();
for (name, ent) in &self.entries {
if ent.rule.eval(room, user, &json, &matched_set) {
matched_set.insert(name.clone());
if ent.actions.is_empty() {
matched.insert(name.clone(), HashSet::new());
} else {
matched.insert(name.clone(), ent.actions.iter().cloned().collect());
}
}
}
matched
}
/// "overlay" another set of rules on top of these rules. if there's a conflict, our rules win.
pub fn merge(mut self, other: P2Rules) -> P2Rules {
for (name, rule) in other.entries {
if self.entries.iter().find(|(i, _)| *i == name).is_none() {
self.entries.push((name, rule));
}
}
if !topological_sort(&mut self.entries) {
// FIXME: global on its own is fine, room on its own is fine,
// but it may be possible to get a cycle when they're merged
}
self
}
}
impl<'de> Deserialize<'de> for P2Rules {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize)]
pub struct FakeP2Rules {
rules: HashMap<Box<str>, P2Entry>,
}
let fake = FakeP2Rules::deserialize(deserializer)?;
let mut rules = P2Rules {
entries: fake.rules.into_iter().collect(),
};
if !topological_sort(&mut rules.entries) {
return Err(serde::de::Error::custom::<P2RuleError>(P2RuleError::BadDeps));
}
Ok(rules)
}
}
impl<'de> Deserialize<'de> for P2Action {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
// this is a hacky mess but until #912 is closed it's necessary
// https://github.com/serde-rs/serde/issues/912
#[derive(Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum FakeP2Action {
Notify,
Store,
Mark,
}
#[derive(Deserialize)]
#[serde(untagged)]
pub enum FakeP2ActionWrapper {
Known(FakeP2Action),
Unknown(Box<str>),
}
let fake = FakeP2ActionWrapper::deserialize(deserializer)?;
Ok(match fake {
FakeP2ActionWrapper::Known(FakeP2Action::Notify) => P2Action::Notify,
FakeP2ActionWrapper::Known(FakeP2Action::Store) => P2Action::Store,
FakeP2ActionWrapper::Known(FakeP2Action::Mark) => P2Action::Mark,
FakeP2ActionWrapper::Unknown(val) => P2Action::Custom(val),
})
}
}

279
src/utils/p2/parser.rs Normal file
View file

@ -0,0 +1,279 @@
use std::{iter::Peekable, vec::IntoIter};
// use crate::utils::p2::error::P2RuleError;
use super::P2RuleError;
use lazy_static::lazy_static;
use serde::Deserialize;
macro_rules! rgx {
($pat:expr) => {
regex::RegexBuilder::new($pat)
.case_insensitive(true)
.dot_matches_new_line(true)
.build()
.expect("regex is always valid")
};
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct P2Rule {
pub(super) ops: Vec<P2RuleOp>,
pub(super) data: Vec<P2RuleData>,
pub(super) depends: Vec<Box<str>>,
}
#[derive(Debug, PartialEq, Eq, Clone)]
#[rustfmt::skip_fmt]
pub enum P2RuleOp {
Data(usize),
Dataset(usize),
Get,
Rule,
And, Or, Not, Coalesce,
Eq, Neq, Gt, Lt, Gte, Lte,
// Like, // glob?
// Match, // regex?
In, CanNotify,
}
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum P2RuleData {
Ident(Box<str>),
String(Box<str>),
Integer(i64),
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
enum P2TokenType {
String,
Integer,
Operator,
Rule,
Ident,
OpenParan,
CloseParan,
OpenBracket,
CloseBracket,
Space,
}
type P2Token<'a> = (P2TokenType, Option<&'a str>, usize);
fn get_binding(op: &str) -> (u32, u32) {
match op {
"." => (10, 11),
"!" => (8, 0),
"&&" => (6, 7),
"||" => (4, 5),
"==" => (2, 3),
"!=" => (2, 3),
">" => (2, 3),
"<" => (2, 3),
">=" => (2, 3),
"<=" => (2, 3),
"in" => (0, 1),
_ => unreachable!(),
}
}
fn get_op(op: &str) -> P2RuleOp {
match op {
"." => P2RuleOp::Get,
"!" => P2RuleOp::Not,
"&&" => P2RuleOp::And,
"||" => P2RuleOp::Or,
"==" => P2RuleOp::Eq,
"!=" => P2RuleOp::Neq,
">" => P2RuleOp::Gt,
"<" => P2RuleOp::Lt,
">=" => P2RuleOp::Gte,
"<=" => P2RuleOp::Lte,
"in" => P2RuleOp::In,
_ => unreachable!(),
}
}
impl P2Rule {
/// Take an expression and tokenize, parse, and generate bytecode
/// for it
pub fn parse(s: &str) -> Result<P2Rule, P2RuleError> {
let mut tokens = P2Rule::tokenize(s)?.into_iter().peekable();
let mut ops = vec![];
let mut data = vec![];
let mut depends = vec![];
P2Rule::generate(&mut tokens, &mut ops, &mut data, &mut depends, 0)?;
dbg!(&ops, &data, &depends);
match tokens.next() {
None => {},
Some((_, _, pos)) => return Err(P2RuleError::InvalidToken(pos)),
};
Ok(P2Rule { ops, data, depends })
}
fn tokenize<'a>(s: &'a str) -> Result<Vec<P2Token<'a>>, P2RuleError> {
lazy_static! {
static ref REGEXES: Vec<(P2TokenType, regex::Regex)> = vec![
(P2TokenType::Operator, rgx!(r"^(\.|&&|\|\||!|==|!=|>|<|>=|<=|in)")),
(P2TokenType::String, rgx!(r#"^"((:?[^\\]|\\.)+?)""#)),
(P2TokenType::String, rgx!(r"^'((:?[^\\]|\\.)+?)'")),
(P2TokenType::Integer, rgx!(r"^(-?[0-9]+?)")),
(P2TokenType::Ident, rgx!(r#"^([a-z0-9_-]+)"#)),
(P2TokenType::Rule, rgx!(r"^@")),
(P2TokenType::OpenParan, rgx!(r"^\(")),
(P2TokenType::CloseParan, rgx!(r"^\)")),
(P2TokenType::OpenBracket, rgx!(r"^\[")),
(P2TokenType::CloseBracket, rgx!(r"^\]")),
(P2TokenType::Space, rgx!(r"^\s+")),
];
}
let mut start = 0;
let mut tokens = vec![];
'search: while start < s.len() {
for (kind, regex) in REGEXES.iter() {
if let Some(found) = regex.captures(&s[start..]) {
let full = found.get(0).unwrap().as_str();
let capture = found.get(1).map(|c| c.as_str());
if *kind != P2TokenType::Space {
tokens.push((*kind, capture, start));
}
start += full.len();
continue 'search;
}
}
return Err(P2RuleError::InvalidChar(start));
}
Ok(tokens)
}
/// recursively walk through tokens, "unravelling" it into bytecode
fn generate(
tokens: &mut Peekable<IntoIter<P2Token<'_>>>,
ops: &mut Vec<P2RuleOp>,
data: &mut Vec<P2RuleData>,
depends: &mut Vec<Box<str>>,
parent_power: u32,
) -> Result<(), P2RuleError> {
match tokens.next() {
Some((P2TokenType::Ident, Some(ident), _)) => {
let idx = data.len();
data.push(P2RuleData::Ident(ident.to_string().into_boxed_str()));
ops.push(P2RuleOp::Data(idx));
// TODO: unhardcode
if tokens.next_if(|t| matches!(t, (P2TokenType::OpenParan, _, _))).is_some() {
if ident != "can_notify" {
return Err(P2RuleError::BadCall(ident.to_string()));
}
match tokens.next() {
Some((P2TokenType::CloseParan, _, _)) => {},
Some((_, _, pos)) => return Err(P2RuleError::InvalidToken(pos)),
None => return Err(P2RuleError::UnexpectedEOF),
}
}
}
Some((P2TokenType::String, Some(string), _)) => {
let idx = data.len();
data.push(P2RuleData::String(string.to_string().into_boxed_str()));
ops.push(P2RuleOp::Data(idx));
}
Some((P2TokenType::Integer, Some(int), _)) => {
let idx = data.len();
data.push(P2RuleData::Integer(int.parse().expect("regex only matches valid ints")));
ops.push(P2RuleOp::Data(idx));
}
Some((P2TokenType::OpenParan, None, _)) => {
P2Rule::generate(tokens, ops, data, depends, 0)?;
match tokens.next() {
Some((P2TokenType::CloseParan, _, _)) => {},
Some((_, _, pos)) => return Err(P2RuleError::InvalidToken(pos)),
None => return Err(P2RuleError::UnexpectedEOF),
}
}
Some((P2TokenType::OpenBracket, None, _)) => {
let mut set_size = 0;
loop {
match tokens.next() {
Some((P2TokenType::CloseBracket, None, _)) => break,
Some((_, _, _)) => P2Rule::generate(tokens, ops, data, depends, 0)?,
None => return Err(P2RuleError::UnexpectedEOF),
};
set_size += 1;
match tokens.next() {
Some((P2TokenType::CloseBracket, None, _)) => break,
Some((_, Some(","), _)) => {},
Some((_, _, pos)) => return Err(P2RuleError::InvalidToken(pos)),
None => return Err(P2RuleError::UnexpectedEOF),
};
}
ops.push(P2RuleOp::Dataset(set_size))
}
Some((P2TokenType::Rule, None, _)) => {
let rule = match tokens.next() {
Some((P2TokenType::String, Some(s), _)) => s,
Some((P2TokenType::Ident, Some(s), _)) => s,
Some((_, _, pos)) => return Err(P2RuleError::InvalidToken(pos)),
None => return Err(P2RuleError::UnexpectedEOF),
};
let bx = rule.to_string().into_boxed_str();
let idx = data.len();
data.push(P2RuleData::String(bx.clone()));
depends.push(bx);
ops.push(P2RuleOp::Data(idx));
ops.push(P2RuleOp::Rule);
}
Some((_, _, pos)) => return Err(P2RuleError::InvalidToken(pos)),
None => return Err(P2RuleError::UnexpectedEOF),
};
loop {
match tokens.peek() {
Some((P2TokenType::Operator, Some(op), _)) => {
let (lhs_power, rhs_power) = get_binding(op);
// if the op on the left binds more powerfully than this op, they win
if parent_power >= lhs_power {
break;
}
let op = op.clone();
tokens.next();
P2Rule::generate(tokens, ops, data, depends, rhs_power)?;
ops.push(get_op(&op));
}
Some((P2TokenType::CloseParan | P2TokenType::CloseBracket, _, _)) | None => {},
Some((_, _, pos)) => return Err(P2RuleError::InvalidToken(*pos)),
}
}
Ok(())
}
}
impl<'de> Deserialize<'de> for P2Rule {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct V;
impl serde::de::Visitor<'_> for V {
type Value = P2Rule;
fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(formatter, "valid push rule")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
P2Rule::parse(v).map_err(serde::de::Error::custom)
}
}
deserializer.deserialize_str(V)
}
}