Compare commits
16 commits
master
...
Nyaaori/fe
Author | SHA1 | Date | |
---|---|---|---|
|
c249fd7d4d | ||
|
330e68eb93 | ||
|
1fabc3cd69 | ||
|
b9c2bb38db | ||
|
d8ee8cd0ef | ||
|
2717bdcf2e | ||
|
98e24722ad | ||
|
e71ad56e68 | ||
|
5987452cfc | ||
|
e3dcb668cf | ||
|
f814e8e5cc | ||
|
898e4d24d9 | ||
|
5d7f4602b2 | ||
|
c4bd0a9f2c | ||
|
711e03b799 | ||
|
ed64b528c1 |
7 changed files with 630 additions and 54 deletions
|
@ -12,6 +12,7 @@ use ruma::{
|
|||
client::error::{Error as RumaError, ErrorKind},
|
||||
federation::{
|
||||
authorization::get_event_authorization,
|
||||
backfill::get_backfill,
|
||||
device::get_devices::{self, v1::UserDevice},
|
||||
directory::{get_public_rooms, get_public_rooms_filtered},
|
||||
discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey},
|
||||
|
@ -43,11 +44,11 @@ use ruma::{
|
|||
serde::{Base64, JsonObject, Raw},
|
||||
to_device::DeviceIdOrAllDevices,
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
|
||||
OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName,
|
||||
OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, UInt,
|
||||
};
|
||||
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
collections::{BTreeMap, HashSet, VecDeque},
|
||||
fmt::Debug,
|
||||
mem,
|
||||
net::{IpAddr, SocketAddr},
|
||||
|
@ -950,6 +951,58 @@ pub async fn get_event_route(
|
|||
})
|
||||
}
|
||||
|
||||
/// # `GET /_matrix/federation/v1/backfill/<room_id>`
|
||||
///
|
||||
/// Retrieves events from before the sender joined the room, if the room's
|
||||
/// history visibility allows.
|
||||
pub async fn get_backfill_route(
|
||||
body: Ruma<get_backfill::v1::Request>,
|
||||
) -> Result<get_backfill::v1::Response> {
|
||||
if !services().globals.allow_federation() {
|
||||
return Err(Error::bad_config("Federation is disabled."));
|
||||
}
|
||||
|
||||
let sender_servername = body
|
||||
.sender_servername
|
||||
.as_ref()
|
||||
.expect("server is authenticated");
|
||||
|
||||
info!("Got backfill request from: {}", sender_servername);
|
||||
|
||||
if !services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(sender_servername, &body.room_id)?
|
||||
{
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::Forbidden,
|
||||
"Server is not in room.",
|
||||
));
|
||||
}
|
||||
|
||||
services()
|
||||
.rooms
|
||||
.event_handler
|
||||
.acl_check(sender_servername, &body.room_id)?;
|
||||
|
||||
let origin = services().globals.server_name().to_owned();
|
||||
let earliest_events = &[];
|
||||
|
||||
let events = get_missing_events(
|
||||
sender_servername,
|
||||
&body.room_id,
|
||||
earliest_events,
|
||||
&body.v,
|
||||
body.limit,
|
||||
)?;
|
||||
|
||||
Ok(get_backfill::v1::Response {
|
||||
origin,
|
||||
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
|
||||
pdus: events,
|
||||
})
|
||||
}
|
||||
|
||||
/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
|
||||
///
|
||||
/// Retrieves events that the sender is missing.
|
||||
|
@ -981,52 +1034,197 @@ pub async fn get_missing_events_route(
|
|||
.event_handler
|
||||
.acl_check(sender_servername, &body.room_id)?;
|
||||
|
||||
let mut queued_events = body.latest_events.clone();
|
||||
let mut events = Vec::new();
|
||||
|
||||
let mut i = 0;
|
||||
while i < queued_events.len() && events.len() < u64::from(body.limit) as usize {
|
||||
if let Some(pdu) = services().rooms.timeline.get_pdu_json(&queued_events[i])? {
|
||||
let room_id_str = pdu
|
||||
.get("room_id")
|
||||
.and_then(|val| val.as_str())
|
||||
.ok_or_else(|| Error::bad_database("Invalid event in database"))?;
|
||||
|
||||
let event_room_id = <&RoomId>::try_from(room_id_str)
|
||||
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
|
||||
|
||||
if event_room_id != body.room_id {
|
||||
warn!(
|
||||
"Evil event detected: Event {} found while searching in room {}",
|
||||
queued_events[i], body.room_id
|
||||
);
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
"Evil event detected",
|
||||
));
|
||||
}
|
||||
|
||||
if body.earliest_events.contains(&queued_events[i]) {
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
queued_events.extend_from_slice(
|
||||
&serde_json::from_value::<Vec<OwnedEventId>>(
|
||||
serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| {
|
||||
Error::bad_database("Event in db has no prev_events field.")
|
||||
})?)
|
||||
.expect("canonical json is valid json value"),
|
||||
)
|
||||
.map_err(|_| Error::bad_database("Invalid prev_events content in pdu in db."))?,
|
||||
);
|
||||
events.push(PduEvent::convert_to_outgoing_federation_event(pdu));
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
let events = get_missing_events(
|
||||
sender_servername,
|
||||
&body.room_id,
|
||||
&body.earliest_events,
|
||||
&body.latest_events,
|
||||
body.limit,
|
||||
)?;
|
||||
|
||||
Ok(get_missing_events::v1::Response { events })
|
||||
}
|
||||
|
||||
/// Fetch events starting from `latest_events`, going backwards
|
||||
/// through each event's `prev_events` until reaching the `earliest_events`.
|
||||
///
|
||||
/// Used by the federation /backfill and /get_missing_events routes.
|
||||
fn get_missing_events(
|
||||
sender_servername: &ServerName,
|
||||
room_id: &RoomId,
|
||||
earliest_events: &[OwnedEventId],
|
||||
latest_events: &[OwnedEventId],
|
||||
limit: UInt,
|
||||
) -> Result<Vec<Box<RawJsonValue>>> {
|
||||
let (room_members, room_errors): (Vec<_>, Vec<_>) = services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.room_members(room_id)
|
||||
.partition(Result::is_ok);
|
||||
|
||||
// Just log errors and continue with correct users
|
||||
if !room_errors.is_empty() {
|
||||
warn!(?room_id, "Some errors occurred when fetching room members");
|
||||
}
|
||||
|
||||
let current_server_members: Vec<OwnedUserId> = room_members
|
||||
.into_iter()
|
||||
.map(Result::unwrap)
|
||||
.filter(|member| member.server_name() == sender_servername)
|
||||
.collect();
|
||||
|
||||
let event_filter = |event_id: &EventId| {
|
||||
services()
|
||||
.rooms
|
||||
.state_accessor
|
||||
.server_can_see_event(
|
||||
sender_servername,
|
||||
current_server_members.as_slice(),
|
||||
event_id,
|
||||
)
|
||||
.unwrap_or_default()
|
||||
};
|
||||
|
||||
let pdu_filter = |pdu: &CanonicalJsonObject| {
|
||||
let event_room_id = pdu
|
||||
.get("room_id")
|
||||
.and_then(|val| val.as_str())
|
||||
.and_then(|room_id_str| <&RoomId>::try_from(room_id_str).ok());
|
||||
|
||||
match event_room_id {
|
||||
Some(event_room_id) => {
|
||||
let valid_event = event_room_id == room_id;
|
||||
if !valid_event {
|
||||
error!(?room_id, ?event_room_id, "An evil event detected");
|
||||
}
|
||||
valid_event
|
||||
}
|
||||
None => {
|
||||
error!(?pdu, "Can't extract valid `room_id` from pdu");
|
||||
false
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
#[inline]
|
||||
fn get_pdu(event: &EventId) -> Option<CanonicalJsonObject> {
|
||||
services()
|
||||
.rooms
|
||||
.timeline
|
||||
.get_pdu_json(event)
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
let events = linearize_previous_events(
|
||||
latest_events.iter().cloned(),
|
||||
earliest_events.iter().cloned(),
|
||||
limit,
|
||||
get_pdu,
|
||||
event_filter,
|
||||
pdu_filter,
|
||||
);
|
||||
|
||||
Ok(events)
|
||||
}
|
||||
|
||||
/// Unwinds previous events by doing a breadth-first walk from given roots
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `roots`: Starting point to unwind event history
|
||||
/// * `excluded`: Skipped events
|
||||
/// * `limit`: How many events to extract
|
||||
/// * `pdu_extractor`: Closure to extract PDU for given event_id, for example, from DB.
|
||||
/// * `event_filter`: Closure to filter event by it's visiblity. It may or may not hit DB.
|
||||
/// * `pdu_filter`: Closure to get basic validation against malformed PDUs.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// The previous events for given roots, without any `excluded` events, up to the provided `limit`.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// In matrix specification, «Server-Server API», paragraph 8 there is no mention of previous events for excluded events.
|
||||
/// Therefore, algorithm below excludes **only** events itself, but allows to process their history.
|
||||
fn linearize_previous_events<E, L, F, V, P>(
|
||||
roots: E,
|
||||
excluded: E,
|
||||
limit: L,
|
||||
pdu_extractor: P,
|
||||
event_filter: F,
|
||||
pdu_filter: V,
|
||||
) -> Vec<Box<RawJsonValue>>
|
||||
where
|
||||
E: IntoIterator<Item = OwnedEventId>,
|
||||
F: Fn(&EventId) -> bool,
|
||||
L: Into<u64>,
|
||||
V: Fn(&CanonicalJsonObject) -> bool,
|
||||
P: Fn(&EventId) -> Option<CanonicalJsonObject>,
|
||||
{
|
||||
let limit = limit.into() as usize;
|
||||
assert!(limit > 0, "Limit should be > 0");
|
||||
|
||||
#[inline]
|
||||
fn get_previous_events(pdu: &CanonicalJsonObject) -> Option<Vec<OwnedEventId>> {
|
||||
match pdu.get("prev_events") {
|
||||
None => {
|
||||
error!(?pdu, "A stored event has no 'prev_events' field");
|
||||
None
|
||||
}
|
||||
Some(prev_events) => {
|
||||
let val = prev_events.clone().into();
|
||||
let events = serde_json::from_value::<Vec<OwnedEventId>>(val);
|
||||
if let Err(error) = events {
|
||||
error!(?prev_events, ?error, "Broken 'prev_events' field");
|
||||
return None;
|
||||
}
|
||||
Some(events.unwrap_or_default())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut visited: HashSet<OwnedEventId> = Default::default();
|
||||
let mut history: Vec<Box<RawJsonValue>> = Default::default();
|
||||
let mut queue: VecDeque<OwnedEventId> = Default::default();
|
||||
let excluded: HashSet<_> = excluded.into_iter().collect();
|
||||
|
||||
// Add all roots into processing queue
|
||||
for root in roots {
|
||||
queue.push_back(root);
|
||||
}
|
||||
|
||||
while let Some(current_event) = queue.pop_front() {
|
||||
// Return all collected events if reached limit
|
||||
if history.len() >= limit {
|
||||
return history;
|
||||
}
|
||||
|
||||
// Skip an entire branch containing incorrect events
|
||||
if !event_filter(¤t_event) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Process PDU from a current event if it exists and valid
|
||||
if let Some(pdu) = pdu_extractor(¤t_event).filter(&pdu_filter) {
|
||||
if !&excluded.contains(¤t_event) {
|
||||
history.push(PduEvent::convert_to_outgoing_federation_event(pdu.clone()));
|
||||
}
|
||||
|
||||
// Fetch previous events, if they exists
|
||||
if let Some(previous_events) = get_previous_events(&pdu) {
|
||||
for previous_event in previous_events {
|
||||
if !visited.contains(&previous_event) {
|
||||
visited.insert(previous_event.clone());
|
||||
queue.push_back(previous_event);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// All done, return collected events
|
||||
history
|
||||
}
|
||||
|
||||
/// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}`
|
||||
///
|
||||
/// Retrieves the auth chain for a given event.
|
||||
|
@ -1779,7 +1977,11 @@ pub async fn claim_keys_route(
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{add_port_to_hostname, get_ip_with_port, FedDest};
|
||||
use super::{add_port_to_hostname, get_ip_with_port, linearize_previous_events, FedDest};
|
||||
use ruma::{CanonicalJsonObject, CanonicalJsonValue, OwnedEventId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{value::RawValue, Value};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[test]
|
||||
fn ips_get_default_ports() {
|
||||
|
@ -1820,4 +2022,227 @@ mod tests {
|
|||
FedDest::Named(String::from("example.com"), String::from(":1337"))
|
||||
)
|
||||
}
|
||||
|
||||
type PduStorage = HashMap<OwnedEventId, CanonicalJsonObject>;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct MockPDU {
|
||||
content: i32,
|
||||
prev_events: Vec<OwnedEventId>,
|
||||
}
|
||||
|
||||
fn mock_event_id(id: &i32) -> OwnedEventId {
|
||||
const DOMAIN: &str = "canterlot.eq";
|
||||
<OwnedEventId>::try_from(format!("${id}:{DOMAIN}")).unwrap()
|
||||
}
|
||||
|
||||
fn create_graph(data: Vec<(i32, Vec<i32>)>) -> PduStorage {
|
||||
data.iter()
|
||||
.map(|(head, tail)| {
|
||||
let key = mock_event_id(head);
|
||||
let pdu = MockPDU {
|
||||
content: *head,
|
||||
prev_events: tail.iter().map(mock_event_id).collect(),
|
||||
};
|
||||
let value = serde_json::to_value(pdu).unwrap();
|
||||
let value: CanonicalJsonValue = value.try_into().unwrap();
|
||||
(key, value.as_object().unwrap().to_owned())
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn mock_full_graph() -> PduStorage {
|
||||
/*
|
||||
(1)
|
||||
__________|___________
|
||||
/ / \ \
|
||||
(2) (3) (10) (11)
|
||||
/ \ / \ | |
|
||||
(4) (5) (6) (7) (12) (13)
|
||||
| | |
|
||||
(8) (9) (14)
|
||||
\ /
|
||||
(15)
|
||||
|
|
||||
(16)
|
||||
*/
|
||||
create_graph(vec![
|
||||
(1, vec![2, 3, 10, 11]),
|
||||
(2, vec![4, 5]),
|
||||
(3, vec![6, 7]),
|
||||
(4, vec![]),
|
||||
(5, vec![8]),
|
||||
(6, vec![9]),
|
||||
(7, vec![]),
|
||||
(8, vec![15]),
|
||||
(9, vec![15]),
|
||||
(10, vec![12]),
|
||||
(11, vec![13]),
|
||||
(12, vec![]),
|
||||
(13, vec![14]),
|
||||
(14, vec![]),
|
||||
(15, vec![16]),
|
||||
(16, vec![16]),
|
||||
])
|
||||
}
|
||||
|
||||
fn extract_events_payload(events: Vec<Box<RawValue>>) -> Vec<i32> {
|
||||
events
|
||||
.iter()
|
||||
.map(|e| serde_json::from_str(e.get()).unwrap())
|
||||
.map(|p: MockPDU| p.content)
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backfill_empty() {
|
||||
let events = linearize_previous_events(
|
||||
vec![],
|
||||
vec![],
|
||||
16u64,
|
||||
|_| unreachable!(),
|
||||
|_| true,
|
||||
|_| true,
|
||||
);
|
||||
assert!(events.is_empty());
|
||||
}
|
||||
#[test]
|
||||
fn backfill_limit() {
|
||||
/*
|
||||
(5) → (4) → (3) → (2) → (1) → ×
|
||||
*/
|
||||
let events = create_graph(vec![
|
||||
(1, vec![]),
|
||||
(2, vec![1]),
|
||||
(3, vec![2]),
|
||||
(4, vec![3]),
|
||||
(5, vec![4]),
|
||||
]);
|
||||
let roots = vec![mock_event_id(&5)];
|
||||
let result = linearize_previous_events(
|
||||
roots,
|
||||
vec![],
|
||||
3u64,
|
||||
|e| events.get(e).cloned(),
|
||||
|_| true,
|
||||
|_| true,
|
||||
);
|
||||
|
||||
assert_eq!(extract_events_payload(result), vec![5, 4, 3])
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backfill_bfs() {
|
||||
let events = mock_full_graph();
|
||||
let roots = vec![mock_event_id(&1)];
|
||||
let result = linearize_previous_events(
|
||||
roots,
|
||||
vec![],
|
||||
100u64,
|
||||
|e| events.get(e).cloned(),
|
||||
|_| true,
|
||||
|_| true,
|
||||
);
|
||||
assert_eq!(
|
||||
extract_events_payload(result),
|
||||
vec![1, 2, 3, 10, 11, 4, 5, 6, 7, 12, 13, 8, 9, 14, 15, 16]
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backfill_subgraph() {
|
||||
let events = mock_full_graph();
|
||||
let roots = vec![mock_event_id(&3)];
|
||||
let result = linearize_previous_events(
|
||||
roots,
|
||||
vec![],
|
||||
100u64,
|
||||
|e| events.get(e).cloned(),
|
||||
|_| true,
|
||||
|_| true,
|
||||
);
|
||||
assert_eq!(extract_events_payload(result), vec![3, 6, 7, 9, 15, 16])
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backfill_two_roots() {
|
||||
let events = mock_full_graph();
|
||||
let roots = vec![mock_event_id(&3), mock_event_id(&11)];
|
||||
let result = linearize_previous_events(
|
||||
roots,
|
||||
vec![],
|
||||
100u64,
|
||||
|e| events.get(e).cloned(),
|
||||
|_| true,
|
||||
|_| true,
|
||||
);
|
||||
assert_eq!(
|
||||
extract_events_payload(result),
|
||||
vec![3, 11, 6, 7, 13, 9, 14, 15, 16]
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backfill_exclude_events() {
|
||||
let events = mock_full_graph();
|
||||
let roots = vec![mock_event_id(&1)];
|
||||
let excluded_events = vec![
|
||||
mock_event_id(&14),
|
||||
mock_event_id(&15),
|
||||
mock_event_id(&16),
|
||||
mock_event_id(&3),
|
||||
];
|
||||
let result = linearize_previous_events(
|
||||
roots,
|
||||
excluded_events,
|
||||
100u64,
|
||||
|e| events.get(e).cloned(),
|
||||
|_| true,
|
||||
|_| true,
|
||||
);
|
||||
assert_eq!(
|
||||
extract_events_payload(result),
|
||||
vec![1, 2, 10, 11, 4, 5, 6, 7, 12, 13, 8, 9]
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backfill_exclude_branch_with_evil_event() {
|
||||
let events = mock_full_graph();
|
||||
let roots = vec![mock_event_id(&1)];
|
||||
let result = linearize_previous_events(
|
||||
roots,
|
||||
vec![],
|
||||
100u64,
|
||||
|e| events.get(e).cloned(),
|
||||
|_| true,
|
||||
|e| {
|
||||
let value: Value = CanonicalJsonValue::Object(e.clone()).into();
|
||||
let pdu: MockPDU = serde_json::from_value(value).unwrap();
|
||||
pdu.content != 3
|
||||
},
|
||||
);
|
||||
assert_eq!(
|
||||
extract_events_payload(result),
|
||||
vec![1, 2, 10, 11, 4, 5, 12, 13, 8, 14, 15, 16]
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backfill_exclude_branch_with_inaccessible_event() {
|
||||
let events = mock_full_graph();
|
||||
let roots = vec![mock_event_id(&1)];
|
||||
let result = linearize_previous_events(
|
||||
roots,
|
||||
vec![],
|
||||
100u64,
|
||||
|e| events.get(e).cloned(),
|
||||
|e| e != mock_event_id(&3),
|
||||
|_| true,
|
||||
);
|
||||
assert_eq!(
|
||||
extract_events_payload(result),
|
||||
vec![1, 2, 10, 11, 4, 5, 12, 13, 8, 14, 15, 16]
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,11 @@ use std::{collections::HashMap, sync::Arc};
|
|||
|
||||
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
|
||||
use async_trait::async_trait;
|
||||
use ruma::{events::StateEventType, EventId, RoomId};
|
||||
use ruma::{
|
||||
events::{room::member::MembershipState, StateEventType},
|
||||
EventId, RoomId, UserId,
|
||||
};
|
||||
use serde_json::Value;
|
||||
|
||||
#[async_trait]
|
||||
impl service::rooms::state_accessor::Data for KeyValueDatabase {
|
||||
|
@ -120,6 +124,21 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
|
|||
})
|
||||
}
|
||||
|
||||
fn state_get_content(
|
||||
&self,
|
||||
shortstatehash: u64,
|
||||
event_type: &StateEventType,
|
||||
state_key: &str,
|
||||
) -> Result<Option<Value>> {
|
||||
let content = self
|
||||
.state_get(shortstatehash, event_type, state_key)?
|
||||
.map(|event| serde_json::from_str(event.content.get()))
|
||||
.transpose()
|
||||
.map_err(|_| Error::bad_database("Invalid event in database"))?;
|
||||
|
||||
Ok(content)
|
||||
}
|
||||
|
||||
/// Returns the state hash for this pdu.
|
||||
fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<Option<u64>> {
|
||||
self.eventid_shorteventid
|
||||
|
@ -138,6 +157,23 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
|
|||
})
|
||||
}
|
||||
|
||||
/// Get membership for given user in state
|
||||
fn user_membership(&self, shortstatehash: u64, user_id: &UserId) -> Result<MembershipState> {
|
||||
self.state_get_content(
|
||||
shortstatehash,
|
||||
&StateEventType::RoomMember,
|
||||
user_id.as_str(),
|
||||
)?
|
||||
.map(|content| match content.get("membership") {
|
||||
Some(Value::String(membership)) => Ok(MembershipState::from(membership.as_str())),
|
||||
None => Ok(MembershipState::Leave),
|
||||
_ => Err(Error::bad_database(
|
||||
"Malformed membership, expected Value::String",
|
||||
)),
|
||||
})
|
||||
.unwrap_or(Ok(MembershipState::Leave))
|
||||
}
|
||||
|
||||
/// Returns the full room state.
|
||||
async fn room_state_full(
|
||||
&self,
|
||||
|
|
|
@ -391,6 +391,7 @@ fn routes() -> Router {
|
|||
.ruma_route(server_server::send_transaction_message_route)
|
||||
.ruma_route(server_server::get_event_route)
|
||||
.ruma_route(server_server::get_missing_events_route)
|
||||
.ruma_route(server_server::get_backfill_route)
|
||||
.ruma_route(server_server::get_event_authorization_route)
|
||||
.ruma_route(server_server::get_room_state_route)
|
||||
.ruma_route(server_server::get_room_state_ids_route)
|
||||
|
|
|
@ -77,7 +77,12 @@ impl Services {
|
|||
search: rooms::search::Service { db },
|
||||
short: rooms::short::Service { db },
|
||||
state: rooms::state::Service { db },
|
||||
state_accessor: rooms::state_accessor::Service { db },
|
||||
state_accessor: rooms::state_accessor::Service {
|
||||
db,
|
||||
server_visibility_cache: Mutex::new(LruCache::new(
|
||||
(100.0 * config.conduit_cache_capacity_modifier) as usize,
|
||||
)),
|
||||
},
|
||||
state_cache: rooms::state_cache::Service { db },
|
||||
state_compressor: rooms::state_compressor::Service {
|
||||
db,
|
||||
|
|
|
@ -281,6 +281,10 @@ impl state_res::Event for PduEvent {
|
|||
&self.sender
|
||||
}
|
||||
|
||||
fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch {
|
||||
MilliSecondsSinceUnixEpoch(self.origin_server_ts)
|
||||
}
|
||||
|
||||
fn event_type(&self) -> &RoomEventType {
|
||||
&self.kind
|
||||
}
|
||||
|
@ -289,10 +293,6 @@ impl state_res::Event for PduEvent {
|
|||
&self.content
|
||||
}
|
||||
|
||||
fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch {
|
||||
MilliSecondsSinceUnixEpoch(self.origin_server_ts)
|
||||
}
|
||||
|
||||
fn state_key(&self) -> Option<&str> {
|
||||
self.state_key.as_deref()
|
||||
}
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use ruma::{events::StateEventType, EventId, RoomId};
|
||||
use ruma::{
|
||||
events::{room::member::MembershipState, StateEventType},
|
||||
EventId, RoomId, UserId,
|
||||
};
|
||||
|
||||
use crate::{PduEvent, Result};
|
||||
|
||||
|
@ -32,9 +35,19 @@ pub trait Data: Send + Sync {
|
|||
state_key: &str,
|
||||
) -> Result<Option<Arc<PduEvent>>>;
|
||||
|
||||
fn state_get_content(
|
||||
&self,
|
||||
shortstatehash: u64,
|
||||
event_type: &StateEventType,
|
||||
state_key: &str,
|
||||
) -> Result<Option<serde_json::Value>>;
|
||||
|
||||
/// Returns the state hash for this pdu.
|
||||
fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<Option<u64>>;
|
||||
|
||||
/// Get membership for given user in state
|
||||
fn user_membership(&self, shortstatehash: u64, user_id: &UserId) -> Result<MembershipState>;
|
||||
|
||||
/// Returns the full room state.
|
||||
async fn room_state_full(
|
||||
&self,
|
||||
|
|
|
@ -1,13 +1,25 @@
|
|||
mod data;
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
pub use data::Data;
|
||||
use ruma::{events::StateEventType, EventId, RoomId};
|
||||
use lru_cache::LruCache;
|
||||
use ruma::{
|
||||
events::{
|
||||
room::{history_visibility::HistoryVisibility, member::MembershipState},
|
||||
StateEventType,
|
||||
},
|
||||
EventId, OwnedServerName, OwnedUserId, RoomId, ServerName, UserId,
|
||||
};
|
||||
use tracing::warn;
|
||||
|
||||
use crate::{PduEvent, Result};
|
||||
|
||||
pub struct Service {
|
||||
pub db: &'static dyn Data,
|
||||
pub server_visibility_cache: Mutex<LruCache<(OwnedServerName, u64), bool>>,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
|
@ -46,11 +58,95 @@ impl Service {
|
|||
self.db.state_get(shortstatehash, event_type, state_key)
|
||||
}
|
||||
|
||||
pub fn state_get_content(
|
||||
&self,
|
||||
shortstatehash: u64,
|
||||
event_type: &StateEventType,
|
||||
state_key: &str,
|
||||
) -> Result<Option<serde_json::Value>> {
|
||||
self.db
|
||||
.state_get_content(shortstatehash, event_type, state_key)
|
||||
}
|
||||
|
||||
/// Returns the state hash for this pdu.
|
||||
pub fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<Option<u64>> {
|
||||
self.db.pdu_shortstatehash(event_id)
|
||||
}
|
||||
|
||||
/// Whether a server is allowed to see an event through federation, based on
|
||||
/// the room's history_visibility at that event's state.
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn server_can_see_event(
|
||||
&self,
|
||||
server_name: &ServerName,
|
||||
current_server_members: &[OwnedUserId],
|
||||
event_id: &EventId,
|
||||
) -> Result<bool> {
|
||||
let shortstatehash = match self.pdu_shortstatehash(event_id) {
|
||||
Ok(Some(shortstatehash)) => shortstatehash,
|
||||
_ => return Ok(false),
|
||||
};
|
||||
|
||||
if let Some(visibility) = self
|
||||
.server_visibility_cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get_mut(&(server_name.to_owned(), shortstatehash))
|
||||
{
|
||||
return Ok(*visibility);
|
||||
}
|
||||
|
||||
let history_visibility = self
|
||||
.state_get_content(shortstatehash, &StateEventType::RoomHistoryVisibility, "")?
|
||||
.map(|content| match content.get("history_visibility") {
|
||||
Some(visibility) => HistoryVisibility::from(visibility.as_str().unwrap_or("")),
|
||||
None => HistoryVisibility::Shared,
|
||||
});
|
||||
|
||||
let visibility = match history_visibility {
|
||||
Some(HistoryVisibility::WorldReadable) => {
|
||||
// Allow if event was sent while world readable
|
||||
true
|
||||
}
|
||||
Some(HistoryVisibility::Invited) => {
|
||||
// Allow if any member on requesting server was AT LEAST invited, else deny
|
||||
current_server_members
|
||||
.iter()
|
||||
.any(|member| self.user_was_invited(shortstatehash, member))
|
||||
}
|
||||
_ => {
|
||||
// Allow if any member on requested server was joined, else deny
|
||||
current_server_members
|
||||
.iter()
|
||||
.any(|member| self.user_was_joined(shortstatehash, member))
|
||||
}
|
||||
};
|
||||
|
||||
self.server_visibility_cache
|
||||
.lock()
|
||||
.unwrap()
|
||||
.insert((server_name.to_owned(), shortstatehash), visibility);
|
||||
|
||||
Ok(visibility)
|
||||
}
|
||||
|
||||
/// The user was a joined member at this state (potentially in the past)
|
||||
fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> bool {
|
||||
self.db
|
||||
.user_membership(shortstatehash, user_id)
|
||||
.map(|s| s == MembershipState::Join)
|
||||
.unwrap_or_default() // Return sensible default, i.e. false
|
||||
}
|
||||
|
||||
/// The user was an invited or joined room member at this state (potentially
|
||||
/// in the past)
|
||||
fn user_was_invited(&self, shortstatehash: u64, user_id: &UserId) -> bool {
|
||||
self.db
|
||||
.user_membership(shortstatehash, user_id)
|
||||
.map(|s| s == MembershipState::Join || s == MembershipState::Invite)
|
||||
.unwrap_or_default() // Return sensible default, i.e. false
|
||||
}
|
||||
|
||||
/// Returns the full room state.
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub async fn room_state_full(
|
||||
|
|
Loading…
Reference in a new issue