Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,23 @@ export class SequencerPublisherFactory {
const rollup = this.deps.rollupContract;
const slashingProposerContract = await rollup.getSlashingProposer();

const getNextPublisher = async (excludeAddresses: EthAddress[]): Promise<L1TxUtils | undefined> => {
const exclusionFilter: PublisherFilter<L1TxUtils> = (utils: L1TxUtils) => {
if (excludeAddresses.some(addr => addr.equals(utils.getSenderAddress()))) {
return false;
}
return filter(utils);
};
try {
return await this.deps.publisherManager.getAvailablePublisher(exclusionFilter);
} catch {
return undefined;
}
};

const publisher = new SequencerPublisher(this.sequencerConfig, {
l1TxUtils: l1Publisher,
getNextPublisher,
telemetry: this.deps.telemetry,
blobClient: this.deps.blobClient,
rollupContract: this.deps.rollupContract,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from '@aztec/ethereum/l1-tx-utils';
import { FormattedViemError } from '@aztec/ethereum/utils';
import { BlockNumber, EpochNumber, SlotNumber } from '@aztec/foundation/branded-types';
import { TimeoutError } from '@aztec/foundation/error';
import { EthAddress } from '@aztec/foundation/eth-address';
import { sleep } from '@aztec/foundation/sleep';
import { TestDateProvider } from '@aztec/foundation/timer';
Expand Down Expand Up @@ -299,6 +300,140 @@ describe('SequencerPublisher', () => {
expect(result).toEqual(undefined);
});

describe('publisher rotation on send failure', () => {
let secondL1TxUtils: MockProxy<L1TxUtils>;
let getNextPublisher: jest.MockedFunction<(excludeAddresses: EthAddress[]) => Promise<L1TxUtils | undefined>>;
let rotatingPublisher: SequencerPublisher;

beforeEach(() => {
secondL1TxUtils = mock<L1TxUtils>();
secondL1TxUtils.getBlockNumber.mockResolvedValue(1n);
secondL1TxUtils.getSenderAddress.mockReturnValue(EthAddress.random());
secondL1TxUtils.getSenderBalance.mockResolvedValue(1000n);

getNextPublisher = jest.fn();

const epochCache = mock<EpochCache>();
epochCache.getEpochAndSlotNow.mockReturnValue({
epoch: EpochNumber(1),
slot: SlotNumber(2),
ts: 3n,
nowMs: 3000n,
});
epochCache.getCommittee.mockResolvedValue({
committee: [],
seed: 1n,
epoch: EpochNumber(1),
isEscapeHatchOpen: false,
});

rotatingPublisher = new SequencerPublisher({ ethereumSlotDuration: 12, l1ChainId: 1 } as any, {
blobClient,
rollupContract: rollup,
l1TxUtils,
epochCache,
slashingProposerContract,
governanceProposerContract,
slashFactoryContract,
dateProvider: new TestDateProvider(),
metrics: l1Metrics,
lastActions: {},
getNextPublisher,
});
});

it('rotates to next publisher when forward throws and retries successfully', async () => {
forwardSpy
.mockRejectedValueOnce(new Error('RPC error'))
.mockResolvedValueOnce({ receipt: proposeTxReceipt, errorMsg: undefined });
getNextPublisher.mockResolvedValueOnce(secondL1TxUtils);

await rotatingPublisher.enqueueProposeCheckpoint(
new Checkpoint(l2Block.archive, header, [l2Block], l2Block.checkpointNumber),
CommitteeAttestationsAndSigners.empty(),
Signature.empty(),
);
const result = await rotatingPublisher.sendRequests();

expect(forwardSpy).toHaveBeenCalledTimes(2);
// First call uses original publisher, second uses the rotated one
expect(forwardSpy).toHaveBeenNthCalledWith(
1,
expect.anything(),
l1TxUtils,
expect.anything(),
expect.anything(),
expect.anything(),
expect.anything(),
);
expect(forwardSpy).toHaveBeenNthCalledWith(
2,
expect.anything(),
secondL1TxUtils,
expect.anything(),
expect.anything(),
expect.anything(),
expect.anything(),
);
expect(getNextPublisher).toHaveBeenCalledWith([l1TxUtils.getSenderAddress()]);
// Result is defined (rotation succeeded and tx was sent)
expect(result).toBeDefined();
expect(result?.sentActions).toContain('propose');
// l1TxUtils updated to the one that succeeded
expect(rotatingPublisher.l1TxUtils).toBe(secondL1TxUtils);
});

it('does not rotate on TimeoutError, re-throws instead', async () => {
forwardSpy.mockRejectedValueOnce(new TimeoutError('timed out'));

await rotatingPublisher.enqueueProposeCheckpoint(
new Checkpoint(l2Block.archive, header, [l2Block], l2Block.checkpointNumber),
CommitteeAttestationsAndSigners.empty(),
Signature.empty(),
);
// TimeoutError propagates to the outer catch in sendRequests which returns undefined
const result = await rotatingPublisher.sendRequests();

expect(result).toBeUndefined();
expect(getNextPublisher).not.toHaveBeenCalled();
expect(forwardSpy).toHaveBeenCalledTimes(1);
});

it('returns undefined when all publishers are exhausted', async () => {
forwardSpy
.mockRejectedValueOnce(new Error('RPC error on first'))
.mockRejectedValueOnce(new Error('RPC error on second'));
getNextPublisher.mockResolvedValueOnce(secondL1TxUtils).mockResolvedValueOnce(undefined);

await rotatingPublisher.enqueueProposeCheckpoint(
new Checkpoint(l2Block.archive, header, [l2Block], l2Block.checkpointNumber),
CommitteeAttestationsAndSigners.empty(),
Signature.empty(),
);
const result = await rotatingPublisher.sendRequests();

expect(forwardSpy).toHaveBeenCalledTimes(2);
expect(getNextPublisher).toHaveBeenCalledTimes(2);
expect(result).toBeUndefined();
});

it('does not rotate when forward returns a revert (on-chain failure)', async () => {
forwardSpy.mockResolvedValue({ receipt: { ...proposeTxReceipt, status: 'reverted' }, errorMsg: 'revert reason' });

await rotatingPublisher.enqueueProposeCheckpoint(
new Checkpoint(l2Block.archive, header, [l2Block], l2Block.checkpointNumber),
CommitteeAttestationsAndSigners.empty(),
Signature.empty(),
);
const result = await rotatingPublisher.sendRequests();

expect(forwardSpy).toHaveBeenCalledTimes(1);
expect(getNextPublisher).not.toHaveBeenCalled();
// Result contains the reverted receipt (no rotation)
expect(result?.result).toMatchObject({ receipt: { status: 'reverted' } });
});
});

it('does not send propose tx if rollup validation fails', async () => {
l1TxUtils.simulate.mockRejectedValueOnce(new Error('Test error'));

Expand Down
67 changes: 59 additions & 8 deletions yarn-project/sequencer-client/src/publisher/sequencer-publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { toHex as toPaddedHex } from '@aztec/foundation/bigint-buffer';
import { CheckpointNumber, SlotNumber } from '@aztec/foundation/branded-types';
import { pick } from '@aztec/foundation/collection';
import type { Fr } from '@aztec/foundation/curves/bn254';
import { TimeoutError } from '@aztec/foundation/error';
import { EthAddress } from '@aztec/foundation/eth-address';
import { Signature, type ViemSignature } from '@aztec/foundation/eth-signature';
import { type Logger, createLogger } from '@aztec/foundation/log';
Expand Down Expand Up @@ -126,6 +127,9 @@ export class SequencerPublisher {
/** Address to use for simulations in fisherman mode (actual proposer's address) */
private proposerAddressForSimulation?: EthAddress;

/** Optional callback to obtain a replacement publisher when the current one fails to send. */
private getNextPublisher?: (excludeAddresses: EthAddress[]) => Promise<L1TxUtils | undefined>;

/** L1 fee analyzer for fisherman mode */
private l1FeeAnalyzer?: L1FeeAnalyzer;

Expand Down Expand Up @@ -164,6 +168,7 @@ export class SequencerPublisher {
metrics: SequencerPublisherMetrics;
lastActions: Partial<Record<Action, SlotNumber>>;
log?: Logger;
getNextPublisher?: (excludeAddresses: EthAddress[]) => Promise<L1TxUtils | undefined>;
},
) {
this.log = deps.log ?? createLogger('sequencer:publisher');
Expand All @@ -177,6 +182,7 @@ export class SequencerPublisher {
this.metrics = deps.metrics ?? new SequencerPublisherMetrics(telemetry, 'SequencerPublisher');
this.tracer = telemetry.getTracer('SequencerPublisher');
this.l1TxUtils = deps.l1TxUtils;
this.getNextPublisher = deps.getNextPublisher;

this.rollupContract = deps.rollupContract;

Expand Down Expand Up @@ -390,14 +396,10 @@ export class SequencerPublisher {
validRequests: validRequests.map(request => request.action),
txConfig,
});
const result = await Multicall3.forward(
validRequests.map(request => request.request),
this.l1TxUtils,
txConfig,
blobConfig,
this.rollupContract.address,
this.log,
);
const result = await this.forwardWithPublisherRotation(validRequests, txConfig, blobConfig);
if (result === undefined) {
return undefined;
}
const { successfulActions = [], failedActions = [] } = this.callbackBundledTransactions(validRequests, result);
return { result, expiredActions, sentActions: validActions, successfulActions, failedActions };
} catch (err) {
Expand All @@ -416,6 +418,55 @@ export class SequencerPublisher {
}
}

/**
* Forwards transactions via Multicall3, rotating to the next available publisher if a send
* failure occurs (i.e. the tx never reached the chain).
* On-chain reverts and simulation errors are returned as-is without rotation.
*/
private async forwardWithPublisherRotation(
validRequests: RequestWithExpiry[],
txConfig: RequestWithExpiry['gasConfig'],
blobConfig: L1BlobInputs | undefined,
) {
const triedAddresses: EthAddress[] = [];
let currentPublisher = this.l1TxUtils;

while (true) {
triedAddresses.push(currentPublisher.getSenderAddress());
try {
const result = await Multicall3.forward(
validRequests.map(r => r.request),
currentPublisher,
txConfig,
blobConfig,
this.rollupContract.address,
this.log,
);
this.l1TxUtils = currentPublisher;
return result;
} catch (err) {
if (err instanceof TimeoutError) {
throw err;
}
const viemError = formatViemError(err);
if (!this.getNextPublisher) {
this.log.error('Failed to publish bundled transactions', viemError);
return undefined;
}
this.log.warn(
`Publisher ${currentPublisher.getSenderAddress()} failed to send, rotating to next publisher`,
viemError,
);
const nextPublisher = await this.getNextPublisher([...triedAddresses]);
if (!nextPublisher) {
this.log.error('All available publishers exhausted, failed to publish bundled transactions');
return undefined;
}
currentPublisher = nextPublisher;
}
}
}

private callbackBundledTransactions(
requests: RequestWithExpiry[],
result?: { receipt: TransactionReceipt } | FormattedViemError,
Expand Down
Loading