Compare commits
22 commits
master
...
custom-pus
Author | SHA1 | Date | |
---|---|---|---|
2c49ddfcc0 | |||
df9251448b | |||
6c4c73dd8f | |||
928bd3c08f | |||
7ec1ec2473 | |||
fd8da62a88 | |||
1390287345 | |||
a88bcb4646 | |||
2b1407d0f5 | |||
1c6ac219a8 | |||
e50e9af891 | |||
9ee44db6c9 | |||
c83b552727 | |||
f77e26c525 | |||
8d7982900a | |||
84f259d45e | |||
755261e63f | |||
4c03aa80ab | |||
910d166b5d | |||
99fb0a2953 | |||
b20f4d6305 | |||
032602de58 |
24 changed files with 2900 additions and 1332 deletions
1168
Cargo.lock
generated
1168
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -103,6 +103,7 @@ lazy_static = "1.4.0"
|
|||
async-trait = "0.1.68"
|
||||
|
||||
sd-notify = { version = "0.4.1", optional = true }
|
||||
bumpalo = { version = "3.14.0", features = ["collections"] }
|
||||
|
||||
[target.'cfg(unix)'.dependencies]
|
||||
nix = { version = "0.26.2", features = ["resource"] }
|
||||
|
|
130
README.md
130
README.md
|
@ -1,4 +1,134 @@
|
|||
# Conduit
|
||||
|
||||
## The p2 experiment
|
||||
|
||||
p2 (aka push v2, or piston) is an experimental push rule parser designed
|
||||
to be faster and flexible-er than the current push rule system!
|
||||
|
||||
```json
|
||||
{
|
||||
"m.unread": {
|
||||
"rule": "event.type in ['m.room.message', 'm.room.encrypted', 'm.room.name', 'm.room.topic'] && !('m.new_content' in event.content) && !(event.sender in account_data.'m.ignored_users')",
|
||||
"actions": [],
|
||||
"enabled": true
|
||||
},
|
||||
"m.highlight": {
|
||||
"rule": "@'m.unread' && (user_id in event.content.'m.mentions'.user_ids || (event.content.'m.mentions'.room && can_notify('room')))",
|
||||
"actions": ["notify", "store", "mark"],
|
||||
"enabled": true
|
||||
},
|
||||
"m.notify": {
|
||||
"rule": "@'m.highlight' || @'m.room' || 'm.direct' in state.'m.purpose'.purposes",
|
||||
"actions": ["notify"],
|
||||
"enabled": true
|
||||
},
|
||||
"m.invite": {
|
||||
"rule": "event.type == 'm.room.member' && event.state_key == user_id && event.content.membership == 'invite'",
|
||||
"actions": ["notify"],
|
||||
"enabled": true
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### what?
|
||||
|
||||
p2 is composed of **counters**, **rules**, and **actions**. Each
|
||||
counter has one rule and n actions. In the above config, there are four
|
||||
counters. When a rule matches an event, that counter is incremented.
|
||||
|
||||
Actions specify what actions the server should take when a counter is
|
||||
incremented. Currently, there is `notify` for sending a push notification,
|
||||
`store` for storing notifications in a list for later (`/notifications`),
|
||||
and `mark` for adding a mark to an event's unsigned content. Perhaps
|
||||
`count` could be split out from counters into its own action later on.
|
||||
|
||||
Instead of having a separate set of apis and `/sync` field for
|
||||
notify/highlight counts, p2 takes place entirely in account data. Rules
|
||||
are specified via the `p2.rules` event ~~and unread/notify counts are set
|
||||
by the server in `p2.counts`.~~ Update: to fix threads and a few other
|
||||
issues, counters are moved back into unread_counts. Per-room account
|
||||
data can override global push rules.
|
||||
|
||||
The rules aren't mean to be machine-edited - make them read from account
|
||||
data instead.
|
||||
|
||||
This implementation of p2 takes the ruleset and precompiles it to
|
||||
stack-based bytecode-ish intermediate representation, which is then
|
||||
evaluated on all new events. It lazy loads state events and account
|
||||
data. Most of the code is in a [single file](src/utils/p2.rs) currently.
|
||||
|
||||
### why?
|
||||
|
||||
Matrix push rules in their current state are a crime against
|
||||
humanity. They're overengineered, yet aren't able to do things like
|
||||
telling you when a rooms has unread messages. Rules are evaluated in
|
||||
an awful home grown json thing that is not jsonpath nor jsonschema
|
||||
but more limited than either.
|
||||
|
||||
All rules go into a single giant mess of an object - per room
|
||||
account data apparently doesn't exist. Rules are split into
|
||||
override/sender/content/room/underride for no rhyme or reason. Want a
|
||||
room to be muted? That rule goes in "override". Want a room to notify
|
||||
for all messages? That rule goes in "rooms"!
|
||||
|
||||
Rules are hardcoded to either be `notify` or `highlight`. There's no way
|
||||
to make custom/extensible rules, so counters/rules that don't exist in
|
||||
spec (ie. `unread`) can't be implemented or worked around.
|
||||
|
||||
### why not?
|
||||
|
||||
The most obvious reason: this breaks compatibility with everything else in
|
||||
the matrix ecosystem, so it requires forking and modifying clients. This
|
||||
takes varying amounts of effort and can become incompatible or out of
|
||||
sync with upstream.
|
||||
|
||||
In terms of implementation, this system is very much a work in progress
|
||||
and syntax/rules may change and break. The code is a mess, and doesn't
|
||||
properly handle read receipts nor does it atomically update the
|
||||
`p2.counts` config event. It also isn't very performant in its current
|
||||
state, needing an extra db lookup for every event to check for marks.
|
||||
|
||||
This system requires `m.mentions` to be in plaintext, which some people
|
||||
may not want.
|
||||
|
||||
### spec?
|
||||
|
||||
This is a custom system that exists outside of the official matrix
|
||||
client-server spec. If a version of p2 gets merged into the official
|
||||
spec it would be wonderful, but unfortunately it would be a pipe dream.
|
||||
|
||||
### rant
|
||||
|
||||
Matrix is has *so much* potential as a general purpose protocol or
|
||||
event an instant messaging protocol, but so many features seem to be
|
||||
poorly designed. There are so many rough edges that it's hard for me to
|
||||
recommend it to anyone who isn't willing to spend a bit of time working
|
||||
around its idiosyncraties.
|
||||
|
||||
I strongly believe `m.mentions` shouldn't have been encrypted, as a
|
||||
form of it is functionally *required* by servers to accurately send
|
||||
push notifications. It's frustrating that mentions are critical to
|
||||
push notifications, but are one of the only pieces of metadata that
|
||||
are encrypted. Unfortunately, [mentions probably won't be moved to
|
||||
plaintext](https://github.com/matrix-org/matrix-spec-proposals/pull/3952).
|
||||
I know I should probably be happy that matrix is fixing its chronically
|
||||
bad plaintext leaks, but this really shouldn't have been done.
|
||||
|
||||
On the other hand, state events like name/topic/avatar, which if
|
||||
encrypted will have difficulties with `/hierarchy` or summaries, aren't
|
||||
as critical or can be worked around. Reactions are also plaintext, though
|
||||
the reaction keys could be encrypted. Room member nicknames/avatars are
|
||||
stored as plaintext, assuming they aren't clobbered when a user changes
|
||||
their global displayname/avatar_url.
|
||||
|
||||
All these little things add up to a frustrating experience. Despite
|
||||
all that, I'm very hopeful for matrix and it's probably the best option
|
||||
that exists.
|
||||
|
||||
---
|
||||
|
||||
original readme below:
|
||||
|
||||
### A Matrix homeserver written in Rust
|
||||
|
||||
#### What is Matrix?
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use crate::{services, Error, Result, Ruma};
|
||||
use crate::{services, Error, Result, Ruma, utils::p2::{P2Rules, P2Counters}};
|
||||
use ruma::{
|
||||
api::client::{
|
||||
config::{
|
||||
|
@ -26,6 +26,14 @@ pub async fn set_global_account_data_route(
|
|||
|
||||
let event_type = body.event_type.to_string();
|
||||
|
||||
match event_type.as_str() {
|
||||
"p2.rules" => {
|
||||
P2Rules::compile_value(data.clone())?;
|
||||
services().rooms.timeline.p2_uncache_rules(None, &sender_user)?;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
||||
services().account_data.update(
|
||||
None,
|
||||
sender_user,
|
||||
|
@ -51,6 +59,18 @@ pub async fn set_room_account_data_route(
|
|||
.map_err(|_| Error::BadRequest(ErrorKind::BadJson, "Data is invalid."))?;
|
||||
|
||||
let event_type = body.event_type.to_string();
|
||||
|
||||
match event_type.as_str() {
|
||||
"p2.rules" => {
|
||||
P2Rules::compile_value(data.clone())?;
|
||||
services().rooms.timeline.p2_uncache_rules(Some(&body.room_id), &sender_user)?;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
||||
if event_type == "org.celery.p2.rules" {
|
||||
let _rules = P2Rules::compile_value(data.clone())?;
|
||||
}
|
||||
|
||||
services().account_data.update(
|
||||
Some(&body.room_id),
|
||||
|
|
|
@ -430,3 +430,13 @@ pub async fn set_pushers_route(
|
|||
|
||||
Ok(set_pusher::v3::Response::default())
|
||||
}
|
||||
|
||||
// TODO
|
||||
// pub async fn p2_list_notifications_route(
|
||||
// body: Ruma<()>
|
||||
// ) -> Result<()> {
|
||||
// let Some(user) = body.sender_user.as_ref() else {
|
||||
// return Error(Error::BadRequest(ErrorKind::Unknown, ))
|
||||
// }
|
||||
// Ok(())
|
||||
// }
|
||||
|
|
|
@ -57,4 +57,28 @@ impl service::rooms::alias::Data for KeyValueDatabase {
|
|||
.map_err(|_| Error::bad_database("Invalid alias in aliasid_alias."))
|
||||
}))
|
||||
}
|
||||
|
||||
fn all_local_aliases<'a>(
|
||||
&'a self,
|
||||
) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, String)>> + 'a> {
|
||||
Box::new(
|
||||
self.alias_roomid
|
||||
.iter()
|
||||
.map(|(room_alias_bytes, room_id_bytes)| {
|
||||
let room_alias_localpart = utils::string_from_bytes(&room_alias_bytes)
|
||||
.map_err(|_| {
|
||||
Error::bad_database("Invalid alias bytes in aliasid_alias.")
|
||||
})?;
|
||||
|
||||
let room_id = utils::string_from_bytes(&room_id_bytes)
|
||||
.map_err(|_| {
|
||||
Error::bad_database("Invalid room_id bytes in aliasid_alias.")
|
||||
})?
|
||||
.try_into()
|
||||
.map_err(|_| Error::bad_database("Invalid room_id in aliasid_alias."))?;
|
||||
|
||||
Ok((room_id, room_alias_localpart))
|
||||
}),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,150 +0,0 @@
|
|||
use std::mem;
|
||||
|
||||
use ruma::{
|
||||
events::receipt::ReceiptEvent, serde::Raw, CanonicalJsonObject, OwnedUserId, RoomId, UserId,
|
||||
};
|
||||
|
||||
use crate::{database::KeyValueDatabase, service, services, utils, Error, Result};
|
||||
|
||||
impl service::rooms::edus::read_receipt::Data for KeyValueDatabase {
|
||||
fn readreceipt_update(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
event: ReceiptEvent,
|
||||
) -> Result<()> {
|
||||
let mut prefix = room_id.as_bytes().to_vec();
|
||||
prefix.push(0xff);
|
||||
|
||||
let mut last_possible_key = prefix.clone();
|
||||
last_possible_key.extend_from_slice(&u64::MAX.to_be_bytes());
|
||||
|
||||
// Remove old entry
|
||||
if let Some((old, _)) = self
|
||||
.readreceiptid_readreceipt
|
||||
.iter_from(&last_possible_key, true)
|
||||
.take_while(|(key, _)| key.starts_with(&prefix))
|
||||
.find(|(key, _)| {
|
||||
key.rsplit(|&b| b == 0xff)
|
||||
.next()
|
||||
.expect("rsplit always returns an element")
|
||||
== user_id.as_bytes()
|
||||
})
|
||||
{
|
||||
// This is the old room_latest
|
||||
self.readreceiptid_readreceipt.remove(&old)?;
|
||||
}
|
||||
|
||||
let mut room_latest_id = prefix;
|
||||
room_latest_id.extend_from_slice(&services().globals.next_count()?.to_be_bytes());
|
||||
room_latest_id.push(0xff);
|
||||
room_latest_id.extend_from_slice(user_id.as_bytes());
|
||||
|
||||
self.readreceiptid_readreceipt.insert(
|
||||
&room_latest_id,
|
||||
&serde_json::to_vec(&event).expect("EduEvent::to_string always works"),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn readreceipts_since<'a>(
|
||||
&'a self,
|
||||
room_id: &RoomId,
|
||||
since: u64,
|
||||
) -> Box<
|
||||
dyn Iterator<
|
||||
Item = Result<(
|
||||
OwnedUserId,
|
||||
u64,
|
||||
Raw<ruma::events::AnySyncEphemeralRoomEvent>,
|
||||
)>,
|
||||
> + 'a,
|
||||
> {
|
||||
let mut prefix = room_id.as_bytes().to_vec();
|
||||
prefix.push(0xff);
|
||||
let prefix2 = prefix.clone();
|
||||
|
||||
let mut first_possible_edu = prefix.clone();
|
||||
first_possible_edu.extend_from_slice(&(since + 1).to_be_bytes()); // +1 so we don't send the event at since
|
||||
|
||||
Box::new(
|
||||
self.readreceiptid_readreceipt
|
||||
.iter_from(&first_possible_edu, false)
|
||||
.take_while(move |(k, _)| k.starts_with(&prefix2))
|
||||
.map(move |(k, v)| {
|
||||
let count = utils::u64_from_bytes(
|
||||
&k[prefix.len()..prefix.len() + mem::size_of::<u64>()],
|
||||
)
|
||||
.map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?;
|
||||
let user_id = UserId::parse(
|
||||
utils::string_from_bytes(&k[prefix.len() + mem::size_of::<u64>() + 1..])
|
||||
.map_err(|_| {
|
||||
Error::bad_database("Invalid readreceiptid userid bytes in db.")
|
||||
})?,
|
||||
)
|
||||
.map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?;
|
||||
|
||||
let mut json =
|
||||
serde_json::from_slice::<CanonicalJsonObject>(&v).map_err(|_| {
|
||||
Error::bad_database(
|
||||
"Read receipt in roomlatestid_roomlatest is invalid json.",
|
||||
)
|
||||
})?;
|
||||
json.remove("room_id");
|
||||
|
||||
Ok((
|
||||
user_id,
|
||||
count,
|
||||
Raw::from_json(
|
||||
serde_json::value::to_raw_value(&json)
|
||||
.expect("json is valid raw value"),
|
||||
),
|
||||
))
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> {
|
||||
let mut key = room_id.as_bytes().to_vec();
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(user_id.as_bytes());
|
||||
|
||||
self.roomuserid_privateread
|
||||
.insert(&key, &count.to_be_bytes())?;
|
||||
|
||||
self.roomuserid_lastprivatereadupdate
|
||||
.insert(&key, &services().globals.next_count()?.to_be_bytes())
|
||||
}
|
||||
|
||||
fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
|
||||
let mut key = room_id.as_bytes().to_vec();
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(user_id.as_bytes());
|
||||
|
||||
self.roomuserid_privateread
|
||||
.get(&key)?
|
||||
.map_or(Ok(None), |v| {
|
||||
Ok(Some(utils::u64_from_bytes(&v).map_err(|_| {
|
||||
Error::bad_database("Invalid private read marker bytes")
|
||||
})?))
|
||||
})
|
||||
}
|
||||
|
||||
fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
|
||||
let mut key = room_id.as_bytes().to_vec();
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(user_id.as_bytes());
|
||||
|
||||
Ok(self
|
||||
.roomuserid_lastprivatereadupdate
|
||||
.get(&key)?
|
||||
.map(|bytes| {
|
||||
utils::u64_from_bytes(&bytes).map_err(|_| {
|
||||
Error::bad_database("Count in roomuserid_lastprivatereadupdate is invalid.")
|
||||
})
|
||||
})
|
||||
.transpose()?
|
||||
.unwrap_or(0))
|
||||
}
|
||||
}
|
|
@ -1,11 +1,18 @@
|
|||
use std::{collections::hash_map, mem::size_of, sync::Arc};
|
||||
use std::{collections::hash_map, mem::size_of, sync::Arc, ops::DerefMut};
|
||||
|
||||
use ruma::{
|
||||
api::client::error::ErrorKind, CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use tracing::error;
|
||||
|
||||
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
|
||||
use crate::{
|
||||
database::KeyValueDatabase,
|
||||
service::{self, globals::Data},
|
||||
services,
|
||||
utils::{self, p2::P2Rules},
|
||||
Error, PduEvent, Result,
|
||||
};
|
||||
|
||||
use service::rooms::timeline::PduCount;
|
||||
|
||||
|
@ -245,6 +252,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
|||
pdu.remove_transaction_id()?;
|
||||
}
|
||||
pdu.add_age()?;
|
||||
pdu.p2_transpose_marks(&user_id)?;
|
||||
let count = pdu_count(&pdu_id)?;
|
||||
Ok((count, pdu))
|
||||
}),
|
||||
|
@ -272,6 +280,7 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
|||
pdu.remove_transaction_id()?;
|
||||
}
|
||||
pdu.add_age()?;
|
||||
pdu.p2_transpose_marks(&user_id)?;
|
||||
let count = pdu_count(&pdu_id)?;
|
||||
Ok((count, pdu))
|
||||
}),
|
||||
|
@ -305,6 +314,124 @@ impl service::rooms::timeline::Data for KeyValueDatabase {
|
|||
.increment_batch(&mut highlights_batch.into_iter())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// FIXME: not transactional (can clobber user stuff), lots of tiny
|
||||
// but amplified writes, and a meanace to society in general
|
||||
fn p2_increment_counters(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
counters: Vec<(OwnedUserId, Vec<Box<str>>)>,
|
||||
) -> Result<()> {
|
||||
let mut keys = vec![];
|
||||
for (user_id, names) in counters {
|
||||
for name in names {
|
||||
let mut key = room_id.as_bytes().to_vec();
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(user_id.as_bytes());
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(name.as_bytes());
|
||||
keys.push(key);
|
||||
}
|
||||
}
|
||||
self.p2_userroomidcounter_count.increment_batch(&mut keys.into_iter())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn p2_add_store(
|
||||
&self,
|
||||
event_id: &EventId,
|
||||
users: Vec<(OwnedUserId, Vec<Box<str>>)>,
|
||||
) -> Result<()> {
|
||||
let mut store_ids: Vec<Vec<u8>> = vec![];
|
||||
let count = self.next_count()?.to_be_bytes();
|
||||
|
||||
for (user_id, counters) in users {
|
||||
for counter in counters {
|
||||
let mut key = user_id.as_bytes().to_vec();
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(counter.as_bytes());
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(&count);
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(event_id.as_bytes());
|
||||
|
||||
store_ids.push(key);
|
||||
}
|
||||
}
|
||||
|
||||
let mut store_iter = store_ids.into_iter().zip(std::iter::repeat_with(Vec::new));
|
||||
self.p2_storeid.insert_batch(&mut store_iter)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn p2_get_rules(&self, room_id: &RoomId, user_id: &UserId) -> Result<Arc<P2Rules>> {
|
||||
let mut key = user_id.as_bytes().to_vec();
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(room_id.as_bytes());
|
||||
|
||||
if let Some(rules) = self
|
||||
.p2_userroomid_rules_cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get_mut(key.as_slice())
|
||||
{
|
||||
return Ok(rules.clone());
|
||||
}
|
||||
|
||||
#[derive(Default, Deserialize)]
|
||||
struct P2RulesWrapper {
|
||||
content: P2Rules,
|
||||
}
|
||||
|
||||
let room_rules = services()
|
||||
.account_data
|
||||
.get(Some(room_id), user_id, "p2.rules".into())?
|
||||
.map(|ev| serde_json::from_str::<P2RulesWrapper>(ev.get()).unwrap_or_default())
|
||||
.map(|wrap| wrap.content);
|
||||
|
||||
let global_rules = services()
|
||||
.account_data
|
||||
.get(None, user_id, "p2.rules".into())?
|
||||
.map(|ev| serde_json::from_str::<P2RulesWrapper>(ev.get()).unwrap_or_default())
|
||||
.map(|wrap| wrap.content);
|
||||
|
||||
let rules = Arc::new(match (room_rules, global_rules) {
|
||||
(Some(room), Some(global)) => room.merge(global),
|
||||
(None, Some(rules)) => rules,
|
||||
(Some(rules), None) => rules,
|
||||
(None, None) => P2Rules::default(),
|
||||
});
|
||||
|
||||
self.p2_userroomid_rules_cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert(key.into_boxed_slice(), rules.clone());
|
||||
|
||||
Ok(rules)
|
||||
}
|
||||
|
||||
fn p2_uncache_rules(&self, room_id: Option<&RoomId>, user_id: &UserId) -> Result<()> {
|
||||
if let Some(room_id) = room_id {
|
||||
let mut key = user_id.as_bytes().to_vec();
|
||||
key.push(0xff);
|
||||
key.extend_from_slice(room_id.as_bytes());
|
||||
|
||||
self.p2_userroomid_rules_cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.remove(key.as_slice());
|
||||
} else {
|
||||
let mut key = user_id.as_bytes().to_vec();
|
||||
key.push(0xff);
|
||||
let mut cache = self.p2_userroomid_rules_cache.lock().unwrap();
|
||||
let mut tmp = lru_cache::LruCache::new(cache.capacity());
|
||||
std::mem::swap(cache.deref_mut(), &mut tmp);
|
||||
cache.extend(tmp.into_iter().filter(|(k, _)| !k.starts_with(&key)));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the `count` of this pdu's id.
|
||||
|
|
|
@ -2,7 +2,7 @@ pub mod abstraction;
|
|||
pub mod key_value;
|
||||
|
||||
use crate::{
|
||||
service::rooms::timeline::PduCount, services, utils, Config, Error, PduEvent, Result, Services,
|
||||
service::rooms::timeline::PduCount, services, utils::{self, p2::P2Rules}, Config, Error, PduEvent, Result, Services,
|
||||
SERVICES,
|
||||
};
|
||||
use abstraction::{KeyValueDatabaseEngine, KvTree};
|
||||
|
@ -108,6 +108,9 @@ pub struct KeyValueDatabase {
|
|||
pub(super) userroomid_notificationcount: Arc<dyn KvTree>, // NotifyCount = u64
|
||||
pub(super) userroomid_highlightcount: Arc<dyn KvTree>, // HightlightCount = u64
|
||||
pub(super) roomuserid_lastnotificationread: Arc<dyn KvTree>, // LastNotificationRead = u64
|
||||
|
||||
pub(super) p2_userroomidcounter_count: Arc<dyn KvTree>,
|
||||
pub(super) p2_storeid: Arc<dyn KvTree>, // StoreId = UserId + Counter + Count + EventId
|
||||
|
||||
/// Remember the current state hash of a room.
|
||||
pub(super) roomid_shortstatehash: Arc<dyn KvTree>,
|
||||
|
@ -172,6 +175,7 @@ pub struct KeyValueDatabase {
|
|||
pub(super) our_real_users_cache: RwLock<HashMap<OwnedRoomId, Arc<HashSet<OwnedUserId>>>>,
|
||||
pub(super) appservice_in_room_cache: RwLock<HashMap<OwnedRoomId, HashMap<String, bool>>>,
|
||||
pub(super) lasttimelinecount_cache: Mutex<HashMap<OwnedRoomId, PduCount>>,
|
||||
pub(super) p2_userroomid_rules_cache: Mutex<LruCache<Box<[u8]>, Arc<P2Rules>>>,
|
||||
}
|
||||
|
||||
impl KeyValueDatabase {
|
||||
|
@ -336,6 +340,9 @@ impl KeyValueDatabase {
|
|||
userroomid_notificationcount: builder.open_tree("userroomid_notificationcount")?,
|
||||
userroomid_highlightcount: builder.open_tree("userroomid_highlightcount")?,
|
||||
roomuserid_lastnotificationread: builder.open_tree("userroomid_highlightcount")?,
|
||||
|
||||
p2_storeid: builder.open_tree("p2_storeid")?,
|
||||
p2_userroomidcounter_count: builder.open_tree("p2_userroomidcounter_count")?,
|
||||
|
||||
statekey_shortstatekey: builder.open_tree("statekey_shortstatekey")?,
|
||||
shortstatekey_statekey: builder.open_tree("shortstatekey_statekey")?,
|
||||
|
@ -397,6 +404,9 @@ impl KeyValueDatabase {
|
|||
our_real_users_cache: RwLock::new(HashMap::new()),
|
||||
appservice_in_room_cache: RwLock::new(HashMap::new()),
|
||||
lasttimelinecount_cache: Mutex::new(HashMap::new()),
|
||||
p2_userroomid_rules_cache: Mutex::new(LruCache::new(
|
||||
(100_000.0 * config.conduit_cache_capacity_modifier) as usize,
|
||||
)),
|
||||
});
|
||||
|
||||
let db = Box::leak(db_raw);
|
||||
|
|
|
@ -435,6 +435,10 @@ fn routes() -> Router {
|
|||
"/_matrix/client/v3/rooms/:room_id/initialSync",
|
||||
get(initial_sync),
|
||||
)
|
||||
// .route(
|
||||
// "/_matrix/client/p2.0/notifications",
|
||||
// get(client_server::p2_list_notifications_route),
|
||||
// )
|
||||
.route("/", get(it_works))
|
||||
.fallback(not_found)
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -14,7 +14,11 @@ use serde_json::{
|
|||
json,
|
||||
value::{to_raw_value, RawValue as RawJsonValue},
|
||||
};
|
||||
use std::{cmp::Ordering, collections::BTreeMap, sync::Arc};
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
collections::{BTreeMap, HashMap},
|
||||
sync::Arc,
|
||||
};
|
||||
use tracing::warn;
|
||||
|
||||
/// Content hashes of a PDU.
|
||||
|
@ -116,6 +120,27 @@ impl PduEvent {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn p2_transpose_marks(&mut self, user_id: &UserId) -> crate::Result<()> {
|
||||
let mut unsigned: BTreeMap<String, Box<RawJsonValue>> = self
|
||||
.unsigned
|
||||
.as_ref()
|
||||
.map_or_else(|| Ok(BTreeMap::new()), |u| serde_json::from_str(u.get()))
|
||||
.map_err(|_| Error::bad_database("Invalid unsigned in pdu event"))?;
|
||||
|
||||
let Some(all_marks): Option<HashMap<OwnedUserId, Vec<Box<str>>>> = unsigned.remove("p2.marks.all_users")
|
||||
.map(|v| serde_json::from_str(v.get()).ok()).flatten() else {
|
||||
return Ok(())
|
||||
};
|
||||
|
||||
if let Some(marks) = all_marks.get(user_id) {
|
||||
unsigned.insert("p2.marks".to_owned(), to_raw_value(marks).unwrap());
|
||||
}
|
||||
|
||||
self.unsigned = Some(to_raw_value(&unsigned).expect("unsigned is valid"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn to_sync_room_event(&self) -> Raw<AnySyncTimelineEvent> {
|
||||
let mut json = json!({
|
||||
|
|
|
@ -16,4 +16,9 @@ pub trait Data: Send + Sync {
|
|||
&'a self,
|
||||
room_id: &RoomId,
|
||||
) -> Box<dyn Iterator<Item = Result<OwnedRoomAliasId>> + 'a>;
|
||||
|
||||
/// Returns all local aliases on the server
|
||||
fn all_local_aliases<'a>(
|
||||
&'a self,
|
||||
) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, String)>> + 'a>;
|
||||
}
|
||||
|
|
|
@ -32,4 +32,11 @@ impl Service {
|
|||
) -> Box<dyn Iterator<Item = Result<OwnedRoomAliasId>> + 'a> {
|
||||
self.db.local_aliases_for_room(room_id)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn all_local_aliases<'a>(
|
||||
&'a self,
|
||||
) -> Box<dyn Iterator<Item = Result<(OwnedRoomId, String)>> + 'a> {
|
||||
self.db.all_local_aliases()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
use crate::Result;
|
||||
use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId};
|
||||
|
||||
pub trait Data: Send + Sync {
|
||||
/// Replaces the previous read receipt.
|
||||
fn readreceipt_update(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
event: ReceiptEvent,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
|
||||
fn readreceipts_since<'a>(
|
||||
&'a self,
|
||||
room_id: &RoomId,
|
||||
since: u64,
|
||||
) -> Box<
|
||||
dyn Iterator<
|
||||
Item = Result<(
|
||||
OwnedUserId,
|
||||
u64,
|
||||
Raw<ruma::events::AnySyncEphemeralRoomEvent>,
|
||||
)>,
|
||||
> + 'a,
|
||||
>;
|
||||
|
||||
/// Sets a private read marker at `count`.
|
||||
fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()>;
|
||||
|
||||
/// Returns the private read marker.
|
||||
fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>>;
|
||||
|
||||
/// Returns the count of the last typing update in this room.
|
||||
fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;
|
||||
}
|
|
@ -1,55 +0,0 @@
|
|||
mod data;
|
||||
|
||||
pub use data::Data;
|
||||
|
||||
use crate::Result;
|
||||
use ruma::{events::receipt::ReceiptEvent, serde::Raw, OwnedUserId, RoomId, UserId};
|
||||
|
||||
pub struct Service {
|
||||
pub db: &'static dyn Data,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
/// Replaces the previous read receipt.
|
||||
pub fn readreceipt_update(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
event: ReceiptEvent,
|
||||
) -> Result<()> {
|
||||
self.db.readreceipt_update(user_id, room_id, event)
|
||||
}
|
||||
|
||||
/// Returns an iterator over the most recent read_receipts in a room that happened after the event with id `since`.
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn readreceipts_since<'a>(
|
||||
&'a self,
|
||||
room_id: &RoomId,
|
||||
since: u64,
|
||||
) -> impl Iterator<
|
||||
Item = Result<(
|
||||
OwnedUserId,
|
||||
u64,
|
||||
Raw<ruma::events::AnySyncEphemeralRoomEvent>,
|
||||
)>,
|
||||
> + 'a {
|
||||
self.db.readreceipts_since(room_id, since)
|
||||
}
|
||||
|
||||
/// Sets a private read marker at `count`.
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) -> Result<()> {
|
||||
self.db.private_read_set(room_id, user_id, count)
|
||||
}
|
||||
|
||||
/// Returns the private read marker.
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn private_read_get(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<u64>> {
|
||||
self.db.private_read_get(room_id, user_id)
|
||||
}
|
||||
|
||||
/// Returns the count of the last typing update in this room.
|
||||
pub fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64> {
|
||||
self.db.last_privateread_update(user_id, room_id)
|
||||
}
|
||||
}
|
|
@ -2,7 +2,7 @@ use std::sync::Arc;
|
|||
|
||||
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId};
|
||||
|
||||
use crate::{PduEvent, Result};
|
||||
use crate::{PduEvent, Result, utils::p2::P2Rules};
|
||||
|
||||
use super::PduCount;
|
||||
|
||||
|
@ -88,4 +88,32 @@ pub trait Data: Send + Sync {
|
|||
notifies: Vec<OwnedUserId>,
|
||||
highlights: Vec<OwnedUserId>,
|
||||
) -> Result<()>;
|
||||
|
||||
// this should probably be split into its own service
|
||||
|
||||
/// increment counters for rooms and users
|
||||
fn p2_increment_counters(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
counters: Vec<(OwnedUserId, Vec<Box<str>>)>,
|
||||
) -> Result<()>;
|
||||
|
||||
/// store event ids for /notifications
|
||||
fn p2_add_store(
|
||||
&self,
|
||||
event_id: &EventId,
|
||||
users: Vec<(OwnedUserId, Vec<Box<str>>)>,
|
||||
) -> Result<()>;
|
||||
|
||||
fn p2_get_rules(
|
||||
&self,
|
||||
room_id: &RoomId,
|
||||
user_id: &UserId,
|
||||
) -> Result<Arc<P2Rules>>;
|
||||
|
||||
fn p2_uncache_rules(
|
||||
&self,
|
||||
room_id: Option<&RoomId>,
|
||||
user_id: &UserId,
|
||||
) -> Result<()>;
|
||||
}
|
||||
|
|
|
@ -38,7 +38,9 @@ use tracing::{error, info, warn};
|
|||
use crate::{
|
||||
api::server_server,
|
||||
service::pdu::{EventHash, PduBuilder},
|
||||
services, utils, Error, PduEvent, Result,
|
||||
services,
|
||||
utils::{self, p2::P2Action},
|
||||
Error, PduEvent, Result,
|
||||
};
|
||||
|
||||
use super::state_compressor::CompressedStateEvent;
|
||||
|
@ -270,39 +272,6 @@ impl Service {
|
|||
.state
|
||||
.set_forward_extremities(&pdu.room_id, leaves, state_lock)?;
|
||||
|
||||
let mutex_insert = Arc::clone(
|
||||
services()
|
||||
.globals
|
||||
.roomid_mutex_insert
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(pdu.room_id.clone())
|
||||
.or_default(),
|
||||
);
|
||||
let insert_lock = mutex_insert.lock().unwrap();
|
||||
|
||||
let count1 = services().globals.next_count()?;
|
||||
// Mark as read first so the sending client doesn't get a notification even if appending
|
||||
// fails
|
||||
services()
|
||||
.rooms
|
||||
.edus
|
||||
.read_receipt
|
||||
.private_read_set(&pdu.room_id, &pdu.sender, count1)?;
|
||||
services()
|
||||
.rooms
|
||||
.user
|
||||
.reset_notification_counts(&pdu.sender, &pdu.room_id)?;
|
||||
|
||||
let count2 = services().globals.next_count()?;
|
||||
let mut pdu_id = shortroomid.to_be_bytes().to_vec();
|
||||
pdu_id.extend_from_slice(&count2.to_be_bytes());
|
||||
|
||||
// Insert pdu
|
||||
self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2)?;
|
||||
|
||||
drop(insert_lock);
|
||||
|
||||
// See if the event matches any known pushers
|
||||
let power_levels: RoomPowerLevelsEventContent = services()
|
||||
.rooms
|
||||
|
@ -317,6 +286,8 @@ impl Service {
|
|||
|
||||
let sync_pdu = pdu.to_sync_room_event();
|
||||
|
||||
// legacy notification/highlight system
|
||||
|
||||
let mut notifies = Vec::new();
|
||||
let mut highlights = Vec::new();
|
||||
|
||||
|
@ -372,15 +343,121 @@ impl Service {
|
|||
if highlight {
|
||||
highlights.push(user.clone());
|
||||
}
|
||||
|
||||
for push_key in services().pusher.get_pushkeys(user) {
|
||||
services().sending.send_push_pdu(&pdu_id, user, push_key?)?;
|
||||
}
|
||||
|
||||
// pushers are now handled by p2, no need to duplicate notifications
|
||||
}
|
||||
|
||||
self.db
|
||||
.increment_notification_counts(&pdu.room_id, notifies, highlights)?;
|
||||
|
||||
// shiny new p2 notification system
|
||||
|
||||
eprintln!("check p2 stuff for pdu {}", pdu.event_id);
|
||||
let mut p2_counters = Vec::new();
|
||||
let mut p2_store = Vec::new();
|
||||
let mut p2_marks = BTreeMap::new();
|
||||
let mut p2_notify = Vec::new();
|
||||
|
||||
for user in services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.get_our_real_users(&pdu.room_id)?
|
||||
.iter()
|
||||
{
|
||||
// Don't notify the user of their own events
|
||||
if user == &pdu.sender {
|
||||
continue;
|
||||
}
|
||||
|
||||
eprintln!("check p2 stuff for user {}", user);
|
||||
let rules = self.db.p2_get_rules(&pdu.room_id, user)?;
|
||||
let matched = rules.eval(
|
||||
&pdu.room_id,
|
||||
user,
|
||||
serde_json::to_value(&sync_pdu).expect("always valid"),
|
||||
);
|
||||
|
||||
let should_notify = matched
|
||||
.iter()
|
||||
.any(|(_, actions)| actions.contains(&P2Action::Notify));
|
||||
|
||||
let counters_to_mark: Vec<_> = matched
|
||||
.iter()
|
||||
.filter(|(_, actions)| actions.contains(&P2Action::Mark))
|
||||
.map(|(matched, _)| CanonicalJsonValue::String(matched.to_string()))
|
||||
.collect();
|
||||
|
||||
let counters_to_store: Vec<_> = matched
|
||||
.iter()
|
||||
.filter(|(_, actions)| actions.contains(&P2Action::Store))
|
||||
.map(|(matched, _)| matched.to_owned())
|
||||
.collect();
|
||||
|
||||
if !counters_to_store.is_empty() {
|
||||
p2_store.push((user.to_owned(), counters_to_store));
|
||||
}
|
||||
|
||||
if !counters_to_mark.is_empty() {
|
||||
p2_marks.insert(user.to_string(), CanonicalJsonValue::Array(counters_to_mark));
|
||||
}
|
||||
|
||||
if !matched.is_empty() {
|
||||
p2_counters.push((user.to_owned(), matched.into_keys().collect()));
|
||||
}
|
||||
|
||||
if should_notify {
|
||||
p2_notify.push(user.to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
self.db.p2_add_store(&pdu.event_id, p2_store)?;
|
||||
self.db.p2_increment_counters(&pdu.room_id, p2_counters)?;
|
||||
|
||||
if let CanonicalJsonValue::Object(unsigned) = pdu_json
|
||||
.entry("unsigned".to_owned())
|
||||
.or_insert_with(|| CanonicalJsonValue::Object(Default::default())) {
|
||||
unsigned.insert("p2.marks.all_users".to_owned(), CanonicalJsonValue::Object(p2_marks));
|
||||
}
|
||||
|
||||
let mutex_insert = Arc::clone(
|
||||
services()
|
||||
.globals
|
||||
.roomid_mutex_insert
|
||||
.write()
|
||||
.unwrap()
|
||||
.entry(pdu.room_id.clone())
|
||||
.or_default(),
|
||||
);
|
||||
let insert_lock = mutex_insert.lock().unwrap();
|
||||
|
||||
let count1 = services().globals.next_count()?;
|
||||
// Mark as read first so the sending client doesn't get a notification even if appending
|
||||
// fails
|
||||
services()
|
||||
.rooms
|
||||
.edus
|
||||
.read_receipt
|
||||
.private_read_set(&pdu.room_id, &pdu.sender, count1)?;
|
||||
services()
|
||||
.rooms
|
||||
.user
|
||||
.reset_notification_counts(&pdu.sender, &pdu.room_id)?;
|
||||
|
||||
let count2 = services().globals.next_count()?;
|
||||
let mut pdu_id = shortroomid.to_be_bytes().to_vec();
|
||||
pdu_id.extend_from_slice(&count2.to_be_bytes());
|
||||
|
||||
// Insert pdu
|
||||
self.db.append_pdu(&pdu_id, pdu, &pdu_json, count2)?;
|
||||
|
||||
drop(insert_lock);
|
||||
|
||||
for user_id in &p2_notify {
|
||||
for push_key in services().pusher.get_pushkeys(user_id) {
|
||||
services().sending.send_push_pdu(&pdu_id, user_id, push_key?)?;
|
||||
}
|
||||
}
|
||||
|
||||
match pdu.kind {
|
||||
TimelineEventType::RoomRedaction => {
|
||||
if let Some(redact_id) = &pdu.redacts {
|
||||
|
@ -466,7 +543,7 @@ impl Service {
|
|||
&& services().globals.emergency_password().is_none();
|
||||
|
||||
if to_conduit && !from_conduit && admin_room.as_ref() == Some(&pdu.room_id) {
|
||||
services().admin.process_message(body);
|
||||
services().admin.process_message(body, pdu.event_id.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1199,4 +1276,8 @@ impl Service {
|
|||
info!("Prepended backfill pdu");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn p2_uncache_rules(&self, room_id: Option<&RoomId>, user_id: &UserId) -> Result<()> {
|
||||
self.db.p2_uncache_rules(room_id, user_id)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -80,6 +80,8 @@ pub enum Error {
|
|||
#[cfg(feature = "conduit_bin")]
|
||||
#[error("{0}")]
|
||||
PathError(#[from] axum::extract::rejection::PathRejection),
|
||||
#[error("{0}")]
|
||||
P2Error(#[from] crate::utils::p2::error::P2RuleError),
|
||||
}
|
||||
|
||||
impl Error {
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
pub mod error;
|
||||
pub mod p2;
|
||||
|
||||
use argon2::{Config, Variant};
|
||||
use cmp::Ordering;
|
||||
|
|
22
src/utils/p2/error.rs
Normal file
22
src/utils/p2/error.rs
Normal file
|
@ -0,0 +1,22 @@
|
|||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum P2RuleError {
|
||||
#[error("invalid json: {0}")]
|
||||
InvalidJson(serde_json::Error),
|
||||
|
||||
#[error("invalid character at position {0}")]
|
||||
InvalidChar(usize),
|
||||
|
||||
#[error("found invalid token at position {0}")]
|
||||
InvalidToken(usize),
|
||||
|
||||
#[error("unexpected eof")]
|
||||
UnexpectedEOF,
|
||||
|
||||
#[error("unknown function name or bad arity (at `{0}(...)`)")]
|
||||
BadCall(String),
|
||||
|
||||
#[error("dependency doesn't exist or forms a cycle")]
|
||||
BadDeps,
|
||||
}
|
268
src/utils/p2/eval.rs
Normal file
268
src/utils/p2/eval.rs
Normal file
|
@ -0,0 +1,268 @@
|
|||
use std::collections::HashSet;
|
||||
use crate::{services, utils::p2::parser::{P2RuleData, P2RuleOp}};
|
||||
use ruma::{RoomId, UserId};
|
||||
use serde_json::Value as JVal;
|
||||
use super::parser::P2Rule;
|
||||
|
||||
impl P2Rule {
|
||||
/// evaluate a json event against this rule
|
||||
// TODO: clone less
|
||||
pub(super) fn eval(&self, room: &RoomId, user: &UserId, json: &JVal, matched: &HashSet<Box<str>>) -> bool {
|
||||
#[derive(Debug, Clone)]
|
||||
enum Value {
|
||||
Boolean(bool),
|
||||
Int(i64),
|
||||
Json(JVal),
|
||||
Set(Vec<Value>),
|
||||
AccountData,
|
||||
RoomState(Option<String>),
|
||||
}
|
||||
|
||||
impl Value {
|
||||
fn null() -> Value {
|
||||
Value::Json(JVal::Null)
|
||||
}
|
||||
|
||||
fn unbox_bool(&self) -> Option<bool> {
|
||||
match self {
|
||||
Self::Boolean(true) => Some(true),
|
||||
Self::Boolean(false) => Some(false),
|
||||
Self::Json(JVal::Null) => Some(false), // stop null from poisoning (otherwise `foo.bar || true` is false if foo.bar is null)
|
||||
Self::Json(JVal::Bool(b)) => Some(*b),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn unbox_int(&self) -> Option<i64> {
|
||||
match self {
|
||||
Self::Json(JVal::Number(i)) => {
|
||||
Some(i.as_i64().expect("canonical json enforces integers"))
|
||||
}
|
||||
Self::Int(i) => Some(*i),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn unbox_str(&self) -> Option<&str> {
|
||||
match self {
|
||||
Self::Json(JVal::String(s)) => Some(s),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Value {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
// FIXME: sets?
|
||||
match (self, other) {
|
||||
(Value::Json(a), Value::Json(b)) => a == b,
|
||||
(a, b) => {
|
||||
(a.unbox_bool() == b.unbox_bool() && a.unbox_bool().is_some())
|
||||
|| (a.unbox_int() == b.unbox_int() && a.unbox_int().is_some())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut stack: Vec<Value> = Vec::new();
|
||||
|
||||
macro_rules! unboxed {
|
||||
($unbox:ident, $expr:tt) => {{
|
||||
let a = stack.pop().unwrap().$unbox();
|
||||
let b = stack.pop().unwrap().$unbox();
|
||||
match (a, b) {
|
||||
(Some(a), Some(b)) => Value::Boolean(a $expr b),
|
||||
(_, _) => Value::Json(JVal::Null),
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
for op in &self.ops {
|
||||
let next = match op {
|
||||
P2RuleOp::Data(idx) => {
|
||||
match self.data.get(*idx).unwrap() {
|
||||
P2RuleData::String(s) => {
|
||||
let val = serde_json::Value::String(s.to_string());
|
||||
Value::Json(val)
|
||||
}
|
||||
P2RuleData::Integer(int) => Value::Int(*int),
|
||||
P2RuleData::Ident(ident) => match ident.as_ref() {
|
||||
"event" => Value::Json(json.clone()),
|
||||
"account_data" => Value::AccountData,
|
||||
"state" => Value::RoomState(None),
|
||||
"user_id" => Value::Json(JVal::String(user.to_string())),
|
||||
_ => Value::null(),
|
||||
}
|
||||
}
|
||||
}
|
||||
P2RuleOp::Get => {
|
||||
enum Get {
|
||||
Str(String),
|
||||
Int(i64),
|
||||
}
|
||||
|
||||
let g = match stack.pop().unwrap() {
|
||||
Value::Json(JVal::String(s)) => Get::Str(s),
|
||||
Value::Json(JVal::Number(i)) => {
|
||||
Get::Int(i.as_i64().expect("enforced by canonical json"))
|
||||
}
|
||||
Value::Int(i) => Get::Int(i),
|
||||
_ => {
|
||||
stack.push(Value::null());
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match (g, stack.pop().unwrap()) {
|
||||
(Get::Str(s), Value::Json(JVal::Object(obj))) => {
|
||||
Value::Json(obj.get(&s).cloned().unwrap_or(JVal::Null))
|
||||
}
|
||||
(Get::Int(i), Value::Json(JVal::Array(arr))) => match i.try_into() {
|
||||
Ok(u) => Value::Json(arr.get::<usize>(u).cloned().unwrap_or(JVal::Null)),
|
||||
Err(_) => Value::null(),
|
||||
},
|
||||
(Get::Int(i), Value::Set(items)) => match i.try_into() {
|
||||
Ok(idx) => items.get::<usize>(idx).cloned().unwrap_or_else(Value::null),
|
||||
Err(_) => Value::null(),
|
||||
},
|
||||
(Get::Str(s), Value::AccountData) => {
|
||||
let data_type =
|
||||
ruma::events::RoomAccountDataEventType::from(s.to_string());
|
||||
let data = services()
|
||||
.account_data
|
||||
.get(Some(room), user, data_type.clone())
|
||||
.ok()
|
||||
.flatten()
|
||||
.or_else(|| {
|
||||
services()
|
||||
.account_data
|
||||
.get(None, user, data_type)
|
||||
.ok()
|
||||
.flatten()
|
||||
})
|
||||
.map(|v| {
|
||||
serde_json::from_str(v.get())
|
||||
.expect("database has invalid data")
|
||||
})
|
||||
.unwrap_or(JVal::Null);
|
||||
Value::Json(data)
|
||||
}
|
||||
(Get::Str(s), Value::RoomState(None)) => Value::RoomState(Some(s.to_string())),
|
||||
(Get::Str(s), Value::RoomState(Some(state_key))) => {
|
||||
let ev = services()
|
||||
.rooms
|
||||
.state_accessor
|
||||
.room_state_get(room, &s.to_owned().into(), &state_key)
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|ev| serde_json::from_str(ev.content.get()).ok())
|
||||
.flatten()
|
||||
.unwrap_or(JVal::Null);
|
||||
Value::Json(ev)
|
||||
}
|
||||
(_, _) => Value::null(),
|
||||
}
|
||||
}
|
||||
P2RuleOp::Eq => Value::Boolean(stack.pop().unwrap() == stack.pop().unwrap()),
|
||||
P2RuleOp::Neq => Value::Boolean(stack.pop().unwrap() != stack.pop().unwrap()),
|
||||
P2RuleOp::Not => match stack.pop().unwrap().unbox_bool() {
|
||||
Some(b) => Value::Boolean(!b),
|
||||
None => Value::null(),
|
||||
},
|
||||
P2RuleOp::And => unboxed!(unbox_bool, &&),
|
||||
P2RuleOp::Or => unboxed!(unbox_bool, ||),
|
||||
P2RuleOp::Gt => unboxed!(unbox_int, >),
|
||||
P2RuleOp::Lt => unboxed!(unbox_int, <),
|
||||
P2RuleOp::Gte => unboxed!(unbox_int, >=),
|
||||
P2RuleOp::Lte => unboxed!(unbox_int, <=),
|
||||
// maybe, maybe not? regex would be better, but spec doesn't have regex
|
||||
// P2RuleOp::Like => {
|
||||
// let item = stack.pop().unwrap();
|
||||
// let pattern = stack.pop().unwrap();
|
||||
// match (item, pattern) {
|
||||
// (Value::Json(JVal::String(text)), Value::Json(JVal::String(pat))) => {
|
||||
// lazy_static! {
|
||||
// static ref GLOB: regex::Regex = rgx!(r"_\*");
|
||||
// }
|
||||
|
||||
// let reg = regex::escape(&pat)
|
||||
// .replace("*", ".*")
|
||||
// .replace("?", ".?");
|
||||
|
||||
// Value::Boolean(rgx!(®).is_match(text))
|
||||
// }
|
||||
// (_, _) => Value::Boolean(false),
|
||||
// }
|
||||
// },
|
||||
P2RuleOp::Coalesce => todo!(),
|
||||
// P2RuleOp::Coalesce(n) => stack
|
||||
// .drain(stack.len() - n..)
|
||||
// .reduce(|prev, val| match prev {
|
||||
// Value::Json(JVal::Null) => val,
|
||||
// not_null => not_null,
|
||||
// })
|
||||
// .unwrap(),
|
||||
P2RuleOp::CanNotify => {
|
||||
use ruma::events::{
|
||||
room::power_levels::RoomPowerLevelsEventContent, StateEventType,
|
||||
};
|
||||
|
||||
let key = stack.pop().unwrap();
|
||||
let key = match key.unbox_str() {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
stack.push(Value::Boolean(false));
|
||||
continue;
|
||||
},
|
||||
};
|
||||
|
||||
let power_levels: RoomPowerLevelsEventContent = services()
|
||||
.rooms
|
||||
.state_accessor
|
||||
.room_state_get(room, &StateEventType::RoomPowerLevels, "")
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|ev| serde_json::from_str(ev.content.get()).ok())
|
||||
.flatten()
|
||||
.unwrap_or_default();
|
||||
|
||||
let user_level: i64 = (*power_levels.users.get(user)
|
||||
.unwrap_or_else(|| &power_levels.users_default))
|
||||
.into();
|
||||
|
||||
let notif_level: i64 = power_levels.notifications.get(key)
|
||||
.map(|&i| i.into())
|
||||
.unwrap_or(50);
|
||||
|
||||
Value::Boolean(user_level >= notif_level)
|
||||
}
|
||||
P2RuleOp::Dataset(count) => {
|
||||
let items: Vec<_> = stack.drain(stack.len() - count - 1..).collect();
|
||||
Value::Set(items)
|
||||
}
|
||||
// check if a property is in an object or an item is in an array/set
|
||||
P2RuleOp::In => match (stack.pop().unwrap(), stack.pop().unwrap()) {
|
||||
(Value::Json(JVal::Array(a)), Value::Json(JVal::String(s2))) => {
|
||||
Value::Boolean(a.iter().any(|v| v.as_str().is_some_and(|s1| s1 == s2)))
|
||||
}
|
||||
(int1, int2) if int1.unbox_int().is_some() && int2.unbox_int().is_some() => {
|
||||
Value::Boolean(int1.unbox_int().unwrap() == int2.unbox_int().unwrap())
|
||||
}
|
||||
(Value::Json(JVal::Object(o)), Value::Json(JVal::String(s))) => {
|
||||
Value::Boolean(o.contains_key(&s))
|
||||
}
|
||||
(_, _) => Value::Boolean(false),
|
||||
},
|
||||
P2RuleOp::Rule => {
|
||||
let rule = stack.pop().unwrap();
|
||||
let rule = rule.unbox_str().unwrap();
|
||||
Value::Boolean(matched.contains(rule))
|
||||
},
|
||||
};
|
||||
stack.push(next);
|
||||
}
|
||||
|
||||
// only things that are explicitly false count as false
|
||||
stack.pop().unwrap().unbox_bool().unwrap_or(true)
|
||||
}
|
||||
}
|
176
src/utils/p2/mod.rs
Normal file
176
src/utils/p2/mod.rs
Normal file
|
@ -0,0 +1,176 @@
|
|||
mod parser;
|
||||
mod eval;
|
||||
pub mod error;
|
||||
|
||||
use ruma::{RoomId, UserId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value as JVal;
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use parser::P2Rule;
|
||||
use error::P2RuleError;
|
||||
|
||||
#[derive(Debug, Default, PartialEq, Eq)]
|
||||
pub struct P2Rules {
|
||||
entries: Vec<(Box<str>, P2Entry)>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, PartialEq, Eq, Clone)]
|
||||
pub struct P2Entry {
|
||||
rule: P2Rule,
|
||||
actions: Vec<P2Action>,
|
||||
enabled: bool,
|
||||
}
|
||||
|
||||
/*
|
||||
endpoints (prefix = /_matrix/client/unstable/p2)
|
||||
|
||||
GET /_matrix/client/unstable/p2/inbox :: get events in an inbox
|
||||
POST /_matrix/client/unstable/p2/inbox :: manually add an event to an inbox
|
||||
DELETE /_matrix/client/unstable/p2/inbox/:eventid :: delete an event from an inbox
|
||||
DELETE /_matrix/client/unstable/p2/inbox :: delete all events from an inbox
|
||||
POST /_matrix/client/unstable/rooms/:roomid/ack :: reset (or manually set) counts in a room
|
||||
POST /_matrix/client/unstable/ack_bulk :: reset (or manually set) counts in many rooms
|
||||
*/
|
||||
|
||||
#[derive(Debug, Clone, Serialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum P2Action {
|
||||
/// increment a counter associated with this rule
|
||||
Count,
|
||||
|
||||
/// send a push notification to the user
|
||||
Notify,
|
||||
|
||||
/// store the event for later enumeration via `/inbox`
|
||||
Store,
|
||||
|
||||
/// mark the event in unsigned.p2_matches
|
||||
Mark,
|
||||
|
||||
/// a custom action that won't be handled by the server
|
||||
Custom(Box<str>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
pub struct P2Counters {
|
||||
pub counters: HashMap<Box<str>, u64>,
|
||||
#[serde(flatten)]
|
||||
_other: HashMap<Box<str>, JVal>,
|
||||
}
|
||||
|
||||
fn topological_sort(vec: &mut Vec<(Box<str>, P2Entry)>) -> bool {
|
||||
let (mut queue, mut missing_deps): (VecDeque<_>, VecDeque<_>) = std::mem::take(vec)
|
||||
.into_iter()
|
||||
.partition(|(_, ent)| ent.rule.depends.is_empty());
|
||||
while let Some(node) = queue.pop_front() {
|
||||
vec.push(node);
|
||||
let (has_deps, new_missing_deps) = missing_deps.into_iter().partition(|(_, other)| {
|
||||
other.rule.depends.iter().all(|dep| vec.iter().any(|(name, _)| name == dep))
|
||||
});
|
||||
missing_deps = new_missing_deps;
|
||||
queue.extend(has_deps);
|
||||
}
|
||||
missing_deps.is_empty()
|
||||
}
|
||||
|
||||
impl P2Rules {
|
||||
pub fn compile_value(v: JVal) -> Result<P2Rules, P2RuleError> {
|
||||
let mut rules: P2Rules = serde_json::from_value(v).map_err(P2RuleError::InvalidJson)?;
|
||||
if !topological_sort(&mut rules.entries) {
|
||||
return Err(P2RuleError::BadDeps);
|
||||
}
|
||||
Ok(rules)
|
||||
}
|
||||
|
||||
pub fn eval(
|
||||
&self,
|
||||
room: &RoomId,
|
||||
user: &UserId,
|
||||
json: JVal,
|
||||
) -> HashMap<Box<str>, HashSet<P2Action>> {
|
||||
// TODO: don't use duplicate structures
|
||||
let mut matched = HashMap::new();
|
||||
let mut matched_set = HashSet::new();
|
||||
for (name, ent) in &self.entries {
|
||||
if ent.rule.eval(room, user, &json, &matched_set) {
|
||||
matched_set.insert(name.clone());
|
||||
if ent.actions.is_empty() {
|
||||
matched.insert(name.clone(), HashSet::new());
|
||||
} else {
|
||||
matched.insert(name.clone(), ent.actions.iter().cloned().collect());
|
||||
}
|
||||
}
|
||||
}
|
||||
matched
|
||||
}
|
||||
|
||||
/// "overlay" another set of rules on top of these rules. if there's a conflict, our rules win.
|
||||
pub fn merge(mut self, other: P2Rules) -> P2Rules {
|
||||
for (name, rule) in other.entries {
|
||||
if self.entries.iter().find(|(i, _)| *i == name).is_none() {
|
||||
self.entries.push((name, rule));
|
||||
}
|
||||
}
|
||||
|
||||
if !topological_sort(&mut self.entries) {
|
||||
// FIXME: global on its own is fine, room on its own is fine,
|
||||
// but it may be possible to get a cycle when they're merged
|
||||
}
|
||||
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for P2Rules {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
#[derive(Deserialize)]
|
||||
pub struct FakeP2Rules {
|
||||
rules: HashMap<Box<str>, P2Entry>,
|
||||
}
|
||||
|
||||
let fake = FakeP2Rules::deserialize(deserializer)?;
|
||||
let mut rules = P2Rules {
|
||||
entries: fake.rules.into_iter().collect(),
|
||||
};
|
||||
if !topological_sort(&mut rules.entries) {
|
||||
return Err(serde::de::Error::custom::<P2RuleError>(P2RuleError::BadDeps));
|
||||
}
|
||||
|
||||
Ok(rules)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for P2Action {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
// this is a hacky mess but until #912 is closed it's necessary
|
||||
// https://github.com/serde-rs/serde/issues/912
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum FakeP2Action {
|
||||
Notify,
|
||||
Store,
|
||||
Mark,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum FakeP2ActionWrapper {
|
||||
Known(FakeP2Action),
|
||||
Unknown(Box<str>),
|
||||
}
|
||||
|
||||
let fake = FakeP2ActionWrapper::deserialize(deserializer)?;
|
||||
Ok(match fake {
|
||||
FakeP2ActionWrapper::Known(FakeP2Action::Notify) => P2Action::Notify,
|
||||
FakeP2ActionWrapper::Known(FakeP2Action::Store) => P2Action::Store,
|
||||
FakeP2ActionWrapper::Known(FakeP2Action::Mark) => P2Action::Mark,
|
||||
FakeP2ActionWrapper::Unknown(val) => P2Action::Custom(val),
|
||||
})
|
||||
}
|
||||
}
|
279
src/utils/p2/parser.rs
Normal file
279
src/utils/p2/parser.rs
Normal file
|
@ -0,0 +1,279 @@
|
|||
use std::{iter::Peekable, vec::IntoIter};
|
||||
|
||||
// use crate::utils::p2::error::P2RuleError;
|
||||
use super::P2RuleError;
|
||||
use lazy_static::lazy_static;
|
||||
use serde::Deserialize;
|
||||
|
||||
macro_rules! rgx {
|
||||
($pat:expr) => {
|
||||
regex::RegexBuilder::new($pat)
|
||||
.case_insensitive(true)
|
||||
.dot_matches_new_line(true)
|
||||
.build()
|
||||
.expect("regex is always valid")
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub struct P2Rule {
|
||||
pub(super) ops: Vec<P2RuleOp>,
|
||||
pub(super) data: Vec<P2RuleData>,
|
||||
pub(super) depends: Vec<Box<str>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
#[rustfmt::skip_fmt]
|
||||
pub enum P2RuleOp {
|
||||
Data(usize),
|
||||
Dataset(usize),
|
||||
Get,
|
||||
Rule,
|
||||
And, Or, Not, Coalesce,
|
||||
Eq, Neq, Gt, Lt, Gte, Lte,
|
||||
// Like, // glob?
|
||||
// Match, // regex?
|
||||
In, CanNotify,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub enum P2RuleData {
|
||||
Ident(Box<str>),
|
||||
String(Box<str>),
|
||||
Integer(i64),
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
enum P2TokenType {
|
||||
String,
|
||||
Integer,
|
||||
Operator,
|
||||
Rule,
|
||||
Ident,
|
||||
OpenParan,
|
||||
CloseParan,
|
||||
OpenBracket,
|
||||
CloseBracket,
|
||||
Space,
|
||||
}
|
||||
|
||||
type P2Token<'a> = (P2TokenType, Option<&'a str>, usize);
|
||||
|
||||
fn get_binding(op: &str) -> (u32, u32) {
|
||||
match op {
|
||||
"." => (10, 11),
|
||||
"!" => (8, 0),
|
||||
"&&" => (6, 7),
|
||||
"||" => (4, 5),
|
||||
"==" => (2, 3),
|
||||
"!=" => (2, 3),
|
||||
">" => (2, 3),
|
||||
"<" => (2, 3),
|
||||
">=" => (2, 3),
|
||||
"<=" => (2, 3),
|
||||
"in" => (0, 1),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_op(op: &str) -> P2RuleOp {
|
||||
match op {
|
||||
"." => P2RuleOp::Get,
|
||||
"!" => P2RuleOp::Not,
|
||||
"&&" => P2RuleOp::And,
|
||||
"||" => P2RuleOp::Or,
|
||||
"==" => P2RuleOp::Eq,
|
||||
"!=" => P2RuleOp::Neq,
|
||||
">" => P2RuleOp::Gt,
|
||||
"<" => P2RuleOp::Lt,
|
||||
">=" => P2RuleOp::Gte,
|
||||
"<=" => P2RuleOp::Lte,
|
||||
"in" => P2RuleOp::In,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
impl P2Rule {
|
||||
/// Take an expression and tokenize, parse, and generate bytecode
|
||||
/// for it
|
||||
pub fn parse(s: &str) -> Result<P2Rule, P2RuleError> {
|
||||
let mut tokens = P2Rule::tokenize(s)?.into_iter().peekable();
|
||||
let mut ops = vec![];
|
||||
let mut data = vec![];
|
||||
let mut depends = vec![];
|
||||
P2Rule::generate(&mut tokens, &mut ops, &mut data, &mut depends, 0)?;
|
||||
dbg!(&ops, &data, &depends);
|
||||
match tokens.next() {
|
||||
None => {},
|
||||
Some((_, _, pos)) => return Err(P2RuleError::InvalidToken(pos)),
|
||||
};
|
||||
Ok(P2Rule { ops, data, depends })
|
||||
}
|
||||
|
||||
fn tokenize<'a>(s: &'a str) -> Result<Vec<P2Token<'a>>, P2RuleError> {
|
||||
lazy_static! {
|
||||
static ref REGEXES: Vec<(P2TokenType, regex::Regex)> = vec![
|
||||
(P2TokenType::Operator, rgx!(r"^(\.|&&|\|\||!|==|!=|>|<|>=|<=|in)")),
|
||||
(P2TokenType::String, rgx!(r#"^"((:?[^\\]|\\.)+?)""#)),
|
||||
(P2TokenType::String, rgx!(r"^'((:?[^\\]|\\.)+?)'")),
|
||||
(P2TokenType::Integer, rgx!(r"^(-?[0-9]+?)")),
|
||||
(P2TokenType::Ident, rgx!(r#"^([a-z0-9_-]+)"#)),
|
||||
(P2TokenType::Rule, rgx!(r"^@")),
|
||||
(P2TokenType::OpenParan, rgx!(r"^\(")),
|
||||
(P2TokenType::CloseParan, rgx!(r"^\)")),
|
||||
(P2TokenType::OpenBracket, rgx!(r"^\[")),
|
||||
(P2TokenType::CloseBracket, rgx!(r"^\]")),
|
||||
(P2TokenType::Space, rgx!(r"^\s+")),
|
||||
];
|
||||
}
|
||||
|
||||
let mut start = 0;
|
||||
let mut tokens = vec![];
|
||||
'search: while start < s.len() {
|
||||
for (kind, regex) in REGEXES.iter() {
|
||||
if let Some(found) = regex.captures(&s[start..]) {
|
||||
let full = found.get(0).unwrap().as_str();
|
||||
let capture = found.get(1).map(|c| c.as_str());
|
||||
if *kind != P2TokenType::Space {
|
||||
tokens.push((*kind, capture, start));
|
||||
}
|
||||
start += full.len();
|
||||
continue 'search;
|
||||
}
|
||||
}
|
||||
return Err(P2RuleError::InvalidChar(start));
|
||||
}
|
||||
|
||||
Ok(tokens)
|
||||
}
|
||||
|
||||
/// recursively walk through tokens, "unravelling" it into bytecode
|
||||
fn generate(
|
||||
tokens: &mut Peekable<IntoIter<P2Token<'_>>>,
|
||||
ops: &mut Vec<P2RuleOp>,
|
||||
data: &mut Vec<P2RuleData>,
|
||||
depends: &mut Vec<Box<str>>,
|
||||
parent_power: u32,
|
||||
) -> Result<(), P2RuleError> {
|
||||
match tokens.next() {
|
||||
Some((P2TokenType::Ident, Some(ident), _)) => {
|
||||
let idx = data.len();
|
||||
data.push(P2RuleData::Ident(ident.to_string().into_boxed_str()));
|
||||
ops.push(P2RuleOp::Data(idx));
|
||||
|
||||
// TODO: unhardcode
|
||||
if tokens.next_if(|t| matches!(t, (P2TokenType::OpenParan, _, _))).is_some() {
|
||||
if ident != "can_notify" {
|
||||
return Err(P2RuleError::BadCall(ident.to_string()));
|
||||
}
|
||||
match tokens.next() {
|
||||
Some((P2TokenType::CloseParan, _, _)) => {},
|
||||
Some((_, _, pos)) => return Err(P2RuleError::InvalidToken(pos)),
|
||||
None => return Err(P2RuleError::UnexpectedEOF),
|
||||
}
|
||||
}
|
||||
}
|
||||
Some((P2TokenType::String, Some(string), _)) => {
|
||||
let idx = data.len();
|
||||
data.push(P2RuleData::String(string.to_string().into_boxed_str()));
|
||||
ops.push(P2RuleOp::Data(idx));
|
||||
}
|
||||
Some((P2TokenType::Integer, Some(int), _)) => {
|
||||
let idx = data.len();
|
||||
data.push(P2RuleData::Integer(int.parse().expect("regex only matches valid ints")));
|
||||
ops.push(P2RuleOp::Data(idx));
|
||||
}
|
||||
Some((P2TokenType::OpenParan, None, _)) => {
|
||||
P2Rule::generate(tokens, ops, data, depends, 0)?;
|
||||
match tokens.next() {
|
||||
Some((P2TokenType::CloseParan, _, _)) => {},
|
||||
Some((_, _, pos)) => return Err(P2RuleError::InvalidToken(pos)),
|
||||
None => return Err(P2RuleError::UnexpectedEOF),
|
||||
}
|
||||
}
|
||||
Some((P2TokenType::OpenBracket, None, _)) => {
|
||||
let mut set_size = 0;
|
||||
loop {
|
||||
match tokens.next() {
|
||||
Some((P2TokenType::CloseBracket, None, _)) => break,
|
||||
Some((_, _, _)) => P2Rule::generate(tokens, ops, data, depends, 0)?,
|
||||
None => return Err(P2RuleError::UnexpectedEOF),
|
||||
};
|
||||
set_size += 1;
|
||||
match tokens.next() {
|
||||
Some((P2TokenType::CloseBracket, None, _)) => break,
|
||||
Some((_, Some(","), _)) => {},
|
||||
Some((_, _, pos)) => return Err(P2RuleError::InvalidToken(pos)),
|
||||
None => return Err(P2RuleError::UnexpectedEOF),
|
||||
};
|
||||
}
|
||||
ops.push(P2RuleOp::Dataset(set_size))
|
||||
}
|
||||
Some((P2TokenType::Rule, None, _)) => {
|
||||
let rule = match tokens.next() {
|
||||
Some((P2TokenType::String, Some(s), _)) => s,
|
||||
Some((P2TokenType::Ident, Some(s), _)) => s,
|
||||
Some((_, _, pos)) => return Err(P2RuleError::InvalidToken(pos)),
|
||||
None => return Err(P2RuleError::UnexpectedEOF),
|
||||
};
|
||||
let bx = rule.to_string().into_boxed_str();
|
||||
let idx = data.len();
|
||||
data.push(P2RuleData::String(bx.clone()));
|
||||
depends.push(bx);
|
||||
ops.push(P2RuleOp::Data(idx));
|
||||
ops.push(P2RuleOp::Rule);
|
||||
}
|
||||
Some((_, _, pos)) => return Err(P2RuleError::InvalidToken(pos)),
|
||||
None => return Err(P2RuleError::UnexpectedEOF),
|
||||
};
|
||||
|
||||
loop {
|
||||
match tokens.peek() {
|
||||
Some((P2TokenType::Operator, Some(op), _)) => {
|
||||
let (lhs_power, rhs_power) = get_binding(op);
|
||||
|
||||
// if the op on the left binds more powerfully than this op, they win
|
||||
if parent_power >= lhs_power {
|
||||
break;
|
||||
}
|
||||
|
||||
let op = op.clone();
|
||||
tokens.next();
|
||||
P2Rule::generate(tokens, ops, data, depends, rhs_power)?;
|
||||
ops.push(get_op(&op));
|
||||
}
|
||||
Some((P2TokenType::CloseParan | P2TokenType::CloseBracket, _, _)) | None => {},
|
||||
Some((_, _, pos)) => return Err(P2RuleError::InvalidToken(*pos)),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for P2Rule {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
struct V;
|
||||
|
||||
impl serde::de::Visitor<'_> for V {
|
||||
type Value = P2Rule;
|
||||
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(formatter, "valid push rule")
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: serde::de::Error,
|
||||
{
|
||||
P2Rule::parse(v).map_err(serde::de::Error::custom)
|
||||
}
|
||||
}
|
||||
|
||||
deserializer.deserialize_str(V)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in a new issue