Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions src/pipeline/steps/fetch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import { afterEach, describe, expect, it, vi } from 'vitest';
import { CliError } from '../../errors.js';
import type { IPage } from '../../types.js';
import { stepFetch } from './fetch.js';

afterEach(() => {
vi.restoreAllMocks();
vi.unstubAllGlobals();
});

describe('stepFetch', () => {
// W1 + W4: non-browser single fetch throws CliError with FETCH_ERROR code and full message
it('throws CliError with FETCH_ERROR code on non-ok responses without a browser session', async () => {
const jsonMock = vi.fn().mockResolvedValue({ error: 'rate limited' });
const fetchMock = vi.fn().mockResolvedValue({
ok: false,
status: 429,
statusText: 'Too Many Requests',
json: jsonMock,
});
vi.stubGlobal('fetch', fetchMock);

const err = await stepFetch(null, { url: 'https://api.example.com/items' }, null, {}).catch((e: unknown) => e);
expect(err).toBeInstanceOf(CliError);
expect((err as CliError).code).toBe('FETCH_ERROR');
expect((err as CliError).message).toBe('HTTP 429 Too Many Requests from https://api.example.com/items');
expect(jsonMock).not.toHaveBeenCalled();
});

// W1 + W3: browser single fetch returns error status from evaluate, outer code throws CliError
it('throws CliError with FETCH_ERROR code on non-ok responses inside the browser session', async () => {
const jsonMock = vi.fn().mockResolvedValue({ error: 'auth required' });
const fetchMock = vi.fn().mockResolvedValue({
ok: false,
status: 401,
statusText: 'Unauthorized',
json: jsonMock,
});
vi.stubGlobal('fetch', fetchMock);

// Simulate real CDP behavior: evaluate returns a value, errors are thrown outside
const page = {
evaluate: vi.fn(async (js: string) => Function(`return (${js})`)()()),
} as unknown as IPage;

const err = await stepFetch(page, { url: 'https://api.example.com/items' }, null, {}).catch((e: unknown) => e);
expect(err).toBeInstanceOf(CliError);
expect((err as CliError).code).toBe('FETCH_ERROR');
expect((err as CliError).message).toBe('HTTP 401 Unauthorized from https://api.example.com/items');
expect(jsonMock).not.toHaveBeenCalled();
});

it('returns per-item HTTP errors for batch fetches without a browser session', async () => {
const jsonMock = vi.fn().mockResolvedValue({ error: 'upstream unavailable' });
const fetchMock = vi.fn().mockResolvedValue({
ok: false,
status: 503,
statusText: 'Service Unavailable',
json: jsonMock,
});
vi.stubGlobal('fetch', fetchMock);

await expect(stepFetch(
null,
{ url: 'https://api.example.com/items/${{ item.id }}' },
[{ id: 1 }],
{},
)).resolves.toEqual([
{ error: 'HTTP 503 Service Unavailable from https://api.example.com/items/1' },
]);
expect(jsonMock).not.toHaveBeenCalled();
});

it('returns per-item HTTP errors for batch browser fetches', async () => {
const jsonMock = vi.fn().mockResolvedValue({ error: 'upstream unavailable' });
const fetchMock = vi.fn().mockResolvedValue({
ok: false,
status: 503,
statusText: 'Service Unavailable',
json: jsonMock,
});
vi.stubGlobal('fetch', fetchMock);

const page = {
evaluate: vi.fn(async (js: string) => Function(`return (${js})`)()()),
} as unknown as IPage;

await expect(stepFetch(
page,
{ url: 'https://api.example.com/items/${{ item.id }}' },
[{ id: 1 }],
{},
)).resolves.toEqual([
{ error: 'HTTP 503 Service Unavailable from https://api.example.com/items/1' },
]);
expect(jsonMock).not.toHaveBeenCalled();
});

it('stringifies non-Error batch browser failures consistently', async () => {
vi.stubGlobal('fetch', vi.fn().mockRejectedValue('socket hang up'));

const page = {
evaluate: vi.fn(async (js: string) => Function(`return (${js})`)()()),
} as unknown as IPage;

await expect(stepFetch(
page,
{ url: 'https://api.example.com/items/${{ item.id }}' },
[{ id: 1 }],
{},
)).resolves.toEqual([
{ error: 'socket hang up' },
]);
});

it('stringifies non-Error batch non-browser failures consistently', async () => {
vi.stubGlobal('fetch', vi.fn().mockRejectedValue('socket hang up'));

await expect(stepFetch(
null,
{ url: 'https://api.example.com/items/${{ item.id }}' },
[{ id: 1 }],
{},
)).resolves.toEqual([
{ error: 'socket hang up' },
]);
});

// W2: batch item failures emit a warning log
it('logs a warning for each failed batch item in non-browser mode', async () => {
const { log } = await import('../../logger.js');
const warnSpy = vi.spyOn(log, 'warn');

vi.stubGlobal('fetch', vi.fn().mockResolvedValue({
ok: false,
status: 503,
statusText: 'Service Unavailable',
json: vi.fn(),
}));

await stepFetch(
null,
{ url: 'https://api.example.com/items/${{ item.id }}' },
[{ id: 1 }, { id: 2 }],
{},
);

expect(warnSpy).toHaveBeenCalledTimes(2);
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('https://api.example.com/items/1'));
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('https://api.example.com/items/2'));
});

it('logs a warning for each failed batch item in browser mode', async () => {
const { log } = await import('../../logger.js');
const warnSpy = vi.spyOn(log, 'warn');

vi.stubGlobal('fetch', vi.fn().mockResolvedValue({
ok: false,
status: 502,
statusText: 'Bad Gateway',
json: vi.fn(),
}));

const page = {
evaluate: vi.fn(async (js: string) => Function(`return (${js})`)()()),
} as unknown as IPage;

await stepFetch(
page,
{ url: 'https://api.example.com/items/${{ item.id }}' },
[{ id: 1 }, { id: 2 }],
{},
);

expect(warnSpy).toHaveBeenCalledTimes(2);
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('https://api.example.com/items/1'));
expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining('https://api.example.com/items/2'));
});
});
40 changes: 36 additions & 4 deletions src/pipeline/steps/fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
* Pipeline step: fetch — HTTP API requests.
*/

import { CliError, getErrorMessage } from '../../errors.js';
import { log } from '../../logger.js';
import type { IPage } from '../../types.js';
import { render } from '../template.js';

Expand All @@ -28,20 +30,33 @@ async function fetchSingle(

if (page === null) {
const resp = await fetch(finalUrl, { method: method.toUpperCase(), headers: renderedHeaders });
if (!resp.ok) {
throw new CliError('FETCH_ERROR', `HTTP ${resp.status} ${resp.statusText} from ${finalUrl}`);
}
return resp.json();
}

const headersJs = JSON.stringify(renderedHeaders);
const urlJs = JSON.stringify(finalUrl);
const methodJs = JSON.stringify(method.toUpperCase());
return page.evaluate(`
// Return error status instead of throwing inside evaluate to avoid CDP wrapper
// rewriting the message (CDP prepends "Evaluate error: " to thrown errors).
const result = await page.evaluate(`
async () => {
const resp = await fetch(${urlJs}, {
method: ${methodJs}, headers: ${headersJs}, credentials: "include"
});
if (!resp.ok) {
return { __httpError: resp.status, statusText: resp.statusText };
}
return await resp.json();
}
`);
if (result && typeof result === 'object' && '__httpError' in result) {
const { __httpError: status, statusText } = result as { __httpError: number; statusText: string };
throw new CliError('FETCH_ERROR', `HTTP ${status} ${statusText} from ${finalUrl}`);
}
return result;
}

/**
Expand Down Expand Up @@ -71,9 +86,13 @@ async function fetchBatchInBrowser(
const i = idx++;
try {
const resp = await fetch(urls[i], { method, headers, credentials: "include" });
if (!resp.ok) {
throw new Error('HTTP ' + resp.status + ' ' + resp.statusText + ' from ' + urls[i]);
}
results[i] = await resp.json();
} catch (e) {
results[i] = { error: e.message };
results[i] = { error: e instanceof Error ? e.message : String(e) };
// Note: getErrorMessage() is a Node.js utility — can't use it inside evaluate()
}
}
}
Expand Down Expand Up @@ -114,13 +133,26 @@ export async function stepFetch(page: IPage | null, params: unknown, data: unkno

// BATCH IPC: if browser is available, batch all fetches into a single evaluate() call
if (page !== null) {
return fetchBatchInBrowser(page, urls, method.toUpperCase(), renderedHeaders, concurrency);
const results = await fetchBatchInBrowser(page, urls, method.toUpperCase(), renderedHeaders, concurrency);
for (let i = 0; i < results.length; i++) {
const r = results[i];
if (r && typeof r === 'object' && 'error' in r) {
log.warn(`Batch fetch failed for ${urls[i]}: ${(r as { error: string }).error}`);
}
}
return results;
}

// Non-browser: use concurrent pool (already optimized)
return mapConcurrent(data, concurrency, async (item, index) => {
const itemUrl = String(render(urlTemplate, { args, data, item, index }));
return fetchSingle(null, itemUrl, method, queryParams, headers, args, data);
try {
return await fetchSingle(null, itemUrl, method, queryParams, headers, args, data);
} catch (error) {
const message = getErrorMessage(error);
log.warn(`Batch fetch failed for ${itemUrl}: ${message}`);
return { error: message };
}
});
}
const url = render(urlOrObj, { args, data });
Expand Down