Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 63 additions & 38 deletions crates/driver/src/domain/competition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ impl Competition {
}

/// Solve an auction as part of this competition.
pub async fn solve(&self, request: Request<Body>) -> Result<Option<Solved>, Error> {
pub async fn solve(&self, request: Request<Body>) -> Result<Vec<Solved>, Error> {
let start = Instant::now();
let timer = ::observe::metrics::metrics()
.on_auction_overhead_start("driver", "pre_processing_total");
Expand Down Expand Up @@ -344,7 +344,7 @@ impl Competition {

if auction.orders.is_empty() {
tracing::info!("no orders left after pre-processing; skipping solving");
return Ok(None);
return Ok(vec![]);
}

let auction = &auction;
Expand Down Expand Up @@ -498,59 +498,78 @@ impl Competition {
observe::score(settlement, score);
}

// Pick the best-scoring settlement.
let (mut score, settlement) = scores
// Build scored settlements sorted best-first.
let scored: Vec<(Solved, Settlement)> = scores
.into_iter()
.max_by_key(|(score, _)| score.to_owned())
.sorted_by(|(a, _), (b, _)| b.cmp(a))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the reason to switch away from max_by_key to avoid cloning the score? Otherwise I find max_by_key easier to understand by just reading the code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_by_key give 1 element (the max), but now we actually want a vector of N best solutions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, right. Then sorted_by_key would be easier to read, no?

            .sorted_by_key(|(score, _)| std::cmp::Reverse(*score))

.map(|(score, settlement)| {
(
Solved {
id: settlement.solution().clone(),
score,
trades: settlement.orders(),
prices: settlement.prices(),
gas: Some(settlement.gas.estimate),
},
settlement,
)
let solved = Solved {
id: settlement.solution().clone(),
score,
trades: settlement.orders(),
prices: settlement.prices(),
gas: Some(settlement.gas.estimate),
};
(solved, settlement)
})
.unzip();
.collect();

let Some(settlement) = settlement else {
// Don't wait for the deadline because we can't produce a solution anyway.
return Ok(score);
};
let solution_id = settlement.solution().get();
if scored.is_empty() {
return Ok(vec![]);
}

let max_to_propose = self.solver.max_solutions_to_propose();
let scored: Vec<(Solved, Settlement)> = scored.into_iter().take(max_to_propose).collect();

// Cache all settlements so they can be revealed/settled later.
// Use a multiple of max_to_propose so solutions from previous
// overlapping auctions survive until their /settle completes.
{
let mut lock = self.settlements.lock().unwrap();
lock.push_front(settlement.clone());

/// Number of solutions that may be cached at most.
const MAX_SOLUTION_STORAGE: usize = 5;
lock.truncate(MAX_SOLUTION_STORAGE);
for (_, settlement) in &scored {
lock.push_front(settlement.clone());
}
Comment on lines +529 to +531
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder if the following would be faster:

  • keeping scored as a vecdeque from the start
  • extending scored with settlements
  • replacing settlements with scored

lock.truncate(max_to_propose * 5);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think a name for this magic number would be helpful. Maybe MAX_CONCURRENT_AUCTIONS?

}

// Re-simulate the solution on every new block until the deadline ends to make
// sure we actually submit a working solution close to when the winner
// gets picked by the procotol.
// Re-simulate all solutions on every new block until the deadline ends to
// make sure we only propose solutions that are still working when the
// winner gets picked by the protocol.
let mut voided: HashSet<u64> = HashSet::new();
if let Ok(remaining) = deadline.remaining() {
let score_ref = &mut score;
let has_haircut = settlement.has_haircut();
let voided_ref = &mut voided;
let scored_ref = &scored;
let simulate_on_new_blocks = async move {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to extract this into a function? the nesting level gets deep and it would become easier to read

let mut stream =
ethrpc::block_stream::into_stream(self.eth.current_block().clone());
while let Some(block) = stream.next().await {
if let Err(simulator::Error::Revert(err)) =
self.simulate_settlement(&settlement).await
{
let active: Vec<_> = scored_ref
.iter()
.filter(|(solved, _)| !voided_ref.contains(&solved.id.get()))
.collect();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to .collect() here for join_all to work.

Comment on lines +546 to +549
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't you permanently remove solutions from scored to avoid this?
Or can it be that a solution that fails on block N, can pass again on N+1?


let results: Vec<_> =
futures::future::join_all(active.iter().map(|(solved, settlement)| {
let solution_id = solved.id.get();
async move {
let result = self.simulate_settlement(settlement).await;
(solution_id, settlement, result)
}
}))
.await;

for (solution_id, settlement, result) in results {
let err = match result {
Err(simulator::Error::Revert(err)) => err,
_ => continue,
};
let has_haircut = settlement.has_haircut();
observe::winner_voided(self.solver.name(), block, &err, has_haircut);
*score_ref = None;
voided_ref.insert(solution_id);
self.settlements
.lock()
.unwrap()
.retain(|s| s.solution().get() != solution_id);
Comment on lines +569 to +572
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels a bit weird to lock settlements multiple times in a loop after all the solutions already got invalidated.
WDYT about doing most of this error handling in the future itself and only return Option<SolutionId> from the future. Then you can add all of them to the voided_ref at once and lock self.settlements as the simulation results trickle in.

                    let voided_solutions =
                        futures::future::join_all(active.map(|(solved, settlement)| {
                            let solution_id = solved.id.get();
                            async move {
                                let result = self.simulate_settlement(settlement).await;
                                let err = match result {
                                    Err(simulator::Error::Revert(err)) => err,
                                    _ => return None,
                                };

                                let has_haircut = settlement.has_haircut();
                                observe::winner_voided(self.solver.name(), block, &err, has_haircut);
                                self.settlements
                                    .lock()
                                    .unwrap()
                                    .retain(|s| s.solution().get() != solution_id);
                                if !has_haircut {
                                    notify::simulation_failed(
                                        &self.solver,
                                        auction.id(),
                                        settlement.solution(),
                                        &simulator::Error::Revert(err),
                                        true,
                                    );
                                }
                                Some(solution_id)
                            }
                        }))
                        .await;

                    voided_ref.extend(voided_solutions.into_iter().flatten());

// Only notify solver if solution doesn't have haircut
if !has_haircut {
notify::simulation_failed(
&self.solver,
Expand All @@ -560,14 +579,20 @@ impl Competition {
true,
);
}
return;
}
if voided_ref.len() == scored_ref.len() {
return; // all solutions voided, no point waiting for more blocks
}
}
};
let _ = tokio::time::timeout(remaining, simulate_on_new_blocks).await;
}

Ok(score)
Ok(scored
.into_iter()
.filter(|(solved, _)| !voided.contains(&solved.id.get()))
.map(|(solved, _)| solved)
.collect())
}

// Oders already need to be sorted from most relevant to least relevant so that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use {
};

impl SolveResponse {
pub fn new(solved: Option<competition::Solved>, solver: &Solver) -> Self {
pub fn new(solved: Vec<competition::Solved>, solver: &Solver) -> Self {
let solutions = solved
.into_iter()
.map(|solved| Solution::new(solved.id.get(), solved, solver))
Expand Down
1 change: 1 addition & 0 deletions crates/driver/src/infra/config/file/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ pub async fn load(chain: Chain, path: &Path) -> infra::Config {
)
.await,
forwarder_contract: solver_config.forwarder_contract,
max_solutions_to_propose: solver_config.max_solutions_to_propose,
}
}))
.await,
Expand Down
13 changes: 12 additions & 1 deletion crates/driver/src/infra/config/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
serde::{Deserialize, Deserializer, Serialize},
serde_with::serde_as,
solver::solver::Arn,
std::{collections::HashMap, time::Duration},
std::{collections::HashMap, num::NonZeroUsize, time::Duration},
};

mod load;
Expand Down Expand Up @@ -234,6 +234,10 @@ pub fn default_solving_share_of_deadline() -> f64 {
0.8
}

fn default_max_solutions_to_propose() -> NonZeroUsize {
NonZeroUsize::new(1).unwrap()
}

#[serde_as]
#[derive(Debug, Deserialize)]
#[serde(rename_all = "kebab-case", deny_unknown_fields)]
Expand Down Expand Up @@ -322,6 +326,13 @@ struct SolverConfig {
/// Address of the deployed CowSettlementForwarder contract for EIP-7702
/// delegation. Required when `submission_accounts` is non-empty.
forwarder_contract: Option<eth::Address>,

/// Maximum number of solutions the driver proposes to the autopilot per
/// auction. Defaults to 1 (only the best-scoring solution). Values > 1
/// require `submission-accounts` to be configured; the driver will refuse
/// to start otherwise.
#[serde(default = "default_max_solutions_to_propose")]
max_solutions_to_propose: NonZeroUsize,
}

#[derive(Clone, Copy, Debug, Default, Deserialize, PartialEq, Serialize)]
Expand Down
16 changes: 8 additions & 8 deletions crates/driver/src/infra/observe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,21 +254,21 @@ pub fn settled(solver: &solver::Name, result: &Result<competition::Settled, comp
}

/// Observe the result of solving an auction.
pub fn solved(solver: &str, result: &Result<Option<Solved>, competition::Error>) {
pub fn solved(solver: &str, result: &Result<Vec<Solved>, competition::Error>) {
match result {
Ok(Some(solved)) => {
tracing::info!(?solved, "solved auction");
Ok(solutions) if solutions.is_empty() => {
tracing::debug!("no solution found");
metrics::get()
.solutions
.with_label_values(&[solver, "Success"])
.with_label_values(&[solver, "SolutionNotFound"])
.inc();
}
Ok(None) => {
tracing::debug!("no solution found");
Ok(solutions) => {
tracing::info!(?solutions, "solved auction");
metrics::get()
.solutions
.with_label_values(&[solver, "SolutionNotFound"])
.inc();
.with_label_values(&[solver, "Success"])
.inc_by(solutions.len() as u64);
}
Err(err) => {
tracing::warn!(?err, "failed to solve auction");
Expand Down
23 changes: 21 additions & 2 deletions crates/driver/src/infra/solver/eip7702.rs
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take it or leave it: Cover the error conditions in the function doc too

The error message is great though, this would just help when hovering over

Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,31 @@ pub async fn setup(solvers: &[Solver], eth: &Ethereum) -> anyhow::Result<()> {
for solver in solvers {
let config = solver.config();
if config.submission_accounts.is_empty() {
anyhow::ensure!(
config.max_solutions_to_propose.get() == 1,
"solver '{}': max-solutions-to-propose > 1 requires at least one \
submission-account (EIP-7702 parallel submission must be enabled)",
config.name,
);
continue;
}
if matches!(config.account, super::Account::Address(_)) {
tracing::debug!(solver = %config.name, "read-only mode, skipping EIP-7702 setup");
if config
.submission_accounts
.iter()
.all(|a| matches!(a, super::Account::Address(_)))
{
tracing::debug!(
solver = %config.name,
"all submission accounts are read-only, skipping EIP-7702 setup"
);
continue;
}
anyhow::ensure!(
!matches!(config.account, super::Account::Address(_)),
"solver '{}': main account must be a signer to set up EIP-7702 delegation when \
submission accounts are configured",
config.name,
);
let forwarder = config.forwarder_contract.ok_or_else(|| {
anyhow::anyhow!(
"solver {}: submission_accounts configured but forwarder_contract missing",
Expand Down
7 changes: 7 additions & 0 deletions crates/driver/src/infra/solver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ pub struct Config {
/// Address of the deployed CowSettlementForwarder contract for EIP-7702
/// delegation. Required when `submission_accounts` is non-empty.
pub forwarder_contract: Option<eth::Address>,
/// Maximum number of solutions the driver proposes to the autopilot per
/// auction. When 1 (the default), only the best-scoring solution is sent.
pub max_solutions_to_propose: std::num::NonZeroUsize,
}

impl Solver {
Expand Down Expand Up @@ -325,6 +328,10 @@ impl Solver {
self.config.forwarder_contract
}

pub fn max_solutions_to_propose(&self) -> usize {
self.config.max_solutions_to_propose.get()
}

/// Make a POST request instructing the solver to solve an auction.
/// Allocates at most `timeout` time for the solving.
#[instrument(name = "solver_engine", skip_all)]
Expand Down
Loading
Loading