feat: Add federation backfill and event visibility
Co-authored-by: Nyaaori <+@nyaaori.cat>
This commit is contained in:
parent
c86313d4fa
commit
ed64b528c1
5 changed files with 150 additions and 11 deletions
|
@ -12,6 +12,7 @@ use ruma::{
|
|||
client::error::{Error as RumaError, ErrorKind},
|
||||
federation::{
|
||||
authorization::get_event_authorization,
|
||||
backfill::get_backfill,
|
||||
device::get_devices::{self, v1::UserDevice},
|
||||
directory::{get_public_rooms, get_public_rooms_filtered},
|
||||
discovery::{get_server_keys, get_server_version, ServerSigningKeys, VerifyKey},
|
||||
|
@ -43,11 +44,11 @@ use ruma::{
|
|||
serde::{Base64, JsonObject, Raw},
|
||||
to_device::DeviceIdOrAllDevices,
|
||||
CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId,
|
||||
OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName,
|
||||
OwnedRoomId, OwnedServerName, OwnedServerSigningKeyId, OwnedUserId, RoomId, ServerName, UInt,
|
||||
};
|
||||
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
collections::{BTreeMap, HashSet},
|
||||
fmt::Debug,
|
||||
mem,
|
||||
net::{IpAddr, SocketAddr},
|
||||
|
@ -950,6 +951,53 @@ pub async fn get_event_route(
|
|||
})
|
||||
}
|
||||
|
||||
/// # `GET /_matrix/federation/v1/backfill/<room_id>`
|
||||
///
|
||||
/// Retrieves events from before the sender joined the room, if the room's
|
||||
/// history visibility allows.
|
||||
pub async fn get_backfill_route(
|
||||
body: Ruma<get_backfill::v1::Request>,
|
||||
) -> Result<get_backfill::v1::Response> {
|
||||
if !services().globals.allow_federation() {
|
||||
return Err(Error::bad_config("Federation is disabled."));
|
||||
}
|
||||
|
||||
let sender_servername = body
|
||||
.sender_servername
|
||||
.as_ref()
|
||||
.expect("server is authenticated");
|
||||
|
||||
info!("Got backfill request from: {}", sender_servername);
|
||||
|
||||
if !services()
|
||||
.rooms
|
||||
.state_cache
|
||||
.server_in_room(sender_servername, &body.room_id)?
|
||||
{
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::Forbidden,
|
||||
"Server is not in room.",
|
||||
));
|
||||
}
|
||||
|
||||
let origin = services().globals.server_name().to_owned();
|
||||
let earliest_events = &[];
|
||||
|
||||
let events = get_missing_events(
|
||||
sender_servername,
|
||||
&body.room_id,
|
||||
earliest_events,
|
||||
&body.v,
|
||||
body.limit,
|
||||
)?;
|
||||
|
||||
Ok(get_backfill::v1::Response {
|
||||
origin,
|
||||
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
|
||||
pdus: events,
|
||||
})
|
||||
}
|
||||
|
||||
/// # `POST /_matrix/federation/v1/get_missing_events/{roomId}`
|
||||
///
|
||||
/// Retrieves events that the sender is missing.
|
||||
|
@ -981,11 +1029,43 @@ pub async fn get_missing_events_route(
|
|||
.event_handler
|
||||
.acl_check(sender_servername, &body.room_id)?;
|
||||
|
||||
let mut queued_events = body.latest_events.clone();
|
||||
let events = get_missing_events(
|
||||
sender_servername,
|
||||
&body.room_id,
|
||||
&body.earliest_events,
|
||||
&body.latest_events,
|
||||
body.limit,
|
||||
)?;
|
||||
|
||||
Ok(get_missing_events::v1::Response { events })
|
||||
}
|
||||
|
||||
// Recursively fetch events starting from `latest_events`, going backwards
|
||||
// through each event's `prev_events` until reaching the `earliest_events`.
|
||||
//
|
||||
// Used by the federation /backfill and /get_missing_events routes.
|
||||
fn get_missing_events(
|
||||
sender_servername: &ServerName,
|
||||
room_id: &RoomId,
|
||||
earliest_events: &[OwnedEventId],
|
||||
latest_events: &Vec<OwnedEventId>,
|
||||
limit: UInt,
|
||||
) -> Result<Vec<Box<RawJsonValue>>> {
|
||||
let limit = u64::from(limit) as usize;
|
||||
|
||||
let mut queued_events = latest_events.clone();
|
||||
let mut events = Vec::new();
|
||||
|
||||
let mut stop_at_events = HashSet::with_capacity(limit);
|
||||
stop_at_events.extend(earliest_events.iter().cloned());
|
||||
|
||||
let mut i = 0;
|
||||
while i < queued_events.len() && events.len() < u64::from(body.limit) as usize {
|
||||
while i < queued_events.len() && events.len() < limit {
|
||||
if stop_at_events.contains(&queued_events[i]) {
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Some(pdu) = services().rooms.timeline.get_pdu_json(&queued_events[i])? {
|
||||
let room_id_str = pdu
|
||||
.get("room_id")
|
||||
|
@ -995,10 +1075,10 @@ pub async fn get_missing_events_route(
|
|||
let event_room_id = <&RoomId>::try_from(room_id_str)
|
||||
.map_err(|_| Error::bad_database("Invalid room id field in event in database"))?;
|
||||
|
||||
if event_room_id != body.room_id {
|
||||
if event_room_id != room_id {
|
||||
warn!(
|
||||
"Evil event detected: Event {} found while searching in room {}",
|
||||
queued_events[i], body.room_id
|
||||
queued_events[i], room_id
|
||||
);
|
||||
return Err(Error::BadRequest(
|
||||
ErrorKind::InvalidParam,
|
||||
|
@ -1006,10 +1086,20 @@ pub async fn get_missing_events_route(
|
|||
));
|
||||
}
|
||||
|
||||
if body.earliest_events.contains(&queued_events[i]) {
|
||||
let event_is_visible = services()
|
||||
.rooms
|
||||
.state_accessor
|
||||
.server_can_see_event(sender_servername, &queued_events[i])?;
|
||||
|
||||
if !event_is_visible {
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Don't send this event again if it comes through some other
|
||||
// event's prev_events.
|
||||
stop_at_events.insert(queued_events[i].clone());
|
||||
|
||||
queued_events.extend_from_slice(
|
||||
&serde_json::from_value::<Vec<OwnedEventId>>(
|
||||
serde_json::to_value(pdu.get("prev_events").cloned().ok_or_else(|| {
|
||||
|
@ -1024,7 +1114,7 @@ pub async fn get_missing_events_route(
|
|||
i += 1;
|
||||
}
|
||||
|
||||
Ok(get_missing_events::v1::Response { events })
|
||||
Ok(events)
|
||||
}
|
||||
|
||||
/// # `GET /_matrix/federation/v1/event_auth/{roomId}/{eventId}`
|
||||
|
|
|
@ -2,7 +2,13 @@ use std::{collections::HashMap, sync::Arc};
|
|||
|
||||
use crate::{database::KeyValueDatabase, service, services, utils, Error, PduEvent, Result};
|
||||
use async_trait::async_trait;
|
||||
use ruma::{events::StateEventType, EventId, RoomId};
|
||||
use ruma::{
|
||||
events::{
|
||||
room::history_visibility::{HistoryVisibility, RoomHistoryVisibilityEventContent},
|
||||
StateEventType,
|
||||
},
|
||||
EventId, RoomId, ServerName,
|
||||
};
|
||||
|
||||
#[async_trait]
|
||||
impl service::rooms::state_accessor::Data for KeyValueDatabase {
|
||||
|
@ -138,6 +144,35 @@ impl service::rooms::state_accessor::Data for KeyValueDatabase {
|
|||
})
|
||||
}
|
||||
|
||||
/// Whether a server is allowed to see an event through federation, based on
|
||||
/// the room's history_visibility at that event's state.
|
||||
///
|
||||
/// Note: Joined/Invited history visibility not yet implemented.
|
||||
#[tracing::instrument(skip(self))]
|
||||
fn server_can_see_event(&self, _server_name: &ServerName, event_id: &EventId) -> Result<bool> {
|
||||
let shortstatehash = match self.pdu_shortstatehash(event_id) {
|
||||
Ok(Some(shortstatehash)) => shortstatehash,
|
||||
_ => return Ok(false),
|
||||
};
|
||||
|
||||
let history_visibility = self
|
||||
.state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "")?
|
||||
.map(|event| serde_json::from_str(event.content.get()))
|
||||
.transpose()
|
||||
.map_err(|_| Error::bad_database("Invalid room history visibility event in database."))?
|
||||
.map(|content: RoomHistoryVisibilityEventContent| content.history_visibility);
|
||||
|
||||
Ok(match history_visibility {
|
||||
Some(HistoryVisibility::WorldReadable) => true,
|
||||
Some(HistoryVisibility::Shared) => true,
|
||||
// TODO: Check if any of the server's users were invited
|
||||
// at this point in time.
|
||||
Some(HistoryVisibility::Joined) => false,
|
||||
Some(HistoryVisibility::Invited) => false,
|
||||
_ => false,
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the full room state.
|
||||
async fn room_state_full(
|
||||
&self,
|
||||
|
|
|
@ -391,6 +391,7 @@ fn routes() -> Router {
|
|||
.ruma_route(server_server::send_transaction_message_route)
|
||||
.ruma_route(server_server::get_event_route)
|
||||
.ruma_route(server_server::get_missing_events_route)
|
||||
.ruma_route(server_server::get_backfill_route)
|
||||
.ruma_route(server_server::get_event_authorization_route)
|
||||
.ruma_route(server_server::get_room_state_route)
|
||||
.ruma_route(server_server::get_room_state_ids_route)
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use ruma::{events::StateEventType, EventId, RoomId};
|
||||
use ruma::{events::StateEventType, EventId, RoomId, ServerName};
|
||||
|
||||
use crate::{PduEvent, Result};
|
||||
|
||||
|
@ -35,6 +35,9 @@ pub trait Data: Send + Sync {
|
|||
/// Returns the state hash for this pdu.
|
||||
fn pdu_shortstatehash(&self, event_id: &EventId) -> Result<Option<u64>>;
|
||||
|
||||
/// Returns true if a server has permission to see an event
|
||||
fn server_can_see_event(&self, sever_name: &ServerName, event_id: &EventId) -> Result<bool>;
|
||||
|
||||
/// Returns the full room state.
|
||||
async fn room_state_full(
|
||||
&self,
|
||||
|
|
|
@ -2,7 +2,7 @@ mod data;
|
|||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
pub use data::Data;
|
||||
use ruma::{events::StateEventType, EventId, RoomId};
|
||||
use ruma::{events::StateEventType, EventId, RoomId, ServerName};
|
||||
|
||||
use crate::{PduEvent, Result};
|
||||
|
||||
|
@ -51,6 +51,16 @@ impl Service {
|
|||
self.db.pdu_shortstatehash(event_id)
|
||||
}
|
||||
|
||||
/// Returns true if a server has permission to see an event
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub fn server_can_see_event<'a>(
|
||||
&'a self,
|
||||
sever_name: &ServerName,
|
||||
event_id: &EventId,
|
||||
) -> Result<bool> {
|
||||
self.db.server_can_see_event(sever_name, event_id)
|
||||
}
|
||||
|
||||
/// Returns the full room state.
|
||||
#[tracing::instrument(skip(self))]
|
||||
pub async fn room_state_full(
|
||||
|
|
Loading…
Reference in a new issue