forked from mirror/grapevine
Compare commits
12 commits
custom
...
benjamin/r
Author | SHA1 | Date | |
---|---|---|---|
|
a53a3baaa9 | ||
|
0e6cf1a90a | ||
|
383e439078 | ||
|
567b59974a | ||
|
a570a41463 | ||
|
49cda4efdb | ||
|
e9b8202cf0 | ||
|
346d388152 | ||
|
c70d65bd75 | ||
|
1459d85f4f | ||
|
18deb1873c | ||
|
63b160f6bf |
9 changed files with 594 additions and 136 deletions
|
@ -168,6 +168,11 @@ This will be the first release of Grapevine since it was forked from Conduit
|
|||
([!78](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/78))
|
||||
15. Fix bug where expired keys may not be re-fetched in some scenarios.
|
||||
([!78](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/78))
|
||||
16. Remove buggy backoff implementation for remote device key queries that
|
||||
failed to reset the backoff delay after a successful request. This caused
|
||||
an increasing rate of key query failures (and therefore UTD messages) over
|
||||
time until a restart.
|
||||
([!70](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/70))
|
||||
|
||||
### Added
|
||||
|
||||
|
@ -217,3 +222,6 @@ This will be the first release of Grapevine since it was forked from Conduit
|
|||
[!84](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/84))
|
||||
15. Added support for Authenticated Media ([MSC3916](https://github.com/matrix-org/matrix-spec-proposals/pull/3916)).
|
||||
([!58](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/58))
|
||||
16. Attempt to detect offline remote servers and back off all federation
|
||||
requests to them.
|
||||
([!70](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/70))
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use std::{
|
||||
collections::{hash_map, BTreeMap, HashMap, HashSet},
|
||||
time::{Duration, Instant},
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
future::IntoFuture,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures_util::{stream::FuturesUnordered, StreamExt};
|
||||
|
@ -385,47 +386,9 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>(
|
|||
|
||||
let mut failures = BTreeMap::new();
|
||||
|
||||
let back_off = |id| async {
|
||||
match services().globals.bad_query_ratelimiter.write().await.entry(id) {
|
||||
hash_map::Entry::Vacant(e) => {
|
||||
e.insert((Instant::now(), 1));
|
||||
}
|
||||
hash_map::Entry::Occupied(mut e) => {
|
||||
*e.get_mut() = (Instant::now(), e.get().1 + 1);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let mut futures: FuturesUnordered<_> = get_over_federation
|
||||
.into_iter()
|
||||
.map(|(server, vec)| async move {
|
||||
if let Some((time, tries)) = services()
|
||||
.globals
|
||||
.bad_query_ratelimiter
|
||||
.read()
|
||||
.await
|
||||
.get(server)
|
||||
{
|
||||
// Exponential backoff
|
||||
let mut min_elapsed_duration =
|
||||
Duration::from_secs(30) * (*tries) * (*tries);
|
||||
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
|
||||
min_elapsed_duration = Duration::from_secs(60 * 60 * 24);
|
||||
}
|
||||
|
||||
if let Some(remaining) =
|
||||
min_elapsed_duration.checked_sub(time.elapsed())
|
||||
{
|
||||
debug!(%server, %tries, ?remaining, "Backing off from server");
|
||||
return (
|
||||
server,
|
||||
Err(Error::BadServerResponse(
|
||||
"bad query, still backing off",
|
||||
)),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let mut device_keys_input_fed = BTreeMap::new();
|
||||
for (user_id, keys) in vec {
|
||||
device_keys_input_fed.insert(user_id.to_owned(), keys.clone());
|
||||
|
@ -436,12 +399,15 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>(
|
|||
server,
|
||||
tokio::time::timeout(
|
||||
Duration::from_secs(25),
|
||||
services().sending.send_federation_request(
|
||||
services()
|
||||
.sending
|
||||
.send_federation_request(
|
||||
server,
|
||||
federation::keys::get_keys::v1::Request {
|
||||
device_keys: device_keys_input_fed,
|
||||
},
|
||||
),
|
||||
)
|
||||
.into_future(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_e| Error::BadServerResponse("Query took too long"))
|
||||
|
@ -454,7 +420,6 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>(
|
|||
let response = match response {
|
||||
Ok(response) => response,
|
||||
Err(error) => {
|
||||
back_off(server.to_owned()).await;
|
||||
debug!(%server, %error, "remote device key query failed");
|
||||
failures.insert(server.to_string(), json!({}));
|
||||
continue;
|
||||
|
|
|
@ -7,7 +7,7 @@ use std::{
|
|||
|
||||
use once_cell::sync::Lazy;
|
||||
use ruma::{OwnedServerName, RoomVersionId};
|
||||
use serde::Deserialize;
|
||||
use serde::{Deserialize, Deserializer};
|
||||
|
||||
use crate::error;
|
||||
|
||||
|
@ -250,6 +250,7 @@ pub(crate) struct FederationConfig {
|
|||
pub(crate) trusted_servers: Vec<OwnedServerName>,
|
||||
pub(crate) max_fetch_prev_events: u16,
|
||||
pub(crate) max_concurrent_requests: u16,
|
||||
pub(crate) backoff: BackoffConfig,
|
||||
}
|
||||
|
||||
impl Default for FederationConfig {
|
||||
|
@ -261,6 +262,44 @@ impl Default for FederationConfig {
|
|||
],
|
||||
max_fetch_prev_events: 100,
|
||||
max_concurrent_requests: 100,
|
||||
backoff: BackoffConfig::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub(crate) struct BackoffConfig {
|
||||
/// Minimum number of consecutive failures for a server before starting to
|
||||
/// delay requests.
|
||||
pub(crate) failure_threshold: u8,
|
||||
|
||||
/// Initial delay between requests in seconds, after the number of
|
||||
/// consecutive failures to a server first exceeds the threshold.
|
||||
pub(crate) base_delay: f64,
|
||||
|
||||
/// Factor to increase delay by after each additional consecutive failure.
|
||||
pub(crate) multiplier: f64,
|
||||
|
||||
/// Maximum delay between requests to a server in seconds.
|
||||
pub(crate) max_delay: f64,
|
||||
|
||||
/// Range of random multipliers to request delay.
|
||||
#[serde(deserialize_with = "deserialize_jitter_range")]
|
||||
pub(crate) jitter_range: std::ops::Range<f64>,
|
||||
}
|
||||
|
||||
impl Default for BackoffConfig {
|
||||
fn default() -> Self {
|
||||
// After the first 3 consecutive failed requests, increase delay
|
||||
// exponentially from 5s to 24h over the next 24 failures. It takes an
|
||||
// average of 4.3 days of failures to reach the maximum delay of 24h.
|
||||
Self {
|
||||
failure_threshold: 3,
|
||||
base_delay: 5.0,
|
||||
multiplier: 1.5,
|
||||
max_delay: 60.0 * 60.0 * 24.0,
|
||||
jitter_range: 0.5..1.5,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -327,6 +366,24 @@ pub(crate) fn default_default_room_version() -> RoomVersionId {
|
|||
RoomVersionId::V10
|
||||
}
|
||||
|
||||
fn deserialize_jitter_range<'de, D>(
|
||||
deserializer: D,
|
||||
) -> Result<std::ops::Range<f64>, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
let s = String::deserialize(deserializer)?;
|
||||
let Some((a, b)) = s.split_once("..") else {
|
||||
return Err(serde::de::Error::custom(crate::Error::bad_config(
|
||||
"invalid jitter range",
|
||||
)));
|
||||
};
|
||||
|
||||
a.parse()
|
||||
.and_then(|a| b.parse().map(|b| a..b))
|
||||
.map_err(serde::de::Error::custom)
|
||||
}
|
||||
|
||||
/// Search default locations for a configuration file
|
||||
///
|
||||
/// If one isn't found, the list of tried paths is returned.
|
||||
|
|
|
@ -274,6 +274,9 @@ pub(crate) struct Metrics {
|
|||
/// Number of entries in an
|
||||
/// [`OnDemandHashMap`](crate::utils::on_demand_hashmap::OnDemandHashMap)
|
||||
on_demand_hashmap_size: opentelemetry::metrics::Gauge<u64>,
|
||||
|
||||
/// Number of known remote servers in each state (online or offline)
|
||||
remote_server_count: opentelemetry::metrics::Gauge<u64>,
|
||||
}
|
||||
|
||||
impl Metrics {
|
||||
|
@ -329,11 +332,17 @@ impl Metrics {
|
|||
.with_description("Number of entries in OnDemandHashMap")
|
||||
.init();
|
||||
|
||||
let remote_server_count = meter
|
||||
.u64_gauge("remote_server_count")
|
||||
.with_description("Number of known remote servers")
|
||||
.init();
|
||||
|
||||
Metrics {
|
||||
otel_state: (registry, provider),
|
||||
http_requests_histogram,
|
||||
lookup,
|
||||
on_demand_hashmap_size,
|
||||
remote_server_count,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -368,6 +377,18 @@ impl Metrics {
|
|||
&[KeyValue::new("name", name)],
|
||||
);
|
||||
}
|
||||
|
||||
/// Record number of remote servers marked online or offline.
|
||||
pub(crate) fn record_remote_server_count(
|
||||
&self,
|
||||
online_count: u64,
|
||||
offline_count: u64,
|
||||
) {
|
||||
self.remote_server_count
|
||||
.record(online_count, &[KeyValue::new("state", "online")]);
|
||||
self.remote_server_count
|
||||
.record(offline_count, &[KeyValue::new("state", "offline")]);
|
||||
}
|
||||
}
|
||||
|
||||
/// Track HTTP metrics by converting this into an [`axum`] layer
|
||||
|
|
|
@ -18,6 +18,7 @@ pub(crate) mod pdu;
|
|||
pub(crate) mod pusher;
|
||||
pub(crate) mod rooms;
|
||||
pub(crate) mod sending;
|
||||
pub(crate) mod server_backoff;
|
||||
pub(crate) mod transaction_ids;
|
||||
pub(crate) mod uiaa;
|
||||
pub(crate) mod users;
|
||||
|
@ -34,6 +35,7 @@ pub(crate) struct Services {
|
|||
pub(crate) globals: globals::Service,
|
||||
pub(crate) key_backups: key_backups::Service,
|
||||
pub(crate) media: media::Service,
|
||||
pub(crate) server_backoff: Arc<server_backoff::Service>,
|
||||
pub(crate) sending: Arc<sending::Service>,
|
||||
}
|
||||
|
||||
|
@ -148,6 +150,7 @@ impl Services {
|
|||
media: media::Service {
|
||||
db,
|
||||
},
|
||||
server_backoff: server_backoff::Service::build(),
|
||||
sending: sending::Service::build(db, &config),
|
||||
|
||||
globals: globals::Service::load(db, config, reload_handles)?,
|
||||
|
|
|
@ -75,8 +75,6 @@ pub(crate) struct Service {
|
|||
Arc<RwLock<HashMap<OwnedEventId, RateLimitState>>>,
|
||||
pub(crate) bad_signature_ratelimiter:
|
||||
Arc<RwLock<HashMap<Vec<String>, RateLimitState>>>,
|
||||
pub(crate) bad_query_ratelimiter:
|
||||
Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
|
||||
pub(crate) servername_ratelimiter:
|
||||
OnDemandHashMap<OwnedServerName, Semaphore>,
|
||||
pub(crate) roomid_mutex_insert: TokenSet<OwnedRoomId, marker::Insert>,
|
||||
|
@ -269,7 +267,6 @@ impl Service {
|
|||
admin_bot_room_alias_id,
|
||||
bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
|
||||
bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
|
||||
bad_query_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
|
||||
servername_ratelimiter: OnDemandHashMap::new(
|
||||
"servername_ratelimiter".to_owned(),
|
||||
),
|
||||
|
|
|
@ -3,8 +3,10 @@ mod data;
|
|||
use std::{
|
||||
collections::{BTreeMap, HashMap, HashSet},
|
||||
fmt::Debug,
|
||||
future::{Future, IntoFuture},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use base64::{engine::general_purpose, Engine as _};
|
||||
|
@ -109,6 +111,30 @@ pub(crate) struct RequestData {
|
|||
requester_span: Span,
|
||||
}
|
||||
|
||||
/// Which types of errors should cause us to backoff requests to this server
|
||||
/// globally.
|
||||
///
|
||||
/// The default is [`BackoffOn::AllExceptWellFormed`], which is conservative,
|
||||
/// with a high false negative rate and low false positive rate. For endpoints
|
||||
/// where we have additional information, we should pick a less conservative
|
||||
/// setting.
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub(crate) enum BackoffOn {
|
||||
/// All errors except for error responses that match the expected
|
||||
/// `{ "errcode": ... }` format for the matrix protocol.
|
||||
AllExceptWellFormed,
|
||||
|
||||
/// All errors
|
||||
AllErrors,
|
||||
}
|
||||
|
||||
pub(crate) struct SendFederationRequestBuilder<'a, T> {
|
||||
destination: &'a ServerName,
|
||||
request: T,
|
||||
log_errors: bool,
|
||||
backoff_on: BackoffOn,
|
||||
}
|
||||
|
||||
pub(crate) struct Service {
|
||||
db: &'static dyn Data,
|
||||
|
||||
|
@ -121,10 +147,7 @@ pub(crate) struct Service {
|
|||
#[derive(Debug)]
|
||||
enum TransactionStatus {
|
||||
Running,
|
||||
// number of times failed, time of last failure
|
||||
Failed(u32, Instant),
|
||||
// number of times failed
|
||||
Retrying(u32),
|
||||
Failed,
|
||||
}
|
||||
|
||||
struct HandlerInputs {
|
||||
|
@ -259,19 +282,14 @@ impl Service {
|
|||
}
|
||||
|
||||
if let Err(error) = result {
|
||||
// Logging transactions that fail due to backoff produces a lot of
|
||||
// clutter in the logs. This is expected behavior, and the
|
||||
// transaction will be retried later.
|
||||
if !matches!(error, Error::ServerBackoff { .. }) {
|
||||
warn!(%error, "Marking transaction as failed");
|
||||
current_transaction_status.entry(destination).and_modify(|e| {
|
||||
use TransactionStatus::{Failed, Retrying, Running};
|
||||
|
||||
*e = match e {
|
||||
Running => Failed(1, Instant::now()),
|
||||
Retrying(n) => Failed(*n + 1, Instant::now()),
|
||||
Failed(..) => {
|
||||
error!("Request that was not even running failed?!");
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
current_transaction_status
|
||||
.insert(destination, TransactionStatus::Failed);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
|
@ -376,27 +394,13 @@ impl Service {
|
|||
|
||||
entry
|
||||
.and_modify(|e| match e {
|
||||
TransactionStatus::Running | TransactionStatus::Retrying(_) => {
|
||||
TransactionStatus::Running => {
|
||||
// already running
|
||||
allow = false;
|
||||
}
|
||||
TransactionStatus::Failed(tries, time) => {
|
||||
// Fail if a request has failed recently (exponential
|
||||
// backoff)
|
||||
let mut min_elapsed_duration =
|
||||
Duration::from_secs(30) * (*tries) * (*tries);
|
||||
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24)
|
||||
{
|
||||
min_elapsed_duration =
|
||||
Duration::from_secs(60 * 60 * 24);
|
||||
}
|
||||
|
||||
if time.elapsed() < min_elapsed_duration {
|
||||
allow = false;
|
||||
} else {
|
||||
TransactionStatus::Failed => {
|
||||
retry = true;
|
||||
*e = TransactionStatus::Retrying(*tries);
|
||||
}
|
||||
*e = TransactionStatus::Running;
|
||||
}
|
||||
})
|
||||
.or_insert(TransactionStatus::Running);
|
||||
|
@ -662,30 +666,20 @@ impl Service {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(self, request))]
|
||||
pub(crate) async fn send_federation_request<T>(
|
||||
// Allowed because `SendFederationRequestBuilder::into_future` uses
|
||||
// `services()`
|
||||
#[allow(clippy::unused_self)]
|
||||
pub(crate) fn send_federation_request<'a, T>(
|
||||
&self,
|
||||
destination: &ServerName,
|
||||
destination: &'a ServerName,
|
||||
request: T,
|
||||
) -> Result<T::IncomingResponse>
|
||||
where
|
||||
T: OutgoingRequest + Debug,
|
||||
{
|
||||
debug!("Waiting for permit");
|
||||
let permit = self.maximum_requests.acquire().await;
|
||||
debug!("Got permit");
|
||||
let response = tokio::time::timeout(
|
||||
Duration::from_secs(2 * 60),
|
||||
server_server::send_request(destination, request, true),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
warn!("Timeout waiting for server response");
|
||||
Error::BadServerResponse("Timeout waiting for server response")
|
||||
})?;
|
||||
drop(permit);
|
||||
|
||||
response
|
||||
) -> SendFederationRequestBuilder<'a, T> {
|
||||
SendFederationRequestBuilder {
|
||||
destination,
|
||||
request,
|
||||
log_errors: true,
|
||||
backoff_on: BackoffOn::AllExceptWellFormed,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sends a request to an appservice
|
||||
|
@ -713,6 +707,88 @@ impl Service {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> SendFederationRequestBuilder<'_, T> {
|
||||
/// Enable or disable automatically logging any error making this request.
|
||||
///
|
||||
/// This should be disabled if the error is going to be logged elsewhere,
|
||||
/// to avoid cluttering logs with duplicate error messages.
|
||||
pub(crate) fn log_errors(mut self, log_errors: bool) -> Self {
|
||||
self.log_errors = log_errors;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the types of errors that will cause us to backoff future requests to
|
||||
/// this server globally.
|
||||
pub(crate) fn backoff_on(mut self, backoff_on: BackoffOn) -> Self {
|
||||
self.backoff_on = backoff_on;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T> IntoFuture for SendFederationRequestBuilder<'a, T>
|
||||
where
|
||||
T: OutgoingRequest + Send + Debug + 'a,
|
||||
T::IncomingResponse: Send,
|
||||
{
|
||||
// TODO: get rid of the Box once impl_trait_in_assoc_type is stable
|
||||
// <https://github.com/rust-lang/rust/issues/63063>
|
||||
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
|
||||
type Output = Result<T::IncomingResponse>;
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "send_federation_request",
|
||||
skip(self),
|
||||
fields(destination = %self.destination)
|
||||
)]
|
||||
fn into_future(self) -> Self::IntoFuture {
|
||||
Box::pin(async move {
|
||||
debug!("Waiting for permit");
|
||||
let permit = services().sending.maximum_requests.acquire().await;
|
||||
debug!("Got permit");
|
||||
|
||||
let backoff_guard =
|
||||
services().server_backoff.server_ready(self.destination)?;
|
||||
|
||||
let response = tokio::time::timeout(
|
||||
Duration::from_secs(2 * 60),
|
||||
server_server::send_request(
|
||||
self.destination,
|
||||
self.request,
|
||||
self.log_errors,
|
||||
),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
warn!("Timeout waiting for server response");
|
||||
Error::BadServerResponse("Timeout waiting for server response")
|
||||
})
|
||||
.and_then(|result| result);
|
||||
drop(permit);
|
||||
|
||||
match &response {
|
||||
Err(Error::Federation(_, error)) => {
|
||||
if error.error_kind().is_some() {
|
||||
if let BackoffOn::AllExceptWellFormed = self.backoff_on
|
||||
{
|
||||
backoff_guard.soft_failure();
|
||||
} else {
|
||||
backoff_guard.hard_failure();
|
||||
}
|
||||
} else {
|
||||
// The error wasn't in the expected format for matrix
|
||||
// API responses.
|
||||
backoff_guard.hard_failure();
|
||||
}
|
||||
}
|
||||
Err(_) => backoff_guard.hard_failure(),
|
||||
Ok(_) => backoff_guard.success(),
|
||||
}
|
||||
|
||||
response
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(events))]
|
||||
async fn handle_appservice_event(
|
||||
id: &str,
|
||||
|
@ -891,9 +967,9 @@ async fn handle_federation_event(
|
|||
}
|
||||
}
|
||||
|
||||
let permit = services().sending.maximum_requests.acquire().await;
|
||||
|
||||
let response = server_server::send_request(
|
||||
let response = services()
|
||||
.sending
|
||||
.send_federation_request(
|
||||
server,
|
||||
send_transaction_message::v1::Request {
|
||||
origin: services().globals.server_name().to_owned(),
|
||||
|
@ -912,8 +988,12 @@ async fn handle_federation_event(
|
|||
))
|
||||
.into(),
|
||||
},
|
||||
false,
|
||||
)
|
||||
// The spec states that this endpoint should always return success, even
|
||||
// if individual PDUs fail. If we get an error, something is wrong.
|
||||
.backoff_on(BackoffOn::AllErrors)
|
||||
// The error will be logged in `handle_response`
|
||||
.log_errors(false)
|
||||
.await?;
|
||||
|
||||
for pdu in response.pdus {
|
||||
|
@ -922,8 +1002,6 @@ async fn handle_federation_event(
|
|||
}
|
||||
}
|
||||
|
||||
drop(permit);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
|
322
src/service/server_backoff.rs
Normal file
322
src/service/server_backoff.rs
Normal file
|
@ -0,0 +1,322 @@
|
|||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, Mutex, RwLock},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use rand::{thread_rng, Rng};
|
||||
use ruma::{OwnedServerName, ServerName};
|
||||
use tracing::{debug, error, info, instrument};
|
||||
|
||||
use crate::{observability::METRICS, services, Error, Result};
|
||||
|
||||
/// Service to handle backing off requests to offline servers.
|
||||
///
|
||||
/// Matrix is full of servers that are either temporarily or permanently
|
||||
/// offline. It's important not to flood offline servers with federation
|
||||
/// traffic, since this can consume resources on both ends.
|
||||
///
|
||||
/// To limit traffic to offline servers, we track a global exponential backoff
|
||||
/// state for federation requests to each server name. This mechanism is *only*
|
||||
/// intended to handle offline servers. Rate limiting and backoff retries for
|
||||
/// specific requests have different considerations and need to be handled
|
||||
/// elsewhere.
|
||||
///
|
||||
/// Exponential backoff is typically used in a retry loop for a single request.
|
||||
/// Because the state of this backoff is global, and requests may be issued
|
||||
/// concurrently, we do a couple of unusual things:
|
||||
///
|
||||
/// First, we wait for a certain number of consecutive failed requests before we
|
||||
/// start delaying further requests. This is to avoid delaying requests to a
|
||||
/// server that is not offline but fails on a small fraction of requests.
|
||||
///
|
||||
/// Second, we only increment the failure counter once for every batch of
|
||||
/// concurrent requests, instead of on every failed request. This avoids rapidly
|
||||
/// increasing the counter, proportional to the rate of outgoing requests, when
|
||||
/// the server is only briefly offline.
|
||||
pub(crate) struct Service {
|
||||
servers: RwLock<HashMap<OwnedServerName, Arc<RwLock<BackoffState>>>>,
|
||||
|
||||
server_counts: Mutex<ServerCounts>,
|
||||
}
|
||||
|
||||
/// Guard to record the result of an attempted request to a server.
|
||||
///
|
||||
/// If the request succeeds, call [`BackoffGuard::success`]. If the request
|
||||
/// fails in a way that indicates the server is unavailble, call
|
||||
/// [`BackoffGuard::hard_failure`]. If the request fails in a way that doesn't
|
||||
/// necessarily indicate that the server is unavailable, call
|
||||
/// [`BackoffGuard::soft_failure`]. Note that this choice is security-sensitive.
|
||||
/// If an attacker is able to trigger hard failures for an online server, they
|
||||
/// can cause us to incorrectly mark it as offline and block outgoing requests
|
||||
/// to it.
|
||||
#[must_use]
|
||||
pub(crate) struct BackoffGuard {
|
||||
result_recorded: bool,
|
||||
backoff: Arc<RwLock<BackoffState>>,
|
||||
/// Store the last failure timestamp observed when this request started. If
|
||||
/// there was another failure recorded since the request started, do not
|
||||
/// increment the failure count. This ensures that only one failure will
|
||||
/// be recorded for every batch of concurrent requests, as discussed in
|
||||
/// the documentation of [`Service`].
|
||||
last_failure: Option<Instant>,
|
||||
}
|
||||
|
||||
/// State of exponential backoff for a specific server.
|
||||
#[derive(Clone, Debug)]
|
||||
struct BackoffState {
|
||||
server_name: OwnedServerName,
|
||||
|
||||
/// Count of consecutive failed requests to this server.
|
||||
failure_count: u8,
|
||||
/// Timestamp of the last failed request to this server.
|
||||
last_failure: Option<Instant>,
|
||||
/// Random multiplier to request delay.
|
||||
///
|
||||
/// This is updated to a new random value after each batch of concurrent
|
||||
/// requests containing a failure.
|
||||
jitter_coeff: f64,
|
||||
}
|
||||
|
||||
/// State transitions for a single server
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
enum Transition {
|
||||
/// A new server, marked as online by default
|
||||
New,
|
||||
OnlineToOffline,
|
||||
OfflineToOnline,
|
||||
}
|
||||
|
||||
/// Counts of known servers in each state, used for metrics
|
||||
#[derive(Debug, Copy, Clone, Default)]
|
||||
struct ServerCounts {
|
||||
online_count: u64,
|
||||
offline_count: u64,
|
||||
}
|
||||
|
||||
impl Service {
|
||||
pub(crate) fn build() -> Arc<Service> {
|
||||
Arc::new(Service {
|
||||
servers: RwLock::default(),
|
||||
server_counts: Mutex::default(),
|
||||
})
|
||||
}
|
||||
|
||||
/// If ready to attempt another request to a server, returns a guard to
|
||||
/// record the result.
|
||||
///
|
||||
/// If still in the backoff period for this server, returns `Err`.
|
||||
#[instrument(skip(self))]
|
||||
pub(crate) fn server_ready(
|
||||
&self,
|
||||
server_name: &ServerName,
|
||||
) -> Result<BackoffGuard> {
|
||||
let state = self.server_state(server_name);
|
||||
|
||||
let last_failure = {
|
||||
let state_lock = state.read().unwrap();
|
||||
|
||||
if let Some(remaining_delay) = state_lock.remaining_delay() {
|
||||
debug!(failures = %state_lock.failure_count, ?remaining_delay, "backing off from server");
|
||||
return Err(Error::ServerBackoff {
|
||||
server: server_name.to_owned(),
|
||||
remaining_delay,
|
||||
});
|
||||
}
|
||||
|
||||
state_lock.last_failure
|
||||
};
|
||||
|
||||
Ok(BackoffGuard {
|
||||
result_recorded: false,
|
||||
backoff: state,
|
||||
last_failure,
|
||||
})
|
||||
}
|
||||
|
||||
fn record_transition(
|
||||
&self,
|
||||
server_name: &ServerName,
|
||||
transition: Transition,
|
||||
) {
|
||||
let mut counts = self.server_counts.lock().unwrap();
|
||||
|
||||
match transition {
|
||||
Transition::New => {
|
||||
info!(
|
||||
%server_name,
|
||||
"new remote server, marked as online by default"
|
||||
);
|
||||
counts.online_count += 1;
|
||||
}
|
||||
Transition::OnlineToOffline => {
|
||||
info!(
|
||||
%server_name,
|
||||
"remote server transitioned from online to offline"
|
||||
);
|
||||
counts.online_count -= 1;
|
||||
counts.offline_count += 1;
|
||||
}
|
||||
Transition::OfflineToOnline => {
|
||||
info!(
|
||||
%server_name,
|
||||
"remote server transitioned from offline to online"
|
||||
);
|
||||
counts.offline_count -= 1;
|
||||
counts.online_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
METRICS.record_remote_server_count(
|
||||
counts.online_count,
|
||||
counts.offline_count,
|
||||
);
|
||||
}
|
||||
|
||||
fn server_state(
|
||||
&self,
|
||||
server_name: &ServerName,
|
||||
) -> Arc<RwLock<BackoffState>> {
|
||||
let servers = self.servers.read().unwrap();
|
||||
if let Some(state) = servers.get(server_name) {
|
||||
Arc::clone(state)
|
||||
} else {
|
||||
drop(servers);
|
||||
let mut servers = self.servers.write().unwrap();
|
||||
|
||||
// We have to check again because it's possible for another thread
|
||||
// to write in between us dropping the read lock and taking the
|
||||
// write lock.
|
||||
if let Some(state) = servers.get(server_name) {
|
||||
Arc::clone(state)
|
||||
} else {
|
||||
let state = Arc::new(RwLock::new(BackoffState::new(
|
||||
server_name.to_owned(),
|
||||
)));
|
||||
servers.insert(server_name.to_owned(), Arc::clone(&state));
|
||||
self.record_transition(server_name, Transition::New);
|
||||
state
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BackoffState {
|
||||
fn new(server_name: OwnedServerName) -> BackoffState {
|
||||
BackoffState {
|
||||
server_name,
|
||||
failure_count: 0,
|
||||
last_failure: None,
|
||||
jitter_coeff: 0.0,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the remaining time before ready to attempt another request to
|
||||
/// this server.
|
||||
fn remaining_delay(&self) -> Option<Duration> {
|
||||
let config = &services().globals.config.federation.backoff;
|
||||
|
||||
let last_failure = self.last_failure?;
|
||||
if self.failure_count <= config.failure_threshold {
|
||||
return None;
|
||||
}
|
||||
|
||||
let excess_failure_count =
|
||||
self.failure_count - config.failure_threshold;
|
||||
let delay_secs = config.max_delay.min(
|
||||
config.base_delay
|
||||
* config.multiplier.powi(i32::from(excess_failure_count)),
|
||||
) * self.jitter_coeff;
|
||||
let delay = Duration::from_secs_f64(delay_secs);
|
||||
delay.checked_sub(last_failure.elapsed())
|
||||
}
|
||||
|
||||
/// Returns whether this server is marked as online (no backoff delay).
|
||||
fn is_online(&self) -> bool {
|
||||
let config = &services().globals.config.federation.backoff;
|
||||
self.failure_count <= config.failure_threshold
|
||||
}
|
||||
}
|
||||
|
||||
impl BackoffGuard {
|
||||
/// Record a successful request.
|
||||
#[instrument(skip(self))]
|
||||
pub(crate) fn success(mut self) {
|
||||
self.result_recorded = true;
|
||||
|
||||
let mut state = self.backoff.write().unwrap();
|
||||
let was_online = state.is_online();
|
||||
|
||||
if state.failure_count != 0 {
|
||||
debug!(
|
||||
server_name = %&state.server_name,
|
||||
"successful request to server, resetting failure count"
|
||||
);
|
||||
}
|
||||
|
||||
state.failure_count = 0;
|
||||
|
||||
// Server is always online after setting failure_count = 0
|
||||
if !was_online {
|
||||
services().server_backoff.record_transition(
|
||||
&state.server_name,
|
||||
Transition::OfflineToOnline,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a failed request indicating that the server may be unavailable.
|
||||
///
|
||||
/// Examples of failures in this category are a timeout, a 500 status, or
|
||||
/// a 404 from an endpoint that is not specced to return 404.
|
||||
#[instrument(skip(self))]
|
||||
pub(crate) fn hard_failure(mut self) {
|
||||
self.result_recorded = true;
|
||||
|
||||
let config = &services().globals.config.federation.backoff;
|
||||
|
||||
let mut state = self.backoff.write().unwrap();
|
||||
let was_online = state.is_online();
|
||||
|
||||
if state.last_failure == self.last_failure {
|
||||
state.failure_count = state.failure_count.saturating_add(1);
|
||||
state.jitter_coeff =
|
||||
thread_rng().gen_range(config.jitter_range.clone());
|
||||
state.last_failure = Some(Instant::now());
|
||||
|
||||
debug!(
|
||||
server_name = %state.server_name,
|
||||
failure_count = state.failure_count,
|
||||
"hard failure sending request to server, incrementing failure count"
|
||||
);
|
||||
|
||||
if state.is_online() != was_online {
|
||||
services().server_backoff.record_transition(
|
||||
&state.server_name,
|
||||
Transition::OnlineToOffline,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a request that failed, but where the failure is likely to occur
|
||||
/// in normal operation even if the server is not unavailable.
|
||||
///
|
||||
/// An example of a failure in this category is 404 from querying a user
|
||||
/// profile. This might occur if the server no longer exists, but will also
|
||||
/// occur if the userid doesn't exist.
|
||||
#[instrument(skip(self))]
|
||||
pub(crate) fn soft_failure(mut self) {
|
||||
self.result_recorded = true;
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for BackoffGuard {
|
||||
fn drop(&mut self) {
|
||||
if !self.result_recorded {
|
||||
error!(
|
||||
"BackoffGuard dropped without recording result. This is a bug."
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
use std::convert::Infallible;
|
||||
use std::{convert::Infallible, time::Duration};
|
||||
|
||||
use http::StatusCode;
|
||||
use ruma::{
|
||||
|
@ -82,6 +82,13 @@ pub(crate) enum Error {
|
|||
Redaction(OwnedServerName, ruma::canonical_json::RedactionError),
|
||||
#[error("{0} in {1}")]
|
||||
InconsistentRoomState(&'static str, ruma::OwnedRoomId),
|
||||
#[error(
|
||||
"backing off requests to {server} for the next {remaining_delay:?}"
|
||||
)]
|
||||
ServerBackoff {
|
||||
server: OwnedServerName,
|
||||
remaining_delay: Duration,
|
||||
},
|
||||
}
|
||||
|
||||
impl Error {
|
||||
|
|
Loading…
Reference in a new issue