1
0
Fork 0
forked from mirror/grapevine

Compare commits

...

12 commits

Author SHA1 Message Date
Benjamin Lee
a53a3baaa9
add changelog entry for global offline server backoff 2024-09-05 14:12:07 -07:00
Benjamin Lee
0e6cf1a90a
replace outgoing transaction backoff with global offline server backoff
The global backoff code is in `send_federation_transaction`, so we had
to switch to using this function instead of
`server_server::send_request` directly. This has the side effect of
introducing a timeout, which we previously didn't have for transactions.

TODO: confirm that we aren't getting any new timeouts on
computer.surgery because of this
2024-09-05 00:06:32 -07:00
Benjamin Lee
383e439078
add option for which errors will cause a backoff for outgoing requests 2024-09-05 00:06:32 -07:00
Benjamin Lee
567b59974a
add log_errors parameter to send_federation_request builder
This allows us to use send_federation_request for federation
transactions.
2024-09-05 00:06:32 -07:00
Benjamin Lee
a570a41463
add builder struct for sending::send_federation_request
This will allow us to add optional parameters for things like error
logging and timeouts without cluttering call sites that use the
default.
2024-09-05 00:06:32 -07:00
Benjamin Lee
49cda4efdb
log message when BackoffGuard is dropped without recording result 2024-09-05 00:06:31 -07:00
Benjamin Lee
e9b8202cf0
metrics for online and offline remote server count 2024-09-05 00:06:31 -07:00
Benjamin Lee
346d388152
log when servers switch between online and offline 2024-09-05 00:06:31 -07:00
avdb13
c70d65bd75
feat: configurable federation backoff 2024-09-05 00:06:31 -07:00
Benjamin Lee
1459d85f4f
remove remote device key query backoff
This is handled by the server_backoff service now.

The previous implementation of backoff for remote device key queries
that we are removing had a bug where the failure counter was never reset
after a success. This caused grapevine to accumulate a larger error rate
for remote device key queries until it is restarted. This bug is not
present in the new global backoff implementation.
2024-09-05 00:06:31 -07:00
Benjamin Lee
18deb1873c
backoff from offline servers in all outgoing federation requests
Only marking M_UNKNOWN errors as a hard failure if they are in the
standard error format is conservative, and might cause us to miss some
offline servers. For example, a server might configure a load balancer
to send a standard-looking { errcode: ..., ... } response when the
backend is down, with a custom errcode. We wouldn't catch this.

TODO: evaluate whether this comes up in practice by running the changes
on computer.surgery
2024-09-05 00:06:31 -07:00
Benjamin Lee
63b160f6bf
add service for tracking backoffs to offline servers
Currently we have some exponential backoff logic scattered in different
locations, with multiple distinct bad implementations. We should
centralize backoff logic in one place and actually do it correctly.

This backoff logic is similar to synapse's implementation[1], with a
couple fixes:

 - we wait until we observe 5 consecutive failures before we start
   delaying requests, to avoid being sensitive to a small fraction of
   failed requests on an otherwise healthy server.
 - synapse's implementation is kinda similar to our "only increment the
   failure count once per batch of concurrent requests" behavoir, where
   they base the retry state written to the store on the state observed
   at the beginning of the request, rather on the state observed at the
   end of the request. Their implementation has a bug, where a success
   will be ignored if a failure occurs in the same batch. We do not
   replicate this bug.

Our parameter choices are significantly less aggressive than synapse[2], which
starts at 10m delay, has a multiplier of 2, and saturates at 4d delay.

[1]: 70b0e38603/synapse/util/retryutils.py
[2]: 70b0e38603/synapse/config/federation.py (L83)
2024-09-05 00:06:31 -07:00
9 changed files with 594 additions and 136 deletions

View file

@ -168,6 +168,11 @@ This will be the first release of Grapevine since it was forked from Conduit
([!78](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/78))
15. Fix bug where expired keys may not be re-fetched in some scenarios.
([!78](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/78))
16. Remove buggy backoff implementation for remote device key queries that
failed to reset the backoff delay after a successful request. This caused
an increasing rate of key query failures (and therefore UTD messages) over
time until a restart.
([!70](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/70))
### Added
@ -217,3 +222,6 @@ This will be the first release of Grapevine since it was forked from Conduit
[!84](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/84))
15. Added support for Authenticated Media ([MSC3916](https://github.com/matrix-org/matrix-spec-proposals/pull/3916)).
([!58](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/58))
16. Attempt to detect offline remote servers and back off all federation
requests to them.
([!70](https://gitlab.computer.surgery/matrix/grapevine-fork/-/merge_requests/70))

View file

@ -1,6 +1,7 @@
use std::{
collections::{hash_map, BTreeMap, HashMap, HashSet},
time::{Duration, Instant},
collections::{BTreeMap, HashMap, HashSet},
future::IntoFuture,
time::Duration,
};
use futures_util::{stream::FuturesUnordered, StreamExt};
@ -385,47 +386,9 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>(
let mut failures = BTreeMap::new();
let back_off = |id| async {
match services().globals.bad_query_ratelimiter.write().await.entry(id) {
hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
}
hash_map::Entry::Occupied(mut e) => {
*e.get_mut() = (Instant::now(), e.get().1 + 1);
}
}
};
let mut futures: FuturesUnordered<_> = get_over_federation
.into_iter()
.map(|(server, vec)| async move {
if let Some((time, tries)) = services()
.globals
.bad_query_ratelimiter
.read()
.await
.get(server)
{
// Exponential backoff
let mut min_elapsed_duration =
Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
min_elapsed_duration = Duration::from_secs(60 * 60 * 24);
}
if let Some(remaining) =
min_elapsed_duration.checked_sub(time.elapsed())
{
debug!(%server, %tries, ?remaining, "Backing off from server");
return (
server,
Err(Error::BadServerResponse(
"bad query, still backing off",
)),
);
}
}
let mut device_keys_input_fed = BTreeMap::new();
for (user_id, keys) in vec {
device_keys_input_fed.insert(user_id.to_owned(), keys.clone());
@ -436,12 +399,15 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>(
server,
tokio::time::timeout(
Duration::from_secs(25),
services().sending.send_federation_request(
server,
federation::keys::get_keys::v1::Request {
device_keys: device_keys_input_fed,
},
),
services()
.sending
.send_federation_request(
server,
federation::keys::get_keys::v1::Request {
device_keys: device_keys_input_fed,
},
)
.into_future(),
)
.await
.map_err(|_e| Error::BadServerResponse("Query took too long"))
@ -454,7 +420,6 @@ pub(crate) async fn get_keys_helper<F: Fn(&UserId) -> bool>(
let response = match response {
Ok(response) => response,
Err(error) => {
back_off(server.to_owned()).await;
debug!(%server, %error, "remote device key query failed");
failures.insert(server.to_string(), json!({}));
continue;

View file

@ -7,7 +7,7 @@ use std::{
use once_cell::sync::Lazy;
use ruma::{OwnedServerName, RoomVersionId};
use serde::Deserialize;
use serde::{Deserialize, Deserializer};
use crate::error;
@ -250,6 +250,7 @@ pub(crate) struct FederationConfig {
pub(crate) trusted_servers: Vec<OwnedServerName>,
pub(crate) max_fetch_prev_events: u16,
pub(crate) max_concurrent_requests: u16,
pub(crate) backoff: BackoffConfig,
}
impl Default for FederationConfig {
@ -261,6 +262,44 @@ impl Default for FederationConfig {
],
max_fetch_prev_events: 100,
max_concurrent_requests: 100,
backoff: BackoffConfig::default(),
}
}
}
#[derive(Debug, Deserialize)]
#[serde(default)]
pub(crate) struct BackoffConfig {
/// Minimum number of consecutive failures for a server before starting to
/// delay requests.
pub(crate) failure_threshold: u8,
/// Initial delay between requests in seconds, after the number of
/// consecutive failures to a server first exceeds the threshold.
pub(crate) base_delay: f64,
/// Factor to increase delay by after each additional consecutive failure.
pub(crate) multiplier: f64,
/// Maximum delay between requests to a server in seconds.
pub(crate) max_delay: f64,
/// Range of random multipliers to request delay.
#[serde(deserialize_with = "deserialize_jitter_range")]
pub(crate) jitter_range: std::ops::Range<f64>,
}
impl Default for BackoffConfig {
fn default() -> Self {
// After the first 3 consecutive failed requests, increase delay
// exponentially from 5s to 24h over the next 24 failures. It takes an
// average of 4.3 days of failures to reach the maximum delay of 24h.
Self {
failure_threshold: 3,
base_delay: 5.0,
multiplier: 1.5,
max_delay: 60.0 * 60.0 * 24.0,
jitter_range: 0.5..1.5,
}
}
}
@ -327,6 +366,24 @@ pub(crate) fn default_default_room_version() -> RoomVersionId {
RoomVersionId::V10
}
fn deserialize_jitter_range<'de, D>(
deserializer: D,
) -> Result<std::ops::Range<f64>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
let Some((a, b)) = s.split_once("..") else {
return Err(serde::de::Error::custom(crate::Error::bad_config(
"invalid jitter range",
)));
};
a.parse()
.and_then(|a| b.parse().map(|b| a..b))
.map_err(serde::de::Error::custom)
}
/// Search default locations for a configuration file
///
/// If one isn't found, the list of tried paths is returned.

View file

@ -274,6 +274,9 @@ pub(crate) struct Metrics {
/// Number of entries in an
/// [`OnDemandHashMap`](crate::utils::on_demand_hashmap::OnDemandHashMap)
on_demand_hashmap_size: opentelemetry::metrics::Gauge<u64>,
/// Number of known remote servers in each state (online or offline)
remote_server_count: opentelemetry::metrics::Gauge<u64>,
}
impl Metrics {
@ -329,11 +332,17 @@ impl Metrics {
.with_description("Number of entries in OnDemandHashMap")
.init();
let remote_server_count = meter
.u64_gauge("remote_server_count")
.with_description("Number of known remote servers")
.init();
Metrics {
otel_state: (registry, provider),
http_requests_histogram,
lookup,
on_demand_hashmap_size,
remote_server_count,
}
}
@ -368,6 +377,18 @@ impl Metrics {
&[KeyValue::new("name", name)],
);
}
/// Record number of remote servers marked online or offline.
pub(crate) fn record_remote_server_count(
&self,
online_count: u64,
offline_count: u64,
) {
self.remote_server_count
.record(online_count, &[KeyValue::new("state", "online")]);
self.remote_server_count
.record(offline_count, &[KeyValue::new("state", "offline")]);
}
}
/// Track HTTP metrics by converting this into an [`axum`] layer

View file

@ -18,6 +18,7 @@ pub(crate) mod pdu;
pub(crate) mod pusher;
pub(crate) mod rooms;
pub(crate) mod sending;
pub(crate) mod server_backoff;
pub(crate) mod transaction_ids;
pub(crate) mod uiaa;
pub(crate) mod users;
@ -34,6 +35,7 @@ pub(crate) struct Services {
pub(crate) globals: globals::Service,
pub(crate) key_backups: key_backups::Service,
pub(crate) media: media::Service,
pub(crate) server_backoff: Arc<server_backoff::Service>,
pub(crate) sending: Arc<sending::Service>,
}
@ -148,6 +150,7 @@ impl Services {
media: media::Service {
db,
},
server_backoff: server_backoff::Service::build(),
sending: sending::Service::build(db, &config),
globals: globals::Service::load(db, config, reload_handles)?,

View file

@ -75,8 +75,6 @@ pub(crate) struct Service {
Arc<RwLock<HashMap<OwnedEventId, RateLimitState>>>,
pub(crate) bad_signature_ratelimiter:
Arc<RwLock<HashMap<Vec<String>, RateLimitState>>>,
pub(crate) bad_query_ratelimiter:
Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
pub(crate) servername_ratelimiter:
OnDemandHashMap<OwnedServerName, Semaphore>,
pub(crate) roomid_mutex_insert: TokenSet<OwnedRoomId, marker::Insert>,
@ -269,7 +267,6 @@ impl Service {
admin_bot_room_alias_id,
bad_event_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
bad_signature_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
bad_query_ratelimiter: Arc::new(RwLock::new(HashMap::new())),
servername_ratelimiter: OnDemandHashMap::new(
"servername_ratelimiter".to_owned(),
),

View file

@ -3,8 +3,10 @@ mod data;
use std::{
collections::{BTreeMap, HashMap, HashSet},
fmt::Debug,
future::{Future, IntoFuture},
pin::Pin,
sync::Arc,
time::{Duration, Instant},
time::Duration,
};
use base64::{engine::general_purpose, Engine as _};
@ -109,6 +111,30 @@ pub(crate) struct RequestData {
requester_span: Span,
}
/// Which types of errors should cause us to backoff requests to this server
/// globally.
///
/// The default is [`BackoffOn::AllExceptWellFormed`], which is conservative,
/// with a high false negative rate and low false positive rate. For endpoints
/// where we have additional information, we should pick a less conservative
/// setting.
#[derive(Copy, Clone, Debug)]
pub(crate) enum BackoffOn {
/// All errors except for error responses that match the expected
/// `{ "errcode": ... }` format for the matrix protocol.
AllExceptWellFormed,
/// All errors
AllErrors,
}
pub(crate) struct SendFederationRequestBuilder<'a, T> {
destination: &'a ServerName,
request: T,
log_errors: bool,
backoff_on: BackoffOn,
}
pub(crate) struct Service {
db: &'static dyn Data,
@ -121,10 +147,7 @@ pub(crate) struct Service {
#[derive(Debug)]
enum TransactionStatus {
Running,
// number of times failed, time of last failure
Failed(u32, Instant),
// number of times failed
Retrying(u32),
Failed,
}
struct HandlerInputs {
@ -259,19 +282,14 @@ impl Service {
}
if let Err(error) = result {
warn!(%error, "Marking transaction as failed");
current_transaction_status.entry(destination).and_modify(|e| {
use TransactionStatus::{Failed, Retrying, Running};
*e = match e {
Running => Failed(1, Instant::now()),
Retrying(n) => Failed(*n + 1, Instant::now()),
Failed(..) => {
error!("Request that was not even running failed?!");
return;
}
}
});
// Logging transactions that fail due to backoff produces a lot of
// clutter in the logs. This is expected behavior, and the
// transaction will be retried later.
if !matches!(error, Error::ServerBackoff { .. }) {
warn!(%error, "Marking transaction as failed");
}
current_transaction_status
.insert(destination, TransactionStatus::Failed);
return Ok(None);
}
@ -376,27 +394,13 @@ impl Service {
entry
.and_modify(|e| match e {
TransactionStatus::Running | TransactionStatus::Retrying(_) => {
TransactionStatus::Running => {
// already running
allow = false;
}
TransactionStatus::Failed(tries, time) => {
// Fail if a request has failed recently (exponential
// backoff)
let mut min_elapsed_duration =
Duration::from_secs(30) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24)
{
min_elapsed_duration =
Duration::from_secs(60 * 60 * 24);
}
if time.elapsed() < min_elapsed_duration {
allow = false;
} else {
retry = true;
*e = TransactionStatus::Retrying(*tries);
}
TransactionStatus::Failed => {
retry = true;
*e = TransactionStatus::Running;
}
})
.or_insert(TransactionStatus::Running);
@ -662,30 +666,20 @@ impl Service {
Ok(())
}
#[tracing::instrument(skip(self, request))]
pub(crate) async fn send_federation_request<T>(
// Allowed because `SendFederationRequestBuilder::into_future` uses
// `services()`
#[allow(clippy::unused_self)]
pub(crate) fn send_federation_request<'a, T>(
&self,
destination: &ServerName,
destination: &'a ServerName,
request: T,
) -> Result<T::IncomingResponse>
where
T: OutgoingRequest + Debug,
{
debug!("Waiting for permit");
let permit = self.maximum_requests.acquire().await;
debug!("Got permit");
let response = tokio::time::timeout(
Duration::from_secs(2 * 60),
server_server::send_request(destination, request, true),
)
.await
.map_err(|_| {
warn!("Timeout waiting for server response");
Error::BadServerResponse("Timeout waiting for server response")
})?;
drop(permit);
response
) -> SendFederationRequestBuilder<'a, T> {
SendFederationRequestBuilder {
destination,
request,
log_errors: true,
backoff_on: BackoffOn::AllExceptWellFormed,
}
}
/// Sends a request to an appservice
@ -713,6 +707,88 @@ impl Service {
}
}
impl<T> SendFederationRequestBuilder<'_, T> {
/// Enable or disable automatically logging any error making this request.
///
/// This should be disabled if the error is going to be logged elsewhere,
/// to avoid cluttering logs with duplicate error messages.
pub(crate) fn log_errors(mut self, log_errors: bool) -> Self {
self.log_errors = log_errors;
self
}
/// Set the types of errors that will cause us to backoff future requests to
/// this server globally.
pub(crate) fn backoff_on(mut self, backoff_on: BackoffOn) -> Self {
self.backoff_on = backoff_on;
self
}
}
impl<'a, T> IntoFuture for SendFederationRequestBuilder<'a, T>
where
T: OutgoingRequest + Send + Debug + 'a,
T::IncomingResponse: Send,
{
// TODO: get rid of the Box once impl_trait_in_assoc_type is stable
// <https://github.com/rust-lang/rust/issues/63063>
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
type Output = Result<T::IncomingResponse>;
#[tracing::instrument(
name = "send_federation_request",
skip(self),
fields(destination = %self.destination)
)]
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
debug!("Waiting for permit");
let permit = services().sending.maximum_requests.acquire().await;
debug!("Got permit");
let backoff_guard =
services().server_backoff.server_ready(self.destination)?;
let response = tokio::time::timeout(
Duration::from_secs(2 * 60),
server_server::send_request(
self.destination,
self.request,
self.log_errors,
),
)
.await
.map_err(|_| {
warn!("Timeout waiting for server response");
Error::BadServerResponse("Timeout waiting for server response")
})
.and_then(|result| result);
drop(permit);
match &response {
Err(Error::Federation(_, error)) => {
if error.error_kind().is_some() {
if let BackoffOn::AllExceptWellFormed = self.backoff_on
{
backoff_guard.soft_failure();
} else {
backoff_guard.hard_failure();
}
} else {
// The error wasn't in the expected format for matrix
// API responses.
backoff_guard.hard_failure();
}
}
Err(_) => backoff_guard.hard_failure(),
Ok(_) => backoff_guard.success(),
}
response
})
}
}
#[tracing::instrument(skip(events))]
async fn handle_appservice_event(
id: &str,
@ -891,30 +967,34 @@ async fn handle_federation_event(
}
}
let permit = services().sending.maximum_requests.acquire().await;
let response = server_server::send_request(
server,
send_transaction_message::v1::Request {
origin: services().globals.server_name().to_owned(),
pdus: pdu_jsons,
edus: edu_jsons,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
transaction_id: general_purpose::URL_SAFE_NO_PAD
.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b)
| SendingEventType::Pdu(b) => &**b,
})
.collect::<Vec<_>>(),
))
.into(),
},
false,
)
.await?;
let response = services()
.sending
.send_federation_request(
server,
send_transaction_message::v1::Request {
origin: services().globals.server_name().to_owned(),
pdus: pdu_jsons,
edus: edu_jsons,
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
transaction_id: general_purpose::URL_SAFE_NO_PAD
.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEventType::Edu(b)
| SendingEventType::Pdu(b) => &**b,
})
.collect::<Vec<_>>(),
))
.into(),
},
)
// The spec states that this endpoint should always return success, even
// if individual PDUs fail. If we get an error, something is wrong.
.backoff_on(BackoffOn::AllErrors)
// The error will be logged in `handle_response`
.log_errors(false)
.await?;
for pdu in response.pdus {
if let (event_id, Err(error)) = pdu {
@ -922,8 +1002,6 @@ async fn handle_federation_event(
}
}
drop(permit);
Ok(())
}

View file

@ -0,0 +1,322 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex, RwLock},
time::{Duration, Instant},
};
use rand::{thread_rng, Rng};
use ruma::{OwnedServerName, ServerName};
use tracing::{debug, error, info, instrument};
use crate::{observability::METRICS, services, Error, Result};
/// Service to handle backing off requests to offline servers.
///
/// Matrix is full of servers that are either temporarily or permanently
/// offline. It's important not to flood offline servers with federation
/// traffic, since this can consume resources on both ends.
///
/// To limit traffic to offline servers, we track a global exponential backoff
/// state for federation requests to each server name. This mechanism is *only*
/// intended to handle offline servers. Rate limiting and backoff retries for
/// specific requests have different considerations and need to be handled
/// elsewhere.
///
/// Exponential backoff is typically used in a retry loop for a single request.
/// Because the state of this backoff is global, and requests may be issued
/// concurrently, we do a couple of unusual things:
///
/// First, we wait for a certain number of consecutive failed requests before we
/// start delaying further requests. This is to avoid delaying requests to a
/// server that is not offline but fails on a small fraction of requests.
///
/// Second, we only increment the failure counter once for every batch of
/// concurrent requests, instead of on every failed request. This avoids rapidly
/// increasing the counter, proportional to the rate of outgoing requests, when
/// the server is only briefly offline.
pub(crate) struct Service {
servers: RwLock<HashMap<OwnedServerName, Arc<RwLock<BackoffState>>>>,
server_counts: Mutex<ServerCounts>,
}
/// Guard to record the result of an attempted request to a server.
///
/// If the request succeeds, call [`BackoffGuard::success`]. If the request
/// fails in a way that indicates the server is unavailble, call
/// [`BackoffGuard::hard_failure`]. If the request fails in a way that doesn't
/// necessarily indicate that the server is unavailable, call
/// [`BackoffGuard::soft_failure`]. Note that this choice is security-sensitive.
/// If an attacker is able to trigger hard failures for an online server, they
/// can cause us to incorrectly mark it as offline and block outgoing requests
/// to it.
#[must_use]
pub(crate) struct BackoffGuard {
result_recorded: bool,
backoff: Arc<RwLock<BackoffState>>,
/// Store the last failure timestamp observed when this request started. If
/// there was another failure recorded since the request started, do not
/// increment the failure count. This ensures that only one failure will
/// be recorded for every batch of concurrent requests, as discussed in
/// the documentation of [`Service`].
last_failure: Option<Instant>,
}
/// State of exponential backoff for a specific server.
#[derive(Clone, Debug)]
struct BackoffState {
server_name: OwnedServerName,
/// Count of consecutive failed requests to this server.
failure_count: u8,
/// Timestamp of the last failed request to this server.
last_failure: Option<Instant>,
/// Random multiplier to request delay.
///
/// This is updated to a new random value after each batch of concurrent
/// requests containing a failure.
jitter_coeff: f64,
}
/// State transitions for a single server
#[derive(Debug, Copy, Clone)]
enum Transition {
/// A new server, marked as online by default
New,
OnlineToOffline,
OfflineToOnline,
}
/// Counts of known servers in each state, used for metrics
#[derive(Debug, Copy, Clone, Default)]
struct ServerCounts {
online_count: u64,
offline_count: u64,
}
impl Service {
pub(crate) fn build() -> Arc<Service> {
Arc::new(Service {
servers: RwLock::default(),
server_counts: Mutex::default(),
})
}
/// If ready to attempt another request to a server, returns a guard to
/// record the result.
///
/// If still in the backoff period for this server, returns `Err`.
#[instrument(skip(self))]
pub(crate) fn server_ready(
&self,
server_name: &ServerName,
) -> Result<BackoffGuard> {
let state = self.server_state(server_name);
let last_failure = {
let state_lock = state.read().unwrap();
if let Some(remaining_delay) = state_lock.remaining_delay() {
debug!(failures = %state_lock.failure_count, ?remaining_delay, "backing off from server");
return Err(Error::ServerBackoff {
server: server_name.to_owned(),
remaining_delay,
});
}
state_lock.last_failure
};
Ok(BackoffGuard {
result_recorded: false,
backoff: state,
last_failure,
})
}
fn record_transition(
&self,
server_name: &ServerName,
transition: Transition,
) {
let mut counts = self.server_counts.lock().unwrap();
match transition {
Transition::New => {
info!(
%server_name,
"new remote server, marked as online by default"
);
counts.online_count += 1;
}
Transition::OnlineToOffline => {
info!(
%server_name,
"remote server transitioned from online to offline"
);
counts.online_count -= 1;
counts.offline_count += 1;
}
Transition::OfflineToOnline => {
info!(
%server_name,
"remote server transitioned from offline to online"
);
counts.offline_count -= 1;
counts.online_count += 1;
}
}
METRICS.record_remote_server_count(
counts.online_count,
counts.offline_count,
);
}
fn server_state(
&self,
server_name: &ServerName,
) -> Arc<RwLock<BackoffState>> {
let servers = self.servers.read().unwrap();
if let Some(state) = servers.get(server_name) {
Arc::clone(state)
} else {
drop(servers);
let mut servers = self.servers.write().unwrap();
// We have to check again because it's possible for another thread
// to write in between us dropping the read lock and taking the
// write lock.
if let Some(state) = servers.get(server_name) {
Arc::clone(state)
} else {
let state = Arc::new(RwLock::new(BackoffState::new(
server_name.to_owned(),
)));
servers.insert(server_name.to_owned(), Arc::clone(&state));
self.record_transition(server_name, Transition::New);
state
}
}
}
}
impl BackoffState {
fn new(server_name: OwnedServerName) -> BackoffState {
BackoffState {
server_name,
failure_count: 0,
last_failure: None,
jitter_coeff: 0.0,
}
}
/// Returns the remaining time before ready to attempt another request to
/// this server.
fn remaining_delay(&self) -> Option<Duration> {
let config = &services().globals.config.federation.backoff;
let last_failure = self.last_failure?;
if self.failure_count <= config.failure_threshold {
return None;
}
let excess_failure_count =
self.failure_count - config.failure_threshold;
let delay_secs = config.max_delay.min(
config.base_delay
* config.multiplier.powi(i32::from(excess_failure_count)),
) * self.jitter_coeff;
let delay = Duration::from_secs_f64(delay_secs);
delay.checked_sub(last_failure.elapsed())
}
/// Returns whether this server is marked as online (no backoff delay).
fn is_online(&self) -> bool {
let config = &services().globals.config.federation.backoff;
self.failure_count <= config.failure_threshold
}
}
impl BackoffGuard {
/// Record a successful request.
#[instrument(skip(self))]
pub(crate) fn success(mut self) {
self.result_recorded = true;
let mut state = self.backoff.write().unwrap();
let was_online = state.is_online();
if state.failure_count != 0 {
debug!(
server_name = %&state.server_name,
"successful request to server, resetting failure count"
);
}
state.failure_count = 0;
// Server is always online after setting failure_count = 0
if !was_online {
services().server_backoff.record_transition(
&state.server_name,
Transition::OfflineToOnline,
);
}
}
/// Record a failed request indicating that the server may be unavailable.
///
/// Examples of failures in this category are a timeout, a 500 status, or
/// a 404 from an endpoint that is not specced to return 404.
#[instrument(skip(self))]
pub(crate) fn hard_failure(mut self) {
self.result_recorded = true;
let config = &services().globals.config.federation.backoff;
let mut state = self.backoff.write().unwrap();
let was_online = state.is_online();
if state.last_failure == self.last_failure {
state.failure_count = state.failure_count.saturating_add(1);
state.jitter_coeff =
thread_rng().gen_range(config.jitter_range.clone());
state.last_failure = Some(Instant::now());
debug!(
server_name = %state.server_name,
failure_count = state.failure_count,
"hard failure sending request to server, incrementing failure count"
);
if state.is_online() != was_online {
services().server_backoff.record_transition(
&state.server_name,
Transition::OnlineToOffline,
);
}
}
}
/// Record a request that failed, but where the failure is likely to occur
/// in normal operation even if the server is not unavailable.
///
/// An example of a failure in this category is 404 from querying a user
/// profile. This might occur if the server no longer exists, but will also
/// occur if the userid doesn't exist.
#[instrument(skip(self))]
pub(crate) fn soft_failure(mut self) {
self.result_recorded = true;
}
}
impl Drop for BackoffGuard {
fn drop(&mut self) {
if !self.result_recorded {
error!(
"BackoffGuard dropped without recording result. This is a bug."
);
}
}
}

View file

@ -1,4 +1,4 @@
use std::convert::Infallible;
use std::{convert::Infallible, time::Duration};
use http::StatusCode;
use ruma::{
@ -82,6 +82,13 @@ pub(crate) enum Error {
Redaction(OwnedServerName, ruma::canonical_json::RedactionError),
#[error("{0} in {1}")]
InconsistentRoomState(&'static str, ruma::OwnedRoomId),
#[error(
"backing off requests to {server} for the next {remaining_delay:?}"
)]
ServerBackoff {
server: OwnedServerName,
remaining_delay: Duration,
},
}
impl Error {