Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions apps/api/src/app/routes/route.middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -361,9 +361,6 @@ export async function getOrgForRequest(
// Refresh event will be fired when renewed access token
// to store it in your storage for next request
try {
if (!refreshToken) {
return;
}
await salesforceOrgsDb.updateAccessToken_UNSAFE({ accessToken, refreshToken, org, userId: user.id });
} catch (ex) {
logger.error({ requestId, ...getExceptionLog(ex) }, '[ORG][REFRESH] Error saving refresh token');
Expand All @@ -378,6 +375,30 @@ export async function getOrgForRequest(
}
};

// Re-reads current tokens from DB so concurrent workers that lose the refresh token rotation race
// can retry with the tokens written by the worker that won.
const getFreshTokens = async () => {
try {
const freshOrg = await salesforceOrgsDb.findByUniqueId_UNSAFE(user.id, uniqueId);
if (!freshOrg) {
return null;
}
const [freshAccessToken, freshRefreshToken] = await sfdcEncService.decryptAccessToken({
encryptedAccessToken: freshOrg.accessToken,
userId: user.id,
});
// Decryption failed - treat as "no fresh tokens" so the caller falls through to onConnectionError
// and the org gets flagged, instead of retrying with the literal sentinel as the bearer token.
if (freshAccessToken === sfdcEncService.DUMMY_INVALID_ENCRYPTED_TOKEN) {
return null;
}
return { accessToken: freshAccessToken, refreshToken: freshRefreshToken };
} catch (ex) {
logger.error({ requestId, ...getExceptionLog(ex) }, '[ORG][REFRESH] Error fetching fresh tokens for race condition check');
return null;
}
};

const jetstreamConn = new ApiConnection(
{
apiRequestAdapter: getApiRequestFactoryFn(fetch),
Expand All @@ -392,6 +413,7 @@ export async function getOrgForRequest(
logger,
sfdcClientId: ENV.SFDC_CONSUMER_KEY,
sfdcClientSecret: ENV.SFDC_CONSUMER_SECRET,
getFreshTokens,
},
handleRefresh,
handleConnectionError,
Expand Down Expand Up @@ -574,7 +596,6 @@ export function setPermissionPolicy(_req: express.Request, res: express.Response
next();
}


export function setCacheControlForApiRoutes(_req: express.Request, res: express.Response, next: express.NextFunction) {
res.setHeader('Cache-Control', 'no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0');
next();
Expand Down
25 changes: 22 additions & 3 deletions apps/jetstream-desktop/src/utils/route.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,6 @@ export function initApiConnection(
// Refresh event will be fired when renewed access token
// to store it in your storage for next request
try {
if (!refreshToken) {
return;
}
await updateAccessTokens(org.uniqueId, { accessToken, refreshToken });
} catch (ex) {
logger.error('[ORG][REFRESH] Error saving refresh token', getErrorMessage(ex));
Expand All @@ -221,6 +218,27 @@ export function initApiConnection(
}
};

// Re-reads current tokens from the in-memory store so concurrent requests that lose the
// refresh token rotation race can retry with the tokens written by the request that won.
const getFreshTokens = async () => {
try {
const freshOrg = getSalesforceOrgById(org.uniqueId);
if (!freshOrg) {
return null;
}
const plaintext = decryptTokenPortable(freshOrg.accessToken);
const spaceIndex = plaintext.indexOf(' ');
if (spaceIndex === -1) {
Comment thread
paustint marked this conversation as resolved.
logger.warn('[ORG][REFRESH] Fresh token payload is malformed and missing expected separator');
return null;
}
return { accessToken: plaintext.slice(0, spaceIndex), refreshToken: plaintext.slice(spaceIndex + 1) };
} catch (ex) {
logger.error('[ORG][REFRESH] Error fetching fresh tokens for race condition check', getErrorMessage(ex));
return null;
}
};

const jetstreamConn = new ApiConnection(
{
apiRequestAdapter: getApiRequestFactoryFn(fetch),
Expand All @@ -234,6 +252,7 @@ export function initApiConnection(
logger: logger as any,
enableLogging: false,
sfdcClientId: ENV.DESKTOP_SFDC_CLIENT_ID,
getFreshTokens,
},
handleRefresh,
handleConnectionError,
Expand Down
57 changes: 48 additions & 9 deletions libs/salesforce-api/src/lib/callout-adapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ERROR_MESSAGES, HTTP } from '@jetstream/shared/constants';
import { getErrorMessageAndStackObj } from '@jetstream/shared/utils';
import { parse } from '@jetstreamapp/simple-xml';
import isObject from 'lodash/isObject';
import { ApiRequestOptions, ApiRequestOutputType, BulkXmlErrorResponse, FetchFn, FetchResponse, Logger, SoapErrorResponse } from './types';
Expand Down Expand Up @@ -46,9 +47,14 @@ function parseXml(value: string) {
export function getApiRequestFactoryFn(fetch: FetchFn) {
return (
logger: Logger,
onRefresh?: (accessToken: string) => void,
onConnectionError?: (accessToken: string) => void,
onRefresh?: (accessToken: string, refreshToken?: string, skipPersistence?: boolean) => Promise<void> | void,
onConnectionError?: (error: string) => void,
/**
* Enable logging only applies to request/response data
* other logging for refresh flow and logic errors will still be logged
*/
enableLogging?: boolean,
getFreshTokens?: () => Promise<{ accessToken: string; refreshToken: string } | null>,
) => {
const apiRequest = async <Response = unknown>(options: ApiRequestOptions, attemptRefresh = true): Promise<Response> => {
// eslint-disable-next-line prefer-const
Expand Down Expand Up @@ -125,18 +131,48 @@ export function getApiRequestFactoryFn(fetch: FetchFn) {
sessionInfo.refreshToken
) {
try {
// if 401 and we have a refresh token, then attempt to refresh the token
const { access_token: newAccessToken } = await exchangeRefreshToken(fetch, sessionInfo);
onRefresh?.(newAccessToken);
logger.debug({ url, method, status: response.status }, '[TOKEN REFRESH] Attempting token refresh');
const { access_token: newAccessToken, refresh_token: newRefreshToken } = await exchangeRefreshToken(fetch, sessionInfo);
logger.debug({ url, method, tokenRotated: !!newRefreshToken }, '[TOKEN REFRESH] Token refresh successful');
await onRefresh?.(newAccessToken, newRefreshToken);
// replace token in body
if (typeof options.body === 'string' && options.body.includes(accessToken)) {
// if the response is soap, we need to return the response as is
options.body = options.body.replace(accessToken, newAccessToken);
}

return apiRequest({ ...options, sessionInfo: { ...sessionInfo, accessToken: newAccessToken } }, false);
} catch {
logger.warn('Unable to refresh accessToken');
} catch (ex) {
Comment thread
paustint marked this conversation as resolved.
logger.warn({ url, method, ...getErrorMessageAndStackObj(ex) }, '[TOKEN REFRESH] Unable to refresh accessToken');

// Check if another worker already refreshed (race condition on token rotation).
// If the DB has a different access token, a concurrent request won the race — retry with fresh tokens.
// `return await` (not bare `return`) is deliberate: it keeps the recursive retry inside this
// try/catch so a second failure still falls through to onConnectionError below.
if (getFreshTokens) {
try {
const freshTokens = await getFreshTokens();
if (freshTokens && freshTokens.accessToken !== accessToken) {
logger.info({ url, method }, '[TOKEN REFRESH] Concurrent refresh detected — retrying with tokens from another worker');
Comment thread
paustint marked this conversation as resolved.
// Sync the connection's canonical session state without re-persisting (DB already has these
// tokens — that's how we got them). Also mutates the shared sessionInfo reference so
// subsequent requests on this connection stop hitting the stale token path.
await onRefresh?.(freshTokens.accessToken, freshTokens.refreshToken, true);
// replace token in body
if (typeof options.body === 'string' && options.body.includes(accessToken)) {
// if the response is soap, we need to return the response as is
options.body = options.body.replace(accessToken, freshTokens.accessToken);
}
return await apiRequest({ ...options, sessionInfo: { ...sessionInfo, ...freshTokens } }, false);
}
} catch (freshEx) {
logger.warn(
{ url, method, ...getErrorMessageAndStackObj(freshEx) },
'[TOKEN REFRESH] Failed to retrieve fresh tokens for race condition check',
);
}
}

responseText = ERROR_MESSAGES.SFDC_EXPIRED_TOKEN;
onConnectionError?.(ERROR_MESSAGES.SFDC_EXPIRED_TOKEN);
}
Expand Down Expand Up @@ -192,7 +228,10 @@ function handleSalesforceApiError(outputType: ApiRequestOutputType, responseText
return output;
}

function exchangeRefreshToken(fetch: FetchFn, sessionInfo: ApiRequestOptions['sessionInfo']): Promise<{ access_token: string }> {
function exchangeRefreshToken(
fetch: FetchFn,
sessionInfo: ApiRequestOptions['sessionInfo'],
): Promise<{ access_token: string; refresh_token?: string }> {
const searchParams = new URLSearchParams({
grant_type: 'refresh_token',
});
Comment thread
paustint marked this conversation as resolved.
Expand Down Expand Up @@ -221,6 +260,6 @@ function exchangeRefreshToken(fetch: FetchFn, sessionInfo: ApiRequestOptions['se
})
.then((response) => response.json())
.then((response) => {
return response as { access_token: string };
return response as { access_token: string; refresh_token?: string };
});
}
32 changes: 27 additions & 5 deletions libs/salesforce-api/src/lib/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ export interface ApiConnectionOptions {
sfdcClientId?: string;
sfdcClientSecret?: string;
logger: Logger;
/** Re-reads current tokens from the source of truth (e.g. DB) to handle concurrent refresh token rotation across workers */
getFreshTokens?: () => Promise<{ accessToken: string; refreshToken: string } | null>;
}

export class ApiConnection {
logger: Logger;
sessionInfo: SessionInfo;
apiRequest: ReturnType<ReturnType<typeof getApiRequestFactoryFn>>;
refreshCallback: ((accessToken: string, refreshToken: string) => void) | undefined;
refreshCallback: ((accessToken: string, refreshToken: string) => Promise<void> | void) | undefined;
onConnectionError: ((error: string) => void) | undefined;

org: ApiOrg;
Expand All @@ -56,12 +58,19 @@ export class ApiConnection {
sfdcClientId,
sfdcClientSecret,
logger,
getFreshTokens,
}: ApiConnectionOptions,
refreshCallback?: (accessToken: string, refreshToken: string) => void,
refreshCallback?: (accessToken: string, refreshToken: string) => Promise<void> | void,
onConnectionError?: (error: string) => void,
) {
this.logger = logger;
this.apiRequest = apiRequestAdapter(logger, this.handleRefresh.bind(this), this.handleConnectionError.bind(this), enableLogging);
this.apiRequest = apiRequestAdapter(
logger,
this.handleRefresh.bind(this),
this.handleConnectionError.bind(this),
enableLogging,
getFreshTokens,
);
this.refreshCallback = refreshCallback;
this.onConnectionError = onConnectionError;
this.sessionInfo = {
Expand Down Expand Up @@ -122,10 +131,23 @@ export class ApiConnection {
this.sessionInfo.userId = userId ?? this.sessionInfo.userId;
}

public handleRefresh(accessToken: string) {
/**
* Updates in-memory session state with freshly rotated tokens.
*
* When skipPersistence is true, the refreshCallback is NOT invoked. This is used by the
* race-condition fallback in callout-adapter, which retrieves tokens that are ALREADY persisted
* by the worker that won the rotation race — re-persisting them would be a wasted encrypt + DB write.
*/
public async handleRefresh(accessToken: string, newRefreshToken?: string, skipPersistence = false) {
this.sessionInfo.accessToken = accessToken;
if (newRefreshToken) {
this.sessionInfo.refreshToken = newRefreshToken;
}
if (skipPersistence) {
return;
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.refreshCallback?.(accessToken, this.sessionInfo.refreshToken!);
await this.refreshCallback?.(accessToken, this.sessionInfo.refreshToken!);
}

public handleConnectionError(error: string) {
Expand Down
Loading