Compare commits

...

2 commits

Author SHA1 Message Date
Joonas Myhrberg
1b7f2f61ea Sleep between backfills 2023-08-03 11:58:57 +03:00
Joonas Myhrberg
e3340d61ec Add global backfill lock
Ensures only a single channel is backfilled at once.
2023-08-02 22:03:16 +03:00
4 changed files with 32 additions and 1 deletions

View file

@ -5,7 +5,10 @@ import (
"crypto/sha256"
"encoding/base64"
"fmt"
"math/rand"
"sort"
"sync"
"time"
"github.com/bwmarrin/discordgo"
"github.com/rs/zerolog"
@ -16,6 +19,8 @@ import (
"go.mau.fi/mautrix-discord/database"
)
var globalBackfillLock sync.Mutex
func (portal *Portal) forwardBackfillInitial(source *User, thread *Thread) {
log := portal.log
defer func() {
@ -27,6 +32,9 @@ func (portal *Portal) forwardBackfillInitial(source *User, thread *Thread) {
panic("forwardBackfillInitial() called without locking forwardBackfillLock")
}
globalBackfillLock.Lock()
defer globalBackfillLock.Unlock()
limit := portal.bridge.Config.Bridge.Backfill.Limits.Initial.Channel
if portal.GuildID == "" {
limit = portal.bridge.Config.Bridge.Backfill.Limits.Initial.DM
@ -78,6 +86,9 @@ func (portal *Portal) ForwardBackfillMissed(source *User, serverLastMessageID st
portal.forwardBackfillLock.Lock()
defer portal.forwardBackfillLock.Unlock()
globalBackfillLock.Lock()
defer globalBackfillLock.Unlock()
var lastMessage *database.Message
if thread != nil {
lastMessage = portal.bridge.DB.Message.GetLastInThread(portal.Key, thread.ID)
@ -116,6 +127,13 @@ func (portal *Portal) collectBackfillMessages(log zerolog.Logger, source *User,
protoChannelID = thread.ID
}
for {
sleepMin := portal.bridge.Config.Bridge.Backfill.SleepMin
sleepMax := portal.bridge.Config.Bridge.Backfill.SleepMax
if sleepMin > 0 && sleepMax > 0 {
sleep := time.Duration(sleepMin+(sleepMax-sleepMin)*rand.Float64()) * time.Second
log.Debug().Dur("sleep", sleep).Msg("Sleeping before fetching more messages")
time.Sleep(sleep)
}
log.Debug().Str("before_id", before).Msg("Fetching messages for backfill")
newMessages, err := source.Session.ChannelMessages(protoChannelID, messageFetchChunkSize, before, "", "")
if err != nil {
@ -179,6 +197,13 @@ func (portal *Portal) backfillUnlimitedMissed(log zerolog.Logger, source *User,
protoChannelID = thread.ID
}
for {
sleepMin := portal.bridge.Config.Bridge.Backfill.SleepMin
sleepMax := portal.bridge.Config.Bridge.Backfill.SleepMax
if sleepMin > 0 && sleepMax > 0 {
sleep := time.Duration(sleepMin+(sleepMax-sleepMin)*rand.Float64()) * time.Second
log.Debug().Dur("sleep", sleep).Msg("Sleeping before fetching more messages")
time.Sleep(sleep)
}
log.Debug().Str("after_id", after).Msg("Fetching chunk of messages to backfill")
messages, err := source.Session.ChannelMessages(protoChannelID, messageFetchChunkSize, "", after, "")
if err != nil {

View file

@ -75,7 +75,9 @@ type BridgeConfig struct {
ManagementRoomText bridgeconfig.ManagementRoomTexts `yaml:"management_room_text"`
Backfill struct {
Limits struct {
SleepMin float64 `yaml:"sleep_min"`
SleepMax float64 `yaml:"sleep_max"`
Limits struct {
Initial BackfillLimitPart `yaml:"initial"`
Missed BackfillLimitPart `yaml:"missed"`
} `yaml:"forward_limits"`

View file

@ -77,6 +77,8 @@ func DoUpgrade(helper *up.Helper) {
helper.Copy(up.Str, "bridge", "management_room_text", "welcome_unconnected")
helper.Copy(up.Str|up.Null, "bridge", "management_room_text", "additional_help")
helper.Copy(up.Bool, "bridge", "backfill", "enabled")
helper.Copy(up.Float, "bridge", "backfill", "sleep_min")
helper.Copy(up.Float, "bridge", "backfill", "sleep_max")
helper.Copy(up.Int, "bridge", "backfill", "forward_limits", "initial", "dm")
helper.Copy(up.Int, "bridge", "backfill", "forward_limits", "initial", "channel")
helper.Copy(up.Int, "bridge", "backfill", "forward_limits", "initial", "thread")

View file

@ -225,6 +225,8 @@ bridge:
# Settings for backfilling messages.
backfill:
sleep_min: 0.5
sleep_max: 6.0
# Limits for forward backfilling.
forward_limits:
# Initial backfill (when creating portal). 0 means backfill is disabled.