Compare commits

...

5 commits

Author SHA1 Message Date
Tulir Asokan
d9fe6c1b6b Fix some logs 2022-07-12 23:17:25 +03:00
Tulir Asokan
dfe5e34969 Send marker event after backfilling 2022-07-12 22:51:58 +03:00
Tulir Asokan
3a92a46bee Add basic command to backfill with batch sending 2022-07-12 19:31:19 +03:00
Tulir Asokan
2f567f39dc Update mautrix-go 2022-07-12 17:56:58 +03:00
Tulir Asokan
e0378cf19d Refactor message handling to convert before sending 2022-07-12 17:53:15 +03:00
13 changed files with 375 additions and 96 deletions

View file

@ -33,6 +33,9 @@
* [ ] Auto-joining threads
* [ ] Backfilling threads after joining
* [x] Custom emojis
* [ ] Message history
* [x] Manually with `!discord backfill`
* [ ] Automatically in background
* [x] Message deletions
* [x] Reactions
* [x] Unicode emojis

View file

@ -49,6 +49,7 @@ func (br *DiscordBridge) RegisterCommands() {
cmdGuilds,
cmdRejoinSpace,
cmdDeleteAllPortals,
cmdBackfill,
)
}
@ -372,3 +373,28 @@ func fnDeleteAllPortals(ce *WrappedCommandEvent) {
ce.Reply("Finished background cleanup of deleted portal rooms.")
}()
}
var cmdBackfill = &commands.FullHandler{
Func: wrapCommand(fnBackfill),
Name: "backfill",
Help: commands.HelpMeta{
Section: commands.HelpSectionUnclassified,
Description: "Backfill messages in the current portal.",
Args: "<_number of pages_>",
},
RequiresPortal: true,
RequiresLogin: true,
}
func fnBackfill(ce *WrappedCommandEvent) {
if len(ce.Args) == 0 {
ce.Reply("**Usage**: `$cmdprefix backfill <number of pages>`")
return
}
pages, err := strconv.Atoi(ce.Args[0])
if err != nil || pages < 1 {
ce.Reply("**Usage**: `$cmdprefix backfill <number of pages>`")
return
}
ce.Portal.BackfillHistoryChunks(ce.User, pages)
}

View file

@ -37,6 +37,10 @@ type BridgeConfig struct {
PortalMessageBuffer int `yaml:"portal_message_buffer"`
Backfill struct {
Enable bool `yaml:"enable"`
UseDoublePuppet bool `yaml:"use_double_puppet"`
} `yaml:"backfill"`
DeliveryReceipts bool `yaml:"delivery_receipts"`
MessageStatusEvents bool `yaml:"message_status_events"`
MessageErrorNotices bool `yaml:"message_error_notices"`

View file

@ -33,3 +33,15 @@ func (config *Config) CanAutoDoublePuppet(userID id.UserID) bool {
return hasSecret
}
func (config *Config) CanDoublePuppetBackfill(userID id.UserID) bool {
if !config.Bridge.Backfill.UseDoublePuppet {
return false
}
_, homeserver, _ := userID.Parse()
// Batch sending can only use local users, so don't allow double puppets on other servers.
if homeserver != config.Homeserver.Domain {
return false
}
return true
}

View file

@ -31,6 +31,8 @@ func DoUpgrade(helper *up.Helper) {
helper.Copy(up.Str, "bridge", "guild_name_template")
helper.Copy(up.Bool, "bridge", "private_chat_portal_meta")
helper.Copy(up.Int, "bridge", "startup_private_channel_create_limit")
helper.Copy(up.Bool, "bridge", "backfill", "enable")
helper.Copy(up.Bool, "bridge", "backfill", "use_double_puppet")
helper.Copy(up.Int, "bridge", "portal_message_buffer")
helper.Copy(up.Bool, "bridge", "delivery_receipts")
helper.Copy(up.Bool, "bridge", "message_status_events")

View file

@ -61,6 +61,11 @@ func (mq *MessageQuery) GetLastByDiscordID(key PortalKey, discordID string) *Mes
return mq.New().Scan(mq.db.QueryRow(query, key.ChannelID, key.Receiver, discordID))
}
func (mq *MessageQuery) GetFirstInChat(key PortalKey) *Message {
query := messageSelect + " WHERE dc_chan_id=$1 AND dc_chan_receiver=$2 AND dc_thread_id='' AND dc_edit_index=0 ORDER BY dcid ASC, dc_attachment_id ASC LIMIT 1"
return mq.New().Scan(mq.db.QueryRow(query, key.ChannelID, key.Receiver))
}
func (mq *MessageQuery) GetClosestBefore(key PortalKey, ts time.Time) *Message {
query := messageSelect + " WHERE dc_chan_id=$1 AND dc_chan_receiver=$2 AND timestamp<=$3 ORDER BY timestamp DESC, dc_attachment_id DESC LIMIT 1"
return mq.New().Scan(mq.db.QueryRow(query, key.ChannelID, key.Receiver, ts.UnixMilli()))
@ -179,6 +184,49 @@ func (m *Message) MassInsert(msgs []MessagePart) {
}
}
type PartialMessage struct {
DiscordID string
AttachmentID string
EditIndex int
SenderID string
Timestamp time.Time
MXID id.EventID
}
func (mq *MessageQuery) MassInsert(portal PortalKey, threadID string, msgs []PartialMessage) {
if len(msgs) == 0 {
return
}
for len(msgs) > 100 {
mq.MassInsert(portal, threadID, msgs[:100])
msgs = msgs[100:]
}
valueStringFormat := "($%d, $%d, $%d, $1, $2, $%d, $%d, $3, $%d)"
if mq.db.Dialect == dbutil.SQLite {
valueStringFormat = strings.ReplaceAll(valueStringFormat, "$", "?")
}
params := make([]interface{}, 3+len(msgs)*6)
placeholders := make([]string, len(msgs))
params[0] = portal.ChannelID
params[1] = portal.Receiver
params[2] = threadID
for i, msg := range msgs {
offset := 3 + i*6
params[offset] = msg.DiscordID
params[offset+1] = msg.AttachmentID
params[offset+2] = msg.EditIndex
params[offset+3] = msg.SenderID
params[offset+4] = msg.Timestamp.UnixMilli()
params[offset+5] = msg.MXID
placeholders[i] = fmt.Sprintf(valueStringFormat, offset+1, offset+2, offset+3, offset+4, offset+5, offset+6)
}
_, err := mq.db.Exec(fmt.Sprintf(messageMassInsertTemplate, strings.Join(placeholders, ", ")), params...)
if err != nil {
mq.log.Warnfln("Failed to insert %d messages in %s/%s: %v", len(msgs), portal, threadID, err)
panic(err)
}
}
func (m *Message) Insert() {
_, err := m.db.Exec(messageInsertQuery,
m.DiscordID, m.AttachmentID, m.EditIndex, m.Channel.ChannelID, m.Channel.Receiver, m.SenderID,

View file

@ -16,7 +16,7 @@ const (
portalSelect = `
SELECT dcid, receiver, type, other_user_id, dc_guild_id, dc_parent_id, mxid,
plain_name, name, name_set, topic, topic_set, avatar, avatar_url, avatar_set,
encrypted, in_space, first_event_id
encrypted, in_space, first_event_id, batch_id, insertion_id, has_more_history
FROM portal
`
)
@ -116,17 +116,20 @@ type Portal struct {
Encrypted bool
InSpace id.RoomID
FirstEventID id.EventID
FirstEventID id.EventID
BatchID id.BatchID
InsertionID id.EventID
HasMoreHistory bool
}
func (p *Portal) Scan(row dbutil.Scannable) *Portal {
var otherUserID, guildID, parentID, mxid, firstEventID sql.NullString
var otherUserID, guildID, parentID, mxid sql.NullString
var chanType int32
var avatarURL string
err := row.Scan(&p.Key.ChannelID, &p.Key.Receiver, &chanType, &otherUserID, &guildID, &parentID,
&mxid, &p.PlainName, &p.Name, &p.NameSet, &p.Topic, &p.TopicSet, &p.Avatar, &avatarURL, &p.AvatarSet,
&p.Encrypted, &p.InSpace, &firstEventID)
&p.Encrypted, &p.InSpace, &p.FirstEventID, &p.BatchID, &p.InsertionID, &p.HasMoreHistory)
if err != nil {
if err != sql.ErrNoRows {
@ -142,7 +145,6 @@ func (p *Portal) Scan(row dbutil.Scannable) *Portal {
p.GuildID = guildID.String
p.ParentID = parentID.String
p.Type = discordgo.ChannelType(chanType)
p.FirstEventID = id.EventID(firstEventID.String)
p.AvatarURL, _ = id.ParseContentURI(avatarURL)
return p
@ -152,13 +154,13 @@ func (p *Portal) Insert() {
query := `
INSERT INTO portal (dcid, receiver, type, other_user_id, dc_guild_id, dc_parent_id, mxid,
plain_name, name, name_set, topic, topic_set, avatar, avatar_url, avatar_set,
encrypted, in_space, first_event_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)
encrypted, in_space, first_event_id, batch_id, insertion_id, has_more_history)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21)
`
_, err := p.db.Exec(query, p.Key.ChannelID, p.Key.Receiver, p.Type,
strPtr(p.OtherUserID), strPtr(p.GuildID), strPtr(p.ParentID), strPtr(string(p.MXID)),
p.PlainName, p.Name, p.NameSet, p.Topic, p.TopicSet, p.Avatar, p.AvatarURL.String(), p.AvatarSet,
p.Encrypted, p.InSpace, p.FirstEventID.String())
p.Encrypted, p.InSpace, p.FirstEventID.String(), p.BatchID.String(), p.InsertionID.String(), p.HasMoreHistory)
if err != nil {
p.log.Warnfln("Failed to insert %s: %v", p.Key, err)
@ -171,13 +173,13 @@ func (p *Portal) Update() {
UPDATE portal
SET type=$1, other_user_id=$2, dc_guild_id=$3, dc_parent_id=$4, mxid=$5,
plain_name=$6, name=$7, name_set=$8, topic=$9, topic_set=$10, avatar=$11, avatar_url=$12, avatar_set=$13,
encrypted=$14, in_space=$15, first_event_id=$16
WHERE dcid=$17 AND receiver=$18
encrypted=$14, in_space=$15, first_event_id=$16, batch_id=$17, insertion_id=$18, has_more_history=$19
WHERE dcid=$20 AND receiver=$21
`
_, err := p.db.Exec(query,
p.Type, strPtr(p.OtherUserID), strPtr(p.GuildID), strPtr(p.ParentID), strPtr(string(p.MXID)),
p.PlainName, p.Name, p.NameSet, p.Topic, p.TopicSet, p.Avatar, p.AvatarURL.String(), p.AvatarSet,
p.Encrypted, p.InSpace, p.FirstEventID.String(),
p.Encrypted, p.InSpace, p.FirstEventID.String(), p.BatchID.String(), p.InsertionID.String(), p.HasMoreHistory,
p.Key.ChannelID, p.Key.Receiver)
if err != nil {

View file

@ -1,4 +1,4 @@
-- v0 -> v8: Latest revision
-- v0 -> v9: Latest revision
CREATE TABLE guild (
dcid TEXT PRIMARY KEY,
@ -37,7 +37,10 @@ CREATE TABLE portal (
encrypted BOOLEAN NOT NULL,
in_space TEXT NOT NULL,
first_event_id TEXT NOT NULL,
first_event_id TEXT NOT NULL,
batch_id TEXT NOT NULL,
insertion_id TEXT NOT NULL,
has_more_history BOOLEAN NOT NULL,
PRIMARY KEY (dcid, receiver),
CONSTRAINT portal_parent_fkey FOREIGN KEY (dc_parent_id, dc_parent_receiver) REFERENCES portal (dcid, receiver) ON DELETE CASCADE,

View file

@ -0,0 +1,8 @@
-- v9: Add backfill batch ID for portals
ALTER TABLE portal ADD COLUMN batch_id TEXT DEFAULT '';
ALTER TABLE portal ADD COLUMN insertion_id TEXT DEFAULT '';
ALTER TABLE portal ADD COLUMN has_more_history BOOLEAN NOT NULL DEFAULT true;
-- only: postgres for next 3 lines
ALTER TABLE portal ALTER COLUMN batch_id DROP DEFAULT;
ALTER TABLE portal ALTER COLUMN insertion_id DROP DEFAULT;
ALTER TABLE portal ALTER COLUMN has_more_history DROP DEFAULT;

View file

@ -96,6 +96,16 @@ bridge:
# Number of private channel portals to create on bridge startup.
# Other portals will be created when receiving messages.
startup_private_channel_create_limit: 5
backfill:
# Enable backfilling messages from Discord using batch sending?
# This requires a server with MSC2716 support, which is currently an experimental feature in synapse.
# It can be enabled by setting experimental_features -> msc2716_enabled to true in homeserver.yaml.
enable: false
# Use double puppets for backfilling?
# In order to use this, the double puppets must be in the appservice's user ID namespace
# (because the bridge can't use the double puppet access token with batch sending).
# This only affects double puppets on the local server, double puppets on other servers will never be used.
use_double_puppet: false
# Should the bridge send a read receipt from the bridge bot when a message has been sent to Discord?
delivery_receipts: false
# Whether the bridge should send the message status as a custom com.beeper.message_send_status event.

2
go.mod
View file

@ -11,7 +11,7 @@ require (
github.com/skip2/go-qrcode v0.0.0-20200617195104-da1b6568686e
github.com/yuin/goldmark v1.4.12
maunium.net/go/maulogger/v2 v2.3.2
maunium.net/go/mautrix v0.11.1-0.20220708142125-1f795238d635
maunium.net/go/mautrix v0.11.1-0.20220712195105-1f3cdabc5e4c
)
require (

4
go.sum
View file

@ -59,5 +59,5 @@ maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/maulogger/v2 v2.3.2 h1:1XmIYmMd3PoQfp9J+PaHhpt80zpfmMqaShzUTC7FwY0=
maunium.net/go/maulogger/v2 v2.3.2/go.mod h1:TYWy7wKwz/tIXTpsx8G3mZseIRiC5DoMxSZazOHy68A=
maunium.net/go/mautrix v0.11.1-0.20220708142125-1f795238d635 h1:/kbw/ZW+MzViciUoTmhJoasmZcqHhcKSPws+FfKwb0s=
maunium.net/go/mautrix v0.11.1-0.20220708142125-1f795238d635/go.mod h1:85mjebfgKX7jjca7XNKTt7lHueX3YQsFUU+5o/FxpTw=
maunium.net/go/mautrix v0.11.1-0.20220712195105-1f3cdabc5e4c h1:sj8XwVEfKc1R4voWvw4awZTupM0Ag0wztmKqRtSb0G0=
maunium.net/go/mautrix v0.11.1-0.20220712195105-1f3cdabc5e4c/go.mod h1:85mjebfgKX7jjca7XNKTt7lHueX3YQsFUU+5o/FxpTw=

321
portal.go
View file

@ -493,34 +493,36 @@ func (portal *Portal) markMessageHandled(discordID string, editIndex int, author
msg.MassInsert(parts)
}
func (portal *Portal) sendMediaFailedMessage(intent *appservice.IntentAPI, bridgeErr error) {
content := &event.MessageEventContent{
func (portal *Portal) makeMediaFailedMessage(bridgeErr error) event.MessageEventContent {
return event.MessageEventContent{
Body: fmt.Sprintf("Failed to bridge media: %v", bridgeErr),
MsgType: event.MsgNotice,
}
_, err := portal.sendMatrixMessage(intent, event.EventMessage, content, nil, 0)
if err != nil {
portal.log.Warnfln("Failed to send media error message to matrix: %v", err)
}
}
const DiscordStickerSize = 160
func (portal *Portal) handleDiscordFile(typeName string, intent *appservice.IntentAPI, id, url string, content *event.MessageEventContent, ts time.Time, threadRelation *event.RelatesTo) *database.MessagePart {
type ConvertedMessagePart struct {
Type event.Type
Content *event.MessageEventContent
AttachmentID string
}
func (portal *Portal) convertDiscordFile(typeName string, intent *appservice.IntentAPI, url string, content *event.MessageEventContent) {
data, err := portal.downloadDiscordAttachment(url)
if err != nil {
portal.sendMediaFailedMessage(intent, err)
return nil
*content = portal.makeMediaFailedMessage(err)
portal.log.Warnfln("Failed to download %s: %v", url, err)
return
}
err = portal.uploadMatrixAttachment(intent, data, content)
if err != nil {
portal.sendMediaFailedMessage(intent, err)
return nil
*content = portal.makeMediaFailedMessage(err)
portal.log.Warnfln("Failed to reupload %s: %v", url, err)
return
}
evtType := event.EventMessage
if typeName == "sticker" && (content.Info.Width > DiscordStickerSize || content.Info.Height > DiscordStickerSize) {
if content.Info.Width > content.Info.Height {
content.Info.Height /= content.Info.Width / DiscordStickerSize
@ -532,25 +534,10 @@ func (portal *Portal) handleDiscordFile(typeName string, intent *appservice.Inte
content.Info.Width = DiscordStickerSize
content.Info.Height = DiscordStickerSize
}
evtType = event.EventSticker
}
resp, err := portal.sendMatrixMessage(intent, evtType, content, nil, ts.UnixMilli())
if err != nil {
portal.log.Warnfln("Failed to send %s to Matrix: %v", typeName, err)
return nil
}
// Update the fallback reply event for the next attachment
if threadRelation != nil {
threadRelation.InReplyTo.EventID = resp.EventID
}
return &database.MessagePart{
AttachmentID: id,
MXID: resp.EventID,
}
}
func (portal *Portal) handleDiscordSticker(intent *appservice.IntentAPI, sticker *discordgo.Sticker, ts time.Time, threadRelation *event.RelatesTo) *database.MessagePart {
func (portal *Portal) convertDiscordSticker(intent *appservice.IntentAPI, sticker *discordgo.Sticker) ConvertedMessagePart {
var mime string
switch sticker.FormatType {
case discordgo.StickerFormatTypePNG:
@ -565,12 +552,16 @@ func (portal *Portal) handleDiscordSticker(intent *appservice.IntentAPI, sticker
Info: &event.FileInfo{
MimeType: mime,
},
RelatesTo: threadRelation,
}
return portal.handleDiscordFile("sticker", intent, sticker.ID, sticker.URL(), content, ts, threadRelation)
portal.convertDiscordFile("sticker", intent, sticker.URL(), content)
return ConvertedMessagePart{
Type: event.EventSticker,
Content: content,
AttachmentID: sticker.ID,
}
}
func (portal *Portal) handleDiscordAttachment(intent *appservice.IntentAPI, att *discordgo.MessageAttachment, ts time.Time, threadRelation *event.RelatesTo) *database.MessagePart {
func (portal *Portal) convertDiscordAttachment(intent *appservice.IntentAPI, att *discordgo.MessageAttachment) ConvertedMessagePart {
// var captionContent *event.MessageEventContent
// if att.Description != "" {
@ -591,7 +582,6 @@ func (portal *Portal) handleDiscordAttachment(intent *appservice.IntentAPI, att
// This gets overwritten later after the file is uploaded to the homeserver
Size: att.Size,
},
RelatesTo: threadRelation,
}
switch strings.ToLower(strings.Split(att.ContentType, "/")[0]) {
@ -604,7 +594,12 @@ func (portal *Portal) handleDiscordAttachment(intent *appservice.IntentAPI, att
default:
content.MsgType = event.MsgFile
}
return portal.handleDiscordFile("attachment", intent, att.ID, att.URL, content, ts, threadRelation)
portal.convertDiscordFile("attachment", intent, att.URL, content)
return ConvertedMessagePart{
Type: event.EventMessage,
Content: content,
AttachmentID: att.ID,
}
}
func (portal *Portal) handleDiscordMessageCreate(user *User, msg *discordgo.Message, thread *Thread) {
@ -648,23 +643,53 @@ func (portal *Portal) handleDiscordMessageCreate(user *User, msg *discordgo.Mess
puppet.UpdateInfo(user, msg.Author)
intent := puppet.IntentFor(portal)
var threadRelation *event.RelatesTo
var threadID string
var threadRootID, lastInThreadID id.EventID
if thread != nil {
threadID = thread.ID
lastEventID := thread.RootMXID
threadRootID = thread.RootMXID
lastInThread := portal.bridge.DB.Message.GetLastInThread(portal.Key, thread.ID)
if lastInThread != nil {
lastEventID = lastInThread.MXID
lastInThreadID = lastInThread.MXID
} else {
lastInThreadID = thread.RootMXID
}
threadRelation = (&event.RelatesTo{}).SetThread(thread.RootMXID, lastEventID)
}
var parts []database.MessagePart
parts := portal.convertDiscordMessage(intent, msg)
if len(parts) == 0 {
portal.log.Warnfln("Unhandled message %s", msg.ID)
return
}
ts, _ := discordgo.SnowflakeTimestamp(msg.ID)
dbParts := make([]database.MessagePart, 0, len(parts))
for _, part := range parts {
if threadRootID != "" {
part.Content.RelatesTo.SetThread(threadRootID, lastInThreadID)
}
resp, err := portal.sendMatrixMessage(intent, part.Type, part.Content, nil, ts.UnixMilli())
if err != nil {
portal.log.Errorln("Failed to send part %q of %s: %v", part.AttachmentID, msg.ID, err)
} else {
lastInThreadID = resp.EventID
dbParts = append(dbParts, database.MessagePart{
AttachmentID: part.AttachmentID,
MXID: resp.EventID,
})
}
}
if len(dbParts) > 0 {
go portal.sendDeliveryReceipt(dbParts[len(dbParts)-1].MXID)
portal.markMessageHandled(msg.ID, 0, msg.Author.ID, ts, threadID, dbParts)
} else {
portal.log.Warnfln("All parts of %s failed to send", msg.ID)
}
}
func (portal *Portal) convertDiscordMessage(intent *appservice.IntentAPI, msg *discordgo.Message) []ConvertedMessagePart {
var parts []ConvertedMessagePart
if msg.Content != "" {
content := portal.renderDiscordMarkdown(msg.Content)
content.RelatesTo = threadRelation.Copy()
if msg.MessageReference != nil {
//key := database.PortalKey{msg.MessageReference.ChannelID, user.ID}
@ -677,36 +702,18 @@ func (portal *Portal) handleDiscordMessageCreate(user *User, msg *discordgo.Mess
}
}
resp, err := portal.sendMatrixMessage(intent, event.EventMessage, &content, nil, ts.UnixMilli())
if err != nil {
portal.log.Warnfln("Failed to send message %s to matrix: %v", msg.ID, err)
return
}
parts = append(parts, database.MessagePart{MXID: resp.EventID})
// Update the fallback reply event for attachments
if threadRelation != nil {
threadRelation.InReplyTo.EventID = resp.EventID
}
go portal.sendDeliveryReceipt(resp.EventID)
parts = append(parts, ConvertedMessagePart{
Type: event.EventMessage,
Content: &content,
})
}
for _, att := range msg.Attachments {
part := portal.handleDiscordAttachment(intent, att, ts, threadRelation)
if part != nil {
parts = append(parts, *part)
}
parts = append(parts, portal.convertDiscordAttachment(intent, att))
}
for _, sticker := range msg.StickerItems {
part := portal.handleDiscordSticker(intent, sticker, ts, threadRelation)
if part != nil {
parts = append(parts, *part)
}
}
if len(parts) == 0 {
portal.log.Warnfln("Unhandled message %s", msg.ID)
} else {
portal.markMessageHandled(msg.ID, 0, msg.Author.ID, ts, threadID, parts)
parts = append(parts, portal.convertDiscordSticker(intent, sticker))
}
return parts
}
func (portal *Portal) handleDiscordMessageUpdate(user *User, msg *discordgo.Message) {
@ -818,6 +825,153 @@ func (portal *Portal) handleDiscordMessageDelete(user *User, msg *discordgo.Mess
}
}
func (portal *Portal) BackfillHistoryChunks(user *User, number int) {
if !portal.HasMoreHistory {
return
}
firstMessage := portal.bridge.DB.Message.GetFirstInChat(portal.Key)
var beforeMsgID string
const limit = 50
if firstMessage != nil {
beforeMsgID = firstMessage.DiscordID
}
// This list is newest to oldest
var messages []*discordgo.Message
for i := 0; i < number; i++ {
resp, err := user.Session.ChannelMessages(portal.Key.ChannelID, limit, beforeMsgID, "", "")
if err != nil {
portal.log.Errorfln("Failed to fetch messages before %q through %s for backfill: %v", beforeMsgID, user.MXID, err)
return
}
messages = append(messages, resp...)
beforeMsgID = resp[len(resp)-1].ID
if len(resp) < limit {
portal.HasMoreHistory = false
break
}
}
portal.log.Debugfln("Got %d messages through %s for backfilling", len(messages), user.MXID)
portal.batchSend(user, messages, portal.FirstEventID, false)
}
func (portal *Portal) batchSend(user *User, messages []*discordgo.Message, prevEventID id.EventID, forward bool) {
req := mautrix.ReqBatchSend{
PrevEventID: prevEventID,
BeeperNewMessages: forward,
}
if !forward {
req.BatchID = portal.BatchID
}
req.StateEventsAtStart = make([]*event.Event, 0)
firstMessageTS, _ := discordgo.SnowflakeTimestamp(messages[len(messages)-1].ID)
beforeFirstMessageTimestampMillis := firstMessageTS.UnixMilli() - 1
addedMembers := make(map[id.UserID]struct{})
addMember := func(mxid id.UserID, name string, avatar id.ContentURI) {
if _, alreadyAdded := addedMembers[mxid]; alreadyAdded {
return
}
content := event.MemberEventContent{
Membership: event.MembershipJoin,
Displayname: name,
AvatarURL: avatar.CUString(),
}
inviteContent := content
inviteContent.Membership = event.MembershipInvite
stateKey := mxid.String()
req.StateEventsAtStart = append(req.StateEventsAtStart, &event.Event{
Type: event.StateMember,
Sender: portal.MainIntent().UserID,
StateKey: &stateKey,
Timestamp: beforeFirstMessageTimestampMillis,
Content: event.Content{Parsed: &inviteContent},
}, &event.Event{
Type: event.StateMember,
Sender: mxid,
StateKey: &stateKey,
Timestamp: beforeFirstMessageTimestampMillis,
Content: event.Content{Parsed: &content},
})
addedMembers[mxid] = struct{}{}
}
dbMessages := make([]database.PartialMessage, 0, len(messages))
req.Events = make([]*event.Event, 0, len(messages))
for i := len(messages) - 1; i >= 0; i-- {
msg := messages[i]
ts, _ := discordgo.SnowflakeTimestamp(msg.ID)
puppet := portal.bridge.GetPuppetByID(msg.Author.ID)
puppet.UpdateInfo(user, msg.Author)
intent := puppet.IntentFor(portal)
if intent.IsCustomPuppet && !portal.bridge.Config.CanDoublePuppetBackfill(intent.UserID) {
intent = puppet.DefaultIntent()
}
addMember(intent.UserID, puppet.Name, puppet.AvatarURL)
for _, part := range portal.convertDiscordMessage(intent, msg) {
wrappedContent := event.Content{
Parsed: part.Content,
}
eventType, err := portal.encrypt(intent, &wrappedContent, part.Type)
if err != nil {
portal.log.Errorfln("Error encrypting %s/%s while backfilling: %v", msg.ID, part.AttachmentID)
continue
}
intent.AddDoublePuppetValue(&wrappedContent)
req.Events = append(req.Events, &event.Event{
Sender: intent.UserID,
Type: eventType,
Timestamp: ts.UnixMilli(),
Content: wrappedContent,
})
dbMessages = append(dbMessages, database.PartialMessage{
DiscordID: msg.ID,
AttachmentID: part.AttachmentID,
SenderID: msg.Author.ID,
Timestamp: ts,
})
}
}
if len(req.Events) != len(dbMessages) {
panic(fmt.Errorf("backfill data length mismatch: %d != %d", len(req.Events), len(dbMessages)))
} else if len(req.Events) == 0 {
portal.log.Warnfln("Didn't get any Matrix events to batch send out of %d messages", len(dbMessages))
return
}
resp, err := portal.MainIntent().BatchSend(portal.MXID, &req)
if err != nil {
portal.log.Errorfln("Failed to batch send %d events: %v", len(dbMessages), err)
return
} else if len(resp.EventIDs) != len(dbMessages) {
portal.log.Errorfln("Unexpected batch send response: got %d event IDs, even though we sent %d messages", len(resp.EventIDs), len(dbMessages))
return
}
insertionID := resp.BaseInsertionEventID
if !forward {
if resp.NextBatchID != "" {
portal.BatchID = resp.NextBatchID
}
if resp.BaseInsertionEventID != "" {
portal.InsertionID = resp.BaseInsertionEventID
} else {
insertionID = portal.InsertionID
}
}
for i, evtID := range resp.EventIDs {
dbMessages[i].MXID = evtID
}
portal.bridge.DB.Message.MassInsert(portal.Key, "", dbMessages)
portal.Update()
portal.log.Infofln("Successfully batch sent %d events", len(dbMessages))
_, err = portal.MainIntent().SendStateEvent(portal.MXID, event.StateInsertionMarker, insertionID.String(), &event.InsertionMarkerContent{
InsertionID: insertionID,
Timestamp: time.Now().UnixMilli(),
})
if err != nil {
portal.log.Warnfln("Failed to send marker event after batch send: %v", err)
}
}
func (portal *Portal) syncParticipants(source *User, participants []*discordgo.User) {
for _, participant := range participants {
puppet := portal.bridge.GetPuppetByID(participant.ID)
@ -984,7 +1138,7 @@ var (
errUnknownEmoji = errors.New("unknown emoji")
)
func errorToStatusReason(err error) (reason event.MessageStatusReason, isCertain, canRetry, sendNotice bool) {
func errorToStatusReason(err error) (reason event.MessageStatusReason, status event.MessageStatus, isCertain, sendNotice bool, message string) {
switch {
case errors.Is(err, errUnknownMsgType),
errors.Is(err, errUnknownRelationType),
@ -993,19 +1147,19 @@ func errorToStatusReason(err error) (reason event.MessageStatusReason, isCertain
errors.Is(err, id.InvalidContentURI),
errors.Is(err, attachment.UnsupportedVersion),
errors.Is(err, attachment.UnsupportedAlgorithm):
return event.MessageStatusUnsupported, true, false, true
return event.MessageStatusUnsupported, event.MessageStatusFail, true, true, ""
case errors.Is(err, attachment.HashMismatch),
errors.Is(err, attachment.InvalidKey),
errors.Is(err, attachment.InvalidInitVector):
return event.MessageStatusUndecryptable, true, false, true
return event.MessageStatusUndecryptable, event.MessageStatusFail, true, true, ""
case errors.Is(err, errUserNotReceiver):
return event.MessageStatusNoPermission, true, false, false
return event.MessageStatusNoPermission, event.MessageStatusFail, true, false, ""
case errors.Is(err, errUnknownEditTarget):
return event.MessageStatusGenericError, true, false, false
return event.MessageStatusGenericError, event.MessageStatusFail, true, false, ""
case errors.Is(err, errTargetNotFound):
return event.MessageStatusGenericError, true, false, true
return event.MessageStatusGenericError, event.MessageStatusFail, true, true, ""
default:
return event.MessageStatusGenericError, false, true, true
return event.MessageStatusGenericError, event.MessageStatusRetriable, false, true, ""
}
}
@ -1025,15 +1179,14 @@ func (portal *Portal) sendStatusEvent(evtID id.EventID, err error) {
Type: event.RelReference,
EventID: evtID,
},
Success: err == nil,
}
if !content.Success {
reason, isCertain, canRetry, _ := errorToStatusReason(err)
content.Reason = reason
content.IsCertain = &isCertain
content.CanRetry = &canRetry
if err == nil {
content.Status = event.MessageStatusSuccess
} else {
content.Reason, content.Status, _, _, content.Message = errorToStatusReason(err)
content.Error = err.Error()
}
content.FillLegacyBooleans()
_, err = intent.SendMessageEvent(portal.MXID, event.BeeperMessageStatus, &content)
if err != nil {
portal.log.Warnln("Failed to send message status event:", err)
@ -1062,8 +1215,8 @@ func (portal *Portal) sendMessageMetrics(evt *event.Event, err error, part strin
level = log.LevelDebug
}
portal.log.Logfln(level, "%s %s %s from %s: %v", part, msgType, evtDescription, evt.Sender, err)
reason, isCertain, _, sendNotice := errorToStatusReason(err)
status := bridge.ReasonToCheckpointStatus(reason)
reason, msgStatus, isCertain, sendNotice, _ := errorToStatusReason(err)
status := bridge.ReasonToCheckpointStatus(reason, msgStatus)
portal.bridge.SendMessageCheckpoint(evt, bridge.MsgStepRemote, err, status, 0)
if sendNotice {
portal.sendErrorMessage(msgType, err.Error(), isCertain)
@ -1239,6 +1392,14 @@ func (portal *Portal) RemoveMXID() {
}
delete(portal.bridge.portalsByMXID, portal.MXID)
portal.MXID = ""
portal.InSpace = ""
portal.BatchID = ""
portal.InsertionID = ""
portal.FirstEventID = ""
portal.HasMoreHistory = true
portal.AvatarSet = false
portal.NameSet = false
portal.Encrypted = false
portal.Update()
portal.bridge.DB.Message.DeleteAll(portal.Key)
}