diff --git a/client/components/Editor/Editor.stories.tsx b/client/components/Editor/Editor.stories.tsx index a3db8ac34a..6cca88ce7d 100644 --- a/client/components/Editor/Editor.stories.tsx +++ b/client/components/Editor/Editor.stories.tsx @@ -162,6 +162,7 @@ storiesOf('Editor', module) updatechangeObject(evt); }} collaborativeOptions={{ + pubId: 'storybook-pub-id', firebaseRef: draftRef as any, clientData, initialDocKey: -1, @@ -237,6 +238,7 @@ storiesOf('Editor', module) } }} collaborativeOptions={{ + pubId: 'storybook-pub-id', firebaseRef: draftRef as any, clientData, initialDocKey: -1, diff --git a/client/components/Editor/plugins/collaborative/document.ts b/client/components/Editor/plugins/collaborative/document.ts index 306a610244..8590dd18f1 100644 --- a/client/components/Editor/plugins/collaborative/document.ts +++ b/client/components/Editor/plugins/collaborative/document.ts @@ -45,6 +45,7 @@ export default ( ) => { const { collaborativeOptions, isReadOnly, onError = noop } = options; const { + pubId, firebaseRef: ref, onStatusChange = noop, onUpdateLatestKey = noop, @@ -100,7 +101,7 @@ export default ( /* If multiple of saveEveryNSteps, update checkpoint */ const saveEveryNSteps = 100; if (snapshot.key && snapshot.key % saveEveryNSteps === 0) { - storeCheckpoint(ref, newState.doc, snapshot.key); + storeCheckpoint(pubId, newState.doc, snapshot.key); } } diff --git a/client/components/Editor/types.ts b/client/components/Editor/types.ts index 35622891b0..2a0f0bf04e 100644 --- a/client/components/Editor/types.ts +++ b/client/components/Editor/types.ts @@ -42,6 +42,7 @@ export type CollaborativeOptions = { clientData: { id: null | string; }; + pubId: string; firebaseRef: firebase.database.Reference; initialDocKey: number; onStatusChange?: (status: CollaborativeEditorStatus) => unknown; diff --git a/client/components/Editor/utils/firebase.ts b/client/components/Editor/utils/firebase.ts index 6958a4ea99..f693f25bfa 100644 --- a/client/components/Editor/utils/firebase.ts +++ b/client/components/Editor/utils/firebase.ts @@ -4,26 +4,30 @@ import type { Step } from 'prosemirror-transform'; import type { CompressedChange, CompressedKeyable } from '../types'; -import { compressStateJSON, compressStepJSON } from 'prosemirror-compress-pubpub'; +import { compressStepJSON } from 'prosemirror-compress-pubpub'; import uuid from 'uuid'; +import { apiFetch } from 'client/utils/apiFetch'; + export const firebaseTimestamp = { '.sv': 'timestamp' }; -export const storeCheckpoint = async ( - firebaseRef: firebase.database.Reference, - doc: Node, - keyNumber: number, -) => { - const checkpoint = { - d: compressStateJSON({ doc: doc.toJSON() }).d, - k: keyNumber, - t: firebaseTimestamp, - }; - await Promise.all([ - firebaseRef.child(`checkpoints/${keyNumber}`).set(checkpoint), - firebaseRef.child('checkpoint').set(checkpoint), - firebaseRef.child(`checkpointMap/${keyNumber}`).set(firebaseTimestamp), - ]); +/** + * Store a checkpoint by writing the doc to Postgres via the server API. + * Firebase checkpoints are no longer written — Postgres is the single + * source of truth for checkpoints. + */ +export const storeCheckpoint = async (pubId: string, doc: Node, keyNumber: number) => { + try { + await apiFetch.post('/api/draftCheckpoint', { + pubId, + historyKey: keyNumber, + doc: doc.toJSON(), + }); + } catch (err) { + // Non-fatal: the checkpoint is an optimization, not required for correctness. + // The next checkpoint attempt (100 steps later) will try again. + console.error('Failed to store checkpoint:', err); + } }; export const flattenKeyables = ( diff --git a/client/containers/Pub/PubDocument/PubBody.tsx b/client/containers/Pub/PubDocument/PubBody.tsx index 171b4f1229..ae7da374b4 100644 --- a/client/containers/Pub/PubDocument/PubBody.tsx +++ b/client/containers/Pub/PubDocument/PubBody.tsx @@ -33,6 +33,7 @@ const markSentryError = (err: Error) => { const PubBody = (props: Props) => { const { editorWrapperRef } = props; const { + pubData, noteManager, updateCollabData, historyData: { setLatestHistoryKey }, @@ -84,6 +85,7 @@ const PubBody = (props: Props) => { const collaborativeOptions = includeCollabPlugin && !!firebaseDraftRef && { + pubId: pubData.id, initialDocKey: initialHistoryKey, firebaseRef: firebaseDraftRef, clientData: localCollabUser, diff --git a/server/apiRoutes.ts b/server/apiRoutes.ts index 772b4cec09..6d3cec868d 100644 --- a/server/apiRoutes.ts +++ b/server/apiRoutes.ts @@ -12,6 +12,7 @@ import { router as customScriptRouter } from './customScript/api'; import { router as devApiRouter } from './dev/api'; import { router as discussionRouter } from './discussion/api'; import { router as doiRouter } from './doi/api'; +import { router as draftCheckpointRouter } from './draftCheckpoint/api'; import { router as editorRouter } from './editor/api'; import { router as integrationDataOAuth1Router } from './integrationDataOAuth1/api'; import { router as landingPageFeatureRouter } from './landingPageFeature/api'; @@ -46,6 +47,7 @@ const apiRouter = Router() .use(customScriptRouter) .use(discussionRouter) .use(doiRouter) + .use(draftCheckpointRouter) .use(editorRouter) .use(integrationDataOAuth1Router) .use(landingPageFeatureRouter) diff --git a/server/draft/model.ts b/server/draft/model.ts index ec3ef0be17..90571b5f15 100644 --- a/server/draft/model.ts +++ b/server/draft/model.ts @@ -10,7 +10,6 @@ import { Model, PrimaryKey, Table, - // HasOne, } from 'sequelize-typescript'; // import { Pub } from '../models'; @@ -30,6 +29,8 @@ export class Draft extends Model, InferCreationAttributes @Column(DataType.STRING) declare firebasePath: string; - // @HasOne(() => Pub, { as: 'pub', foreignKey: 'draftId' }) - // pub?: Pub; + // UUID of the DraftCheckpoint row in Postgres, if one exists. + // When set, the checkpoint is loaded from Postgres rather than Firebase. + @Column(DataType.UUID) + declare coldCheckpointId: string | null; } diff --git a/server/draftCheckpoint/api.ts b/server/draftCheckpoint/api.ts new file mode 100644 index 0000000000..317345b302 --- /dev/null +++ b/server/draftCheckpoint/api.ts @@ -0,0 +1,39 @@ +import { Router } from 'express'; + +import { Draft, Pub } from 'server/models'; +import { wrap } from 'server/wrap'; +import { expect } from 'utils/assert'; + +import { upsertDraftCheckpoint } from './queries'; + +export const router = Router(); + +router.post( + '/api/draftCheckpoint', + wrap(async (req, res) => { + const userId = req.user?.id; + if (!userId) { + return res.status(401).json({}); + } + + const { pubId, historyKey: rawHistoryKey, doc } = req.body; + const historyKey = + typeof rawHistoryKey === 'string' ? parseInt(rawHistoryKey, 10) : rawHistoryKey; + if (!pubId || typeof historyKey !== 'number' || Number.isNaN(historyKey) || !doc) { + return res.status(400).json({ error: 'Missing pubId, historyKey, or doc' }); + } + + // Look up the draft for this pub + const pub = await Pub.findOne({ + where: { id: pubId }, + include: [{ model: Draft, as: 'draft' }], + }); + if (!pub?.draft) { + return res.status(404).json({ error: 'Pub or draft not found' }); + } + + const checkpoint = await upsertDraftCheckpoint(pub.draft.id, historyKey, doc, Date.now()); + + return res.status(200).json({ id: checkpoint.id }); + }), +); diff --git a/server/draftCheckpoint/model.ts b/server/draftCheckpoint/model.ts new file mode 100644 index 0000000000..a6039d91cb --- /dev/null +++ b/server/draftCheckpoint/model.ts @@ -0,0 +1,73 @@ +import type { CreationOptional, InferAttributes, InferCreationAttributes } from 'sequelize'; + +import type { SerializedModel } from 'types'; + +import { + AllowNull, + BelongsTo, + Column, + DataType, + Default, + Index, + Model, + PrimaryKey, + Table, +} from 'sequelize-typescript'; + +import { Draft } from '../models'; + +@Table +export class DraftCheckpoint extends Model< + InferAttributes, + InferCreationAttributes +> { + public declare toJSON: (this: M) => SerializedModel; + + @Default(DataType.UUIDV4) + @PrimaryKey + @Column(DataType.UUID) + declare id: CreationOptional; + + @AllowNull(false) + @Index + @Column(DataType.UUID) + declare draftId: string; + + // The history key this checkpoint represents (i.e. the doc state after applying + // all changes up to and including this key) + @AllowNull(false) + @Column(DataType.INTEGER) + declare historyKey: number; + + // The compressed doc JSON (same shape as Doc.content — a ProseMirror doc JSON) + @AllowNull(false) + @Column(DataType.JSONB) + declare doc: Record; + + // Timestamp of the change at this history key + @Column(DataType.BIGINT) + declare timestamp: number | null; + + // Firebase discussion positions at the time of cold storage, keyed by discussion ID. + // Stored so they can be "thawed" back into Firebase when the draft is next loaded. + @Column(DataType.JSONB) + declare discussions: Record | null; + + // Cumulative StepMap ranges from the latest release historyKey to this checkpoint's + // historyKey. Used to map discussion anchors during release creation when the + // original steps are no longer available in Firebase. + // Shape: Array — each inner array is a StepMap.ranges (triples of + // [oldStart, oldSize, newSize]). + @Column(DataType.JSONB) + declare stepMaps: number[][] | null; + + // The history key that stepMaps cover up to. After cold storage thaw + editing, + // the checkpoint's historyKey advances but stepMaps still only cover up to this key. + // At release time, Firebase changes from stepMapToKey+1 → currentKey are composed + // with the stored stepMaps. + @Column(DataType.INTEGER) + declare stepMapToKey: number | null; + + @BelongsTo(() => Draft, { as: 'draft', foreignKey: 'draftId' }) + declare draft?: Draft; +} diff --git a/server/draftCheckpoint/queries.ts b/server/draftCheckpoint/queries.ts new file mode 100644 index 0000000000..989e29c1ca --- /dev/null +++ b/server/draftCheckpoint/queries.ts @@ -0,0 +1,61 @@ +import type { DocJson } from 'types'; + +import { DraftCheckpoint } from 'server/models'; + +/** + * Create or update the checkpoint for a draft. + * Each draft has at most one checkpoint — an upsert on draftId. + */ +export const upsertDraftCheckpoint = async ( + draftId: string, + historyKey: number, + doc: DocJson, + timestamp: number | null = null, + sequelizeTransaction: any = null, + options: { + discussions?: Record | null; + stepMaps?: number[][] | null; + stepMapToKey?: number | null; + } = {}, +) => { + // Only include optional fields in the update if they were explicitly provided. + // This prevents normal checkpoint writes (from the client API) from clobbering + // stepMaps/discussions that were set during cold storage. + const optionalFields: Partial< + Pick + > = {}; + if ('discussions' in options) optionalFields.discussions = options.discussions ?? null; + if ('stepMaps' in options) optionalFields.stepMaps = options.stepMaps ?? null; + if ('stepMapToKey' in options) optionalFields.stepMapToKey = options.stepMapToKey ?? null; + + const existing = await DraftCheckpoint.findOne({ + where: { draftId }, + transaction: sequelizeTransaction, + }); + + if (existing) { + // Only update if the new key is more recent + if (historyKey > existing.historyKey) { + await existing.update( + { historyKey, doc, timestamp, ...optionalFields }, + { transaction: sequelizeTransaction }, + ); + } + return existing; + } + + return DraftCheckpoint.create( + { draftId, historyKey, doc, timestamp, ...optionalFields }, + { transaction: sequelizeTransaction }, + ); +}; + +/** + * Get the checkpoint for a draft, if one exists. + */ +export const getDraftCheckpoint = async (draftId: string, sequelizeTransaction: any = null) => { + return DraftCheckpoint.findOne({ + where: { draftId }, + transaction: sequelizeTransaction, + }); +}; diff --git a/server/models.ts b/server/models.ts index 925e1b1b75..54148ea0b2 100644 --- a/server/models.ts +++ b/server/models.ts @@ -18,6 +18,7 @@ import { Discussion } from './discussion/model'; import { DiscussionAnchor } from './discussionAnchor/model'; import { Doc } from './doc/model'; import { Draft } from './draft/model'; +import { DraftCheckpoint } from './draftCheckpoint/model'; import { EmailChangeToken } from './emailChangeToken/model'; import { Export } from './export/model'; import { ExternalPublication } from './externalPublication/model'; @@ -78,6 +79,7 @@ sequelize.addModels([ DiscussionAnchor, Doc, Draft, + DraftCheckpoint, EmailChangeToken, Export, ExternalPublication, @@ -174,6 +176,7 @@ export { EmailChangeToken, Doc, Draft, + DraftCheckpoint, Export, ExternalPublication, FeatureFlag, diff --git a/server/pubHistory/queries.ts b/server/pubHistory/queries.ts index 277d49200a..a5edbd3763 100644 --- a/server/pubHistory/queries.ts +++ b/server/pubHistory/queries.ts @@ -14,7 +14,7 @@ export const restorePubDraftToHistoryKey = async (options: RestorePubOptions) => const { pubId, userId, historyKey } = options; assert(typeof historyKey === 'number' && historyKey >= 0); const pubDraftRef = await getPubDraftRef(pubId); - const { doc } = await getPubDraftDoc(pubDraftRef, historyKey); + const { doc } = await getPubDraftDoc(pubId, historyKey); const editor = await editFirebaseDraftByRef(pubDraftRef, userId); editor.transform((tr, schema) => { diff --git a/server/release/queries.ts b/server/release/queries.ts index ad5861bf7d..13c3bd500e 100644 --- a/server/release/queries.ts +++ b/server/release/queries.ts @@ -2,14 +2,16 @@ import type firebase from 'firebase'; import type { DefinitelyHas, DocJson, Maybe, Release as ReleaseType } from 'types'; +import { StepMap } from 'prosemirror-transform'; import { Op } from 'sequelize'; import { editorSchema, getStepsInChangeRange } from 'components/Editor'; import { createPubReleasedActivityItem } from 'server/activityItem/queries'; import { createUpdatedDiscussionAnchorForNewSteps } from 'server/discussionAnchor/queries'; import { createDoc } from 'server/doc/queries'; +import { getDraftCheckpoint } from 'server/draftCheckpoint/queries'; import { createLatestPubExports } from 'server/export/queries'; -import { Discussion, DiscussionAnchor, Doc, Release } from 'server/models'; +import { Discussion, DiscussionAnchor, Doc, Draft, Pub, Release } from 'server/models'; import { sequelize } from 'server/sequelize'; import { defer } from 'server/utils/deferred'; import { getPubDraftDoc, getPubDraftRef } from 'server/utils/firebaseAdmin'; @@ -40,28 +42,57 @@ const getStepsSinceLastRelease = async ( return []; }; +/** + * Map a discussion selection through an array of StepMap ranges (from DraftCheckpoint.stepMaps). + * Returns the new selection, or null if it was deleted. + */ +const mapSelectionThroughStoredStepMaps = ( + selection: { anchor: number; head: number } | null, + stepMapRanges: number[][], +) => { + if (!selection || selection.anchor === selection.head) return null; + let from = Math.min(selection.anchor, selection.head); + let to = Math.max(selection.anchor, selection.head); + + for (const ranges of stepMapRanges) { + const map = new StepMap(ranges); + from = map.map(from, 1); + to = map.map(to, -1); + if (from >= to || from === 0) return null; + } + + return { type: 'text' as const, anchor: from, head: to }; +}; + const createDiscussionAnchorsForRelease = async ( pubId: string, previousRelease: Maybe>, currentHistoryKey: number, sequelizeTransaction: any, ) => { + if (!previousRelease) return; + const draftRef = await getPubDraftRef(pubId, sequelizeTransaction); - if (previousRelease) { - const steps = await getStepsSinceLastRelease(draftRef, previousRelease, currentHistoryKey); - const flatSteps = steps.reduce((a, b) => [...a, ...b], []); - const discussions = await Discussion.findAll({ - where: { pubId }, - attributes: ['id'], - transaction: sequelizeTransaction, - }); - const existingAnchors = await DiscussionAnchor.findAll({ - where: { - discussionId: { [Op.in]: discussions.map((d) => d.id) }, - historyKey: previousRelease.historyKey, - }, - transaction: sequelizeTransaction, - }); + const steps = await getStepsSinceLastRelease(draftRef, previousRelease, currentHistoryKey); + const flatSteps = steps.reduce((a, b) => [...a, ...b], []); + + const discussions = await Discussion.findAll({ + where: { pubId }, + attributes: ['id'], + transaction: sequelizeTransaction, + }); + const existingAnchors = await DiscussionAnchor.findAll({ + where: { + discussionId: { [Op.in]: discussions.map((d) => d.id) }, + historyKey: previousRelease.historyKey, + }, + transaction: sequelizeTransaction, + }); + + if (existingAnchors.length === 0) return; + + // If we got steps from Firebase, use them directly + if (flatSteps.length > 0) { await Promise.all( existingAnchors.map((anchor) => createUpdatedDiscussionAnchorForNewSteps( @@ -72,7 +103,76 @@ const createDiscussionAnchorsForRelease = async ( ).catch((err) => console.error('Failed to create updated discussion anchor', err)), ), ); + return; } + + // No steps from Firebase covering the full range — try stored stepMaps from + // the DraftCheckpoint (cold-stored draft) and compose with any new Firebase + // changes that happened after the stepMaps were captured. + const pub = await Pub.findOne({ + where: { id: pubId }, + include: [{ model: Draft, as: 'draft' }], + transaction: sequelizeTransaction, + }); + if (!pub?.draft) return; + + const pgCheckpoint = await getDraftCheckpoint(pub.draft.id, sequelizeTransaction); + if (!pgCheckpoint?.stepMaps?.length || pgCheckpoint.stepMapToKey == null) { + console.warn( + `[release] No steps or stepMaps available for pub ${pubId}, skipping anchor mapping`, + ); + return; + } + + // Compose stored stepMaps (release→stepMapToKey) with any new Firebase + // steps (stepMapToKey+1→currentHistoryKey) that happened after thaw. + let allStepMapRanges = pgCheckpoint.stepMaps!; + if (pgCheckpoint.stepMapToKey < currentHistoryKey) { + try { + const newStepsByChange = await getStepsInChangeRange( + draftRef, + editorSchema, + pgCheckpoint.stepMapToKey + 1, + currentHistoryKey, + ); + const newFlatSteps = newStepsByChange.reduce((a, b) => [...a, ...b], []); + const newRanges = newFlatSteps.map((s) => + Array.from((s.getMap() as any).ranges as number[]), + ); + allStepMapRanges = [...allStepMapRanges, ...newRanges]; + } catch (err) { + console.warn( + `[release] Could not get Firebase steps ${pgCheckpoint.stepMapToKey + 1}→${currentHistoryKey}, using stored stepMaps only`, + err, + ); + } + } + + // Use composed stepMaps to map anchors + await Promise.all( + existingAnchors.map(async (anchor) => { + try { + const nextSelection = mapSelectionThroughStoredStepMaps( + anchor.selection, + allStepMapRanges, + ); + await DiscussionAnchor.create( + { + historyKey: currentHistoryKey, + discussionId: anchor.discussionId, + originalText: anchor.originalText, + originalTextPrefix: anchor.originalTextPrefix, + originalTextSuffix: anchor.originalTextSuffix, + selection: nextSelection, + isOriginal: false, + }, + { transaction: sequelizeTransaction }, + ); + } catch (err) { + console.error('Failed to create discussion anchor from stepMaps', err); + } + }), + ); }; export const createRelease = async ({ diff --git a/server/utils/firebaseAdmin.ts b/server/utils/firebaseAdmin.ts index 3265d737b6..f64bf8f7db 100644 --- a/server/utils/firebaseAdmin.ts +++ b/server/utils/firebaseAdmin.ts @@ -4,7 +4,9 @@ import type { Schema } from 'prosemirror-model'; import type { DocJson, PubDraftInfo } from 'types'; import firebaseAdmin from 'firebase-admin'; -import { type Step, Transform } from 'prosemirror-transform'; +import { uncompressStepJSON } from 'prosemirror-compress-pubpub'; +import { Node } from 'prosemirror-model'; +import { Step, Transform } from 'prosemirror-transform'; import { editorSchema, @@ -12,7 +14,8 @@ import { getFirstKeyAndTimestamp, getLatestKeyAndTimestamp, } from 'components/Editor'; -import { createFirebaseChange, storeCheckpoint } from 'components/Editor/utils'; +import { createFirebaseChange, flattenKeyables } from 'components/Editor/utils'; +import { getDraftCheckpoint } from 'server/draftCheckpoint/queries'; import { Draft, Pub } from 'server/models'; import { expect } from 'utils/assert'; import { getFirebaseConfig } from 'utils/editor/firebaseConfig'; @@ -59,6 +62,18 @@ export const getPubDraftRef = async (pubId: string, sequelizeTransaction: any = return getDatabaseRef(expect(pub.draft).firebasePath); }; +export const getPubDraft = async (pubId: string, sequelizeTransaction: any = null) => { + const pub = expect( + await Pub.findOne({ + where: { id: pubId }, + include: [{ model: Draft, as: 'draft' }], + transaction: sequelizeTransaction, + }), + ); + const draft = expect(pub.draft); + return { draft, draftRef: getDatabaseRef(draft.firebasePath) }; +}; + const maybeAddKeyTimestampPair = (key, timestamp) => { if (typeof key === 'number' && key >= 0) { return { [key]: timestamp }; @@ -66,14 +81,165 @@ const maybeAddKeyTimestampPair = (key, timestamp) => { return null; }; +/** + * Apply Firebase changes on top of a checkpoint doc to produce the current document. + * Used when loading from a Postgres checkpoint with Firebase changes layered on top. + */ +const applyFirebaseChangesOnDoc = async ( + draftRef: firebase.database.Reference, + checkpointDoc: DocJson, + checkpointKey: number, + checkpointTimestamp: number | null, + historyKey: null | number, +) => { + const versionBound = historyKey ?? Infinity; + + const getChanges = draftRef + .child('changes') + .orderByKey() + .startAt(String(checkpointKey + 1)) + .endAt(String(versionBound)) + .once('value'); + + const getMerges = draftRef + .child('merges') + .orderByKey() + .startAt(String(checkpointKey + 1)) + .endAt(String(versionBound)) + .once('value'); + + const [changesSnapshot, mergesSnapshot] = await Promise.all([getChanges, getMerges]); + + const allKeyables = { + ...changesSnapshot.val(), + ...mergesSnapshot.val(), + }; + + const flattenedChanges = flattenKeyables(allKeyables); + const stepsJson = flattenedChanges.flatMap((change) => change.s.map(uncompressStepJSON)); + + const keys = Object.keys(allKeyables); + const currentKey = keys.length + ? keys.map((k) => parseInt(k, 10)).reduce((a, b) => Math.max(a, b)) + : checkpointKey; + + const currentTimestamp = + flattenedChanges.length > 0 + ? flattenedChanges[flattenedChanges.length - 1].t + : checkpointTimestamp; + + let doc = Node.fromJSON(editorSchema, checkpointDoc); + for (const stepJson of stepsJson) { + const step = Step.fromJSON(editorSchema, stepJson); + const { failed, doc: nextDoc } = step.apply(doc); + if (failed) { + console.error(`Failed with: ${failed}`); + } else if (nextDoc) { + doc = nextDoc; + } + } + + return { + doc, + key: currentKey, + timestamp: currentTimestamp as number, + hasFirebaseChanges: stepsJson.length > 0, + }; +}; + export const getPubDraftDoc = async ( pubIdOrRef: string | firebase.database.Reference, historyKey: null | number = null, - createMissingCheckpoints = false, +): Promise => { + // If called with a raw ref (no pub context), fall back to Firebase-only path + if (typeof pubIdOrRef !== 'string') { + return getPubDraftDocFromFirebase(pubIdOrRef, historyKey); + } + + const pubId = pubIdOrRef; + const { draft, draftRef } = await getPubDraft(pubId); + + // Always try Postgres checkpoint first — but only if the requested historyKey + // is at or after the checkpoint. If the user is browsing history before the + // checkpoint, we need the Firebase path which has older changes/checkpoints. + const pgCheckpoint = await getDraftCheckpoint(draft.id); + if (pgCheckpoint && (historyKey === null || historyKey >= pgCheckpoint.historyKey)) { + // Sequelize returns BIGINT as string — coerce to number for Date use + const pgTimestamp = pgCheckpoint.timestamp ? Number(pgCheckpoint.timestamp) : null; + const { + doc, + key: currentKey, + timestamp: currentTimestamp, + } = await applyFirebaseChangesOnDoc( + draftRef, + pgCheckpoint.doc as DocJson, + pgCheckpoint.historyKey, + pgTimestamp, + historyKey, + ); + + // If this draft was cold-stored and has frozen discussions, thaw them + // back into Firebase so the collaborative discussions plugin works. + if (draft.coldCheckpointId && pgCheckpoint.discussions) { + const existingDiscussions = await draftRef.child('discussions').once('value'); + if (!existingDiscussions.val()) { + await draftRef.child('discussions').set(pgCheckpoint.discussions); + } + } + + // Gather timestamps for history UI + const [ + { timestamp: firstTimestamp, key: firstKey }, + { timestamp: latestTimestamp, key: latestKey }, + ] = await Promise.all([ + getFirstKeyAndTimestamp(draftRef).catch(() => ({ + timestamp: currentTimestamp, + key: currentKey, + })), + getLatestKeyAndTimestamp(draftRef).catch(() => ({ + timestamp: currentTimestamp, + key: currentKey, + })), + ]); + + // Use the Postgres checkpoint key as the "first" if Firebase has nothing earlier + const effectiveFirstKey = firstKey >= 0 ? firstKey : pgCheckpoint.historyKey; + const effectiveFirstTimestamp = firstKey >= 0 ? firstTimestamp : pgCheckpoint.timestamp; + const effectiveLatestKey = latestKey >= 0 ? latestKey : currentKey; + const effectiveLatestTimestamp = latestKey >= 0 ? latestTimestamp : currentTimestamp; + + return { + doc: doc.toJSON() as DocJson, + size: doc.content.size, + mostRecentRemoteKey: currentKey, + firstTimestamp: effectiveFirstTimestamp as number, + latestTimestamp: effectiveLatestTimestamp as number, + historyData: { + timestamps: { + ...maybeAddKeyTimestampPair(effectiveFirstKey, effectiveFirstTimestamp), + ...maybeAddKeyTimestampPair(currentKey, currentTimestamp), + ...maybeAddKeyTimestampPair(effectiveLatestKey, effectiveLatestTimestamp), + }, + currentKey, + latestKey: effectiveLatestKey, + }, + }; + } + + // No PG checkpoint — fall back to Firebase-only path (legacy drafts) + return getPubDraftDocFromFirebase(draftRef, historyKey); +}; + +/** + * Original Firebase-only path for loading a draft doc. + */ +const getPubDraftDocFromFirebase = async ( + pubIdOrRef: string | firebase.database.Reference, + historyKey: null | number = null, ): Promise => { const draftRef = typeof pubIdOrRef === 'string' ? await getPubDraftRef(pubIdOrRef) : pubIdOrRef; const [ - { doc, docIsFromCheckpoint, key: currentKey, timestamp: currentTimestamp, checkpointMap }, + { doc, key: currentKey, timestamp: currentTimestamp, checkpointMap }, { timestamp: firstTimestamp, key: firstKey }, { timestamp: latestTimestamp, key: latestKey }, ] = await Promise.all([ @@ -82,10 +248,6 @@ export const getPubDraftDoc = async ( getLatestKeyAndTimestamp(draftRef), ]); - if (!docIsFromCheckpoint && createMissingCheckpoints && currentKey === latestKey) { - storeCheckpoint(draftRef, doc, latestKey); - } - return { doc: doc.toJSON() as DocJson, size: doc.content.size, diff --git a/server/utils/queryHelpers/pubEnrich.ts b/server/utils/queryHelpers/pubEnrich.ts index fa3930e5d2..b4ed64039f 100644 --- a/server/utils/queryHelpers/pubEnrich.ts +++ b/server/utils/queryHelpers/pubEnrich.ts @@ -27,10 +27,11 @@ export const getPubFirebaseDraft = async ( historyKey, ); if (latestTimestamp && pubData.draft) { - await Draft.update( - { latestKeyAt: new Date(latestTimestamp) }, - { where: { id: pubData.draft.id } }, - ); + const keyDate = new Date(latestTimestamp); + // Guard against invalid dates (e.g. BIGINT-as-string from Sequelize) + if (!Number.isNaN(keyDate.getTime())) { + await Draft.update({ latestKeyAt: keyDate }, { where: { id: pubData.draft.id } }); + } } return { diff --git a/tools/bootstrapCheckpoints.ts b/tools/bootstrapCheckpoints.ts new file mode 100644 index 0000000000..509dd22fa4 --- /dev/null +++ b/tools/bootstrapCheckpoints.ts @@ -0,0 +1,627 @@ +/** + * Bootstrap Draft Checkpoints + * + * A one-time migration that: + * 1. Normalizes legacy Firebase paths (pub-{id}/branch-{id} → drafts/draft-{id}) + * 2. Copies the latest checkpoint from Firebase into the DraftCheckpoints Postgres table + * + * After this script runs, every draft with Firebase data will have a Postgres + * checkpoint. New checkpoints are written directly to Postgres by the client, + * so this script only needs to run once. + * + * Safety: + * - Dry run by default (use --execute to actually write) + * - Single draft with --draftId= + * - Concurrency control with --concurrency=N (default 20, for path normalization) + * - Separate --extractConcurrency=N for checkpoint extraction (default 20) + * + * Usage: + * pnpm run tools bootstrapCheckpoints # Dry run + * pnpm run tools bootstrapCheckpoints --execute # Actually migrate + * pnpm run tools bootstrapCheckpoints --draftId= # Single draft + * pnpm run tools bootstrapCheckpoints --execute --prod # Run against prod + * pnpm run tools bootstrapCheckpoints --execute --extractConcurrency=3 # Slower but safer + */ + +import type firebase from 'firebase'; + +import firebaseAdmin from 'firebase-admin'; +import { Op, QueryTypes } from 'sequelize'; + +import { editorSchema, getFirebaseDoc, getStepsInChangeRange } from 'components/Editor'; +import { Draft, DraftCheckpoint, Pub, Release } from 'server/models'; +import { sequelize } from 'server/sequelize'; +import { getDatabaseRef } from 'server/utils/firebaseAdmin'; +import { getFirebaseConfig } from 'utils/editor/firebaseConfig'; + +const { + argv: { + execute, + draftId: specificDraftId, + concurrency: concurrencyArg = 20, + extractConcurrency: extractConcurrencyArg = 20, + verbose: verboseFlag, + skipPathNormalization, + }, +} = require('yargs'); + +const isDryRun = !execute; +// Path normalization copies entire Firebase trees (all checkpoints, changes, +// discussions) so even this needs conservative concurrency to avoid +// overwhelming Firebase or OOMing the container. +const CONCURRENCY = Number(concurrencyArg); +const EXTRACT_CONCURRENCY = Number(extractConcurrencyArg); + +// biome-ignore lint/suspicious/noConsole: CLI tool output +const log = (msg: string) => console.log(`[bootstrap] ${new Date().toISOString()} ${msg}`); +const verbose = (msg: string) => verboseFlag && log(msg); + +// --- Stats --- + +interface BootstrapStats { + pathsNormalized: number; + pathsSkipped: number; + pathsFailed: number; + checkpointsCreated: number; + checkpointsSkippedEmpty: number; + checkpointsSkippedExisting: number; + checkpointsFailed: number; + totalDrafts: number; +} + +const stats: BootstrapStats = { + pathsNormalized: 0, + pathsSkipped: 0, + pathsFailed: 0, + checkpointsCreated: 0, + checkpointsSkippedEmpty: 0, + checkpointsSkippedExisting: 0, + checkpointsFailed: 0, + totalDrafts: 0, +}; + +// --- Concurrency helper --- + +let totalCompleted = 0; + +const runWithConcurrency = async ( + tasks: (() => Promise)[], + concurrency: number, + progressLabel?: string, +): Promise => { + const results: T[] = []; + let index = 0; + const total = tasks.length; + const inFlight = new Set(); + + // Periodically log progress so we can detect hangs + const heartbeat = setInterval(() => { + if (inFlight.size > 0) { + log( + ` [${progressLabel ?? '?'}] heartbeat: ${totalCompleted}/${total} done, ${inFlight.size} in flight`, + ); + } + }, 30_000); + + const worker = async (): Promise => { + while (index < tasks.length) { + const currentIndex = index++; + inFlight.add(currentIndex); + try { + // biome-ignore lint/performance/noAwaitInLoops: worker pool pattern + results[currentIndex] = await tasks[currentIndex](); + } catch (err: any) { + log(` [${progressLabel ?? '?'}] task ${currentIndex} failed: ${err.message}`); + results[currentIndex] = undefined as any; + } + inFlight.delete(currentIndex); + totalCompleted++; + if (progressLabel && totalCompleted % 500 === 0) { + log(` [${progressLabel}] ${totalCompleted}/${total} done`); + } + } + }; + totalCompleted = 0; + try { + await Promise.all( + Array.from({ length: Math.min(concurrency, tasks.length) }, () => worker()), + ); + } finally { + clearInterval(heartbeat); + } + return results; +}; + +// --- Firebase REST helpers (avoid SDK WebSocket throttling) --- +// All Phase 1 operations use REST to avoid the SDK's shared persistent +// WebSocket connection, which Firebase silently throttles under load. + +let cachedAccessToken: { token: string; expiresAt: number } | null = null; + +const getAccessToken = async (): Promise => { + const now = Date.now(); + if (cachedAccessToken && cachedAccessToken.expiresAt > now + 60_000) { + return cachedAccessToken.token; + } + const credential = firebaseAdmin.credential.cert( + JSON.parse( + Buffer.from(process.env.FIREBASE_SERVICE_ACCOUNT_BASE64 as string, 'base64').toString(), + ), + ); + const tokenResult = await credential.getAccessToken(); + cachedAccessToken = { + token: tokenResult.access_token, + expiresAt: now + (tokenResult.expires_in ?? 3600) * 1000, + }; + return cachedAccessToken.token; +}; + +const REST_TIMEOUT_MS = 60_000; // 60s for REST (longer than SDK since these actually complete) +const REST_MAX_RETRIES = 5; + +/** + * General-purpose Firebase REST API helper. + * Each call is an independent HTTP request — no shared WebSocket. + */ +const firebaseRest = async ( + method: 'GET' | 'PUT' | 'PATCH' | 'DELETE', + path: string, + body?: any, + queryParams?: Record, +): Promise => { + const databaseURL = getFirebaseConfig().databaseURL; + + for (let attempt = 1; attempt <= REST_MAX_RETRIES; attempt++) { + // biome-ignore lint/performance/noAwaitInLoops: retry loop + const accessToken = await getAccessToken(); + const params = new URLSearchParams({ access_token: accessToken, ...queryParams }); + const url = `${databaseURL}/${path}.json?${params}`; + + try { + const options: RequestInit = { + method, + signal: AbortSignal.timeout(REST_TIMEOUT_MS), + }; + if (body !== undefined) { + options.headers = { 'Content-Type': 'application/json' }; + options.body = JSON.stringify(body); + } + const response = await fetch(url, options); + if (!response.ok) { + const text = await response.text(); + throw new Error(`Firebase REST ${method} ${response.status}: ${text}`); + } + if (method === 'DELETE') return null as T; + return (await response.json()) as T; + } catch (error: any) { + // Don't retry deterministic errors like WRITE_TOO_BIG + const errMsg = error?.message || String(error); + if ( + errMsg.includes('Data to write exceeds') || + errMsg.includes('WRITE_TOO_BIG') || + errMsg.includes('write_too_big') + ) { + throw error; + } + if (attempt === REST_MAX_RETRIES) throw error; + const delay = Math.min(2000 * 2 ** attempt, 30_000); + log( + ` [rest] ${method} ${path}: attempt ${attempt} failed, retrying in ${delay / 1000}s (${errMsg})`, + ); + await new Promise((r) => setTimeout(r, delay)); + } + } + throw new Error('unreachable'); +}; + +const getShallowKeys = async (path: string): Promise => { + const data = await firebaseRest | null>('GET', path, undefined, { + shallow: 'true', + }); + if (!data || typeof data !== 'object') return []; + return Object.keys(data); +}; + +// --- Phase 1: Path normalization --- + +const isLegacyPath = (path: string): boolean => { + return /^pub-[^/]+\/branch-[^/]+$/.test(path); +}; + +const getModernPath = (draftId: string): string => { + return `drafts/draft-${draftId}`; +}; + +/** + * Copy Firebase data from source to dest, child-by-child. + * Uses shallow key listing so we never load the entire tree into memory. + * For children that are themselves large key-value maps (changes, merges, + * checkpoints), we copy in paginated batches. + */ +const LARGE_CHILDREN = new Set(['changes', 'merges', 'checkpoints', 'checkpointMap']); +const COPY_BATCH_SIZE = 500; + +const copyFirebaseChild = async ( + sourcePath: string, + destPath: string, + childKey: string, +): Promise => { + const srcChildPath = `${sourcePath}/${childKey}`; + const dstChildPath = `${destPath}/${childKey}`; + + if (!LARGE_CHILDREN.has(childKey)) { + // Small child — copy in one shot via REST + const data = await firebaseRest('GET', srcChildPath); + if (data != null) { + await firebaseRest('PUT', dstChildPath, data); + } + return; + } + + // Large child — copy in paginated batches via REST + const keys = await getShallowKeys(srcChildPath); + if (keys.length === 0) return; + + // Sort keys numerically + const sortedKeys = keys + .map((k) => parseInt(k, 10)) + .filter((k) => !Number.isNaN(k)) + .sort((a, b) => a - b); + + if (sortedKeys.length > COPY_BATCH_SIZE) { + verbose( + ` ${childKey}: ${sortedKeys.length} entries, copying in batches of ${COPY_BATCH_SIZE}`, + ); + } + + for (let i = 0; i < sortedKeys.length; i += COPY_BATCH_SIZE) { + const batchStart = sortedKeys[i]; + const batchEnd = sortedKeys[Math.min(i + COPY_BATCH_SIZE - 1, sortedKeys.length - 1)]; + + // REST read with orderBy/startAt/endAt query params + // biome-ignore lint/performance/noAwaitInLoops: sequential batch copy + const data = await firebaseRest | any[] | null>( + 'GET', + srcChildPath, + undefined, + { + orderBy: '"$key"', + startAt: `"${batchStart}"`, + endAt: `"${batchEnd}"`, + }, + ); + + // REST may return a sparse array for consecutive integer keys. + // Normalize to a plain object so PATCH doesn't choke on undefined holes. + let updates: Record = {}; + if (Array.isArray(data)) { + for (let idx = 0; idx < data.length; idx++) { + if (data[idx] !== undefined && data[idx] !== null) { + updates[String(idx)] = data[idx]; + } + } + } else if (data && typeof data === 'object') { + updates = data; + } + + if (Object.keys(updates).length > 0) { + try { + // PATCH = multi-path update + await firebaseRest('PATCH', dstChildPath, updates); + } catch (patchErr: any) { + const patchMsg = patchErr?.message || String(patchErr); + if ( + patchMsg.includes('Data to write exceeds') || + patchMsg.includes('WRITE_TOO_BIG') || + patchMsg.includes('write_too_big') + ) { + // Batch too large — fall back to writing each key individually via PUT + verbose( + ` ${childKey}: batch ${batchStart}-${batchEnd} too large, copying individually`, + ); + for (const [key, value] of Object.entries(updates)) { + // biome-ignore lint/performance/noAwaitInLoops: sequential fallback + await firebaseRest('PUT', `${dstChildPath}/${key}`, value); + } + } else { + throw patchErr; + } + } + } + if (sortedKeys.length > COPY_BATCH_SIZE) { + verbose( + ` ${childKey}: copied ${Math.min(i + COPY_BATCH_SIZE, sortedKeys.length)}/${sortedKeys.length}`, + ); + } + } +}; + +const copyFirebaseData = async (sourcePath: string, destPath: string): Promise => { + const topKeys = await getShallowKeys(sourcePath); + if (topKeys.length === 0) return false; + + for (const childKey of topKeys) { + // biome-ignore lint/performance/noAwaitInLoops: sequential copy of children + await copyFirebaseChild(sourcePath, destPath, childKey); + } + return true; +}; + +/** + * Recursively delete a Firebase path, handling WRITE_TOO_BIG by shallowly + * listing children and deleting them individually (or recursing further). + * For large key-value children, uses batch multi-path updates. + */ +const DELETE_BATCH_SIZE = 2500; + +const deleteFirebasePath = async (path: string): Promise => { + try { + await firebaseRest('DELETE', path); + } catch (err: any) { + const msg = err?.message || String(err); + if (!msg.includes('WRITE_TOO_BIG') && !msg.includes('write_too_big')) throw err; + + verbose(` ${path} too large, deleting in batches`); + const childKeys = await getShallowKeys(path); + + // Try batch multi-path PATCH with nulls (much faster than individual deletes) + for (let i = 0; i < childKeys.length; i += DELETE_BATCH_SIZE) { + const batch = childKeys.slice(i, i + DELETE_BATCH_SIZE); + const updates: Record = {}; + for (const key of batch) { + updates[key] = null; + } + try { + // biome-ignore lint/performance/noAwaitInLoops: batched deletion + await firebaseRest('PATCH', path, updates); + } catch (batchErr: any) { + const batchMsg = batchErr?.message || String(batchErr); + if (batchMsg.includes('WRITE_TOO_BIG') || batchMsg.includes('write_too_big')) { + // Batch too large, fall back to individual recursive deletes + for (const key of batch) { + // biome-ignore lint/performance/noAwaitInLoops: sequential fallback + await deleteFirebasePath(`${path}/${key}`); + } + } else { + throw batchErr; + } + } + } + + // Delete the now-empty parent + await firebaseRest('DELETE', path); + } +}; + +const normalizePath = async (draft: Draft): Promise => { + const { id, firebasePath } = draft; + if (!isLegacyPath(firebasePath)) { + stats.pathsSkipped++; + return; + } + + const modernPath = getModernPath(id); + + if (isDryRun) { + verbose(` [path] Would normalize ${id}: ${firebasePath} → ${modernPath}`); + stats.pathsNormalized++; + return; + } + + try { + const hasData = await copyFirebaseData(firebasePath, modernPath); + if (!hasData) { + verbose(` [path] ${id}: no data at ${firebasePath}, updating path only`); + } + + await draft.update({ firebasePath: modernPath }); + + if (hasData) { + await deleteFirebasePath(firebasePath); + } + + stats.pathsNormalized++; + verbose(` [path] Normalized ${id}`); + } catch (err: any) { + log(` [path] ERROR ${id}: ${err.message}`); + stats.pathsFailed++; + } +}; + +// --- Phase 2: Checkpoint extraction --- + +const extractCheckpoint = async (draft: Draft): Promise => { + const { id: draftId, firebasePath } = draft; + + // Skip if a checkpoint already exists + const existing = await DraftCheckpoint.findOne({ where: { draftId } }); + if (existing) { + verbose(` [ckpt] ${draftId}: already has PG checkpoint at key ${existing.historyKey}`); + stats.checkpointsSkippedExisting++; + return; + } + + // Quick REST check: if the Firebase path has no data at all, skip. + // This avoids the expensive SDK getFirebaseDoc call on re-runs + // for drafts we've already determined are empty. + const topKeys = await getShallowKeys(firebasePath); + if (topKeys.length === 0) { + verbose(` [ckpt] ${draftId}: no data in Firebase, skipping`); + stats.checkpointsSkippedEmpty++; + return; + } + + try { + const draftRef = getDatabaseRef(firebasePath); + + // Read the full current doc from Firebase (checkpoint + changes) + const { + doc, + key: currentKey, + timestamp: currentTimestamp, + } = await getFirebaseDoc(draftRef, editorSchema); + + if (currentKey < 0) { + verbose(` [ckpt] ${draftId}: no history in Firebase`); + stats.checkpointsSkippedEmpty++; + return; + } + + const docJson = doc.toJSON(); + + // Compute stepMaps from latest release if applicable + let stepMaps: number[][] | null = null; + const pub = await Pub.findOne({ + where: { draftId }, + attributes: ['id'], + }); + if (pub) { + const latestRelease = await Release.findOne({ + where: { pubId: pub.id }, + attributes: ['historyKey'], + order: [['historyKey', 'DESC']], + }); + if (latestRelease && latestRelease.historyKey < currentKey) { + try { + const stepsByChange = await getStepsInChangeRange( + draftRef, + editorSchema, + latestRelease.historyKey + 1, + currentKey, + ); + const allSteps = stepsByChange.reduce((a, b) => [...a, ...b], []); + if (allSteps.length > 0) { + stepMaps = allSteps.map((step) => + Array.from((step.getMap() as any).ranges as number[]), + ); + } + } catch { + verbose(` [ckpt] ${draftId}: could not compute stepMaps (ok, non-fatal)`); + } + } + } + + if (isDryRun) { + verbose( + ` [ckpt] Would create checkpoint for ${draftId}: key=${currentKey}, size=${JSON.stringify(docJson).length}B, stepMaps=${stepMaps?.length ?? 0}`, + ); + stats.checkpointsCreated++; + return; + } + + await DraftCheckpoint.create({ + draftId, + historyKey: currentKey, + doc: docJson, + timestamp: currentTimestamp, + stepMaps, + stepMapToKey: stepMaps ? currentKey : null, + }); + + // Backfill latestKeyAt if it's null — prevents cold storage from treating + // this draft as "never tracked" and freezing it immediately. + if (!draft.latestKeyAt && currentTimestamp) { + await draft.update({ latestKeyAt: new Date(currentTimestamp) }); + } + + stats.checkpointsCreated++; + verbose(` [ckpt] Created checkpoint for ${draftId} at key ${currentKey}`); + } catch (err: any) { + log(` [ckpt] ERROR ${draftId}: ${err.message}`); + stats.checkpointsFailed++; + } +}; + +// --- Main --- + +const main = async () => { + log('Bootstrap Draft Checkpoints'); + log(`Mode: ${isDryRun ? 'DRY RUN' : 'EXECUTE'}`); + log( + `Concurrency: ${CONCURRENCY} (path normalization), ${EXTRACT_CONCURRENCY} (checkpoint extraction)`, + ); + log(''); + + // Load all drafts + const whereClause: any = {}; + if (specificDraftId) { + whereClause.id = specificDraftId; + } + + const drafts = await Draft.findAll({ + where: whereClause, + order: [['id', 'ASC']], + }); + + stats.totalDrafts = drafts.length; + log(`Found ${drafts.length} drafts`); + + // Phase 1: Normalize legacy paths (rolling concurrency) + if (!skipPathNormalization) { + const legacyDrafts = drafts.filter((d) => isLegacyPath(d.firebasePath)); + log(''); + log( + `Phase 1: Path normalization (${legacyDrafts.length} legacy paths, concurrency=${CONCURRENCY})`, + ); + + await runWithConcurrency( + legacyDrafts.map((draft) => () => normalizePath(draft)), + CONCURRENCY, + 'path', + ); + log( + ` Normalized: ${stats.pathsNormalized}, Skipped: ${stats.pathsSkipped}, Failed: ${stats.pathsFailed}`, + ); + } else { + log('Phase 1: Skipped (--skipPathNormalization)'); + } + + // Phase 2: Extract checkpoints (rolling concurrency) + log(''); + + // Reload drafts to get updated paths after normalization + const updatedDrafts = await Draft.findAll({ + where: whereClause, + order: [['id', 'ASC']], + }); + + log( + `Phase 2: Extract checkpoints to Postgres (${updatedDrafts.length} drafts, concurrency=${EXTRACT_CONCURRENCY})`, + ); + + await runWithConcurrency( + updatedDrafts.map((draft) => () => extractCheckpoint(draft)), + EXTRACT_CONCURRENCY, + 'ckpt', + ); + + // Summary + log(''); + log('='.repeat(60)); + log('Summary'); + log('='.repeat(60)); + log(`Total drafts: ${stats.totalDrafts}`); + log(''); + log('Path normalization:'); + log(` Normalized: ${stats.pathsNormalized}`); + log(` Skipped (modern): ${stats.pathsSkipped}`); + log(` Failed: ${stats.pathsFailed}`); + log(''); + log('Checkpoint extraction:'); + log(` Created: ${stats.checkpointsCreated}`); + log(` Skipped (existing): ${stats.checkpointsSkippedExisting}`); + log(` Skipped (empty): ${stats.checkpointsSkippedEmpty}`); + log(` Failed: ${stats.checkpointsFailed}`); + + if (isDryRun) { + log(''); + log('This was a DRY RUN. Use --execute to apply changes.'); + } + + process.exit(0); +}; + +main().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/tools/cleanupFirebase.ts b/tools/cleanupFirebase.ts index 035704bbdd..9016046784 100644 --- a/tools/cleanupFirebase.ts +++ b/tools/cleanupFirebase.ts @@ -30,8 +30,9 @@ import { } from 'prosemirror-compress-pubpub'; import { QueryTypes } from 'sequelize'; -import { editorSchema, getFirebaseDoc } from 'components/Editor'; +import { editorSchema, getFirebaseDoc, getStepsInChangeRange } from 'components/Editor'; import { createFastForwarder } from 'components/Editor/plugins/discussions/fastForward'; +import { getDraftCheckpoint } from 'server/draftCheckpoint/queries'; import { Doc, Draft, Pub, Release } from 'server/models'; import { sequelize } from 'server/sequelize'; import { getDatabaseRef } from 'server/utils/firebaseAdmin'; @@ -512,10 +513,96 @@ const pruneDraft = async ( preloadedReleaseKey: number | null = null, label: string = '', localStats: CleanupStats = stats, + draftId: string | null = null, ): Promise => { const prefix = label ? `[${label}] ` : ' '; const draftRef = getDatabaseRef(firebasePath); + // --- PG-checkpoint-aware fast path --- + // If a PG checkpoint exists, it is the source of truth for the doc. + // We can safely prune all Firebase changes/checkpoints before the PG key. + // Before pruning, capture stepMaps for the range we're about to delete. + if (draftId) { + const pgCheckpoint = await getDraftCheckpoint(draftId); + if (pgCheckpoint) { + const pgKey = pgCheckpoint.historyKey; + verbose(`${prefix}PG checkpoint at key ${pgKey}, using PG-aware prune`); + + // Before pruning, ensure stepMaps cover the range we're about to delete. + // If there's a release, we need stepMaps from release→pgKey. + const latestReleaseKey = + preloadedReleaseKey ?? (pubId ? await getLatestReleaseHistoryKey(pubId) : null); + + if (latestReleaseKey !== null && latestReleaseKey < pgKey) { + const existingToKey = pgCheckpoint.stepMapToKey; + // Only capture new stepMaps if there's a gap + const captureFrom = + existingToKey != null ? existingToKey + 1 : latestReleaseKey + 1; + + if (captureFrom <= pgKey) { + try { + const stepsByChange = await getStepsInChangeRange( + draftRef, + editorSchema, + captureFrom, + pgKey, + ); + const allSteps = stepsByChange.reduce((a, b) => [...a, ...b], []); + if (allSteps.length > 0) { + const newMaps = allSteps.map((step) => + Array.from((step.getMap() as any).ranges as number[]), + ); + const composedMaps = [...(pgCheckpoint.stepMaps ?? []), ...newMaps]; + verbose( + `${prefix}Capturing ${newMaps.length} stepMaps (${captureFrom}→${pgKey}) before prune`, + ); + + if (!isDryRun) { + await pgCheckpoint.update({ + stepMaps: composedMaps, + stepMapToKey: pgKey, + }); + } + } + } catch (err: any) { + log( + `${prefix}Warning: could not capture stepMaps before prune: ${err.message}`, + ); + } + } + } + + // Prune changes, merges, and old Firebase checkpoints before the PG key + const [changesDeleted, mergesDeleted, checkpointsDeleted] = await Promise.all([ + pruneKeysBefore(draftRef, 'changes', pgKey), + pruneKeysBefore(draftRef, 'merges', pgKey), + pruneKeysBefore(draftRef, 'checkpoints', pgKey + 1), // remove ALL Firebase checkpoints at or below pgKey + ]); + localStats.changesDeleted += changesDeleted; + localStats.mergesDeleted += mergesDeleted; + localStats.checkpointsDeleted += checkpointsDeleted; + + // Clean up Firebase checkpointMap entries and deprecated singular checkpoint + if (!isDryRun) { + const checkpointKeys = await getCheckpointKeys(draftRef); + if (checkpointKeys.length > 0) { + const updates: Record = {}; + for (const k of checkpointKeys) { + updates[String(k)] = null; + } + await draftRef.child('checkpointMap').update(updates); + } + await draftRef.child('checkpoint').remove(); + } + + verbose( + `${prefix}PG-aware prune: deleted ${changesDeleted} changes, ${mergesDeleted} merges, ${checkpointsDeleted} checkpoints`, + ); + return; + } + } + + // --- Legacy path: no PG checkpoint, use Firebase checkpoints --- const latestCheckpointKey = await getLatestCheckpointKey(draftRef); if (latestCheckpointKey === null) { @@ -524,7 +611,7 @@ const pruneDraft = async ( return; } - verbose(`${prefix}Latest checkpoint key: ${latestCheckpointKey}`); + verbose(`${prefix}Latest Firebase checkpoint key: ${latestCheckpointKey}`); // Determine safe prune threshold: min(latestCheckpointKey, latestReleaseHistoryKey) // - We need changes from latestCheckpointKey onwards to reconstruct the doc @@ -919,7 +1006,7 @@ const processPubDraft = async (pubId: string): Promise => { verbose(`[${pubLabel}] Draft path: ${pub.draft.firebasePath}`); try { - await pruneDraft(pub.draft.firebasePath, pubId, null, pubLabel); + await pruneDraft(pub.draft.firebasePath, pubId, null, pubLabel, stats, pub.draft.id); await cleanupOrphanedBranchesForPub(pubId, pub.draft.firebasePath); stats.draftsProcessed++; } catch (err) { @@ -947,7 +1034,7 @@ const processDraftById = async (draftId: string): Promise => { verbose(`[${draftLabel}] Draft path: ${draft.firebasePath}`); try { - await pruneDraft(draft.firebasePath, pub?.id ?? null, null, draftLabel); + await pruneDraft(draft.firebasePath, pub?.id ?? null, null, draftLabel, stats, draft.id); stats.draftsProcessed++; } catch (err) { log(` Error processing draft: ${(err as Error).message}`); @@ -1072,7 +1159,14 @@ const processAllDrafts = async (): Promise => { const pubLabel = pub.slug || pub.id.slice(0, 8); verbose(`[W${workerId}:${pubLabel}] Processing...`); const releaseKey = releaseKeyCache.get(pub.id) ?? null; - await pruneDraft(pub.draft!.firebasePath, pub.id, releaseKey, pubLabel, localStats); + await pruneDraft( + pub.draft!.firebasePath, + pub.id, + releaseKey, + pubLabel, + localStats, + pub.draft!.id, + ); await cleanupOrphanedBranchesForPub(pub.id, pub.draft!.firebasePath, localStats); localStats.draftsProcessed++; totalProcessed++; diff --git a/tools/coldStorage.ts b/tools/coldStorage.ts new file mode 100644 index 0000000000..59be390246 --- /dev/null +++ b/tools/coldStorage.ts @@ -0,0 +1,580 @@ +/** + * Cold Storage Tool + * + * Moves inactive drafts from Firebase to Postgres by: + * 1. Finding drafts not edited within a threshold (default: 30 days) + * 2. Building a checkpoint from the current Firebase state (checkpoint + changes) + * 3. Storing that checkpoint in the DraftCheckpoints Postgres table + * 4. Wiping all data from the Firebase path (changes, checkpoints, merges, etc.) + * 5. Setting coldCheckpointId on the Draft row + * + * When a user next opens a cold-stored draft, the server loads the checkpoint + * from Postgres and the client connects to an empty Firebase ref — ready for + * new edits. A future run of this tool will re-checkpoint those new edits. + * + * This tool is safe to run repeatedly. Drafts already cold-stored (with an + * empty Firebase ref) are skipped. + * + * Usage: + * pnpm run tools coldStorage # Dry run, all stale drafts + * pnpm run tools coldStorage --execute # Actually migrate + wipe + * pnpm run tools coldStorage --daysOld=60 # Custom threshold + * pnpm run tools coldStorage --pubId= # Single pub + * pnpm run tools coldStorage --execute --prod # Run against prod + */ + +import type firebase from 'firebase'; + +import firebaseAdmin from 'firebase-admin'; +import { uncompressSelectionJSON } from 'prosemirror-compress-pubpub'; +import { Op, QueryTypes } from 'sequelize'; + +import { editorSchema, getFirebaseDoc, getStepsInChangeRange } from 'components/Editor'; +import { getDraftCheckpoint } from 'server/draftCheckpoint/queries'; +import { Draft, DraftCheckpoint, Pub, Release } from 'server/models'; +import { sequelize } from 'server/sequelize'; +import { getDatabaseRef, getPubDraftDoc } from 'server/utils/firebaseAdmin'; +import { getFirebaseConfig } from 'utils/editor/firebaseConfig'; + +const { + argv: { + execute, + pubId: specificPubId, + daysOld: daysOldArg = 30, + batchSize: batchSizeArg = 100, + concurrency: concurrencyArg = 10, + verbose: verboseFlag, + }, +} = require('yargs'); + +const isDryRun = !execute; +const DAYS_OLD = Number(daysOldArg); +const BATCH_SIZE = Number(batchSizeArg); +const CONCURRENCY = Number(concurrencyArg); + +// biome-ignore lint/suspicious/noConsole: CLI tool output +const log = (msg: string) => console.log(`[cold-storage] ${new Date().toISOString()} ${msg}`); +const verbose = (msg: string) => verboseFlag && log(msg); + +// --- Firebase REST helpers (avoid SDK WebSocket throttling) --- + +let cachedAccessToken: { token: string; expiresAt: number } | null = null; + +const getAccessToken = async (): Promise => { + const now = Date.now(); + if (cachedAccessToken && cachedAccessToken.expiresAt > now + 60_000) { + return cachedAccessToken.token; + } + const credential = firebaseAdmin.credential.cert( + JSON.parse( + Buffer.from(process.env.FIREBASE_SERVICE_ACCOUNT_BASE64 as string, 'base64').toString(), + ), + ); + const tokenResult = await credential.getAccessToken(); + cachedAccessToken = { + token: tokenResult.access_token, + expiresAt: now + (tokenResult.expires_in ?? 3600) * 1000, + }; + return cachedAccessToken.token; +}; + +const REST_TIMEOUT_MS = 60_000; +const REST_MAX_RETRIES = 5; + +/** + * General-purpose Firebase REST API helper. + * Each call is an independent HTTP request — no shared WebSocket. + */ +const firebaseRest = async ( + method: 'GET' | 'PUT' | 'PATCH' | 'DELETE', + path: string, + body?: any, + queryParams?: Record, +): Promise => { + const databaseURL = getFirebaseConfig().databaseURL; + + for (let attempt = 1; attempt <= REST_MAX_RETRIES; attempt++) { + // biome-ignore lint/performance/noAwaitInLoops: retry loop + const accessToken = await getAccessToken(); + const params = new URLSearchParams({ access_token: accessToken, ...queryParams }); + const url = `${databaseURL}/${path}.json?${params}`; + + try { + const options: RequestInit = { + method, + signal: AbortSignal.timeout(REST_TIMEOUT_MS), + }; + if (body !== undefined) { + options.headers = { 'Content-Type': 'application/json' }; + options.body = JSON.stringify(body); + } + const response = await fetch(url, options); + if (!response.ok) { + const text = await response.text(); + throw new Error(`Firebase REST ${method} ${response.status}: ${text}`); + } + if (method === 'DELETE') return null as T; + return (await response.json()) as T; + } catch (error: any) { + if (attempt === REST_MAX_RETRIES) throw error; + const delay = Math.min(2000 * 2 ** attempt, 30_000); + log( + ` [rest] ${method} ${path}: attempt ${attempt} failed, retrying in ${delay / 1000}s (${error.message})`, + ); + await new Promise((r) => setTimeout(r, delay)); + } + } + throw new Error('unreachable'); +}; + +/** + * List child keys at a Firebase path using REST API with ?shallow=true. + * Never downloads the actual content, so safe for huge nodes. + */ +const getShallowKeys = async (ref: firebase.database.Reference): Promise => { + const refPath = ref.toString().replace(/^https:\/\/[^/]+\//, ''); + const data = await firebaseRest | null>('GET', refPath, undefined, { + shallow: 'true', + }); + if (!data || typeof data !== 'object') return []; + return Object.keys(data); +}; + +/** + * Recursively delete a Firebase path via REST, handling WRITE_TOO_BIG errors + * by listing children shallowly and batch-deleting with multi-path PATCH. + */ +const DELETE_BATCH_SIZE = 2500; + +const deleteFirebasePath = async (path: string): Promise => { + try { + await firebaseRest('DELETE', path); + } catch (error: any) { + const msg = error?.message || String(error); + if (!msg.includes('WRITE_TOO_BIG') && !msg.includes('write_too_big')) throw error; + + verbose(` ${path} too large, deleting in batches`); + const childKeys = await firebaseRest | null>('GET', path, undefined, { + shallow: 'true', + }); + if (!childKeys || typeof childKeys !== 'object') return; + const keys = Object.keys(childKeys); + + for (let i = 0; i < keys.length; i += DELETE_BATCH_SIZE) { + const batch = keys.slice(i, i + DELETE_BATCH_SIZE); + const updates: Record = {}; + for (const key of batch) { + updates[key] = null; + } + try { + // biome-ignore lint/performance/noAwaitInLoops: batched deletion + await firebaseRest('PATCH', path, updates); + } catch (batchErr: any) { + const batchMsg = batchErr?.message || String(batchErr); + if (batchMsg.includes('WRITE_TOO_BIG') || batchMsg.includes('write_too_big')) { + for (const key of batch) { + // biome-ignore lint/performance/noAwaitInLoops: sequential fallback + await deleteFirebasePath(`${path}/${key}`); + } + } else { + throw batchErr; + } + } + } + + // Delete the now-empty parent + await firebaseRest('DELETE', path); + } +}; + +// --- Concurrency helper --- + +const runWithConcurrency = async ( + tasks: (() => Promise)[], + concurrency: number, +): Promise => { + const results: T[] = []; + let index = 0; + const worker = async (): Promise => { + while (index < tasks.length) { + const currentIndex = index++; + // biome-ignore lint/performance/noAwaitInLoops: worker pool pattern + results[currentIndex] = await tasks[currentIndex](); + } + }; + await Promise.all(Array.from({ length: Math.min(concurrency, tasks.length) }, () => worker())); + return results; +}; + +// --- Stats --- + +interface ColdStorageStats { + draftsScanned: number; + draftsAlreadyCold: number; + draftsEmpty: number; + draftsFrozen: number; + draftsSkippedError: number; + bytesFreed: number; +} + +const stats: ColdStorageStats = { + draftsScanned: 0, + draftsAlreadyCold: 0, + draftsEmpty: 0, + draftsFrozen: 0, + draftsSkippedError: 0, + bytesFreed: 0, +}; + +const formatBytes = (bytes: number): string => { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`; + if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MB`; + return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)} GB`; +}; + +// --- Core logic --- + +/** + * Freeze a single draft: build checkpoint from Firebase, store in Postgres, wipe Firebase. + * + * Steps: + * 1. Build the current doc from Firebase (checkpoint + changes) + * 2. Collect Firebase discussions and fast-forward them to the current key + * 3. Compute cumulative StepMaps from latest release to current key + * 4. Store checkpoint, discussions, and stepMaps in Postgres + * 5. Update latestKeyAt on the Draft + * 6. Wipe all Firebase data for this draft + */ +const freezeDraft = async (draft: Draft, pubId: string): Promise => { + const { id: draftId, firebasePath } = draft; + const prefix = `[${draftId.slice(0, 8)}]`; + + try { + const draftRef = getDatabaseRef(firebasePath); + + // Check if Firebase has any data at all + const topLevelKeys = await getShallowKeys(draftRef); + if (topLevelKeys.length === 0) { + verbose(`${prefix} Firebase path empty, marking as already cold`); + stats.draftsAlreadyCold++; + return; + } + + // Build the current doc using PG-first logic (handles previously cold-stored drafts + // where Firebase only has changes since the last thaw, not the full history). + const draftDocInfo = await getPubDraftDoc(pubId); + const currentKey = draftDocInfo.mostRecentRemoteKey; + const currentTimestamp = draftDocInfo.latestTimestamp; + + if (currentKey < 0) { + verbose(`${prefix} No history found (empty doc, no changes)`); + stats.draftsEmpty++; + return; + } + + const docJson = draftDocInfo.doc; + const docSize = JSON.stringify(docJson).length; + + verbose(`${prefix} Doc at key ${currentKey}, size ${formatBytes(docSize)}`); + + // Collect and fast-forward Firebase discussions (via REST to avoid WebSocket throttling) + let frozenDiscussions: Record | null = null; + const rawDiscussions = await firebaseRest | null>( + 'GET', + `${firebasePath}/discussions`, + ); + if (rawDiscussions && typeof rawDiscussions === 'object') { + // Fast-forward outdated discussions to currentKey using step maps. + // Discussions whose currentKey matches are already at the latest position. + const outdatedIds = Object.entries(rawDiscussions).filter( + ([_, d]: [string, any]) => + d && typeof d.currentKey === 'number' && d.currentKey < currentKey, + ); + + if (outdatedIds.length > 0) { + // Only fetch steps that are actually available in Firebase. + // After a previous cold storage, early changes are gone — the PG + // checkpoint key is the earliest we can fetch from. + const existingCheckpoint = await getDraftCheckpoint(draftId); + const earliestFirebaseKey = existingCheckpoint ? existingCheckpoint.historyKey : 0; + const mostOutdatedKey = Math.max( + earliestFirebaseKey, + Math.min(...outdatedIds.map(([_, d]: [string, any]) => d.currentKey)), + ); + verbose( + `${prefix} Fast-forwarding ${outdatedIds.length} discussions from key ${mostOutdatedKey}`, + ); + + try { + const stepsByChange = await getStepsInChangeRange( + draftRef, + editorSchema, + mostOutdatedKey + 1, + currentKey, + ); + const allSteps = stepsByChange.reduce((a, b) => [...a, ...b], []); + + for (const [id, discussion] of outdatedIds) { + const disc = discussion as any; + if (disc.selection) { + const sel = + disc.selection.a !== undefined + ? uncompressSelectionJSON(disc.selection) + : disc.selection; + + let from = Math.min(sel.anchor, sel.head); + let to = Math.max(sel.anchor, sel.head); + const stepsToApply = allSteps.slice(disc.currentKey - mostOutdatedKey); + + for (const step of stepsToApply) { + const map = step.getMap(); + from = map.map(from, 1); + to = map.map(to, -1); + } + + if (from < to && from > 0) { + rawDiscussions[id] = { + ...disc, + selection: { type: 'text', a: from, h: to }, + currentKey, + }; + } else { + rawDiscussions[id] = { ...disc, selection: null, currentKey }; + } + } else { + rawDiscussions[id] = { ...disc, currentKey }; + } + } + } catch (err: any) { + verbose( + `${prefix} Warning: could not fast-forward discussions: ${err.message}`, + ); + } + } + + frozenDiscussions = rawDiscussions; + verbose(`${prefix} Freezing ${Object.keys(rawDiscussions).length} discussions`); + } + + // Compute stepMaps from latest release to this checkpoint for discussion anchor mapping. + // If we already have stored stepMaps (from a previous cold storage), compose them + // with the new Firebase-only changes rather than trying to fetch wiped history. + let stepMaps: number[][] | null = null; + let stepMapToKey: number | null = null; + const existingPgCheckpoint = await getDraftCheckpoint(draftId); + const latestRelease = await Release.findOne({ + where: { pubId }, + attributes: ['historyKey'], + order: [['historyKey', 'DESC']], + }); + + if (latestRelease && latestRelease.historyKey < currentKey) { + try { + // Start from existing stored stepMaps if available (covers wiped range) + const existingMaps = existingPgCheckpoint?.stepMaps ?? []; + const existingToKey = existingPgCheckpoint?.stepMapToKey ?? null; + + // Determine what range of new Firebase steps we need + const newStepsStartKey = + existingToKey != null ? existingToKey + 1 : latestRelease.historyKey + 1; + + let newMaps: number[][] = []; + if (newStepsStartKey <= currentKey) { + const stepsByChange = await getStepsInChangeRange( + draftRef, + editorSchema, + newStepsStartKey, + currentKey, + ); + const allSteps = stepsByChange.reduce((a, b) => [...a, ...b], []); + newMaps = allSteps.map((step) => + Array.from((step.getMap() as any).ranges as number[]), + ); + } + + stepMaps = [...existingMaps, ...newMaps]; + stepMapToKey = currentKey; + verbose( + `${prefix} Stored ${stepMaps.length} stepMaps (${existingMaps.length} existing + ${newMaps.length} new, up to key ${currentKey}) from release key ${latestRelease.historyKey}`, + ); + } catch (err: any) { + verbose(`${prefix} Warning: could not compute stepMaps: ${err.message}`); + } + } + + if (isDryRun) { + log( + `${prefix} Would freeze: key=${currentKey} docSize=${formatBytes(docSize)} discussions=${frozenDiscussions ? Object.keys(frozenDiscussions).length : 0} stepMaps=${stepMaps?.length ?? 0} stepMapToKey=${stepMapToKey}`, + ); + stats.draftsFrozen++; + stats.bytesFreed += docSize; + return; + } + + // Store checkpoint in Postgres (upsert) + await sequelize.transaction(async (txn) => { + const existing = await DraftCheckpoint.findOne({ + where: { draftId }, + transaction: txn, + }); + + let checkpointId: string; + if (existing) { + await existing.update( + { + historyKey: currentKey, + doc: docJson, + timestamp: currentTimestamp, + discussions: frozenDiscussions, + stepMaps, + stepMapToKey, + }, + { transaction: txn }, + ); + checkpointId = existing.id; + } else { + const created = await DraftCheckpoint.create( + { + draftId, + historyKey: currentKey, + doc: docJson, + timestamp: currentTimestamp, + discussions: frozenDiscussions, + stepMaps, + stepMapToKey, + }, + { transaction: txn }, + ); + checkpointId = created.id; + } + + // Set coldCheckpointId on the Draft and update latestKeyAt + await Draft.update( + { + coldCheckpointId: checkpointId, + ...(currentTimestamp ? { latestKeyAt: new Date(currentTimestamp) } : {}), + }, + { where: { id: draftId }, transaction: txn }, + ); + }); + + // Wipe Firebase data + await deleteFirebasePath(firebasePath); + + verbose(`${prefix} Frozen successfully`); + stats.draftsFrozen++; + } catch (err: any) { + log(`${prefix} Error: ${err.message}`); + stats.draftsSkippedError++; + } + + stats.draftsScanned++; +}; + +// --- Main --- + +const main = async () => { + log('Firebase Cold Storage Tool'); + log(`Mode: ${isDryRun ? 'DRY RUN' : 'EXECUTE'}`); + log(`Threshold: ${DAYS_OLD} days old`); + log(`Batch size: ${BATCH_SIZE}`); + log(`Concurrency: ${CONCURRENCY}`); + log(''); + + const cutoffDate = new Date(); + cutoffDate.setDate(cutoffDate.getDate() - DAYS_OLD); + log(`Cutoff date: ${cutoffDate.toISOString()}`); + log(''); + + // Find stale drafts + let draftsWithPubs: { draft: Draft; pubId: string }[]; + + if (specificPubId) { + const pub = await Pub.findOne({ + where: { id: specificPubId }, + include: [{ model: Draft, as: 'draft' }], + }); + if (!pub?.draft) { + log(`No draft found for pub ${specificPubId}`); + process.exit(1); + } + draftsWithPubs = [{ draft: pub.draft, pubId: pub.id }]; + log(`Processing single pub: ${specificPubId}`); + } else { + // Find drafts where latestKeyAt is older than the cutoff, or null (never tracked) + const results = await sequelize.query<{ draftId: string; pubId: string }>( + ` + SELECT d.id as "draftId", p.id as "pubId" + FROM "Drafts" d + INNER JOIN "Pubs" p ON p."draftId" = d.id + WHERE (d."latestKeyAt" IS NULL OR d."latestKeyAt" < :cutoff) + ORDER BY d."latestKeyAt" ASC NULLS FIRST + `, + { + replacements: { cutoff: cutoffDate.toISOString() }, + type: QueryTypes.SELECT, + }, + ); + + // Load draft models + const draftIds = results.map((r) => r.draftId); + const pubIdByDraftId = new Map(results.map((r) => [r.draftId, r.pubId])); + + const drafts = await Draft.findAll({ + where: { id: { [Op.in]: draftIds } }, + }); + + draftsWithPubs = drafts.map((d) => ({ + draft: d, + pubId: pubIdByDraftId.get(d.id)!, + })); + + log(`Found ${draftsWithPubs.length} stale drafts (older than ${DAYS_OLD} days)`); + } + + log(''); + + // Process in batches + for (let i = 0; i < draftsWithPubs.length; i += BATCH_SIZE) { + const batch = draftsWithPubs.slice(i, i + BATCH_SIZE); + const batchNum = Math.floor(i / BATCH_SIZE) + 1; + const totalBatches = Math.ceil(draftsWithPubs.length / BATCH_SIZE); + + log(`Batch ${batchNum}/${totalBatches} (${batch.length} drafts)`); + + // biome-ignore lint/performance/noAwaitInLoops: batched processing + await runWithConcurrency( + batch.map( + ({ draft, pubId }) => + () => + freezeDraft(draft, pubId), + ), + CONCURRENCY, + ); + + log(` Frozen so far: ${stats.draftsFrozen}, Errors: ${stats.draftsSkippedError}`); + } + + log(''); + log('=== RESULTS ==='); + log(`Drafts scanned: ${stats.draftsScanned}`); + log(`Drafts frozen: ${stats.draftsFrozen}`); + log(`Already cold/empty: ${stats.draftsAlreadyCold + stats.draftsEmpty}`); + log(`Errors: ${stats.draftsSkippedError}`); + if (isDryRun) { + log(`Est. data to free: ${formatBytes(stats.bytesFreed)}`); + } + log(''); + + if (isDryRun) { + log('This was a DRY RUN. Re-run with --execute to apply changes.'); + } + + process.exit(0); +}; + +main().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/tools/cron.ts b/tools/cron.ts index 883c20a264..ebea8d55c7 100644 --- a/tools/cron.ts +++ b/tools/cron.ts @@ -30,6 +30,10 @@ if (process.env.PUBPUB_PRODUCTION === 'true') { timezone: 'UTC', }); + cron.schedule('0 5 * * 6', () => run('Cold Storage', 'tools-prod coldStorage --execute'), { + timezone: 'UTC', + }); // Weekly on Saturday at 5 AM UTC (day before cleanup) + cron.schedule( '0 5 * * 0', () => run('Firebase Cleanup', 'tools-prod cleanupFirebase --execute'), diff --git a/tools/index.js b/tools/index.js index d48eb6c1d6..c3cfd19dea 100644 --- a/tools/index.js +++ b/tools/index.js @@ -34,10 +34,14 @@ const commandFiles = { backfillDepositTargets: "./backfillDepositTargets", backup: "./backup/backup", backupDb: "./backup-db", + bootstrapCheckpoints: "./bootstrapCheckpoints", branchMaintenance: "./branchMaintenance", bulkimport: "../workers/tasks/import/bulk/cli", checkpointBackfill: "./dashboardMigrations/backfillCheckpoints", cleanupFirebase: "./cleanupFirebase", + coldStorage: "./coldStorage", + measureFirebaseBreakdown: "./measureFirebaseBreakdown", + measureNonCheckpointSize: "./measureNonCheckpointSize", clone: "./clone", devshell: "./devshell", depositCollectionPubs: "./depositCollectionPubs", diff --git a/tools/measureFirebaseBreakdown.ts b/tools/measureFirebaseBreakdown.ts new file mode 100644 index 0000000000..053abdf7e7 --- /dev/null +++ b/tools/measureFirebaseBreakdown.ts @@ -0,0 +1,480 @@ +/** + * Firebase Storage Breakdown Measurement Tool + * + * Samples drafts from Firebase to measure the proportion of storage used by + * checkpoints vs. other data (changes, discussions, merges, etc.). + * + * This tool does NOT require a Postgres connection — it discovers draft paths + * directly from Firebase using the REST API. + * + * Usage: + * pnpm run tools measureFirebaseBreakdown + * pnpm run tools measureFirebaseBreakdown --sampleSize=200 + * pnpm run tools measureFirebaseBreakdown --draftPath=drafts/draft- + */ + +// Prevent Sequelize from crashing when DATABASE_URL points to an unreachable host +// (e.g. Docker-only hostname). This script doesn't use Postgres at all. +if (!process.env.DATABASE_URL || process.env.DATABASE_URL.includes('@db:')) { + process.env.DATABASE_URL = 'postgres://localhost:5432/unused'; +} + +import firebaseAdmin from 'firebase-admin'; + +import { getFirebaseConfig } from 'utils/editor/firebaseConfig'; + +const { + argv: { sampleSize: sampleSizeArg = 100, draftPath: specificDraftPath, prod }, +} = require('yargs'); + +const sampleSize = Number(sampleSizeArg); + +// Allow overriding to prod Firebase +const getDatabaseURL = (): string => { + if (prod) { + return 'https://pubpub-v6-prod.firebaseio.com'; + } + return getFirebaseConfig().databaseURL; +}; + +const log = (msg: string) => console.log(`[measure] ${new Date().toISOString()} ${msg}`); + +// --- Firebase Auth --- + +let cachedAccessToken: { token: string; expiresAt: number } | null = null; + +const getAccessToken = async (): Promise => { + const now = Date.now(); + if (cachedAccessToken && cachedAccessToken.expiresAt > now + 60000) { + return cachedAccessToken.token; + } + const credential = firebaseAdmin.credential.cert( + JSON.parse( + Buffer.from(process.env.FIREBASE_SERVICE_ACCOUNT_BASE64 as string, 'base64').toString(), + ), + ); + const tokenResult = await credential.getAccessToken(); + cachedAccessToken = { + token: tokenResult.access_token, + expiresAt: now + (tokenResult.expires_in ?? 3600) * 1000, + }; + return cachedAccessToken.token; +}; + +// --- Firebase REST helpers --- + +/** + * Get the byte size of a Firebase path by fetching its JSON content. + * Returns { bytes, keyCount } or null if the path doesn't exist. + */ +const getPathSize = async ( + path: string, + retries = 3, +): Promise<{ bytes: number; keyCount: number } | null> => { + const databaseURL = getDatabaseURL(); + const accessToken = await getAccessToken(); + const url = `${databaseURL}/${path}.json?access_token=${accessToken}`; + + for (let attempt = 1; attempt <= retries; attempt++) { + try { + // biome-ignore lint/performance/noAwaitInLoops: shhhhhh + const response = await fetch(url); + if (!response.ok) { + throw new Error(`HTTP ${response.status} ${response.statusText}`); + } + const text = await response.text(); + if (text === 'null') return null; + const data = JSON.parse(text); + const keyCount = data && typeof data === 'object' ? Object.keys(data).length : 1; + // Use the raw JSON text length as the byte measurement — + // this closely matches how Firebase RTDB stores/bills data + return { bytes: Buffer.byteLength(text, 'utf-8'), keyCount }; + } catch (error: any) { + if (attempt === retries) throw error; + const delay = Math.min(1000 * 2 ** attempt, 10000); + await new Promise((r) => setTimeout(r, delay)); + } + } + return null; +}; + +/** + * Get shallow keys at a path (doesn't download content) + */ +const getShallowKeys = async (path: string, retries = 3): Promise => { + const databaseURL = getDatabaseURL(); + const accessToken = await getAccessToken(); + const url = `${databaseURL}/${path}.json?shallow=true&access_token=${accessToken}`; + + for (let attempt = 1; attempt <= retries; attempt++) { + try { + // biome-ignore lint/performance/noAwaitInLoops: shhhhhh + const response = await fetch(url); + if (!response.ok) throw new Error(`HTTP ${response.status}`); + const data = await response.json(); + if (!data || typeof data !== 'object') return []; + return Object.keys(data); + } catch (error: any) { + if (attempt === retries) throw error; + const delay = Math.min(1000 * 2 ** attempt, 10000); + await new Promise((r) => setTimeout(r, delay)); + } + } + return []; +}; + +// --- Concurrency helper --- + +const runWithConcurrency = async ( + tasks: (() => Promise)[], + concurrency: number, +): Promise => { + const results: T[] = []; + let index = 0; + const worker = async (): Promise => { + while (index < tasks.length) { + const currentIndex = index++; + // biome-ignore lint/performance/noAwaitInLoops: shhhhhh + results[currentIndex] = await tasks[currentIndex](); + } + }; + await Promise.all(Array.from({ length: Math.min(concurrency, tasks.length) }, () => worker())); + return results; +}; + +// --- Types --- + +interface DraftBreakdown { + firebasePath: string; + totalBytes: number; + checkpointsBytes: number; + checkpointMapBytes: number; + deprecatedCheckpointBytes: number; + changesBytes: number; + mergesBytes: number; + discussionsBytes: number; + cursorsBytes: number; + otherBytes: number; + changeCount: number; + checkpointCount: number; +} + +// --- Main measurement --- + +const measureDraft = async (firebasePath: string): Promise => { + try { + const childKeys = await getShallowKeys(firebasePath); + if (childKeys.length === 0) return null; + + const breakdown: DraftBreakdown = { + firebasePath, + totalBytes: 0, + checkpointsBytes: 0, + checkpointMapBytes: 0, + deprecatedCheckpointBytes: 0, + changesBytes: 0, + mergesBytes: 0, + discussionsBytes: 0, + cursorsBytes: 0, + otherBytes: 0, + changeCount: 0, + checkpointCount: 0, + }; + + // Measure each child in parallel + const measurements = await Promise.all( + childKeys.map(async (child) => { + const result = await getPathSize(`${firebasePath}/${child}`); + return { child, result }; + }), + ); + + for (const { child, result } of measurements) { + if (!result) continue; + const { bytes, keyCount } = result; + breakdown.totalBytes += bytes; + + switch (child) { + case 'checkpoints': + breakdown.checkpointsBytes = bytes; + breakdown.checkpointCount = keyCount; + break; + case 'checkpointMap': + breakdown.checkpointMapBytes = bytes; + break; + case 'checkpoint': + breakdown.deprecatedCheckpointBytes = bytes; + break; + case 'changes': + breakdown.changesBytes = bytes; + breakdown.changeCount = keyCount; + break; + case 'merges': + breakdown.mergesBytes = bytes; + break; + case 'discussions': + breakdown.discussionsBytes = bytes; + break; + case 'cursors': + breakdown.cursorsBytes = bytes; + break; + default: + breakdown.otherBytes += bytes; + break; + } + } + + return breakdown; + } catch (err: any) { + log(` Error measuring ${firebasePath}: ${err.message}`); + return null; + } +}; + +const formatBytes = (bytes: number): string => { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`; + if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MB`; + return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)} GB`; +}; + +const formatPct = (part: number, total: number): string => { + if (total === 0) return '0%'; + return `${((part / total) * 100).toFixed(1)}%`; +}; + +// Discover all draft paths from Firebase itself. +// Handles both drafts/draft-{id} and legacy pub-{id}/branch-{id} formats. +const discoverDraftPaths = async (): Promise => { + const paths: string[] = []; + + // Modern format: drafts/draft-{id} + log('Discovering draft paths from Firebase...'); + const draftKeys = await getShallowKeys('drafts'); + for (const key of draftKeys) { + paths.push(`drafts/${key}`); + } + log(` Found ${draftKeys.length} paths under drafts/`); + + // Legacy format: pub-{id}/branch-{id} + const rootKeys = await getShallowKeys(''); + const legacyPubKeys = rootKeys.filter((key) => key.startsWith('pub-')); + if (legacyPubKeys.length > 0) { + log(` Found ${legacyPubKeys.length} legacy pub-* paths, scanning branches...`); + const branchResults = await runWithConcurrency( + legacyPubKeys.map((pubKey) => async () => { + try { + const children = await getShallowKeys(pubKey); + return children + .filter((c) => c.startsWith('branch-')) + .map((c) => `${pubKey}/${c}`); + } catch { + return []; + } + }), + 10, + ); + for (const branches of branchResults) { + paths.push(...branches); + } + log(` Found ${paths.length - draftKeys.length} legacy branch paths`); + } + + log(` Total draft paths: ${paths.length}`); + return paths; +}; + +/** + * Randomly sample n items from an array using Fisher-Yates partial shuffle + */ +const randomSample = (arr: T[], n: number): T[] => { + const copy = [...arr]; + const count = Math.min(n, copy.length); + for (let i = 0; i < count; i++) { + const j = i + Math.floor(Math.random() * (copy.length - i)); + [copy[i], copy[j]] = [copy[j], copy[i]]; + } + return copy.slice(0, count); +}; + +const main = async () => { + log('Firebase Storage Breakdown Measurement'); + log(`Database: ${getDatabaseURL()}`); + log(''); + + let draftPaths: string[]; + let totalDraftCount: number; + + if (specificDraftPath) { + draftPaths = [specificDraftPath as string]; + totalDraftCount = 1; + } else { + const allPaths = await discoverDraftPaths(); + totalDraftCount = allPaths.length; + draftPaths = randomSample(allPaths, sampleSize); + log(`Sampling ${draftPaths.length} of ${totalDraftCount} drafts...`); + } + + log(''); + + // Measure drafts with concurrency + let completed = 0; + const results = await runWithConcurrency( + draftPaths.map((path) => async () => { + const result = await measureDraft(path); + completed++; + if (completed % 25 === 0) { + log(` Progress: ${completed}/${draftPaths.length}`); + } + return result; + }), + 10, // 10 concurrent measurements + ); + + const measured = results.filter((r): r is DraftBreakdown => r !== null); + log(`\nSuccessfully measured ${measured.length} drafts\n`); + + if (measured.length === 0) { + log('No data found.'); + process.exit(0); + } + + // --- Aggregate stats --- + const totals = measured.reduce( + (acc, d) => ({ + totalBytes: acc.totalBytes + d.totalBytes, + checkpointsBytes: acc.checkpointsBytes + d.checkpointsBytes, + checkpointMapBytes: acc.checkpointMapBytes + d.checkpointMapBytes, + deprecatedCheckpointBytes: acc.deprecatedCheckpointBytes + d.deprecatedCheckpointBytes, + changesBytes: acc.changesBytes + d.changesBytes, + mergesBytes: acc.mergesBytes + d.mergesBytes, + discussionsBytes: acc.discussionsBytes + d.discussionsBytes, + cursorsBytes: acc.cursorsBytes + d.cursorsBytes, + otherBytes: acc.otherBytes + d.otherBytes, + totalChanges: acc.totalChanges + d.changeCount, + totalCheckpoints: acc.totalCheckpoints + d.checkpointCount, + }), + { + totalBytes: 0, + checkpointsBytes: 0, + checkpointMapBytes: 0, + deprecatedCheckpointBytes: 0, + changesBytes: 0, + mergesBytes: 0, + discussionsBytes: 0, + cursorsBytes: 0, + otherBytes: 0, + totalChanges: 0, + totalCheckpoints: 0, + }, + ); + + const allCheckpointBytes = + totals.checkpointsBytes + totals.checkpointMapBytes + totals.deprecatedCheckpointBytes; + + // --- Print results --- + log('=== SAMPLE BREAKDOWN ==='); + log(`Drafts measured: ${measured.length}`); + log(`Total sampled size: ${formatBytes(totals.totalBytes)}`); + log(''); + log('--- By category ---'); + log( + `checkpoints/ ${formatBytes(totals.checkpointsBytes).padStart(12)} ${formatPct(totals.checkpointsBytes, totals.totalBytes).padStart(7)} (${totals.totalCheckpoints} checkpoints)`, + ); + log( + `checkpoint (dep.) ${formatBytes(totals.deprecatedCheckpointBytes).padStart(12)} ${formatPct(totals.deprecatedCheckpointBytes, totals.totalBytes).padStart(7)}`, + ); + log( + `checkpointMap/ ${formatBytes(totals.checkpointMapBytes).padStart(12)} ${formatPct(totals.checkpointMapBytes, totals.totalBytes).padStart(7)}`, + ); + log( + `changes/ ${formatBytes(totals.changesBytes).padStart(12)} ${formatPct(totals.changesBytes, totals.totalBytes).padStart(7)} (${totals.totalChanges} changes)`, + ); + log( + `merges/ ${formatBytes(totals.mergesBytes).padStart(12)} ${formatPct(totals.mergesBytes, totals.totalBytes).padStart(7)}`, + ); + log( + `discussions/ ${formatBytes(totals.discussionsBytes).padStart(12)} ${formatPct(totals.discussionsBytes, totals.totalBytes).padStart(7)}`, + ); + log( + `cursors/ ${formatBytes(totals.cursorsBytes).padStart(12)} ${formatPct(totals.cursorsBytes, totals.totalBytes).padStart(7)}`, + ); + log( + `other ${formatBytes(totals.otherBytes).padStart(12)} ${formatPct(totals.otherBytes, totals.totalBytes).padStart(7)}`, + ); + log(''); + log( + `ALL CHECKPOINT DATA ${formatBytes(allCheckpointBytes).padStart(10)} ${formatPct(allCheckpointBytes, totals.totalBytes).padStart(7)}`, + ); + log( + `EVERYTHING ELSE ${formatBytes(totals.totalBytes - allCheckpointBytes).padStart(10)} ${formatPct(totals.totalBytes - allCheckpointBytes, totals.totalBytes).padStart(7)}`, + ); + log(''); + + // --- Extrapolation --- + const avgBytesPerDraft = totals.totalBytes / measured.length; + const avgCheckpointBytesPerDraft = allCheckpointBytes / measured.length; + const avgNonCheckpointBytesPerDraft = avgBytesPerDraft - avgCheckpointBytesPerDraft; + + const estimatedTotalFirebase = avgBytesPerDraft * totalDraftCount; + const estimatedCheckpointTotal = avgCheckpointBytesPerDraft * totalDraftCount; + const estimatedNonCheckpointTotal = avgNonCheckpointBytesPerDraft * totalDraftCount; + + log('=== EXTRAPOLATION TO ALL DRAFTS ==='); + log(`Total drafts: ${totalDraftCount}`); + log(`Avg bytes per draft: ${formatBytes(avgBytesPerDraft)}`); + log(`Avg checkpoint bytes/draft: ${formatBytes(avgCheckpointBytesPerDraft)}`); + log(''); + log(`Estimated total Firebase: ${formatBytes(estimatedTotalFirebase)}`); + log( + `Estimated checkpoint data: ${formatBytes(estimatedCheckpointTotal)} (${formatPct(estimatedCheckpointTotal, estimatedTotalFirebase)})`, + ); + log( + `Estimated non-checkpoint data: ${formatBytes(estimatedNonCheckpointTotal)} (${formatPct(estimatedNonCheckpointTotal, estimatedTotalFirebase)})`, + ); + log(''); + log(`Reported Firebase size: 6 GB`); + log( + `If checkpoints removed: ~${formatBytes(6 * 1024 * 1024 * 1024 * (1 - allCheckpointBytes / totals.totalBytes))}`, + ); + log(`Free tier threshold: 1 GB`); + log(''); + + // --- Top 10 largest drafts in sample --- + const sorted = [...measured].sort((a, b) => b.totalBytes - a.totalBytes); + log('=== TOP 10 LARGEST DRAFTS IN SAMPLE ==='); + for (const d of sorted.slice(0, 10)) { + const ckptBytes = d.checkpointsBytes + d.checkpointMapBytes + d.deprecatedCheckpointBytes; + log( + ` ${d.firebasePath.padEnd(50)} total=${formatBytes(d.totalBytes).padStart(10)} ckpt=${formatBytes(ckptBytes).padStart(10)} (${formatPct(ckptBytes, d.totalBytes)}) changes=${d.changeCount}`, + ); + } + + // --- Distribution of checkpoint ratios --- + const ratios = measured + .filter((d) => d.totalBytes > 0) + .map((d) => { + const ckptBytes = + d.checkpointsBytes + d.checkpointMapBytes + d.deprecatedCheckpointBytes; + return ckptBytes / d.totalBytes; + }) + .sort((a, b) => a - b); + + if (ratios.length > 0) { + log(''); + log('=== CHECKPOINT RATIO DISTRIBUTION ==='); + log(` Min: ${(ratios[0] * 100).toFixed(1)}%`); + log(` P25: ${(ratios[Math.floor(ratios.length * 0.25)] * 100).toFixed(1)}%`); + log(` Median: ${(ratios[Math.floor(ratios.length * 0.5)] * 100).toFixed(1)}%`); + log(` P75: ${(ratios[Math.floor(ratios.length * 0.75)] * 100).toFixed(1)}%`); + log(` Max: ${(ratios[ratios.length - 1] * 100).toFixed(1)}%`); + } + + log('\nDone.'); + process.exit(0); +}; + +main().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/tools/measureNonCheckpointSize.ts b/tools/measureNonCheckpointSize.ts new file mode 100644 index 0000000000..854c486931 --- /dev/null +++ b/tools/measureNonCheckpointSize.ts @@ -0,0 +1,271 @@ +/** + * Firebase Non-Checkpoint Size Measurement + * + * Downloads every non-checkpoint child from every draft in Firebase and writes + * them to a local temp file. The resulting file size is an exact measure of + * what Firebase would contain if all checkpoint data were removed. + * + * This does NOT modify Firebase in any way — it only reads. + * + * Usage: + * pnpm run tools measureNonCheckpointSize # dev Firebase + * pnpm run tools measureNonCheckpointSize --prod # prod Firebase + * pnpm run tools measureNonCheckpointSize --prod --concurrency=20 + */ + +// Prevent Sequelize from crashing when DATABASE_URL points to an unreachable host +if (!process.env.DATABASE_URL || process.env.DATABASE_URL.includes('@db:')) { + process.env.DATABASE_URL = 'postgres://localhost:5432/unused'; +} + +import firebaseAdmin from 'firebase-admin'; +import * as fs from 'fs'; +import * as os from 'os'; +import * as path from 'path'; + +import { getFirebaseConfig } from 'utils/editor/firebaseConfig'; + +const { + argv: { prod, concurrency: concurrencyArg = 50 }, +} = require('yargs'); + +const CONCURRENCY = Number(concurrencyArg); + +// Children to skip — these are checkpoint data +const CHECKPOINT_CHILDREN = new Set(['checkpoints', 'checkpointMap', 'checkpoint']); + +const getDatabaseURL = (): string => { + if (prod) return 'https://pubpub-v6-prod.firebaseio.com'; + return getFirebaseConfig().databaseURL; +}; + +const log = (msg: string) => console.log(`[measure] ${new Date().toISOString()} ${msg}`); + +// --- Firebase Auth --- + +let cachedAccessToken: { token: string; expiresAt: number } | null = null; + +const getAccessToken = async (): Promise => { + const now = Date.now(); + if (cachedAccessToken && cachedAccessToken.expiresAt > now + 60000) { + return cachedAccessToken.token; + } + const credential = firebaseAdmin.credential.cert( + JSON.parse( + Buffer.from(process.env.FIREBASE_SERVICE_ACCOUNT_BASE64 as string, 'base64').toString(), + ), + ); + const tokenResult = await credential.getAccessToken(); + cachedAccessToken = { + token: tokenResult.access_token, + expiresAt: now + (tokenResult.expires_in ?? 3600) * 1000, + }; + return cachedAccessToken.token; +}; + +// --- Firebase REST helpers --- + +const fetchJson = async (urlPath: string, retries = 3): Promise => { + const databaseURL = getDatabaseURL(); + const accessToken = await getAccessToken(); + const url = `${databaseURL}/${urlPath}.json?access_token=${accessToken}`; + + for (let attempt = 1; attempt <= retries; attempt++) { + try { + // biome-ignore lint/performance/noAwaitInLoops: shhhhhh + const response = await fetch(url); + if (!response.ok) throw new Error(`HTTP ${response.status} ${response.statusText}`); + const text = await response.text(); + if (text === 'null') return null; + return text; + } catch (error: any) { + if (attempt === retries) throw error; + const delay = Math.min(1000 * 2 ** attempt, 10000); + await new Promise((r) => setTimeout(r, delay)); + } + } + return null; +}; + +const getShallowKeys = async (urlPath: string, retries = 3): Promise => { + const databaseURL = getDatabaseURL(); + const accessToken = await getAccessToken(); + const url = `${databaseURL}/${urlPath}.json?shallow=true&access_token=${accessToken}`; + + for (let attempt = 1; attempt <= retries; attempt++) { + try { + // biome-ignore lint/performance/noAwaitInLoops: shhhhhh + const response = await fetch(url); + if (!response.ok) throw new Error(`HTTP ${response.status}`); + const data = await response.json(); + if (!data || typeof data !== 'object') return []; + return Object.keys(data); + } catch (error: any) { + if (attempt === retries) throw error; + const delay = Math.min(1000 * 2 ** attempt, 10000); + await new Promise((r) => setTimeout(r, delay)); + } + } + return []; +}; + +// --- Concurrency helper --- + +const runWithConcurrency = async ( + tasks: (() => Promise)[], + concurrency: number, +): Promise => { + const results: T[] = []; + let index = 0; + const worker = async (): Promise => { + while (index < tasks.length) { + const currentIndex = index++; + // biome-ignore lint/performance/noAwaitInLoops: shhhhhh + results[currentIndex] = await tasks[currentIndex](); + } + }; + await Promise.all(Array.from({ length: Math.min(concurrency, tasks.length) }, () => worker())); + return results; +}; + +const formatBytes = (bytes: number): string => { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(1)} KB`; + if (bytes < 1024 * 1024 * 1024) return `${(bytes / (1024 * 1024)).toFixed(1)} MB`; + return `${(bytes / (1024 * 1024 * 1024)).toFixed(2)} GB`; +}; + +// --- Main --- + +const main = async () => { + const databaseURL = getDatabaseURL(); + log(`Firebase Non-Checkpoint Size Measurement`); + log(`Database: ${databaseURL}`); + log(`Concurrency: ${CONCURRENCY}`); + log(''); + + // Output file + const outFile = path.join(os.tmpdir(), `firebase-non-checkpoint-${Date.now()}.jsonl`); + const writeStream = fs.createWriteStream(outFile); + log(`Writing non-checkpoint data to: ${outFile}`); + log(''); + + // --- Discover all draft paths --- + log('Discovering draft paths...'); + + // Modern: drafts/draft-{id} + const draftKeys = await getShallowKeys('drafts'); + const modernPaths = draftKeys.map((k) => `drafts/${k}`); + log(` ${modernPaths.length} paths under drafts/`); + + // Legacy: pub-{id}/branch-{id} + const rootKeys = await getShallowKeys(''); + const legacyPubKeys = rootKeys.filter((k) => k.startsWith('pub-')); + log(` ${legacyPubKeys.length} legacy pub-* roots, scanning branches...`); + + const legacyBranches = await runWithConcurrency( + legacyPubKeys.map((pubKey) => async () => { + try { + const children = await getShallowKeys(pubKey); + return children.filter((c) => c.startsWith('branch-')).map((c) => `${pubKey}/${c}`); + } catch { + return []; + } + }), + 10, + ); + const legacyPaths = legacyBranches.flat(); + log(` ${legacyPaths.length} legacy branch paths`); + + const allPaths = [...modernPaths, ...legacyPaths]; + log(` Total: ${allPaths.length} draft paths`); + log(''); + + // --- Process each draft: download non-checkpoint children, write to file --- + let processed = 0; + let errors = 0; + let totalNonCheckpointBytes = 0; + let totalDraftsWithData = 0; + + const processDraft = async (draftPath: string): Promise => { + try { + const childKeys = await getShallowKeys(draftPath); + const nonCheckpointKeys = childKeys.filter((k) => !CHECKPOINT_CHILDREN.has(k)); + + if (nonCheckpointKeys.length === 0) { + processed++; + return; + } + + // Download each non-checkpoint child and write to the file + for (const childKey of nonCheckpointKeys) { + // biome-ignore lint/performance/noAwaitInLoops: shhhhhh + const json = await fetchJson(`${draftPath}/${childKey}`); + if (json) { + const line = `${draftPath}/${childKey}\t${json}\n`; + const _lineBytes = Buffer.byteLength(line, 'utf-8'); + totalNonCheckpointBytes += Buffer.byteLength(json, 'utf-8'); + writeStream.write(line); + } + } + + totalDraftsWithData++; + } catch (_err: any) { + errors++; + } + + processed++; + if (processed % 500 === 0) { + log( + ` Progress: ${processed}/${allPaths.length} (${formatBytes(totalNonCheckpointBytes)} so far, ${errors} errors)`, + ); + } + }; + + await runWithConcurrency( + allPaths.map((p) => () => processDraft(p)), + CONCURRENCY, + ); + + // Close the write stream + await new Promise((resolve) => writeStream.end(resolve)); + + // Get actual file size on disk + const fileStat = fs.statSync(outFile); + + log(''); + log('=== RESULTS ==='); + log(`Drafts scanned: ${allPaths.length}`); + log(`Drafts with data: ${totalDraftsWithData}`); + log(`Errors: ${errors}`); + log(''); + log(`Non-checkpoint JSON bytes: ${formatBytes(totalNonCheckpointBytes)}`); + log( + `Output file size on disk: ${formatBytes(fileStat.size)} (includes path keys + tabs + newlines)`, + ); + log(`Output file: ${outFile}`); + log(''); + log('--- Projection ---'); + log(`Reported Firebase total: 6 GB`); + log(`Non-checkpoint data: ${formatBytes(totalNonCheckpointBytes)}`); + log( + `Checkpoint data (inferred): ${formatBytes(6 * 1024 * 1024 * 1024 - totalNonCheckpointBytes)}`, + ); + log( + `Checkpoint % of total: ${(((6 * 1024 * 1024 * 1024 - totalNonCheckpointBytes) / (6 * 1024 * 1024 * 1024)) * 100).toFixed(1)}%`, + ); + log(`Free tier threshold: 1 GB`); + log( + `Under free tier? ${totalNonCheckpointBytes < 1024 * 1024 * 1024 ? 'YES' : 'NO'}`, + ); + log(''); + log('Done. You can inspect or delete the output file at:'); + log(` ${outFile}`); + + process.exit(0); +}; + +main().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/tools/migrations/2026_04_02_addColdCheckpointIdToDrafts.js b/tools/migrations/2026_04_02_addColdCheckpointIdToDrafts.js new file mode 100644 index 0000000000..ed9efc655a --- /dev/null +++ b/tools/migrations/2026_04_02_addColdCheckpointIdToDrafts.js @@ -0,0 +1,26 @@ +export const up = async ({ Sequelize, sequelize }) => { + const draftsDesc = await sequelize.queryInterface.describeTable('Drafts'); + if (!draftsDesc.coldCheckpointId) { + await sequelize.queryInterface.addColumn('Drafts', 'coldCheckpointId', { + type: Sequelize.UUID, + allowNull: true, + }); + } + // DraftCheckpoints table is created by sync(), but stepMapToKey is a new + // column that may need to be added if the table already exists. + const tableDesc = await sequelize.queryInterface.describeTable('DraftCheckpoints').catch(() => null); + if (tableDesc && !tableDesc.stepMapToKey) { + await sequelize.queryInterface.addColumn('DraftCheckpoints', 'stepMapToKey', { + type: Sequelize.INTEGER, + allowNull: true, + }); + } +}; + +export const down = async ({ sequelize }) => { + await sequelize.queryInterface.removeColumn('Drafts', 'coldCheckpointId'); + const tableDesc = await sequelize.queryInterface.describeTable('DraftCheckpoints').catch(() => null); + if (tableDesc?.stepMapToKey) { + await sequelize.queryInterface.removeColumn('DraftCheckpoints', 'stepMapToKey'); + } +}; diff --git a/workers/tasks/archive.tsx b/workers/tasks/archive.tsx index 0c789dc7d2..74a37990de 100644 --- a/workers/tasks/archive.tsx +++ b/workers/tasks/archive.tsx @@ -225,7 +225,6 @@ const createPubStream = async (pubs: Pub[], batchSize = 100) => { const draftDocPromise = getPubDraftDoc( getDatabaseRef(firebasePath), null, - false, ).then((d) => d.doc); const [draftDoc, facets] = await Promise.all([