Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

73 changes: 73 additions & 0 deletions services/apps/automatic_projects_discovery_worker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Automatic Projects Discovery Worker

Temporal worker that discovers open-source projects from external data sources and writes them to the `projectCatalog` table.

## Architecture

### Source abstraction

Every data source implements the `IDiscoverySource` interface (`src/sources/types.ts`):

| Method | Purpose |
| ----------------------------- | --------------------------------------------------------------------------- |
| `listAvailableDatasets()` | Returns available dataset snapshots, sorted newest-first |
| `fetchDatasetStream(dataset)` | Returns a readable stream for the dataset (e.g. HTTP response) |
| `parseRow(rawRow)` | Converts a raw CSV/JSON row into a `IDiscoverySourceRow`, or `null` to skip |

Sources are registered in `src/sources/registry.ts` as a simple name → factory map.

**To add a new source:** create a class implementing `IDiscoverySource`, then add one line to the registry.

### Current sources

| Name | Folder | Description |
| ------------------------ | ------------------------------------- | ------------------------------------------------------------------------------------ |
| `ossf-criticality-score` | `src/sources/ossf-criticality-score/` | OSSF Criticality Score snapshots from a public GCS bucket (~750K repos per snapshot) |

### Workflow

```
discoverProjects({ mode: 'incremental' | 'full' })
├─ Activity: listDatasets(sourceName)
│ → returns dataset descriptors sorted newest-first
├─ Selection: incremental → latest only, full → all datasets
└─ For each dataset:
└─ Activity: processDataset(sourceName, dataset)
→ HTTP stream → csv-parse → batches of 5000 → bulkUpsertProjectCatalog
```

### Timeouts

| Activity | startToCloseTimeout | retries |
| ------------------ | ------------------- | ------- |
| `listDatasets` | 2 min | 3 |
| `processDataset` | 30 min | 3 |
| Workflow execution | 2 hours | 3 |

### Schedule

Runs daily at midnight via Temporal cron (`0 0 * * *`).

## File structure

```
src/
├── main.ts # Service bootstrap (postgres enabled)
├── activities.ts # Barrel re-export
├── workflows.ts # Barrel re-export
├── activities/
│ └── activities.ts # listDatasets, processDataset
├── workflows/
│ └── discoverProjects.ts # Orchestration with mode selection
├── schedules/
│ └── scheduleProjectsDiscovery.ts # Temporal cron schedule
└── sources/
├── types.ts # IDiscoverySource, IDatasetDescriptor
├── registry.ts # Source factory map
└── ossf-criticality-score/
├── source.ts # IDiscoverySource implementation
└── bucketClient.ts # GCS public bucket HTTP client
```
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@crowd/automatic-projects-discovery-worker",
"scripts": {
"start": "CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker tsx src/main.ts",
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker tsx --inspect=0.0.0.0:9232 src/main.ts",
"start:debug": "CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts",
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
"dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug",
Expand All @@ -24,6 +24,7 @@
"@temporalio/activity": "~1.11.8",
"@temporalio/client": "~1.11.8",
"@temporalio/workflow": "~1.11.8",
"csv-parse": "^5.5.6",
"tsx": "^4.7.1",
"typescript": "^5.6.3"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
export * from './activities/activities'
import { listDatasets, listSources, processDataset } from './activities/activities'

export { listDatasets, listSources, processDataset }
Original file line number Diff line number Diff line change
@@ -1,7 +1,119 @@
import { parse } from 'csv-parse'

import { bulkUpsertProjectCatalog } from '@crowd/data-access-layer'
import { IDbProjectCatalogCreate } from '@crowd/data-access-layer/src/project-catalog/types'
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { getServiceLogger } from '@crowd/logging'

import { svc } from '../main'
import { getAvailableSourceNames, getSource } from '../sources/registry'
import { IDatasetDescriptor } from '../sources/types'

const log = getServiceLogger()

export async function logDiscoveryRun(): Promise<void> {
log.info('Automatic projects discovery workflow executed successfully.')
const BATCH_SIZE = 5000

export async function listSources(): Promise<string[]> {
return getAvailableSourceNames()
}

export async function listDatasets(sourceName: string): Promise<IDatasetDescriptor[]> {
const source = getSource(sourceName)
const datasets = await source.listAvailableDatasets()

log.info({ sourceName, count: datasets.length, newest: datasets[0]?.id }, 'Datasets listed.')

return datasets
}

export async function processDataset(
sourceName: string,
dataset: IDatasetDescriptor,
): Promise<void> {
const qx = pgpQx(svc.postgres.writer.connection())
const startTime = Date.now()

log.info({ sourceName, datasetId: dataset.id, url: dataset.url }, 'Processing dataset...')

const source = getSource(sourceName)
const stream = await source.fetchDatasetStream(dataset)

stream.on('error', (err: Error) => {
log.error({ datasetId: dataset.id, error: err.message }, 'Stream error.')
})

// For CSV sources: pipe through csv-parse to get Record<string, string> objects.
// For JSON sources: the stream already emits pre-parsed objects in object mode.
const records =
source.format === 'json'
? stream
: stream.pipe(
parse({
columns: true,
skip_empty_lines: true,
trim: true,
}),
)

if (source.format !== 'json') {
;(records as ReturnType<typeof parse>).on('error', (err) => {
log.error({ datasetId: dataset.id, error: err.message }, 'CSV parser error.')
})
}

let batch: IDbProjectCatalogCreate[] = []
let totalProcessed = 0
let totalSkipped = 0
let batchNumber = 0
let totalRows = 0

for await (const rawRow of records) {
totalRows++

const parsed = source.parseRow(rawRow as Record<string, unknown>)
if (!parsed) {
totalSkipped++
continue
}

batch.push({
projectSlug: parsed.projectSlug,
repoName: parsed.repoName,
repoUrl: parsed.repoUrl,
ossfCriticalityScore: parsed.ossfCriticalityScore,
lfCriticalityScore: parsed.lfCriticalityScore,
})

if (batch.length >= BATCH_SIZE) {
batchNumber++

await bulkUpsertProjectCatalog(qx, batch)
totalProcessed += batch.length
batch = []

log.info({ totalProcessed, batchNumber, datasetId: dataset.id }, 'Batch upserted.')
}
}

// Flush remaining rows that didn't fill a complete batch
if (batch.length > 0) {
batchNumber++
await bulkUpsertProjectCatalog(qx, batch)
totalProcessed += batch.length
}

const elapsedSeconds = ((Date.now() - startTime) / 1000).toFixed(1)

log.info(
{
sourceName,
datasetId: dataset.id,
totalRows,
totalProcessed,
totalSkipped,
totalBatches: batchNumber,
elapsedSeconds,
},
'Dataset processing complete.',
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const config: Config = {

const options: Options = {
postgres: {
enabled: false,
enabled: true,
},
opensearch: {
enabled: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,15 @@ import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/clien
import { svc } from '../main'
import { discoverProjects } from '../workflows'

const DEFAULT_CRON = '0 2 * * *' // Daily at 2:00 AM

export const scheduleProjectsDiscovery = async () => {
const cronExpression = process.env.CROWD_AUTOMATIC_PROJECTS_DISCOVERY_CRON || DEFAULT_CRON

svc.log.info(`Scheduling projects discovery with cron: ${cronExpression}`)
svc.log.info(`Scheduling projects discovery`)

try {
await svc.temporal.schedule.create({
scheduleId: 'automaticProjectsDiscovery',
spec: {
cronExpressions: [cronExpression],
// Run every day at midnight
cronExpressions: ['0 0 * * *'],
},
policies: {
overlap: ScheduleOverlapPolicy.SKIP,
Expand All @@ -24,6 +21,8 @@ export const scheduleProjectsDiscovery = async () => {
type: 'startWorkflow',
workflowType: discoverProjects,
taskQueue: 'automatic-projects-discovery',
args: [{ mode: 'incremental' as const }],
workflowExecutionTimeout: '2 hours',
retry: {
initialInterval: '15 seconds',
backoffCoefficient: 2,
Expand Down
Loading
Loading