From fbe4d690b9f74a0cc34d74140b68483596bea5a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Thu, 26 Feb 2026 09:36:26 +0100 Subject: [PATCH 1/6] chore: remove repos from integrations.settings since we have them in repositories table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- ...2043927__removeReposFromGithubSettings.sql | 2 + ...2043927__removeReposFromGithubSettings.sql | 24 ++++ .../repositories/integrationRepository.ts | 56 +++++++- backend/src/services/collectionService.ts | 30 ++-- backend/src/services/integrationService.ts | 135 ++++++++---------- .../src/jobs/nangoMonitoring.job.ts | 33 ++--- .../src/bin/auto-heal-github-integration.ts | 7 +- .../src/bin/process-repo.ts | 7 +- .../src/service/integrationRunService.ts | 14 ++ .../src/service/integrationStreamService.ts | 28 +++- .../src/activities/nangoActivities.ts | 54 +++---- .../src/bin/check-nango-mapping.ts | 53 ++++--- .../src/integrations/index.ts | 22 +-- .../src/repositories/index.ts | 72 ++++++++++ .../libs/types/src/integrationSettings.ts | 6 +- 15 files changed, 341 insertions(+), 202 deletions(-) create mode 100644 backend/src/database/migrations/U1772043927__removeReposFromGithubSettings.sql create mode 100644 backend/src/database/migrations/V1772043927__removeReposFromGithubSettings.sql diff --git a/backend/src/database/migrations/U1772043927__removeReposFromGithubSettings.sql b/backend/src/database/migrations/U1772043927__removeReposFromGithubSettings.sql new file mode 100644 index 0000000000..f49a6e8cf8 --- /dev/null +++ b/backend/src/database/migrations/U1772043927__removeReposFromGithubSettings.sql @@ -0,0 +1,2 @@ +-- Undo: repos data was backed up in integration.integrations_backup_02_24_2026 +-- and cannot be automatically restored into settings.orgs[].repos diff --git a/backend/src/database/migrations/V1772043927__removeReposFromGithubSettings.sql b/backend/src/database/migrations/V1772043927__removeReposFromGithubSettings.sql new file mode 100644 index 0000000000..ceac9b0e91 --- /dev/null +++ b/backend/src/database/migrations/V1772043927__removeReposFromGithubSettings.sql @@ -0,0 +1,24 @@ +-- Strip repos from orgs in settings for github and github-nango integrations +-- Repos now live in public.repositories table and are populated into API responses +-- via the compatibility layer in integrationRepository._populateRelations +UPDATE integrations +SET settings = jsonb_set( + settings, + '{orgs}', + ( + SELECT coalesce(jsonb_agg( + org - 'repos' + ), '[]'::jsonb) + FROM jsonb_array_elements(settings->'orgs') org + ) +) +WHERE platform IN ('github', 'github-nango') + AND settings->'orgs' IS NOT NULL + AND "deletedAt" IS NULL; + +-- Also clean up top-level repos/unavailableRepos if present +UPDATE integrations +SET settings = settings - 'repos' - 'unavailableRepos' +WHERE platform IN ('github', 'github-nango') + AND (settings ? 'repos' OR settings ? 'unavailableRepos') + AND "deletedAt" IS NULL; diff --git a/backend/src/database/repositories/integrationRepository.ts b/backend/src/database/repositories/integrationRepository.ts index a70499a700..065feb93c7 100644 --- a/backend/src/database/repositories/integrationRepository.ts +++ b/backend/src/database/repositories/integrationRepository.ts @@ -585,10 +585,9 @@ class IntegrationRepository { const output = record.get({ plain: true }) - // For github-nango integrations, populate settings.nangoMapping from the dedicated table - // so the API contract remains unchanged for frontend consumers + // For github-nango integrations, populate settings.nangoMapping from dedicated table if (output.platform === PlatformType.GITHUB_NANGO) { - const rows = await record.sequelize.query( + const nangoRows = await record.sequelize.query( `SELECT "connectionId", owner, "repoName" FROM integration.nango_mapping WHERE "integrationId" = :integrationId`, { replacements: { integrationId: output.id }, @@ -596,15 +595,62 @@ class IntegrationRepository { }, ) - if (rows.length > 0) { + if (nangoRows.length > 0) { const nangoMapping: Record = {} - for (const row of rows as { connectionId: string; owner: string; repoName: string }[]) { + for (const row of nangoRows as { + connectionId: string + owner: string + repoName: string + }[]) { nangoMapping[row.connectionId] = { owner: row.owner, repoName: row.repoName } } output.settings = { ...output.settings, nangoMapping } } } + // For both github and github-nango, populate orgs[].repos from repositories table + if ( + (output.platform === PlatformType.GITHUB || output.platform === PlatformType.GITHUB_NANGO) && + output.settings?.orgs?.length > 0 + ) { + const repoRows = (await record.sequelize.query( + `SELECT url, split_part(url, '/', -1) as name, split_part(url, '/', -2) as owner, "forkedFrom", "updatedAt" + FROM public.repositories + WHERE "sourceIntegrationId" = :integrationId AND "deletedAt" IS NULL + ORDER BY url`, + { + replacements: { integrationId: output.id }, + type: QueryTypes.SELECT, + }, + )) as { + url: string + name: string + owner: string + forkedFrom: string | null + updatedAt: string + }[] + + // Group repos by owner (org name) + const reposByOwner: Record = {} + for (const repo of repoRows) { + if (!reposByOwner[repo.owner]) reposByOwner[repo.owner] = [] + reposByOwner[repo.owner].push(repo) + } + + output.settings = { + ...output.settings, + orgs: output.settings.orgs.map((org) => ({ + ...org, + repos: (reposByOwner[org.name] || []).map((r) => ({ + url: r.url, + name: r.name, + forkedFrom: r.forkedFrom, + updatedAt: r.updatedAt, + })), + })), + } + } + return output } } diff --git a/backend/src/services/collectionService.ts b/backend/src/services/collectionService.ts index 51be9693d1..74bfd1d84e 100644 --- a/backend/src/services/collectionService.ts +++ b/backend/src/services/collectionService.ts @@ -27,6 +27,7 @@ import { } from '@crowd/data-access-layer/src/collections' import { fetchIntegrationsForSegment } from '@crowd/data-access-layer/src/integrations' import { QueryFilter } from '@crowd/data-access-layer/src/query' +import { getReposForGithubIntegration } from '@crowd/data-access-layer/src/repositories' import { ICreateRepositoryGroup, IRepositoryGroup, @@ -533,13 +534,8 @@ export class CollectionService extends LoggerBase { return listRepositoryGroups(qx, { insightsProjectId }) } - static isSingleRepoOrg(orgs: GithubIntegrationSettings['orgs']): boolean { - return ( - Array.isArray(orgs) && - orgs.length === 1 && - Array.isArray(orgs[0]?.repos) && - orgs[0].repos.length === 1 - ) + static isSingleRepoOrg(orgs: GithubIntegrationSettings['orgs'], repoCount: number): boolean { + return Array.isArray(orgs) && orgs.length === 1 && repoCount === 1 } /** @@ -613,13 +609,12 @@ export class CollectionService extends LoggerBase { const settings = githubIntegration.settings as GithubIntegrationSettings // The orgs must have at least one repo - if ( - !settings?.orgs || - !Array.isArray(settings.orgs) || - settings.orgs.length === 0 || - !Array.isArray(settings.orgs[0].repos) || - settings.orgs[0].repos.length === 0 - ) { + if (!settings?.orgs || !Array.isArray(settings.orgs) || settings.orgs.length === 0) { + return null + } + + const repos = await getReposForGithubIntegration(qx, githubIntegration.id) + if (repos.length === 0) { return null } @@ -633,11 +628,8 @@ export class CollectionService extends LoggerBase { return null } - const details = CollectionService.isSingleRepoOrg(settings.orgs) - ? await GithubIntegrationService.findRepoDetails( - mainOrg.name, - settings.orgs[0].repos[0].name, - ) + const details = CollectionService.isSingleRepoOrg(settings.orgs, repos.length) + ? await GithubIntegrationService.findRepoDetails(mainOrg.name, repos[0].name) : { ...(await GithubIntegrationService.findOrgDetails(mainOrg.name)), topics: mainOrg.topics, diff --git a/backend/src/services/integrationService.ts b/backend/src/services/integrationService.ts index 570a36bd13..ce86c3bfcd 100644 --- a/backend/src/services/integrationService.ts +++ b/backend/src/services/integrationService.ts @@ -21,6 +21,7 @@ import { IRepository, IRepositoryMapping, getIntegrationReposMapping, + getReposForGithubIntegration, getRepositoriesBySourceIntegrationId, getRepositoriesByUrl, insertRepositories, @@ -801,6 +802,24 @@ export default class IntegrationService { const txService = new IntegrationService(txOptions) try { + // Extract repos from orgs and build forkedFrom map, then store settings without repos + const forkedFromMap = new Map() + if (settings?.orgs) { + for (const org of settings.orgs) { + for (const repo of org.repos || []) { + if (repo.url) { + forkedFromMap.set(repo.url, repo.forkedFrom || null) + } + } + } + } + + // Strip repos from orgs before storing in settings + const settingsToStore = { + ...settings, + orgs: (settings?.orgs || []).map(({ repos: _repos, ...org }) => org), + } + let integration if (!integrationId) { @@ -808,26 +827,26 @@ export default class IntegrationService { integration = await txService.createOrUpdate( { platform: PlatformType.GITHUB_NANGO, - settings, + settings: settingsToStore, status: 'done', }, transaction, ) // create github mapping - this also creates git integration - await txService.mapGithubRepos(integration.id, mapping, false) + await txService.mapGithubRepos(integration.id, mapping, false, forkedFromMap) } else { // update existing integration integration = await txService.findById(integrationId) // create github mapping - this also creates git integration - await txService.mapGithubRepos(integrationId, mapping, false) + await txService.mapGithubRepos(integrationId, mapping, false, forkedFromMap) integration = await txService.createOrUpdate( { id: integrationId, platform: PlatformType.GITHUB_NANGO, - settings, + settings: settingsToStore, }, transaction, ) @@ -858,7 +877,12 @@ export default class IntegrationService { } } - async mapGithubRepos(integrationId, mapping, fireOnboarding = true) { + async mapGithubRepos( + integrationId, + mapping, + fireOnboarding = true, + forkedFromMap?: Map, + ) { this.options.log.info(`Mapping GitHub repos for integration ${integrationId}!`) const transaction = await SequelizeRepository.createTransaction(this.options) @@ -880,9 +904,17 @@ export default class IntegrationService { ) // Note: Repos are synced to public.repositories via mapUnifiedRepositories at the end of this method - // Get integration settings to access forkedFrom data from all orgs const integration = await IntegrationRepository.findById(integrationId, txOptions) - const allReposInSettings = integration.settings?.orgs?.flatMap((org) => org.repos || []) || [] + + // Build forkedFrom map from repositories table if not provided + if (!forkedFromMap) { + forkedFromMap = new Map() + const qx = SequelizeRepository.getQueryExecutor(txOptions) + const existingRepos = await getReposForGithubIntegration(qx, integrationId) + for (const repo of existingRepos) { + forkedFromMap.set(repo.url, repo.forkedFrom) + } + } for (const [segmentId, urls] of Object.entries(repos)) { let isGitintegrationConfigured @@ -904,6 +936,9 @@ export default class IntegrationService { isGitintegrationConfigured = false } + const buildRemotes = (urlList: string[]) => + urlList.map((url) => ({ url, forkedFrom: forkedFromMap.get(url) || null })) + if (isGitintegrationConfigured) { this.options.log.info(`Finding Git integration for segment ${segmentId}!`) const gitInfo = await this.gitGetRemotes(segmentOptions) @@ -911,24 +946,14 @@ export default class IntegrationService { const allUrls = Array.from(new Set([...gitRemotes, ...urls])) this.options.log.info(`Updating Git integration for segment ${segmentId}!`) await this.gitConnectOrUpdate( - { - remotes: allUrls.map((url) => { - const repoInSettings = allReposInSettings.find((r) => r.url === url) - return { url, forkedFrom: repoInSettings?.forkedFrom || null } - }), - }, + { remotes: buildRemotes(allUrls) }, segmentOptions, PlatformType.GITHUB, ) } else { this.options.log.info(`Updating Git integration for segment ${segmentId}!`) await this.gitConnectOrUpdate( - { - remotes: urls.map((url) => { - const repoInSettings = allReposInSettings.find((r) => r.url === url) - return { url, forkedFrom: repoInSettings?.forkedFrom || null } - }), - }, + { remotes: buildRemotes(urls) }, segmentOptions, PlatformType.GITHUB, ) @@ -2388,13 +2413,8 @@ export default class IntegrationService { const githubToken = await getGithubInstallationToken() - const repos = integration.settings.orgs.flatMap((org) => org.repos) as { - url: string - name: string - updatedAt: string - }[] - const qx = SequelizeRepository.getQueryExecutor(this.options) + const repos = await getReposForGithubIntegration(qx, integrationId) const githubRepos = await getRepositoriesBySourceIntegrationId(qx, integrationId) const mappedSegments = githubRepos.map((repo) => repo.segmentId) @@ -2841,46 +2861,13 @@ export default class IntegrationService { const repos = await getInstalledRepositories(installToken) this.options.log.info(`Fetched ${repos.length} installed repositories`) - // Update integration settings - const currentSettings: { - orgs: Array<{ - name: string - logo: string - url: string - fullSync: boolean - updatedAt: string - repos: Array<{ - name: string - url: string - updatedAt: string - }> - }> - } = integration.settings || { orgs: [] } - - if (currentSettings.orgs.length !== 1) { - throw new Error('Integration settings must have exactly one organization') - } - - const currentRepos = currentSettings.orgs[0].repos || [] - const newRepos = repos.filter((repo) => !currentRepos.some((r) => r.url === repo.url)) - this.options.log.info(`Found ${newRepos.length} new repositories`) + // Get current repos from repositories table + const qx = SequelizeRepository.getQueryExecutor(this.options) + const currentRepoRows = await getReposForGithubIntegration(qx, integration.id) + const currentRepoUrls = new Set(currentRepoRows.map((r) => r.url)) - const updatedSettings = { - ...currentSettings, - orgs: [ - { - ...currentSettings.orgs[0], - repos: [ - ...currentRepos, - ...newRepos.map((repo) => ({ - name: repo.name, - url: repo.url, - updatedAt: repo.updatedAt || new Date().toISOString(), - })), - ], - }, - ], - } + const newRepos = repos.filter((repo) => !currentRepoUrls.has(repo.url)) + this.options.log.info(`Found ${newRepos.length} new repositories`) this.options = { ...this.options, @@ -2891,12 +2878,7 @@ export default class IntegrationService { ], } - // Update the integration with new settings - await this.update(integration.id, { settings: updatedSettings }) - - this.options.log.info(`Updated integration settings for integration id: ${integration.id}`) - - // Update GitHub repos mapping + // Update GitHub repos mapping — new repos are added to repositories table via mapGithubRepos const defaultSegmentId = integration.segmentId const mapping = {} for (const repo of newRepos) { @@ -3045,18 +3027,15 @@ export default class IntegrationService { } } - // Build forkedFrom map from integration settings (for GITHUB repositories) + // Build forkedFrom map from existing repositories (for GITHUB platforms) const forkedFromMap = new Map() const isGitHubPlatform = [PlatformType.GITHUB, PlatformType.GITHUB_NANGO].includes( sourcePlatform, ) - const sourceIntegration = isGitHubPlatform - ? await IntegrationRepository.findById(sourceIntegrationId, txOptions) - : null - if (sourceIntegration?.settings?.orgs) { - const allRepos = sourceIntegration.settings.orgs.flatMap((org: any) => org.repos || []) - for (const repo of allRepos) { - if (repo.url && repo.forkedFrom) { + if (isGitHubPlatform) { + const existingRepos = await getReposForGithubIntegration(qx, sourceIntegrationId) + for (const repo of existingRepos) { + if (repo.forkedFrom) { forkedFromMap.set(repo.url, repo.forkedFrom) } } diff --git a/services/apps/cron_service/src/jobs/nangoMonitoring.job.ts b/services/apps/cron_service/src/jobs/nangoMonitoring.job.ts index 6c93069cf9..28ab30aacd 100644 --- a/services/apps/cron_service/src/jobs/nangoMonitoring.job.ts +++ b/services/apps/cron_service/src/jobs/nangoMonitoring.job.ts @@ -15,6 +15,7 @@ import { getNangoMappingsForIntegration, } from '@crowd/data-access-layer/src/integrations' import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' +import { getReposForGithubIntegration } from '@crowd/data-access-layer/src/repositories' import { ALL_NANGO_INTEGRATIONS, NangoIntegration, @@ -74,27 +75,23 @@ const job: IJobDefinition = { // Fetch nango mappings from the dedicated table const nangoMapping = await getNangoMappingsForIntegration(pgpQx(dbConnection), int.id) - // first go through all orgs and repos and check if they are connected to nango - for (const org of int.settings.orgs) { - const orgName = org.name - for (const repo of org.repos) { - const repoName = repo.name - totalRepos++ + // Check which repos are connected to nango by comparing repositories table vs nango_mapping + const repoRows = await getReposForGithubIntegration(pgpQx(dbConnection), int.id) + for (const repo of repoRows) { + totalRepos++ - let found = false - - for (const mapping of Object.values(nangoMapping)) { - if (mapping.owner === orgName && mapping.repoName === repoName) { - found = true - } + let found = false + for (const mapping of Object.values(nangoMapping)) { + if (mapping.owner === repo.owner && mapping.repoName === repo.name) { + found = true } + } - if (!found) { - if (ghNotConnectedToNangoYet.has(int.id)) { - ghNotConnectedToNangoYet.set(int.id, ghNotConnectedToNangoYet.get(int.id) + 1) - } else { - ghNotConnectedToNangoYet.set(int.id, 1) - } + if (!found) { + if (ghNotConnectedToNangoYet.has(int.id)) { + ghNotConnectedToNangoYet.set(int.id, ghNotConnectedToNangoYet.get(int.id) + 1) + } else { + ghNotConnectedToNangoYet.set(int.id, 1) } } } diff --git a/services/apps/integration_run_worker/src/bin/auto-heal-github-integration.ts b/services/apps/integration_run_worker/src/bin/auto-heal-github-integration.ts index 0498684aaf..ef086aba3f 100644 --- a/services/apps/integration_run_worker/src/bin/auto-heal-github-integration.ts +++ b/services/apps/integration_run_worker/src/bin/auto-heal-github-integration.ts @@ -3,6 +3,8 @@ import axios from 'axios' import { IntegrationRunWorkerEmitter } from '@crowd/common_services' import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' import IntegrationRunRepository from '@crowd/data-access-layer/src/old/apps/integration_run_worker/integrationRun.repo' +import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor' +import { populateGithubSettingsWithRepos } from '@crowd/data-access-layer/src/repositories' import { GithubIntegrationSettings, GithubManualIntegrationSettings, @@ -171,8 +173,11 @@ setImmediate(async () => { log.info(`Triggering integration run for ${integrationId}!`) // let's get current settings from integration - const currentSettings = (await repo.getIntegrationSettings( + const rawSettings = await repo.getIntegrationSettings(integrationId) + const currentSettings = (await populateGithubSettingsWithRepos( + dbStoreQx(store), integrationId, + rawSettings, )) as GithubIntegrationSettings const repos: Repo[] = [] diff --git a/services/apps/integration_run_worker/src/bin/process-repo.ts b/services/apps/integration_run_worker/src/bin/process-repo.ts index 812e1cef0a..647324cd6a 100644 --- a/services/apps/integration_run_worker/src/bin/process-repo.ts +++ b/services/apps/integration_run_worker/src/bin/process-repo.ts @@ -1,6 +1,8 @@ import { IntegrationRunWorkerEmitter } from '@crowd/common_services' import { DbStore, getDbConnection } from '@crowd/data-access-layer/src/database' import IntegrationRunRepository from '@crowd/data-access-layer/src/old/apps/integration_run_worker/integrationRun.repo' +import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor' +import { populateGithubSettingsWithRepos } from '@crowd/data-access-layer/src/repositories' import { GithubIntegrationSettings, GithubManualIntegrationSettings, @@ -97,8 +99,11 @@ setImmediate(async () => { log.info(`Triggering integration run for ${integrationId}!`) // let's get current settings from integration - const currentSettings = (await repo.getIntegrationSettings( + const rawSettings = await repo.getIntegrationSettings(integrationId) + const currentSettings = (await populateGithubSettingsWithRepos( + dbStoreQx(store), integrationId, + rawSettings, )) as GithubIntegrationSettings let repos = [] diff --git a/services/apps/integration_run_worker/src/service/integrationRunService.ts b/services/apps/integration_run_worker/src/service/integrationRunService.ts index 3923729799..f6b6917ae0 100644 --- a/services/apps/integration_run_worker/src/service/integrationRunService.ts +++ b/services/apps/integration_run_worker/src/service/integrationRunService.ts @@ -7,6 +7,8 @@ import { import { DbStore } from '@crowd/data-access-layer/src/database' import IntegrationRunRepository from '@crowd/data-access-layer/src/old/apps/integration_run_worker/integrationRun.repo' import MemberAttributeSettingsRepository from '@crowd/data-access-layer/src/old/apps/integration_run_worker/memberAttributeSettings.repo' +import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor' +import { populateGithubSettingsWithRepos } from '@crowd/data-access-layer/src/repositories' import { IGenerateStreamsContext, INTEGRATION_SERVICES } from '@crowd/integrations' import { Logger, LoggerBase, getChildLogger } from '@crowd/logging' import { ApiPubSubEmitter, RedisCache, RedisClient } from '@crowd/redis' @@ -223,6 +225,18 @@ export default class IntegrationRunService extends LoggerBase { return } + // Populate orgs[].repos from repositories table for github integrations + if ( + runInfo.integrationType === PlatformType.GITHUB || + runInfo.integrationType === PlatformType.GITHUB_NANGO + ) { + runInfo.integrationSettings = await populateGithubSettingsWithRepos( + dbStoreQx(this.store), + runInfo.integrationId, + runInfo.integrationSettings, + ) + } + // we can do this because this service instance is only used for one run this.log = getChildLogger('run-processor', this.log, { runId, diff --git a/services/apps/integration_stream_worker/src/service/integrationStreamService.ts b/services/apps/integration_stream_worker/src/service/integrationStreamService.ts index a2f321d52b..d9028e20a9 100644 --- a/services/apps/integration_stream_worker/src/service/integrationStreamService.ts +++ b/services/apps/integration_stream_worker/src/service/integrationStreamService.ts @@ -8,6 +8,8 @@ import { DbConnection, DbStore, DbTransaction } from '@crowd/data-access-layer/s import IncomingWebhookRepository from '@crowd/data-access-layer/src/old/apps/integration_stream_worker/incomingWebhook.repo' import { IStreamData } from '@crowd/data-access-layer/src/old/apps/integration_stream_worker/integrationStream.data' import IntegrationStreamRepository from '@crowd/data-access-layer/src/old/apps/integration_stream_worker/integrationStream.repo' +import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor' +import { populateGithubSettingsWithRepos } from '@crowd/data-access-layer/src/repositories' import { INTEGRATION_SERVICES, IProcessStreamContext, @@ -38,7 +40,7 @@ export default class IntegrationStreamService extends LoggerBase { private readonly runWorkerEmitter: IntegrationRunWorkerEmitter, private readonly streamWorkerEmitter: IntegrationStreamWorkerEmitter, dataSinkWorkerEmitter: DataSinkWorkerEmitter, - store: DbStore, + private readonly store: DbStore, parentLog: Logger, ) { super(parentLog) @@ -222,6 +224,18 @@ export default class IntegrationStreamService extends LoggerBase { return false } + // Populate orgs[].repos from repositories table for github integrations + if ( + streamInfo.integrationType === PlatformType.GITHUB || + streamInfo.integrationType === PlatformType.GITHUB_NANGO + ) { + streamInfo.integrationSettings = await populateGithubSettingsWithRepos( + dbStoreQx(this.store), + streamInfo.integrationId, + streamInfo.integrationSettings, + ) + } + if (streamInfo.runId) { this.log.warn({ streamId }, 'Stream is a regular stream! Processing as such!') return await this.processStream(streamId) @@ -390,6 +404,18 @@ export default class IntegrationStreamService extends LoggerBase { return false } + // Populate orgs[].repos from repositories table for github integrations + if ( + streamInfo.integrationType === PlatformType.GITHUB || + streamInfo.integrationType === PlatformType.GITHUB_NANGO + ) { + streamInfo.integrationSettings = await populateGithubSettingsWithRepos( + dbStoreQx(this.store), + streamInfo.integrationId, + streamInfo.integrationSettings, + ) + } + if (streamInfo.webhookId) { this.log.warn({ streamId }, 'Stream is a webhook stream! Processing as such!') return await this.processWebhookStream(streamInfo.webhookId) diff --git a/services/apps/nango_worker/src/activities/nangoActivities.ts b/services/apps/nango_worker/src/activities/nangoActivities.ts index a2da5126a9..3687e18fbc 100644 --- a/services/apps/nango_worker/src/activities/nangoActivities.ts +++ b/services/apps/nango_worker/src/activities/nangoActivities.ts @@ -11,13 +11,16 @@ import { linkNangoMappingToRepository, removeGithubNangoConnection, removeNangoCursorsByConnection, - setGithubIntegrationSettingsOrgs, setNangoCursor, updateNangoCursorLastCheckedAt, } from '@crowd/data-access-layer/src/integrations' import IntegrationStreamRepository from '@crowd/data-access-layer/src/old/apps/integration_stream_worker/integrationStream.repo' import { dbStoreQx } from '@crowd/data-access-layer/src/queryExecutor' -import { softDeleteRepositories, upsertRepository } from '@crowd/data-access-layer/src/repositories' +import { + getReposForGithubIntegration, + softDeleteRepositories, + upsertRepository, +} from '@crowd/data-access-layer/src/repositories' import { getChildLogger } from '@crowd/logging' import { ALL_NANGO_INTEGRATIONS, @@ -294,46 +297,33 @@ export async function analyzeGithubIntegration( if (integration) { if (integration.platform === PlatformType.GITHUB_NANGO) { const settings = integration.settings + const qx = dbStoreQx(svc.postgres.writer) + + // Build desired repos list from repositories table + const repoRows = await getReposForGithubIntegration(qx, integrationId) + const repoSet = new Map() + for (const row of repoRows) { + repoSet.set(`${row.owner}/${row.name}`, { owner: row.owner, repoName: row.name }) + } - // check if we need to sync org repos - let added = 0 + // For fullSync orgs, discover new repos from GitHub API + // New repos not yet in the repositories table will be added to the sync list + // (syncGithubRepo workflow will create the repositories row) for (const org of settings.orgs) { if (org.fullSync) { const results = await GithubIntegrationService.getOrgRepos(org.name) for (const result of results) { - // we didn't find the repo so we add it - if (!org.repos.some((r) => r.url === result.url)) { - org.repos.push(result) - added++ + const parsed = parseGithubUrl(result.url) + const key = `${parsed.owner}/${parsed.repoName}` + if (!repoSet.has(key)) { + repoSet.set(key, parsed) + svc.log.info(`fullSync: discovered new repo ${result.url} for org ${org.name}`) } } } } - if (added > 0) { - // we need to update the integration settings in the database - await setGithubIntegrationSettingsOrgs( - dbStoreQx(svc.postgres.writer), - integrationId, - settings.orgs, - ) - } - - const repos = new Set() - if (settings.orgs) { - for (const org of settings.orgs) { - for (const repo of org.repos) { - repos.add(parseGithubUrl(repo.url)) - } - } - } - if (settings.repos) { - for (const repo of settings.repos) { - repos.add(parseGithubUrl(repo.url)) - } - } - - const finalRepos = Array.from(repos) + const finalRepos = Array.from(repoSet.values()) // fetch nango mappings from the dedicated table const nangoMapping = await getNangoMappingsForIntegration( diff --git a/services/apps/nango_worker/src/bin/check-nango-mapping.ts b/services/apps/nango_worker/src/bin/check-nango-mapping.ts index 008a77cd5f..d60bec840d 100644 --- a/services/apps/nango_worker/src/bin/check-nango-mapping.ts +++ b/services/apps/nango_worker/src/bin/check-nango-mapping.ts @@ -5,6 +5,7 @@ import { getNangoMappingsForIntegration, } from '@crowd/data-access-layer/src/integrations' import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' +import { getReposForGithubIntegration } from '@crowd/data-access-layer/src/repositories' import { getServiceLogger } from '@crowd/logging' import { PlatformType } from '@crowd/types' @@ -60,37 +61,31 @@ async function collectStats(): Promise { } } - // Loop through orgs in settings - if (integration.settings?.orgs) { - for (const org of integration.settings.orgs) { - // Loop through repos in each org - if (org.repos) { - for (const repo of org.repos) { - totalRepos++ - - if (connectionIds.length === 0) { - missingConnectionCount++ - integrationsWithoutConnections.add(integration.id) - continue - } - - let found = false - for (const mapping of Object.values(nangoMapping)) { - if (mapping.owner === org.name && mapping.repoName === repo.name) { - found = true - break - } - } - - if (!found) { - missingConnectionCount++ - integrationsWithoutConnections.add(integration.id) - } else { - connectedRepos++ - } - } + // Get repos from repositories table + const repos = await getReposForGithubIntegration(qx, integration.id) + for (const repo of repos) { + totalRepos++ + + if (connectionIds.length === 0) { + missingConnectionCount++ + integrationsWithoutConnections.add(integration.id) + continue + } + + let found = false + for (const mapping of Object.values(nangoMapping)) { + if (mapping.owner === repo.owner && mapping.repoName === repo.name) { + found = true + break } } + + if (!found) { + missingConnectionCount++ + integrationsWithoutConnections.add(integration.id) + } else { + connectedRepos++ + } } } diff --git a/services/libs/data-access-layer/src/integrations/index.ts b/services/libs/data-access-layer/src/integrations/index.ts index 734543b757..bdf6f57a74 100644 --- a/services/libs/data-access-layer/src/integrations/index.ts +++ b/services/libs/data-access-layer/src/integrations/index.ts @@ -2,6 +2,7 @@ import { getServiceChildLogger } from '@crowd/logging' import { IIntegration, PlatformType } from '@crowd/types' import { QueryExecutor } from '../queryExecutor' +import { getReposForGithubIntegration } from '../repositories' import { getReposBySegmentGroupedByPlatform } from '../segments' const log = getServiceChildLogger('db.integrations') @@ -668,24 +669,11 @@ export async function findNangoRepositoriesToBeRemoved( return [] } - const repoSlugs = new Set() - const settings = integration.settings as any + // Get desired repos from repositories table + const repos = await getReposForGithubIntegration(qx, integrationId) + const repoSlugs = new Set(repos.map((r) => `${r.owner}/${r.name}`)) - if (settings.orgs) { - for (const org of settings.orgs) { - for (const repo of org.repos ?? []) { - repoSlugs.add(extractGithubRepoSlug(repo.url)) - } - } - } - - if (settings.repos) { - for (const repo of settings.repos) { - repoSlugs.add(extractGithubRepoSlug(repo.url)) - } - } - - // determine which connections to delete if needed + // Find nango mappings that aren't in the desired set const reposToBeRemoved: string[] = [] for (const mappedRepo of Object.values(nangoMappings)) { if (!repoSlugs.has(`${mappedRepo.owner}/${mappedRepo.repoName}`)) { diff --git a/services/libs/data-access-layer/src/repositories/index.ts b/services/libs/data-access-layer/src/repositories/index.ts index 1608c3db28..5c6903a87b 100644 --- a/services/libs/data-access-layer/src/repositories/index.ts +++ b/services/libs/data-access-layer/src/repositories/index.ts @@ -557,6 +557,78 @@ export async function findSegmentsForRepos( return results } +export interface IGithubRepoForIntegration { + url: string + name: string + owner: string + forkedFrom: string | null + updatedAt: string +} + +export async function getReposForGithubIntegration( + qx: QueryExecutor, + integrationId: string, +): Promise { + return qx.select( + ` + SELECT + r.url, + split_part(r.url, '/', -1) as name, + split_part(r.url, '/', -2) as owner, + r."forkedFrom", + r."updatedAt" + FROM public.repositories r + WHERE r."sourceIntegrationId" = $(integrationId) + AND r."deletedAt" IS NULL + ORDER BY r.url + `, + { integrationId }, + ) +} + +export async function getReposGroupedByOrg( + qx: QueryExecutor, + integrationId: string, +): Promise> { + const repos = await getReposForGithubIntegration(qx, integrationId) + const grouped: Record = {} + for (const repo of repos) { + if (!grouped[repo.owner]) grouped[repo.owner] = [] + grouped[repo.owner].push(repo) + } + return grouped +} + +/** + * Populates `settings.orgs[].repos` from the repositories table for github/github-nango integrations. + * Used by worker services to inject repo data into settings before passing to stream processors. + */ +export async function populateGithubSettingsWithRepos( + qx: QueryExecutor, + integrationId: string, + settings: unknown, +): Promise { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const s = settings as any + if (!s?.orgs || !Array.isArray(s.orgs)) return settings + + const reposByOrg = await getReposGroupedByOrg(qx, integrationId) + + return { + ...s, + orgs: s.orgs.map((org: { name: string; [key: string]: unknown }) => ({ + ...org, + repos: (reposByOrg[org.name] || []).map((r) => ({ + url: r.url, + name: r.name, + owner: r.owner, + createdAt: r.updatedAt, + forkedFrom: r.forkedFrom, + })), + })), + } +} + /** * Updates repositories.enabled based on the provided list of enabled URLs. * Called when user toggles repository enabled status in the UI. diff --git a/services/libs/types/src/integrationSettings.ts b/services/libs/types/src/integrationSettings.ts index 65fc1ef89d..fc20feb386 100644 --- a/services/libs/types/src/integrationSettings.ts +++ b/services/libs/types/src/integrationSettings.ts @@ -19,12 +19,16 @@ export interface IGithubOrg { url: string fullSync: boolean updatedAt: string - repos: IGithubRepo[] + /** @deprecated Repos are stored in public.repositories table for github-nango. + * Only used by the legacy github platform. */ + repos?: IGithubRepo[] } export interface IGithubIntegrationSettings { orgs: IGithubOrg[] + /** @deprecated Only used by legacy github platform (processStream.ts) */ repos?: IGithubRepo[] + /** @deprecated Only used by legacy github platform (processStream.ts) */ unavailableRepos?: IGithubRepo[] updateMemberAttributes?: boolean } From 2fe0a786a839c8ed77d3b38f94d63c1c446818c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uro=C5=A1=20Marolt?= Date: Fri, 27 Feb 2026 15:01:18 +0100 Subject: [PATCH 2/6] fix: fixed removal of repos from settings for old github integration and some old bugs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Uroš Marolt --- .../repositories/integrationRepository.ts | 37 ++++++++++-------- backend/src/services/integrationService.ts | 21 ++++++++-- .../pages/integration-list.page.vue | 38 +++++++++---------- .../src/repositories/index.ts | 35 ++++++++++++++++- 4 files changed, 89 insertions(+), 42 deletions(-) diff --git a/backend/src/database/repositories/integrationRepository.ts b/backend/src/database/repositories/integrationRepository.ts index 065feb93c7..323efbedd3 100644 --- a/backend/src/database/repositories/integrationRepository.ts +++ b/backend/src/database/repositories/integrationRepository.ts @@ -630,24 +630,29 @@ class IntegrationRepository { updatedAt: string }[] - // Group repos by owner (org name) - const reposByOwner: Record = {} - for (const repo of repoRows) { - if (!reposByOwner[repo.owner]) reposByOwner[repo.owner] = [] - reposByOwner[repo.owner].push(repo) - } + // Only overwrite orgs[].repos from the repositories table if there are rows. + // During the 'mapping' phase (legacy github connect), repos live in settings + // before being written to the repositories table via mapGithubRepos. + if (repoRows.length > 0) { + const reposByOwner: Record = {} + for (const repo of repoRows) { + if (!reposByOwner[repo.owner]) reposByOwner[repo.owner] = [] + reposByOwner[repo.owner].push(repo) + } - output.settings = { - ...output.settings, - orgs: output.settings.orgs.map((org) => ({ - ...org, - repos: (reposByOwner[org.name] || []).map((r) => ({ - url: r.url, - name: r.name, - forkedFrom: r.forkedFrom, - updatedAt: r.updatedAt, + output.settings = { + ...output.settings, + orgs: output.settings.orgs.map((org) => ({ + ...org, + repos: (reposByOwner[org.name] || []).map((r) => ({ + url: r.url, + name: r.name, + owner: r.owner, + forkedFrom: r.forkedFrom, + updatedAt: r.updatedAt, + })), })), - })), + } } } diff --git a/backend/src/services/integrationService.ts b/backend/src/services/integrationService.ts index ce86c3bfcd..a51b62050f 100644 --- a/backend/src/services/integrationService.ts +++ b/backend/src/services/integrationService.ts @@ -27,6 +27,7 @@ import { insertRepositories, restoreRepositories, softDeleteRepositories, + stripReposFromGithubSettings, } from '@crowd/data-access-layer/src/repositories' import { getMappedAllWithSegmentName, @@ -964,20 +965,32 @@ export default class IntegrationService { const txService = new IntegrationService(txOptions) await txService.mapUnifiedRepositories(integration.platform, integrationId, mapping) + // Now that repos are in the repositories table, strip them from settings + const qxTx = SequelizeRepository.getQueryExecutor(txOptions) + await stripReposFromGithubSettings(qxTx, integrationId) + + let onboardingIntegration if (fireOnboarding) { this.options.log.info('Updating integration status to in-progress!') - const integration = await IntegrationRepository.update( + onboardingIntegration = await IntegrationRepository.update( integrationId, { status: 'in-progress' }, txOptions, ) + } + + await SequelizeRepository.commitTransaction(transaction) + // Trigger the run AFTER commit so the worker can see the repositories rows + if (onboardingIntegration) { this.options.log.info('Sending GitHub message to int-run-worker!') const emitter = await getIntegrationRunWorkerEmitter() - await emitter.triggerIntegrationRun(integration.platform, integration.id, true) + await emitter.triggerIntegrationRun( + onboardingIntegration.platform, + onboardingIntegration.id, + true, + ) } - - await SequelizeRepository.commitTransaction(transaction) } catch (err) { this.options.log.error(err, 'Error while mapping GitHub repos!') try { diff --git a/frontend/src/modules/admin/modules/integration/pages/integration-list.page.vue b/frontend/src/modules/admin/modules/integration/pages/integration-list.page.vue index bfb16905a9..efb44fb303 100644 --- a/frontend/src/modules/admin/modules/integration/pages/integration-list.page.vue +++ b/frontend/src/modules/admin/modules/integration/pages/integration-list.page.vue @@ -128,7 +128,22 @@ const { array, loadingFetch } = mapGetters('integration'); const { id, grandparentId } = route.params; -const useGitHubNango = ref(false); // true for v2, false for v1 +const authStore = useAuthStore(); +const userId = computed(() => authStore.user?.id); +const teamUserIds = computed(() => config.permissions.teamUserIds); +const env = computed(() => config.env); + +const isTeamUser = computed(() => env.value !== 'production' || teamUserIds.value?.includes(userId.value)); + +const useGitHubNango = computed(() => { + const githubIntegration = array.value.find( + (integration: any) => integration.platform === 'github', + ); + if (githubIntegration) { + return !!githubIntegration.isNango; + } + return !!isTeamUser.value; +}); const subproject = ref(); @@ -153,34 +168,15 @@ const platformsByStatus = computed(() => { return all.filter((platform) => matching.includes(platform)); }); -const authStore = useAuthStore(); -const userId = computed(() => authStore.user?.id); -const teamUserIds = computed(() => config.permissions.teamUserIds); -const env = computed(() => config.env); - -const isTeamUser = computed(() => env.value !== 'production' || teamUserIds.value?.includes(userId.value)); - onMounted(() => { localStorage.setItem('segmentId', id as string); localStorage.setItem('segmentGrandparentId', grandparentId as string); - doFetch().then(() => { - useGitHubNango.value = updateGithubVersion(); - }); + doFetch(); findSubProject(id).then((res) => { subproject.value = res; }); }); - -const updateGithubVersion = () => { - const githubIntegration = array.value.find( - (integration: any) => integration.platform === 'github', - ); - if (githubIntegration) { - return !!githubIntegration.isNango; - } - return !!isTeamUser.value; -};