Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion apps/api/src/app/auth/ee.auth.module.config.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { MiddlewareConsumer, ModuleMetadata } from '@nestjs/common';
import { cacheService, PlatformException } from '@novu/application-generic';
import { cacheService, featureFlagsService, InMemoryLRUCacheService, PlatformException } from '@novu/application-generic';
import { RootEnvironmentGuard } from './framework/root-environment-guard.service';
import { AuthService } from './services/auth.service';
import { ApiKeyStrategy } from './services/passport/apikey.strategy';
Expand All @@ -23,6 +23,8 @@ export function getEEModuleConfig(): ModuleMetadata {
JwtSubscriberStrategy,
AuthService,
cacheService,
featureFlagsService,
InMemoryLRUCacheService,
RootEnvironmentGuard,
],
exports: [...eeAuthModule.exports, RootEnvironmentGuard, AuthService],
Expand Down
69 changes: 18 additions & 51 deletions apps/api/src/app/auth/services/passport/apikey.strategy.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,22 @@
import { Injectable, ServiceUnavailableException } from '@nestjs/common';
import { PassportStrategy } from '@nestjs/passport';
import { FeatureFlagsService, HttpRequestHeaderKeysEnum } from '@novu/application-generic';
import {
FeatureFlagsService,
HttpRequestHeaderKeysEnum,
InMemoryLRUCacheService,
InMemoryLRUCacheStore,
} from '@novu/application-generic';
import { ApiAuthSchemeEnum, FeatureFlagsKeysEnum, UserSessionData } from '@novu/shared';
import { createHash } from 'crypto';
import { LRUCache } from 'lru-cache';
import { HeaderAPIKeyStrategy } from 'passport-headerapikey';
import { AuthService } from '../auth.service';

const apiKeyUserCache = new LRUCache<string, UserSessionData>({
max: 1000,
ttl: 1000 * 60,
});

const apiKeyInflightRequests = new Map<string, Promise<UserSessionData | null>>();

@Injectable()
export class ApiKeyStrategy extends PassportStrategy(HeaderAPIKeyStrategy) {
constructor(
private readonly authService: AuthService,
private readonly featureFlagsService: FeatureFlagsService
private readonly featureFlagsService: FeatureFlagsService,
private readonly inMemoryLRUCacheService: InMemoryLRUCacheService
) {
super(
{ header: HttpRequestHeaderKeysEnum.AUTHORIZATION, prefix: `${ApiAuthSchemeEnum.API_KEY} ` },
Expand All @@ -42,51 +40,20 @@ export class ApiKeyStrategy extends PassportStrategy(HeaderAPIKeyStrategy) {
private async validateApiKey(apiKey: string): Promise<UserSessionData | null> {
const hashedApiKey = createHash('sha256').update(apiKey).digest('hex');

const isLruCacheEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_LRU_CACHE_ENABLED,
defaultValue: false,
environment: { _id: 'system' },
component: 'api-key-auth',
});

if (isLruCacheEnabled) {
const cached = apiKeyUserCache.get(hashedApiKey);
if (cached) {
await this.checkKillSwitch(cached);

return cached;
}

const inflightRequest = apiKeyInflightRequests.get(hashedApiKey);
if (inflightRequest) {
return inflightRequest;
const user = await this.inMemoryLRUCacheService.get(
InMemoryLRUCacheStore.API_KEY_USER,
hashedApiKey,
() => this.authService.getUserByApiKey(apiKey),
{
environmentId: 'system',
}
}

const fetchPromise = this.authService
.getUserByApiKey(apiKey)
.then(async (user) => {
if (user && isLruCacheEnabled) {
apiKeyUserCache.set(hashedApiKey, user);
}

if (user) {
await this.checkKillSwitch(user);
}

return user;
})
.finally(() => {
if (isLruCacheEnabled) {
apiKeyInflightRequests.delete(hashedApiKey);
}
});
);

if (isLruCacheEnabled) {
apiKeyInflightRequests.set(hashedApiKey, fetchPromise);
if (user) {
await this.checkKillSwitch(user);
}

return fetchPromise;
return user;
}

private async checkKillSwitch(user: UserSessionData): Promise<void> {
Expand Down
2 changes: 2 additions & 0 deletions apps/api/src/app/environments-v1/novu-bridge.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
FeatureFlagsService,
GetDecryptedSecretKey,
GetLayoutUseCase as GetLayoutUseCaseV1,
InMemoryLRUCacheService,
TraceLogRepository,
} from '@novu/application-generic';

Expand Down Expand Up @@ -84,6 +85,7 @@ export const featureFlagsService = {
ClickHouseService,
CreateExecutionDetails,
featureFlagsService,
InMemoryLRUCacheService,
],
})
export class NovuBridgeModule {}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { Injectable, InternalServerErrorException } from '@nestjs/common';
import {
emailControlSchema,
FeatureFlagsService,
InMemoryLRUCacheService,
InMemoryLRUCacheStore,
Instrument,
InstrumentUsecase,
PinoLogger,
Expand All @@ -16,16 +18,9 @@ import {
} from '@novu/dal';
import { workflow } from '@novu/framework/express';
import { ActionStep, ChannelStep, PostActionEnum, Schema, Step, StepOutput, Workflow } from '@novu/framework/internal';
import {
EnvironmentTypeEnum,
FeatureFlagsKeysEnum,
LAYOUT_PREVIEW_EMAIL_STEP,
LAYOUT_PREVIEW_WORKFLOW_ID,
StepTypeEnum,
} from '@novu/shared';
import { EnvironmentTypeEnum, LAYOUT_PREVIEW_EMAIL_STEP, LAYOUT_PREVIEW_WORKFLOW_ID, StepTypeEnum } from '@novu/shared';
import { AdditionalOperation, RulesLogic } from 'json-logic-js';
import _ from 'lodash';
import { LRUCache } from 'lru-cache';
import { evaluateRules } from '../../../shared/services/query-parser/query-parser.service';
import { isMatchingJsonSchema } from '../../../workflows-v2/util/jsonToSchema';
import {
Expand All @@ -43,19 +38,6 @@ import { ConstructFrameworkWorkflowCommand } from './construct-framework-workflo

const LOG_CONTEXT = 'ConstructFrameworkWorkflow';

const workflowCache = new LRUCache<string, NotificationTemplateEntity>({
max: 1000,
ttl: 1000 * 60,
});

const organizationCache = new LRUCache<string, OrganizationEntity>({
max: 500,
ttl: 1000 * 60,
});

const workflowInflightRequests = new Map<string, Promise<NotificationTemplateEntity>>();
const organizationInflightRequests = new Map<string, Promise<OrganizationEntity | undefined>>();

@Injectable()
export class ConstructFrameworkWorkflow {
constructor(
Expand All @@ -71,7 +53,8 @@ export class ConstructFrameworkWorkflow {
private delayOutputRendererUseCase: DelayOutputRendererUsecase,
private digestOutputRendererUseCase: DigestOutputRendererUsecase,
private throttleOutputRendererUseCase: ThrottleOutputRendererUsecase,
private featureFlagsService: FeatureFlagsService
private featureFlagsService: FeatureFlagsService,
private inMemoryLRUCacheService: InMemoryLRUCacheService
) {}

@InstrumentUsecase()
Expand Down Expand Up @@ -391,102 +374,52 @@ export class ConstructFrameworkWorkflow {
workflowId: string,
shouldUseCache: boolean
): Promise<NotificationTemplateEntity> {
const cacheKey = `${environmentId}:${workflowId}`;

const isFeatureEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_LRU_CACHE_ENABLED,
defaultValue: false,
environment: { _id: environmentId },
component: 'bridge-workflow',
});

const useCache = shouldUseCache && isFeatureEnabled;

if (useCache) {
const cached = workflowCache.get(cacheKey);
if (cached) {
return cached;
}

const inflightRequest = workflowInflightRequests.get(cacheKey);
if (inflightRequest) {
return inflightRequest;
}
}

const fetchPromise = this.workflowsRepository
.findByTriggerIdentifier(environmentId, workflowId, null, false)
.then((foundWorkflow) => {
const workflow = await this.inMemoryLRUCacheService.get(
InMemoryLRUCacheStore.WORKFLOW,
`${environmentId}:${workflowId}`,
async () => {
const foundWorkflow = await this.workflowsRepository.findByTriggerIdentifier(
environmentId,
workflowId,
null,
false
);
if (!foundWorkflow) {
throw new InternalServerErrorException(`Workflow ${workflowId} not found`);
}

if (useCache) {
workflowCache.set(cacheKey, foundWorkflow);
}

return foundWorkflow;
})
.finally(() => {
if (useCache) {
workflowInflightRequests.delete(cacheKey);
}
});
},
{
environmentId,
skipCache: !shouldUseCache,
}
);

if (useCache) {
workflowInflightRequests.set(cacheKey, fetchPromise);
if (!workflow) {
throw new InternalServerErrorException(`Workflow ${workflowId} not found`);
}

return fetchPromise;
return workflow;
}

private async getOrganization(
organizationId: string,
shouldUseCache: boolean,
environmentId: string
): Promise<OrganizationEntity | undefined> {
const isFeatureEnabled = await this.featureFlagsService.getFlag({
key: FeatureFlagsKeysEnum.IS_LRU_CACHE_ENABLED,
defaultValue: false,
environment: { _id: environmentId },
organization: { _id: organizationId },
component: 'bridge-org',
});

const useCache = shouldUseCache && isFeatureEnabled;

if (useCache) {
const cached = organizationCache.get(organizationId);
if (cached) {
return cached;
}

const inflightRequest = organizationInflightRequests.get(organizationId);
if (inflightRequest) {
return inflightRequest;
const organization = await this.inMemoryLRUCacheService.get(
InMemoryLRUCacheStore.ORGANIZATION,
organizationId,
() => this.communityOrganizationRepository.findById(organizationId),
{
environmentId,
organizationId,
skipCache: !shouldUseCache,
}
}

const fetchPromise = this.communityOrganizationRepository
.findById(organizationId)
.then((organization) => {
if (organization && useCache) {
organizationCache.set(organizationId, organization);
}

return organization || undefined;
})
.finally(() => {
if (useCache) {
organizationInflightRequests.delete(organizationId);
}
});

if (useCache) {
organizationInflightRequests.set(organizationId, fetchPromise);
}
);

return fetchPromise;
return organization || undefined;
}

private async processSkipOption(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
ExecuteBridgeRequestCommand,
ExecuteBridgeRequestDto,
FeatureFlagsService,
InMemoryLRUCacheService,
InMemoryLRUCacheStore,
Instrument,
InstrumentUsecase,
IWorkflowDataDto,
Expand Down Expand Up @@ -35,7 +37,6 @@ import {
} from '@novu/shared';
import Ajv, { ValidateFunction } from 'ajv';
import addFormats from 'ajv-formats';
import { LRUCache } from 'lru-cache';
import { generateTransactionId } from '../../../shared/helpers/generate-transaction-id';
import { PayloadValidationException } from '../../exceptions/payload-validation-exception';
import { RecipientSchema, RecipientsSchema } from '../../utils/trigger-recipient-validation';
Expand All @@ -51,27 +52,10 @@ const ajv = new Ajv({
});
addFormats(ajv);

const validatorCache = new LRUCache<string, ValidateFunction>({
max: 5000,
ttl: 1000 * 60 * 60,
});

function getSchemaHash(schema: object): string {
return createHash('sha256').update(JSON.stringify(schema)).digest('hex');
}

function getCompiledValidator(schema: object): ValidateFunction {
const hash = getSchemaHash(schema);
let validate = validatorCache.get(hash);

if (!validate) {
validate = ajv.compile(schema);
validatorCache.set(hash, validate);
}

return validate;
}

@Injectable()
export class ParseEventRequest {
constructor(
Expand All @@ -84,7 +68,8 @@ export class ParseEventRequest {
private logger: PinoLogger,
private featureFlagService: FeatureFlagsService,
private traceLogRepository: TraceLogRepository,
protected moduleRef: ModuleRef
protected moduleRef: ModuleRef,
private inMemoryLRUCacheService: InMemoryLRUCacheService
) {
this.logger.setContext(this.constructor.name);
}
Expand Down Expand Up @@ -489,7 +474,7 @@ export class ParseEventRequest {

@Instrument()
private validateAndApplyPayloadDefaults(payload: Record<string, unknown>, schema: object): Record<string, unknown> {
const validate = getCompiledValidator(schema);
const validate = this.getCompiledValidator(schema);
const payloadWithDefaults = JSON.parse(JSON.stringify(payload));
const valid = validate(payloadWithDefaults);

Expand All @@ -499,4 +484,16 @@ export class ParseEventRequest {

return payloadWithDefaults;
}

private getCompiledValidator(schema: object): ValidateFunction {
const hash = getSchemaHash(schema);
let validate = this.inMemoryLRUCacheService.getIfCached(InMemoryLRUCacheStore.VALIDATOR, hash) as ValidateFunction;

if (!validate) {
validate = ajv.compile(schema);
this.inMemoryLRUCacheService.set(InMemoryLRUCacheStore.VALIDATOR, hash, validate);
}

return validate;
}
}
Loading
Loading