Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 26 additions & 75 deletions pkg/connector/chatsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ import (
"context"
"encoding/base64"
"strings"
"time"

"github.com/rs/zerolog"
"go.mau.fi/util/ptr"
"maunium.net/go/mautrix/bridgev2"
"maunium.net/go/mautrix/bridgev2/database"
"maunium.net/go/mautrix/bridgev2/simplevent"

"go.mau.fi/mautrix-twitter/pkg/twittermeow"
"go.mau.fi/mautrix-twitter/pkg/twittermeow/crypto"
Expand Down Expand Up @@ -61,7 +59,7 @@ func shouldEmitChatInfoUpdate(chatInfo *bridgev2.ChatInfo, portalRoomType databa
}

// syncXChatChannel syncs a single conversation from XChat inbox data.
// Creates the portal synchronously if it doesn't exist.
// It queues a portal resync and lets bridgev2 create the room if needed.
func (tc *TwitterClient) syncXChatChannel(ctx context.Context, item *response.XChatInboxItem, users map[string]*types.User) {
log := zerolog.Ctx(ctx)

Expand Down Expand Up @@ -94,54 +92,23 @@ func (tc *TwitterClient) syncXChatChannel(ctx context.Context, item *response.XC
}
}

// Ensure a backfill task exists even if we don't end up emitting a ChatInfoChange.
// Beeper scrollback relies on the backfill task existing for the portal.
if portal.MXID != "" {
if chatInfo.CanBackfill {
// FIXME this is wrong, backfill tasks are created automatically based on chat resyncs
if err := tc.connector.br.DB.BackfillTask.EnsureExists(ctx, portal.PortalKey, tc.userLogin.ID); err != nil {
log.Warn().Err(err).
Str("conversation_id", conv.ConversationID).
Msg("Failed to ensure backfill task exists")
} else {
tc.connector.br.WakeupBackfillQueue()
}
}
}

// Create Matrix room if it doesn't exist
// Queue a ChatResync so bridgev2 owns room creation and backfill task registration.
if portal.MXID == "" {
// FIXME this is wrong, CreateMatrixRoom should not be called manually
err = portal.CreateMatrixRoom(ctx, tc.userLogin, chatInfo)
if err != nil {
log.Warn().Err(err).
resync := tc.queueChatResyncNow(chatResyncCreate, portal.PortalKey, chatInfo)
if !resync.Success {
log.Warn().
Str("conversation_id", conv.ConversationID).
Msg("Failed to create Matrix room")
Err(resync.Error).
Msg("Failed to queue ChatResync for XChat conversation")
return
}
// Register backfill task for the newly created room
if chatInfo.CanBackfill {
// FIXME this is wrong, backfill tasks are created automatically based on chat resyncs
if err := tc.connector.br.DB.BackfillTask.EnsureExists(ctx, portal.PortalKey, tc.userLogin.ID); err != nil {
log.Warn().Err(err).
Str("conversation_id", conv.ConversationID).
Msg("Failed to ensure backfill task exists for new room")
} else {
tc.connector.br.WakeupBackfillQueue()
}
}
} else {
if shouldEmitChatInfoUpdate(chatInfo, portal.RoomType) {
tc.userLogin.QueueRemoteEvent(&simplevent.ChatInfoChange{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatInfoChange,
PortalKey: portal.PortalKey,
Timestamp: time.Now(),
},
ChatInfoChange: &bridgev2.ChatInfoChange{
ChatInfo: chatInfo,
},
})
} else if chatInfo.CanBackfill || shouldEmitChatInfoUpdate(chatInfo, portal.RoomType) {
resync := tc.queueChatResyncNow(chatResyncUpdate, portal.PortalKey, chatInfo)
if !resync.Success {
log.Warn().
Str("conversation_id", conv.ConversationID).
Err(resync.Error).
Msg("Failed to queue ChatResync for existing XChat conversation")
}
}

Expand Down Expand Up @@ -519,39 +486,23 @@ func (tc *TwitterClient) syncUntrustedConversation(ctx context.Context, conv *ty

chatInfo := tc.conversationToChatInfo(ctx, conv, inbox)

// Create Matrix room if it doesn't exist
// Queue a ChatResync so bridgev2 owns room creation and backfill task registration.
if portal.MXID == "" {
// FIXME this is wrong, CreateMatrixRoom should not be called manually
err = portal.CreateMatrixRoom(ctx, tc.userLogin, chatInfo)
if err != nil {
log.Warn().Err(err).
resync := tc.queueChatResyncNow(chatResyncCreate, portal.PortalKey, chatInfo)
if !resync.Success {
log.Warn().
Str("conversation_id", conv.ConversationID).
Msg("Failed to create Matrix room for untrusted conversation")
Err(resync.Error).
Msg("Failed to queue ChatResync for untrusted conversation")
return
}
} else {
// Room already exists - update MessageRequest status via ChatInfoChange
tc.userLogin.QueueRemoteEvent(&simplevent.ChatInfoChange{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventChatInfoChange,
PortalKey: portal.PortalKey,
Timestamp: time.Now(),
},
ChatInfoChange: &bridgev2.ChatInfoChange{
ChatInfo: chatInfo,
},
})
}

// Ensure untrusted conversations also have a queue backfill task once a room exists.
if portal.MXID != "" && chatInfo.CanBackfill {
// FIXME this is wrong, backfill tasks are created automatically based on chat resyncs
if err := tc.connector.br.DB.BackfillTask.EnsureExists(ctx, portal.PortalKey, tc.userLogin.ID); err != nil {
log.Warn().Err(err).
resync := tc.queueChatResyncNow(chatResyncUpdate, portal.PortalKey, chatInfo)
if !resync.Success {
log.Warn().
Str("conversation_id", conv.ConversationID).
Msg("Failed to ensure backfill task exists for untrusted conversation")
} else {
tc.connector.br.WakeupBackfillQueue()
Err(resync.Error).
Msg("Failed to queue ChatResync for existing untrusted conversation")
}
}

Expand Down Expand Up @@ -586,7 +537,7 @@ func (tc *TwitterClient) processUntrustedMessages(ctx context.Context, conversat
}

// Queue the message event
tc.HandlePollingEvent(msg, inbox)
tc.HandlePollingEvent(ctx, msg, inbox)
}
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ func NewTwitterClient(login *bridgev2.UserLogin, connector *TwitterConnector, cl
if !ok {
return displayname
}
ghost, err := tc.connector.br.GetGhostByID(context.TODO(), userID)
if ctx.Ctx == nil {
return displayname
}
ghost, err := tc.connector.br.GetGhostByID(ctx.Ctx, userID)
if err != nil || len(ghost.Identifiers) < 1 {
return displayname
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/connector/conversationdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (tc *TwitterClient) ensurePortalForConversation(ctx context.Context, conver
log.Warn().Err(err).Msg("Failed to process key change events for fetched conversation data")
}

// Sync channel (creates portal if needed)
// Sync channel and queue a resync if the portal still needs a room.
tc.syncXChatChannel(ctx, item, users)

// Process messages/read events to backfill and register any keys embedded there
Expand Down
8 changes: 6 additions & 2 deletions pkg/connector/handlematrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,11 @@ func (tc *TwitterClient) doHandleMatrixReaction(ctx context.Context, remove bool

// XChat reactions are sent as encrypted MessageCreateEvents (reaction_add/reaction_remove).
xchatConvID := NormalizeConversationID(conversationID)
_, err := tc.client.SendEncryptedReaction(ctx, xchatConvID, messageID, emoji, remove)
action := twittermeow.SendEncryptedReactionAdd
if remove {
action = twittermeow.SendEncryptedReactionRemove
}
_, err := tc.client.SendEncryptedReaction(ctx, xchatConvID, messageID, emoji, action)
return err
}

Expand Down Expand Up @@ -781,7 +785,7 @@ func (tc *TwitterClient) HandleMatrixViewingChat(ctx context.Context, chat *brid
if chat.Portal != nil {
conversationID = ParsePortalID(chat.Portal.ID)
}
tc.client.SetActiveConversation(ConvertConversationIDToREST(conversationID))
tc.client.SetActiveConversation(context.WithoutCancel(ctx), ConvertConversationIDToREST(conversationID))
return nil
}

Expand Down
Loading
Loading