diff --git a/Cargo.lock b/Cargo.lock index a0b06233..9bce0e12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1891,6 +1891,8 @@ dependencies = [ "cb-common", "cb-pbs", "cb-signer", + "eth2", + "ethereum_ssz 0.10.3", "eyre", "jsonwebtoken", "rcgen", diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index c144e2c0..49b3d2f0 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -12,20 +12,25 @@ use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ constants::APPLICATION_BUILDER_DOMAIN, pbs::{ - EMPTY_TX_ROOT_HASH, ExecutionPayloadHeaderRef, GetHeaderInfo, GetHeaderParams, - GetHeaderResponse, HEADER_START_TIME_UNIX_MS, HEADER_TIMEOUT_MS, RelayClient, + EMPTY_TX_ROOT_HASH, ExecutionPayloadHeaderRef, ForkName, ForkVersionDecode, GetHeaderInfo, + GetHeaderParams, GetHeaderResponse, HEADER_START_TIME_UNIX_MS, HEADER_TIMEOUT_MS, + RelayClient, SignedBuilderBid, error::{PbsError, ValidationError}, }, signature::verify_signed_message, types::{BlsPublicKey, BlsPublicKeyBytes, BlsSignature, Chain}, utils::{ - get_user_agent_with_version, ms_into_slot, read_chunked_body_with_max, - timestamp_of_slot_start_sec, utcnow_ms, + EncodingType, OUTBOUND_ACCEPT, get_user_agent_with_version, ms_into_slot, + parse_response_encoding_and_fork, read_chunked_body_with_max, timestamp_of_slot_start_sec, + utcnow_ms, }, }; use futures::future::join_all; use parking_lot::RwLock; -use reqwest::{StatusCode, header::USER_AGENT}; +use reqwest::{ + StatusCode, + header::{ACCEPT, USER_AGENT}, +}; use tokio::time::sleep; use tracing::{Instrument, debug, error, info, warn}; use tree_hash::TreeHash; @@ -41,6 +46,53 @@ use crate::{ utils::check_gas_limit, }; +/// Info about an incoming get_header request. +/// Sent from get_header to each send_timed_get_header call. +#[derive(Clone)] +struct RequestInfo { + /// The blockchain parameters of the get_header request (what slot it's for, + /// which pubkey is requesting it, etc) + params: GetHeaderParams, + + /// Common baseline of headers to send with each request + headers: Arc, + + /// The chain the request is for + chain: Chain, + + /// Context for validating the header returned by the relay + validation: ValidationContext, +} + +struct GetHeaderResponseInfo { + /// ID of the relay the response came from + relay_id: Arc, + + /// The raw body of the response + response_bytes: Vec, + + /// The content type the response is encoded with + content_type: EncodingType, + + /// Which fork the response bid is for (if provided as a header, rather than + /// part of the body) + fork: Option, + + /// The status code of the response, for logging + code: StatusCode, + + /// The round-trip latency of the request + request_latency: Duration, +} + +#[derive(Clone)] +struct ValidationContext { + skip_sigverify: bool, + min_bid_wei: U256, + extra_validation_enabled: bool, + parent_block: Arc>>, +} + /// Implements https://ethereum.github.io/builder-specs/#/Builder/getHeader /// Returns 200 if at least one relay returns 200, else 204 pub async fn get_header( @@ -97,22 +149,37 @@ pub async fn get_header( let mut send_headers = HeaderMap::new(); send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); + // Create the Accept headers for requests + // Use the documented, deterministic preference: + // SSZ first (wire-efficient), JSON fallback. + let accept_types = OUTBOUND_ACCEPT.to_string(); + send_headers.insert( + ACCEPT, + HeaderValue::from_str(&accept_types) + .map_err(|e| PbsError::GeneralRequest(format!("invalid accept header value: {e}")))?, + ); + + // Send requests to all relays concurrently + let slot = params.slot as i64; + let request_info = Arc::new(RequestInfo { + params, + headers: Arc::new(send_headers), + chain: state.config.chain, + validation: ValidationContext { + skip_sigverify: state.pbs_config().skip_sigverify, + min_bid_wei: state.pbs_config().min_bid_wei, + extra_validation_enabled: state.extra_validation_enabled(), + parent_block, + }, + }); let mut handles = Vec::with_capacity(relays.len()); for relay in relays.iter() { handles.push( send_timed_get_header( - params.clone(), + request_info.clone(), relay.clone(), - state.config.chain, - send_headers.clone(), ms_into_slot, max_timeout_ms, - ValidationContext { - skip_sigverify: state.pbs_config().skip_sigverify, - min_bid_wei: state.pbs_config().min_bid_wei, - extra_validation_enabled: state.extra_validation_enabled(), - parent_block: parent_block.clone(), - }, ) .in_current_span(), ); @@ -125,7 +192,7 @@ pub async fn get_header( match res { Ok(Some(res)) => { - RELAY_LAST_SLOT.with_label_values(&[relay_id]).set(params.slot as i64); + RELAY_LAST_SLOT.with_label_values(&[relay_id]).set(slot); let value_gwei = (res.data.message.value() / U256::from(1_000_000_000)) .try_into() .unwrap_or_default(); @@ -179,15 +246,13 @@ async fn fetch_parent_block( } async fn send_timed_get_header( - params: GetHeaderParams, + request_info: Arc, relay: RelayClient, - chain: Chain, - headers: HeaderMap, ms_into_slot: u64, mut timeout_left_ms: u64, - validation: ValidationContext, ) -> Result, PbsError> { - let url = relay.get_header_url(params.slot, ¶ms.parent_hash, ¶ms.pubkey)?; + let params = &request_info.params; + let url = Arc::new(relay.get_header_url(params.slot, ¶ms.parent_hash, ¶ms.pubkey)?); if relay.config.enable_timing_games { if let Some(target_ms) = relay.config.target_first_request_ms { @@ -218,18 +283,12 @@ async fn send_timed_get_header( ); loop { - let params = params.clone(); handles.push(tokio::spawn( send_one_get_header( - params, + request_info.clone(), relay.clone(), - chain, - RequestContext { - timeout_ms: timeout_left_ms, - url: url.clone(), - headers: headers.clone(), - }, - validation.clone(), + url.clone(), + timeout_left_ms, ) .in_current_span(), )); @@ -285,54 +344,181 @@ async fn send_timed_get_header( } // if no timing games or no repeated send, just send one request - send_one_get_header( - params, - relay, - chain, - RequestContext { timeout_ms: timeout_left_ms, url, headers }, - validation, + send_one_get_header(request_info, relay, url, timeout_left_ms) + .await + .map(|(_, maybe_header)| maybe_header) +} + +/// Handles requesting a header from a relay, decoding, and validation. +/// Used by send_timed_get_header to handle each individual request. +async fn send_one_get_header( + request_info: Arc, + relay: RelayClient, + url: Arc, + timeout_left_ms: u64, +) -> Result<(u64, Option), PbsError> { + // Full processing: decode full response and validate + let (start_request_time, get_header_response) = send_get_header_full( + &relay, + url, + timeout_left_ms, + (*request_info.headers).clone(), /* Create a copy of the HeaderMap because the + * impl + * will + * modify it */ ) - .await - .map(|(_, maybe_header)| maybe_header) + .await?; + let get_header_response = match get_header_response { + None => { + // Break if there's no header + return Ok((start_request_time, None)); + } + Some(res) => res, + }; + + // Extract the basic header data needed for validation + let header_data = match &get_header_response.data.message.header() { + ExecutionPayloadHeaderRef::Bellatrix(_) | + ExecutionPayloadHeaderRef::Capella(_) | + ExecutionPayloadHeaderRef::Deneb(_) => { + Err(PbsError::Validation(ValidationError::UnsupportedFork)) + } + ExecutionPayloadHeaderRef::Electra(res) => Ok(HeaderData { + block_hash: res.block_hash.0, + parent_hash: res.parent_hash.0, + tx_root: res.transactions_root, + value: *get_header_response.value(), + timestamp: res.timestamp, + }), + ExecutionPayloadHeaderRef::Fulu(res) => Ok(HeaderData { + block_hash: res.block_hash.0, + parent_hash: res.parent_hash.0, + tx_root: res.transactions_root, + value: *get_header_response.value(), + timestamp: res.timestamp, + }), + }?; + + // Validate the header + let chain = request_info.chain; + let params = &request_info.params; + let validation = &request_info.validation; + validate_header_data( + &header_data, + chain, + params.parent_hash, + validation.min_bid_wei, + params.slot, + )?; + + // Validate the relay signature + if !validation.skip_sigverify { + validate_signature( + chain, + relay.pubkey(), + get_header_response.data.message.pubkey(), + &get_header_response.data.message, + &get_header_response.data.signature, + )?; + } + + // Validate the parent block if enabled + if validation.extra_validation_enabled { + let parent_block = validation.parent_block.read(); + if let Some(parent_block) = parent_block.as_ref() { + extra_validation(parent_block, &get_header_response)?; + } else { + warn!( + relay_id = relay.id.as_ref(), + "parent block not found, skipping extra validation" + ); + } + } + + Ok((start_request_time, Some(get_header_response))) } -struct RequestContext { - url: Url, - timeout_ms: u64, +/// Send and decode a full get_header response, with all of the fields. +async fn send_get_header_full( + relay: &RelayClient, + url: Arc, + timeout_left_ms: u64, headers: HeaderMap, +) -> Result<(u64, Option), PbsError> { + // Send the request + let (start_request_time, info) = + send_get_header_impl(relay, url, timeout_left_ms, headers).await?; + let info = match info { + Some(info) => info, + None => { + return Ok((start_request_time, None)); + } + }; + + // Decode the response + let get_header_response = decode_by_encoding(&info, decode_json_payload, decode_ssz_payload)?; + + // Log and return + info!( + relay_id = info.relay_id.as_ref(), + header_size_bytes = info.response_bytes.len(), + latency = ?info.request_latency, + version =? get_header_response.version, + value_eth = format_ether(*get_header_response.value()), + block_hash = %get_header_response.block_hash(), + content_type = ?info.content_type, + "received new header" + ); + Ok((start_request_time, Some(get_header_response))) } -#[derive(Clone)] -struct ValidationContext { - skip_sigverify: bool, - min_bid_wei: U256, - extra_validation_enabled: bool, - parent_block: Arc>>, +/// Dispatch a get_header response to the appropriate decoder based on the +/// negotiated content-type. SSZ requires a fork header; its absence is a +/// protocol violation reported as `PbsError::RelayResponse`. Callers supply +/// the format-specific decoders, keeping the encoding branch in one place. +fn decode_by_encoding( + info: &GetHeaderResponseInfo, + on_json: impl FnOnce(&[u8]) -> Result, + on_ssz: impl FnOnce(&[u8], ForkName) -> Result, +) -> Result { + match info.content_type { + EncodingType::Json => on_json(&info.response_bytes), + EncodingType::Ssz => { + let fork = info.fork.ok_or_else(|| PbsError::RelayResponse { + error_msg: "relay did not provide consensus version header for ssz payload" + .to_string(), + code: info.code.as_u16(), + })?; + on_ssz(&info.response_bytes, fork) + } + } } -async fn send_one_get_header( - params: GetHeaderParams, - relay: RelayClient, - chain: Chain, - mut req_config: RequestContext, - validation: ValidationContext, -) -> Result<(u64, Option), PbsError> { +/// Sends a get_header request to a relay, returning the response, the time the +/// request was started, and the encoding type of the response (if any). +/// Used by send_one_get_header to perform the actual request submission. +async fn send_get_header_impl( + relay: &RelayClient, + url: Arc, + timeout_left_ms: u64, + mut headers: HeaderMap, +) -> Result<(u64, Option), PbsError> { // the timestamp in the header is the consensus block time which is fixed, // use the beginning of the request as proxy to make sure we use only the // last one received let start_request_time = utcnow_ms(); - req_config.headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(start_request_time)); + headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(start_request_time)); // The timeout header indicating how long a relay has to respond, so they can // minimize timing games without losing the bid - req_config.headers.insert(HEADER_TIMEOUT_MS, HeaderValue::from(req_config.timeout_ms)); + headers.insert(HEADER_TIMEOUT_MS, HeaderValue::from(timeout_left_ms)); let start_request = Instant::now(); let res = match relay .client - .get(req_config.url) - .timeout(Duration::from_millis(req_config.timeout_ms)) - .headers(req_config.headers) + .get(url.as_ref().clone()) + .timeout(Duration::from_millis(timeout_left_ms)) + .headers(headers) .send() .await { @@ -353,120 +539,73 @@ async fn send_one_get_header( let code = res.status(); RELAY_STATUS_CODE.with_label_values(&[code.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id]).inc(); - let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_GET_HEADER_RESPONSE).await?; - let header_size_bytes = response_bytes.len(); - if !code.is_success() { - return Err(PbsError::RelayResponse { - error_msg: String::from_utf8_lossy(&response_bytes).into_owned(), - code: code.as_u16(), - }); - }; - if code == StatusCode::NO_CONTENT { - debug!( - relay_id = relay.id.as_ref(), - ?code, - latency = ?request_latency, - response = ?response_bytes, - "no header from relay" - ); - return Ok((start_request_time, None)); - } - - let get_header_response = match serde_json::from_slice::(&response_bytes) { - Ok(parsed) => parsed, - Err(err) => { - return Err(PbsError::JsonDecode { - err, - raw: String::from_utf8_lossy(&response_bytes).into_owned(), + // Per the builder spec, 200 carries a bid payload and 204 means no bid + // is available. Any other status is an error. We check before reading + // the body so that response headers are still accessible for + // Content-Type and Eth-Consensus-Version parsing on the 200 path. + if code != StatusCode::OK { + if code == StatusCode::NO_CONTENT { + let response_bytes = + read_chunked_body_with_max(res, MAX_SIZE_GET_HEADER_RESPONSE).await?; + debug!( + relay_id = relay.id.as_ref(), + ?code, + latency = ?request_latency, + response = ?response_bytes, + "no header from relay" + ); + return Ok((start_request_time, None)); + } else { + return Err(PbsError::RelayResponse { + error_msg: format!("unexpected status code from relay: {code}"), + code: code.as_u16(), }); } - }; + } - info!( - relay_id = relay.id.as_ref(), - header_size_bytes, - latency = ?request_latency, - version =? get_header_response.version, - value_eth = format_ether(*get_header_response.value()), - block_hash = %get_header_response.block_hash(), - "received new header" - ); + // Parse Content-Type (tolerating MIME parameters per RFC 7231 §3.1.1.1) + // and Eth-Consensus-Version headers in one shot. + let (content_type, fork) = parse_response_encoding_and_fork(res.headers(), code.as_u16())?; - match &get_header_response.data.message.header() { - ExecutionPayloadHeaderRef::Bellatrix(_) | - ExecutionPayloadHeaderRef::Capella(_) | - ExecutionPayloadHeaderRef::Deneb(_) => { - return Err(PbsError::Validation(ValidationError::UnsupportedFork)) - } - ExecutionPayloadHeaderRef::Electra(res) => { - let header_data = HeaderData { - block_hash: res.block_hash.0, - parent_hash: res.parent_hash.0, - tx_root: res.transactions_root, - value: *get_header_response.value(), - timestamp: res.timestamp, - }; - - validate_header_data( - &header_data, - chain, - params.parent_hash, - validation.min_bid_wei, - params.slot, - )?; - - if !validation.skip_sigverify { - validate_signature( - chain, - relay.pubkey(), - get_header_response.data.message.pubkey(), - &get_header_response.data.message, - &get_header_response.data.signature, - )?; - } - } - ExecutionPayloadHeaderRef::Fulu(res) => { - let header_data = HeaderData { - block_hash: res.block_hash.0, - parent_hash: res.parent_hash.0, - tx_root: res.transactions_root, - value: *get_header_response.value(), - timestamp: res.timestamp, - }; - - validate_header_data( - &header_data, - chain, - params.parent_hash, - validation.min_bid_wei, - params.slot, - )?; - - if !validation.skip_sigverify { - validate_signature( - chain, - relay.pubkey(), - get_header_response.data.message.pubkey(), - &get_header_response.data.message, - &get_header_response.data.signature, - )?; - } - } - } + // Decode the body + let response_bytes = read_chunked_body_with_max(res, MAX_SIZE_GET_HEADER_RESPONSE).await?; - if validation.extra_validation_enabled { - let parent_block = validation.parent_block.read(); - if let Some(parent_block) = parent_block.as_ref() { - extra_validation(parent_block, &get_header_response)?; - } else { - warn!( - relay_id = relay.id.as_ref(), - "parent block not found, skipping extra validation" - ); - } + Ok(( + start_request_time, + Some(GetHeaderResponseInfo { + relay_id: relay.id.clone(), + response_bytes, + content_type, + fork, + code, + request_latency, + }), + )) +} + +/// Decode a JSON-encoded get_header response +fn decode_json_payload(response_bytes: &[u8]) -> Result { + match serde_json::from_slice::(response_bytes) { + Ok(parsed) => Ok(parsed), + Err(err) => Err(PbsError::JsonDecode { + err, + raw: String::from_utf8_lossy(response_bytes).into_owned(), + }), } +} - Ok((start_request_time, Some(get_header_response))) +/// Decode an SSZ-encoded get_header response +fn decode_ssz_payload( + response_bytes: &[u8], + fork: ForkName, +) -> Result { + let data = SignedBuilderBid::from_ssz_bytes_by_fork(response_bytes, fork).map_err(|e| { + PbsError::RelayResponse { + error_msg: (format!("error decoding relay payload: {e:?}")).to_string(), + code: 200, + } + })?; + Ok(GetHeaderResponse { version: fork, data, metadata: Default::default() }) } struct HeaderData { @@ -565,13 +704,18 @@ fn extra_validation( #[cfg(test)] mod tests { + use std::{fs, path::Path}; + use alloy::primitives::{B256, U256}; use cb_common::{ - pbs::{EMPTY_TX_ROOT_HASH, error::ValidationError}, + pbs::*, signature::sign_builder_message, - types::{BlsSecretKey, Chain}, - utils::{TestRandomSeed, timestamp_of_slot_start_sec}, + types::{BlsPublicKeyBytes, BlsSecretKey, BlsSignature, Chain}, + utils::{ + TestRandomSeed, get_bid_value_from_signed_builder_bid_ssz, timestamp_of_slot_start_sec, + }, }; + use ssz::Encode; use super::{validate_header_data, *}; @@ -673,4 +817,42 @@ mod tests { .is_ok() ); } + + #[test] + fn test_ssz_value_extraction() { + for fork_name in ForkName::list_all() { + match fork_name { + // Handle forks that didn't have builder bids yet + ForkName::Altair | ForkName::Base => continue, + + // Handle supported forks + ForkName::Bellatrix | + ForkName::Capella | + ForkName::Deneb | + ForkName::Electra | + ForkName::Fulu => {} + + // Skip unsupported forks + ForkName::Gloas => continue, + } + + // Load get_header JSON from test data + let fork_name_str = fork_name.to_string().to_lowercase(); + let path_str = format!("../../tests/data/get_header/{fork_name_str}.json"); + let path = Path::new(path_str.as_str()); + let json_bytes = fs::read(path).expect("file not found"); + let decoded = decode_json_payload(&json_bytes).expect("failed to decode JSON"); + + // Extract the bid value from the SSZ + let encoded = decoded.data.as_ssz_bytes(); + let bid_value = get_bid_value_from_signed_builder_bid_ssz(&encoded, fork_name) + .expect("failed to extract bid value from SSZ"); + + // Compare to the original value + println!("Testing fork: {}", fork_name); + println!("Original value: {}", decoded.value()); + println!("Extracted value: {}", bid_value); + assert_eq!(*decoded.value(), bid_value); + } + } } diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 9ed312af..34a10b7c 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -1,14 +1,17 @@ use alloy::primitives::utils::format_ether; use axum::{ extract::{Path, State}, - http::HeaderMap, + http::{HeaderMap, HeaderValue}, response::IntoResponse, }; use cb_common::{ pbs::{GetHeaderInfo, GetHeaderParams}, - utils::{get_user_agent, ms_into_slot}, + utils::{ + CONSENSUS_VERSION_HEADER, EncodingType, get_accept_types, get_user_agent, ms_into_slot, + }, }; -use reqwest::StatusCode; +use reqwest::{StatusCode, header::CONTENT_TYPE}; +use ssz::Encode; use tracing::{error, info}; use crate::{ @@ -32,16 +35,51 @@ pub async fn handle_get_header>( let ua = get_user_agent(&req_headers); let ms_into_slot = ms_into_slot(params.slot, state.config.chain); + let accept_types = get_accept_types(&req_headers).map_err(|e| { + error!(%e, "error parsing accept header"); + PbsClientError::DecodeError(format!("error parsing accept header: {e}")) + })?; + // Honor caller q-value preference: pick the highest-priority encoding that + // we can actually produce. Server preference for tiebreaks is SSZ first. + let response_encoding = accept_types.preferred(&[EncodingType::Ssz, EncodingType::Json]); info!(ua, ms_into_slot, "new request"); match A::get_header(params, req_headers, state).await { Ok(res) => { if let Some(max_bid) = res { + BEACON_NODE_STATUS.with_label_values(&["200", GET_HEADER_ENDPOINT_TAG]).inc(); + // Respond based on requester accept types info!(value_eth = format_ether(*max_bid.data.message.value()), block_hash =% max_bid.block_hash(), "received header"); - BEACON_NODE_STATUS.with_label_values(&["200", GET_HEADER_ENDPOINT_TAG]).inc(); - Ok((StatusCode::OK, axum::Json(max_bid)).into_response()) + // Three arms: no viable encoding (unreachable in + // practice — `get_accept_types` errors earlier if + // the caller offers nothing we support), SSZ, or JSON. + match response_encoding { + None => Err(PbsClientError::DecodeError( + "no viable accept types in request".to_string(), + )), + Some(EncodingType::Ssz) => { + // ForkName::to_string() always yields valid + // ASCII, so HeaderValue::from_str cannot + // fail here. + let consensus_version_header = + HeaderValue::from_str(&max_bid.version.to_string()) + .expect("fork name is always a valid header value"); + let content_type_header = EncodingType::Ssz.content_type_header().clone(); + + let mut res = max_bid.data.as_ssz_bytes().into_response(); + res.headers_mut() + .insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + res.headers_mut().insert(CONTENT_TYPE, content_type_header); + info!("sending response as SSZ"); + Ok(res) + } + Some(EncodingType::Json) => { + info!("sending response as JSON"); + Ok((StatusCode::OK, axum::Json(max_bid)).into_response()) + } + } } else { // spec: return 204 if request is valid but no bid available info!("no header available for slot"); diff --git a/tests/Cargo.toml b/tests/Cargo.toml index c1c51f58..88b2e377 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -11,8 +11,10 @@ cb-common.workspace = true cb-pbs.workspace = true cb-signer.workspace = true eyre.workspace = true +ethereum_ssz.workspace = true jsonwebtoken.workspace = true lh_types.workspace = true +lh_eth2.workspace = true rcgen.workspace = true reqwest.workspace = true serde.workspace = true diff --git a/tests/data/get_header/bellatrix.json b/tests/data/get_header/bellatrix.json new file mode 100644 index 00000000..16dfb330 --- /dev/null +++ b/tests/data/get_header/bellatrix.json @@ -0,0 +1,26 @@ +{ + "version": "bellatrix", + "data": { + "message": { + "header": { + "parent_hash": "0x114d1897fefa402a01a653c21a7f1f1db049d1373a5e73a2d25d7a8045dc02a1", + "fee_recipient": "0x477cc10a5b54aed5c88544c2e71ea0581cf64593", + "state_root": "0x6724be16ef8e65681cb66f9c144da67347b8983aa5e3f4662c9b5dba90ab5bc6", + "receipts_root": "0xf2f6d2fe6960e4dedad18cca0c7881e6509d551d3e04c1879a627fb8aba30272", + "logs_bloom": "0x00000400000000000000848008100000000000000000000004000000010080000000000100000400000000000000000000000000020100000000000000000000080004000000000800008008000000000000000020004000000400000000000000000000000400000000000000000000000000000010000002000010000000000000000000800000200100000000000000004000000000200002000004000000000800000000000000000000000000008000000000000000800000008000000400012002000000000000000000000000000200000000000000000000000000040000000000000000000000000000000000408000000000040000000000000000", + "prev_randao": "0x0fde820be6404bcb71d7bbeee140c16cd28b1940a40fa8a4e2c493114a08b38a", + "block_number": "1598034", + "gas_limit": "30000000", + "gas_used": "1939652", + "timestamp": "1716481836", + "extra_data": "0xd983010d0c846765746889676f312e32312e3130856c696e7578", + "base_fee_per_gas": "1266581747", + "block_hash": "0x0d9eccac62175d903e4242783d7252f4ab6cdd35995810646bda627b4c35adac", + "transactions_root": "0x9dca93e8c6c9a1b5fcc850990ed95cd44af96ff0a6094c87b119a34259eb64b0" + }, + "value": "1234567890", + "pubkey": "0x883827193f7627cd04e621e1e8d56498362a52b2a30c9a1c72036eb935c4278dee23d38a24d2f7dda62689886f0c39f4" + }, + "signature": "0xa9f158bca1d9d6b93a9104f48bd2d1e7689bef3fc974651fc755cc6f50d3649c5153a342a12f95cd8f9cac4f90144985189f498a7e0e1cb202ed5e7c98f3f504f371a53b9293bdd973fbb019c91242f808072d0ffcd9d17e2404baea3190fd18" + } +} \ No newline at end of file diff --git a/tests/data/get_header/capella.json b/tests/data/get_header/capella.json new file mode 100644 index 00000000..6cdbeb98 --- /dev/null +++ b/tests/data/get_header/capella.json @@ -0,0 +1,27 @@ +{ + "version": "capella", + "data": { + "message": { + "header": { + "parent_hash": "0x114d1897fefa402a01a653c21a7f1f1db049d1373a5e73a2d25d7a8045dc02a1", + "fee_recipient": "0x477cc10a5b54aed5c88544c2e71ea0581cf64593", + "state_root": "0x6724be16ef8e65681cb66f9c144da67347b8983aa5e3f4662c9b5dba90ab5bc6", + "receipts_root": "0xf2f6d2fe6960e4dedad18cca0c7881e6509d551d3e04c1879a627fb8aba30272", + "logs_bloom": "0x00000400000000000000848008100000000000000000000004000000010080000000000100000400000000000000000000000000020100000000000000000000080004000000000800008008000000000000000020004000000400000000000000000000000400000000000000000000000000000010000002000010000000000000000000800000200100000000000000004000000000200002000004000000000800000000000000000000000000008000000000000000800000008000000400012002000000000000000000000000000200000000000000000000000000040000000000000000000000000000000000408000000000040000000000000000", + "prev_randao": "0x0fde820be6404bcb71d7bbeee140c16cd28b1940a40fa8a4e2c493114a08b38a", + "block_number": "1598034", + "gas_limit": "30000000", + "gas_used": "1939652", + "timestamp": "1716481836", + "extra_data": "0xd983010d0c846765746889676f312e32312e3130856c696e7578", + "base_fee_per_gas": "1266581747", + "block_hash": "0x0d9eccac62175d903e4242783d7252f4ab6cdd35995810646bda627b4c35adac", + "transactions_root": "0x9dca93e8c6c9a1b5fcc850990ed95cd44af96ff0a6094c87b119a34259eb64b0", + "withdrawals_root": "0x2daccf0e476ca3e2644afbd13b2621d55b4d515b813a3b867cdacea24bb352d1" + }, + "value": "1234567890", + "pubkey": "0x883827193f7627cd04e621e1e8d56498362a52b2a30c9a1c72036eb935c4278dee23d38a24d2f7dda62689886f0c39f4" + }, + "signature": "0xa9f158bca1d9d6b93a9104f48bd2d1e7689bef3fc974651fc755cc6f50d3649c5153a342a12f95cd8f9cac4f90144985189f498a7e0e1cb202ed5e7c98f3f504f371a53b9293bdd973fbb019c91242f808072d0ffcd9d17e2404baea3190fd18" + } +} \ No newline at end of file diff --git a/tests/data/get_header/deneb.json b/tests/data/get_header/deneb.json new file mode 100644 index 00000000..28d3426a --- /dev/null +++ b/tests/data/get_header/deneb.json @@ -0,0 +1,37 @@ +{ + "version": "deneb", + "data": { + "message": { + "header": { + "parent_hash": "0x114d1897fefa402a01a653c21a7f1f1db049d1373a5e73a2d25d7a8045dc02a1", + "fee_recipient": "0x477cc10a5b54aed5c88544c2e71ea0581cf64593", + "state_root": "0x6724be16ef8e65681cb66f9c144da67347b8983aa5e3f4662c9b5dba90ab5bc6", + "receipts_root": "0xf2f6d2fe6960e4dedad18cca0c7881e6509d551d3e04c1879a627fb8aba30272", + "logs_bloom": "0x00000400000000000000848008100000000000000000000004000000010080000000000100000400000000000000000000000000020100000000000000000000080004000000000800008008000000000000000020004000000400000000000000000000000400000000000000000000000000000010000002000010000000000000000000800000200100000000000000004000000000200002000004000000000800000000000000000000000000008000000000000000800000008000000400012002000000000000000000000000000200000000000000000000000000040000000000000000000000000000000000408000000000040000000000000000", + "prev_randao": "0x0fde820be6404bcb71d7bbeee140c16cd28b1940a40fa8a4e2c493114a08b38a", + "block_number": "1598034", + "gas_limit": "30000000", + "gas_used": "1939652", + "timestamp": "1716481836", + "extra_data": "0xd983010d0c846765746889676f312e32312e3130856c696e7578", + "base_fee_per_gas": "1266581747", + "blob_gas_used": "786432", + "excess_blob_gas": "95158272", + "block_hash": "0x0d9eccac62175d903e4242783d7252f4ab6cdd35995810646bda627b4c35adac", + "transactions_root": "0x9dca93e8c6c9a1b5fcc850990ed95cd44af96ff0a6094c87b119a34259eb64b0", + "withdrawals_root": "0x2daccf0e476ca3e2644afbd13b2621d55b4d515b813a3b867cdacea24bb352d1" + }, + "blob_kzg_commitments": [ + "0x9559cce9cd71a3416793c8e28d3aaaae9f53732180f57e046bf725c74ab348a7b16693fd03194cac9dd2199a526461b7", + "0xabc493f754d156c7156eb8365d28eee13e5b3413767356ce4cb30cb0306fbe0ed45eaba92936a94e81ed976aa0d787c2", + "0xa5d87332b5dd391ed3153fe36dbd67775dcbc1818cbf6a68d2089a5c6015de1de02e5138f039f2375e6b3511cc94764b", + "0xa49c576627561ec9ae1ef7494e7cee7ede7fa7695d4462436c3e549cc3ce78674b407e8b5f8903b80f77a68814642d6c", + "0x83155fbeb04758d267193800fb89fa30eb13ac0e217005ae7e271733205ca8a6cd80fba08bf5c9a4a5cc0c9d463ac633", + "0xa20c71d1985996098aa63e8b5dc7b7fedb70de31478fe309dad3ac0e9b6d28d82be8e5e543021a0203dc785742e94b2f" + ], + "value": "1234567890", + "pubkey": "0x883827193f7627cd04e621e1e8d56498362a52b2a30c9a1c72036eb935c4278dee23d38a24d2f7dda62689886f0c39f4" + }, + "signature": "0xa9f158bca1d9d6b93a9104f48bd2d1e7689bef3fc974651fc755cc6f50d3649c5153a342a12f95cd8f9cac4f90144985189f498a7e0e1cb202ed5e7c98f3f504f371a53b9293bdd973fbb019c91242f808072d0ffcd9d17e2404baea3190fd18" + } +} \ No newline at end of file diff --git a/tests/data/get_header/electra.json b/tests/data/get_header/electra.json new file mode 100644 index 00000000..458018d6 --- /dev/null +++ b/tests/data/get_header/electra.json @@ -0,0 +1,62 @@ +{ + "version": "electra", + "data": { + "message": { + "header": { + "parent_hash": "0x114d1897fefa402a01a653c21a7f1f1db049d1373a5e73a2d25d7a8045dc02a1", + "fee_recipient": "0x477cc10a5b54aed5c88544c2e71ea0581cf64593", + "state_root": "0x6724be16ef8e65681cb66f9c144da67347b8983aa5e3f4662c9b5dba90ab5bc6", + "receipts_root": "0xf2f6d2fe6960e4dedad18cca0c7881e6509d551d3e04c1879a627fb8aba30272", + "logs_bloom": "0x00000400000000000000848008100000000000000000000004000000010080000000000100000400000000000000000000000000020100000000000000000000080004000000000800008008000000000000000020004000000400000000000000000000000400000000000000000000000000000010000002000010000000000000000000800000200100000000000000004000000000200002000004000000000800000000000000000000000000008000000000000000800000008000000400012002000000000000000000000000000200000000000000000000000000040000000000000000000000000000000000408000000000040000000000000000", + "prev_randao": "0x0fde820be6404bcb71d7bbeee140c16cd28b1940a40fa8a4e2c493114a08b38a", + "block_number": "1598034", + "gas_limit": "30000000", + "gas_used": "1939652", + "timestamp": "1716481836", + "extra_data": "0xd983010d0c846765746889676f312e32312e3130856c696e7578", + "base_fee_per_gas": "1266581747", + "blob_gas_used": "786432", + "excess_blob_gas": "95158272", + "block_hash": "0x0d9eccac62175d903e4242783d7252f4ab6cdd35995810646bda627b4c35adac", + "transactions_root": "0x9dca93e8c6c9a1b5fcc850990ed95cd44af96ff0a6094c87b119a34259eb64b0", + "withdrawals_root": "0x2daccf0e476ca3e2644afbd13b2621d55b4d515b813a3b867cdacea24bb352d1" + }, + "blob_kzg_commitments": [ + "0x9559cce9cd71a3416793c8e28d3aaaae9f53732180f57e046bf725c74ab348a7b16693fd03194cac9dd2199a526461b7", + "0xabc493f754d156c7156eb8365d28eee13e5b3413767356ce4cb30cb0306fbe0ed45eaba92936a94e81ed976aa0d787c2", + "0xa5d87332b5dd391ed3153fe36dbd67775dcbc1818cbf6a68d2089a5c6015de1de02e5138f039f2375e6b3511cc94764b", + "0xa49c576627561ec9ae1ef7494e7cee7ede7fa7695d4462436c3e549cc3ce78674b407e8b5f8903b80f77a68814642d6c", + "0x83155fbeb04758d267193800fb89fa30eb13ac0e217005ae7e271733205ca8a6cd80fba08bf5c9a4a5cc0c9d463ac633", + "0xa20c71d1985996098aa63e8b5dc7b7fedb70de31478fe309dad3ac0e9b6d28d82be8e5e543021a0203dc785742e94b2f" + ], + "execution_requests": { + "deposits": [ + { + "pubkey": "0xac0a230bd98a766b8e4156f0626ee679dd280dee5b0eedc2b9455ca3dacc4c7618da5010b9db609450a712f095c9f7a5", + "withdrawal_credentials": "0x0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f", + "amount": "100", + "signature": "0x8aeb4642fb2982039a43fd6a6d9cc0ebf7598dbf02343c4617d9a68d799393c162492add63f31099a25eacc2782ba27a190e977a8c58760b6636dccb503d528b3be9e885c93d5b79699e68fcca870b0c790cdb00d67604d8b4a3025ae75efa2f", + "index": "1" + } + ], + "withdrawals": [ + { + "source_address": "0x1100000000000000000000000000000000000000", + "validator_pubkey": "0xac0a230bd98a766b8e4156f0626ee679dd280dee5b0eedc2b9455ca3dacc4c7618da5010b9db609450a712f095c9f7a5", + "amount": "1" + } + ], + "consolidations": [ + { + "source_address": "0x1200000000000000000000000000000000000000", + "source_pubkey": "0xac0a230bd98a766b8e4156f0626ee679dd280dee5b0eedc2b9455ca3dacc4c7618da5010b9db609450a712f095c9f7a5", + "target_pubkey": "0xac0a230bd98a766b8e4156f0626ee679dd280dee5b0eedc2b9455ca3dacc4c7618da5010b9db609450a712f095c9f7a5" + } + ] + }, + "value": "1234567890", + "pubkey": "0x883827193f7627cd04e621e1e8d56498362a52b2a30c9a1c72036eb935c4278dee23d38a24d2f7dda62689886f0c39f4" + }, + "signature": "0xa9f158bca1d9d6b93a9104f48bd2d1e7689bef3fc974651fc755cc6f50d3649c5153a342a12f95cd8f9cac4f90144985189f498a7e0e1cb202ed5e7c98f3f504f371a53b9293bdd973fbb019c91242f808072d0ffcd9d17e2404baea3190fd18" + } +} \ No newline at end of file diff --git a/tests/data/get_header/fulu.json b/tests/data/get_header/fulu.json new file mode 100644 index 00000000..b4cef51a --- /dev/null +++ b/tests/data/get_header/fulu.json @@ -0,0 +1,62 @@ +{ + "version": "fulu", + "data": { + "message": { + "header": { + "parent_hash": "0x114d1897fefa402a01a653c21a7f1f1db049d1373a5e73a2d25d7a8045dc02a1", + "fee_recipient": "0x477cc10a5b54aed5c88544c2e71ea0581cf64593", + "state_root": "0x6724be16ef8e65681cb66f9c144da67347b8983aa5e3f4662c9b5dba90ab5bc6", + "receipts_root": "0xf2f6d2fe6960e4dedad18cca0c7881e6509d551d3e04c1879a627fb8aba30272", + "logs_bloom": "0x00000400000000000000848008100000000000000000000004000000010080000000000100000400000000000000000000000000020100000000000000000000080004000000000800008008000000000000000020004000000400000000000000000000000400000000000000000000000000000010000002000010000000000000000000800000200100000000000000004000000000200002000004000000000800000000000000000000000000008000000000000000800000008000000400012002000000000000000000000000000200000000000000000000000000040000000000000000000000000000000000408000000000040000000000000000", + "prev_randao": "0x0fde820be6404bcb71d7bbeee140c16cd28b1940a40fa8a4e2c493114a08b38a", + "block_number": "1598034", + "gas_limit": "30000000", + "gas_used": "1939652", + "timestamp": "1716481836", + "extra_data": "0xd983010d0c846765746889676f312e32312e3130856c696e7578", + "base_fee_per_gas": "1266581747", + "blob_gas_used": "786432", + "excess_blob_gas": "95158272", + "block_hash": "0x0d9eccac62175d903e4242783d7252f4ab6cdd35995810646bda627b4c35adac", + "transactions_root": "0x9dca93e8c6c9a1b5fcc850990ed95cd44af96ff0a6094c87b119a34259eb64b0", + "withdrawals_root": "0x2daccf0e476ca3e2644afbd13b2621d55b4d515b813a3b867cdacea24bb352d1" + }, + "blob_kzg_commitments": [ + "0x9559cce9cd71a3416793c8e28d3aaaae9f53732180f57e046bf725c74ab348a7b16693fd03194cac9dd2199a526461b7", + "0xabc493f754d156c7156eb8365d28eee13e5b3413767356ce4cb30cb0306fbe0ed45eaba92936a94e81ed976aa0d787c2", + "0xa5d87332b5dd391ed3153fe36dbd67775dcbc1818cbf6a68d2089a5c6015de1de02e5138f039f2375e6b3511cc94764b", + "0xa49c576627561ec9ae1ef7494e7cee7ede7fa7695d4462436c3e549cc3ce78674b407e8b5f8903b80f77a68814642d6c", + "0x83155fbeb04758d267193800fb89fa30eb13ac0e217005ae7e271733205ca8a6cd80fba08bf5c9a4a5cc0c9d463ac633", + "0xa20c71d1985996098aa63e8b5dc7b7fedb70de31478fe309dad3ac0e9b6d28d82be8e5e543021a0203dc785742e94b2f" + ], + "execution_requests": { + "deposits": [ + { + "pubkey": "0xac0a230bd98a766b8e4156f0626ee679dd280dee5b0eedc2b9455ca3dacc4c7618da5010b9db609450a712f095c9f7a5", + "withdrawal_credentials": "0x0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f0f", + "amount": "100", + "signature": "0x8aeb4642fb2982039a43fd6a6d9cc0ebf7598dbf02343c4617d9a68d799393c162492add63f31099a25eacc2782ba27a190e977a8c58760b6636dccb503d528b3be9e885c93d5b79699e68fcca870b0c790cdb00d67604d8b4a3025ae75efa2f", + "index": "1" + } + ], + "withdrawals": [ + { + "source_address": "0x1100000000000000000000000000000000000000", + "validator_pubkey": "0xac0a230bd98a766b8e4156f0626ee679dd280dee5b0eedc2b9455ca3dacc4c7618da5010b9db609450a712f095c9f7a5", + "amount": "1" + } + ], + "consolidations": [ + { + "source_address": "0x1200000000000000000000000000000000000000", + "source_pubkey": "0xac0a230bd98a766b8e4156f0626ee679dd280dee5b0eedc2b9455ca3dacc4c7618da5010b9db609450a712f095c9f7a5", + "target_pubkey": "0xac0a230bd98a766b8e4156f0626ee679dd280dee5b0eedc2b9455ca3dacc4c7618da5010b9db609450a712f095c9f7a5" + } + ] + }, + "value": "1234567890", + "pubkey": "0x883827193f7627cd04e621e1e8d56498362a52b2a30c9a1c72036eb935c4278dee23d38a24d2f7dda62689886f0c39f4" + }, + "signature": "0xa9f158bca1d9d6b93a9104f48bd2d1e7689bef3fc974651fc755cc6f50d3649c5153a342a12f95cd8f9cac4f90144985189f498a7e0e1cb202ed5e7c98f3f504f371a53b9293bdd973fbb019c91242f808072d0ffcd9d17e2404baea3190fd18" + } +} \ No newline at end of file diff --git a/tests/src/mock_relay.rs b/tests/src/mock_relay.rs index 4d7f0fc1..fd8a92e9 100644 --- a/tests/src/mock_relay.rs +++ b/tests/src/mock_relay.rs @@ -1,43 +1,56 @@ use std::{ + collections::HashSet, net::SocketAddr, sync::{ Arc, RwLock, atomic::{AtomicU64, Ordering}, }, + time::Duration, }; use alloy::{primitives::U256, rpc::types::beacon::relay::ValidatorRegistration}; use axum::{ Json, Router, extract::{Path, State}, - http::StatusCode, + http::{HeaderMap, HeaderValue, StatusCode}, response::{IntoResponse, Response}, routing::{get, post}, }; use cb_common::{ pbs::{ BUILDER_V1_API_PATH, BUILDER_V2_API_PATH, BlobsBundle, BuilderBid, BuilderBidElectra, - ExecutionPayloadElectra, ExecutionPayloadHeaderElectra, ExecutionRequests, ForkName, - GET_HEADER_PATH, GET_STATUS_PATH, GetHeaderParams, GetHeaderResponse, GetPayloadInfo, - PayloadAndBlobs, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, SignedBlindedBeaconBlock, - SignedBuilderBid, SubmitBlindedBlockResponse, + BuilderBidFulu, ExecutionPayloadElectra, ExecutionPayloadHeaderElectra, + ExecutionPayloadHeaderFulu, ExecutionRequests, ForkName, GET_HEADER_PATH, GET_STATUS_PATH, + GetHeaderParams, GetHeaderResponse, GetPayloadInfo, PayloadAndBlobs, + REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, SignedBuilderBid, SubmitBlindedBlockResponse, }, signature::sign_builder_root, types::{BlsSecretKey, Chain}, - utils::{TestRandomSeed, timestamp_of_slot_start_sec}, + utils::{ + CONSENSUS_VERSION_HEADER, EncodingType, TestRandomSeed, deserialize_body, get_accept_types, + get_consensus_version_header, get_content_type, timestamp_of_slot_start_sec, + }, }; use cb_pbs::MAX_SIZE_SUBMIT_BLOCK_RESPONSE; use lh_types::KzgProof; +use reqwest::header::CONTENT_TYPE; +use ssz::Encode; use tokio::net::TcpListener; -use tracing::debug; +use tracing::{debug, error}; use tree_hash::TreeHash; pub async fn start_mock_relay_service(state: Arc, port: u16) -> eyre::Result<()> { - let app = mock_relay_app_router(state); - let socket = SocketAddr::new("0.0.0.0".parse()?, port); let listener = TcpListener::bind(socket).await?; + start_mock_relay_service_with_listener(state, listener).await +} +/// Like [`start_mock_relay_service`], but accepts a pre-bound [`TcpListener`]. +pub async fn start_mock_relay_service_with_listener( + state: Arc, + listener: TcpListener, +) -> eyre::Result<()> { + let app = mock_relay_app_router(state); axum::serve(listener, app).await?; Ok(()) } @@ -45,14 +58,29 @@ pub async fn start_mock_relay_service(state: Arc, port: u16) -> pub struct MockRelayState { pub chain: Chain, pub signer: BlsSecretKey, + pub supported_content_types: Arc>, large_body: bool, supports_submit_block_v2: bool, use_not_found_for_submit_block: bool, + /// If set, `handle_submit_block_v1`/`v2` short-circuits with this status + /// when the inbound request carries `Content-Type: + /// application/octet-stream`. The counter is still incremented before + /// the short-circuit so tests can observe the attempt. Used to drive C3 + /// (retry-as-JSON) tests. + submit_block_ssz_status_override: Option, + /// If set, this literal string is sent as the outgoing `Content-Type` + /// header on `handle_get_header` and `handle_submit_block_v1` responses + /// instead of the canonical `application/json` / `application/octet-stream` + /// value. The body is still serialized according to the encoding that was + /// negotiated via `Accept`. Used to exercise PBS tolerance of + /// MIME-parameter suffixes like `application/octet-stream; charset=binary`. + response_content_type_override: Option, received_get_header: Arc, received_get_status: Arc, received_register_validator: Arc, received_submit_block: Arc, response_override: RwLock>, + bid_value: RwLock, } impl MockRelayState { @@ -77,6 +105,12 @@ impl MockRelayState { pub fn use_not_found_for_submit_block(&self) -> bool { self.use_not_found_for_submit_block } + pub fn submit_block_ssz_status_override(&self) -> Option { + self.submit_block_ssz_status_override + } + pub fn response_content_type_override(&self) -> Option<&str> { + self.response_content_type_override.as_deref() + } pub fn set_response_override(&self, status: StatusCode) { *self.response_override.write().unwrap() = Some(status); } @@ -90,14 +124,27 @@ impl MockRelayState { large_body: false, supports_submit_block_v2: true, use_not_found_for_submit_block: false, + submit_block_ssz_status_override: None, + response_content_type_override: None, received_get_header: Default::default(), received_get_status: Default::default(), received_register_validator: Default::default(), received_submit_block: Default::default(), response_override: RwLock::new(None), + bid_value: RwLock::new(U256::from(10)), + supported_content_types: Arc::new( + [EncodingType::Json, EncodingType::Ssz].iter().cloned().collect(), + ), } } + /// Override the bid value returned by this relay. Defaults to + /// `U256::from(10)`. + pub fn with_bid_value(self, value: U256) -> Self { + *self.bid_value.write().unwrap() = value; + self + } + pub fn with_large_body(self) -> Self { Self { large_body: true, ..self } } @@ -109,6 +156,23 @@ impl MockRelayState { pub fn with_not_found_for_submit_block(self) -> Self { Self { use_not_found_for_submit_block: true, ..self } } + + /// Make `handle_submit_block_v1`/`v2` respond with `status` whenever the + /// request comes in as SSZ (`Content-Type: application/octet-stream`). + /// JSON requests still go through the normal happy path, which lets a + /// single test cover the SSZ→JSON retry behavior. + pub fn with_submit_block_ssz_status(self, status: StatusCode) -> Self { + Self { submit_block_ssz_status_override: Some(status), ..self } + } + + /// Make the relay advertise `raw_content_type` as the `Content-Type` + /// header on `get_header` and `submit_block_v1` responses. The body is + /// still encoded via the negotiated [`EncodingType`] — only the header + /// string changes. Use this to drive PBS tolerance of MIME-parameter + /// suffixes (e.g. `application/octet-stream; charset=binary`). + pub fn with_response_content_type(self, raw_content_type: impl Into) -> Self { + Self { response_content_type_override: Some(raw_content_type.into()), ..self } + } } pub fn mock_relay_app_router(state: Arc) -> Router { @@ -132,40 +196,126 @@ pub fn mock_relay_app_router(state: Arc) -> Router { async fn handle_get_header( State(state): State>, Path(GetHeaderParams { parent_hash, .. }): Path, + headers: HeaderMap, ) -> Response { state.received_get_header.fetch_add(1, Ordering::Relaxed); + let accept_types = get_accept_types(&headers) + .map_err(|e| (StatusCode::BAD_REQUEST, format!("error parsing accept header: {e}"))); + if let Err(e) = accept_types { + return e.into_response(); + } + let accept_types = accept_types.unwrap(); + let consensus_version_header = + get_consensus_version_header(&headers).unwrap_or(ForkName::Electra); - let mut header = ExecutionPayloadHeaderElectra { - parent_hash: parent_hash.into(), - block_hash: Default::default(), - timestamp: timestamp_of_slot_start_sec(0, state.chain), - ..ExecutionPayloadHeaderElectra::test_random() + let content_type = if state.supported_content_types.contains(&EncodingType::Ssz) && + accept_types.contains(EncodingType::Ssz) + { + EncodingType::Ssz + } else if state.supported_content_types.contains(&EncodingType::Json) && + accept_types.contains(EncodingType::Json) + { + EncodingType::Json + } else { + return (StatusCode::NOT_ACCEPTABLE, "No acceptable content type found".to_string()) + .into_response(); }; - header.block_hash.0[0] = 1; + let bid_value = *state.bid_value.read().unwrap(); - let message = BuilderBid::Electra(BuilderBidElectra { - header, - blob_kzg_commitments: Default::default(), - execution_requests: ExecutionRequests::default(), - value: U256::from(10), - pubkey: state.signer.public_key().into(), - }); + let data = match consensus_version_header { + ForkName::Electra => { + let mut header = ExecutionPayloadHeaderElectra { + parent_hash: parent_hash.into(), + block_hash: Default::default(), + timestamp: timestamp_of_slot_start_sec(0, state.chain), + ..ExecutionPayloadHeaderElectra::test_random() + }; + header.block_hash.0[0] = 1; - let object_root = message.tree_hash_root(); - let signature = sign_builder_root(state.chain, &state.signer, &object_root); - let response = SignedBuilderBid { message, signature }; + let message = BuilderBid::Electra(BuilderBidElectra { + header, + blob_kzg_commitments: Default::default(), + execution_requests: ExecutionRequests::default(), + value: bid_value, + pubkey: state.signer.public_key().into(), + }); + let object_root = message.tree_hash_root(); + let signature = sign_builder_root(state.chain, &state.signer, &object_root); + let response = SignedBuilderBid { message, signature }; + if content_type == EncodingType::Ssz { + response.as_ssz_bytes() + } else { + let versioned_response = GetHeaderResponse { + version: ForkName::Electra, + data: response, + metadata: Default::default(), + }; + serde_json::to_vec(&versioned_response).unwrap() + } + } + ForkName::Fulu => { + let mut header = ExecutionPayloadHeaderFulu { + parent_hash: parent_hash.into(), + block_hash: Default::default(), + timestamp: timestamp_of_slot_start_sec(0, state.chain), + ..ExecutionPayloadHeaderFulu::test_random() + }; + header.block_hash.0[0] = 1; - let response = GetHeaderResponse { - version: ForkName::Electra, - data: response, - metadata: Default::default(), + let message = BuilderBid::Fulu(BuilderBidFulu { + header, + blob_kzg_commitments: Default::default(), + execution_requests: ExecutionRequests::default(), + value: bid_value, + pubkey: state.signer.public_key().into(), + }); + let object_root = message.tree_hash_root(); + let signature = sign_builder_root(state.chain, &state.signer, &object_root); + let response = SignedBuilderBid { message, signature }; + if content_type == EncodingType::Ssz { + response.as_ssz_bytes() + } else { + let versioned_response = GetHeaderResponse { + version: ForkName::Fulu, + data: response, + metadata: Default::default(), + }; + serde_json::to_vec(&versioned_response).unwrap() + } + } + _ => { + return ( + StatusCode::BAD_REQUEST, + format!("Unsupported fork {consensus_version_header}"), + ) + .into_response(); + } }; - (StatusCode::OK, Json(response)).into_response() + + let mut response = (StatusCode::OK, data).into_response(); + let consensus_version_header = + HeaderValue::from_str(&consensus_version_header.to_string()).unwrap(); + let content_type_str = state + .response_content_type_override() + .map(|s| s.to_string()) + .unwrap_or_else(|| content_type.to_string()); + let content_type_header = HeaderValue::from_str(&content_type_str).unwrap(); + response.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); + response.headers_mut().insert(CONTENT_TYPE, content_type_header); + response } async fn handle_get_status(State(state): State>) -> impl IntoResponse { state.received_get_status.fetch_add(1, Ordering::Relaxed); + // Production `get_status` dispatches relays concurrently via `select_ok`, + // which cancels losing futures as soon as any relay returns OK. On a + // loaded runner this can abort a sibling relay's reqwest send before + // its handler is entered, so the test-side counter only reaches 1. A + // tiny response delay (counter already bumped above) guarantees every + // concurrent request lands in a handler before any response is written, + // eliminating the flake without altering production behavior. + tokio::time::sleep(Duration::from_millis(20)).await; StatusCode::OK } @@ -184,17 +334,61 @@ async fn handle_register_validator( } async fn handle_submit_block_v1( + headers: HeaderMap, State(state): State>, - Json(submit_block): Json, + body_bytes: axum::body::Bytes, ) -> Response { if state.use_not_found_for_submit_block() { return StatusCode::NOT_FOUND.into_response(); } state.received_submit_block.fetch_add(1, Ordering::Relaxed); - if state.large_body() { - (StatusCode::OK, Json(vec![1u8; 1 + MAX_SIZE_SUBMIT_BLOCK_RESPONSE])).into_response() + // Short-circuit SSZ requests with an overridden status so tests can + // drive the PBS SSZ→JSON retry logic. JSON requests still take the + // normal path so a single mock run can exercise both attempts. + if let Some(status) = state.submit_block_ssz_status_override() && + get_content_type(&headers) == EncodingType::Ssz + { + return (status, "forced ssz override").into_response(); + } + let accept_types = get_accept_types(&headers) + .map_err(|e| (StatusCode::BAD_REQUEST, format!("error parsing accept header: {e}"))); + if let Err(e) = accept_types { + return e.into_response(); + } + let accept_types = accept_types.unwrap(); + let consensus_version_header = get_consensus_version_header(&headers); + let response_content_type = if state.supported_content_types.contains(&EncodingType::Ssz) && + accept_types.contains(EncodingType::Ssz) + { + EncodingType::Ssz + } else if state.supported_content_types.contains(&EncodingType::Json) && + accept_types.contains(EncodingType::Json) + { + EncodingType::Json + } else { + return (StatusCode::NOT_ACCEPTABLE, "No acceptable content type found".to_string()) + .into_response(); + }; + + // Error out if the request content type is not supported + let content_type = get_content_type(&headers); + if !state.supported_content_types.contains(&content_type) { + return (StatusCode::UNSUPPORTED_MEDIA_TYPE, "Unsupported content type".to_string()) + .into_response(); + }; + + let data = if state.large_body() { + vec![1u8; 1 + MAX_SIZE_SUBMIT_BLOCK_RESPONSE] } else { let mut execution_payload = ExecutionPayloadElectra::test_random(); + let submit_block = deserialize_body(&headers, body_bytes).await.map_err(|e| { + error!(%e, "failed to deserialize signed blinded block"); + (StatusCode::BAD_REQUEST, format!("failed to deserialize body: {e}")) + }); + if let Err(e) = submit_block { + return e.into_response(); + } + let submit_block = submit_block.unwrap(); execution_payload.block_hash = submit_block.block_hash().into(); let mut blobs_bundle = BlobsBundle::default(); @@ -207,19 +401,60 @@ async fn handle_submit_block_v1( let response = PayloadAndBlobs { execution_payload: execution_payload.into(), blobs_bundle }; - let response = SubmitBlindedBlockResponse { - version: ForkName::Electra, - metadata: Default::default(), - data: response, - }; + if response_content_type == EncodingType::Ssz { + response.as_ssz_bytes() + } else { + // Return JSON for everything else; this is fine for the mock + let response = SubmitBlindedBlockResponse { + version: ForkName::Electra, + metadata: Default::default(), + data: response, + }; + serde_json::to_vec(&response).unwrap() + } + }; - (StatusCode::OK, Json(response)).into_response() + let mut response = (StatusCode::OK, data).into_response(); + if response_content_type == EncodingType::Ssz { + let consensus_version_header = match consensus_version_header { + Some(header) => header, + None => { + return (StatusCode::BAD_REQUEST, "Missing consensus version header".to_string()) + .into_response() + } + }; + let consensus_version_header = + HeaderValue::from_str(&consensus_version_header.to_string()).unwrap(); + response.headers_mut().insert(CONSENSUS_VERSION_HEADER, consensus_version_header); } + let content_type_str = state + .response_content_type_override() + .map(|s| s.to_string()) + .unwrap_or_else(|| response_content_type.to_string()); + let content_type_header = HeaderValue::from_str(&content_type_str).unwrap(); + response.headers_mut().insert(CONTENT_TYPE, content_type_header); + response } -async fn handle_submit_block_v2(State(state): State>) -> Response { + +async fn handle_submit_block_v2( + headers: HeaderMap, + State(state): State>, +) -> Response { if state.use_not_found_for_submit_block() { return StatusCode::NOT_FOUND.into_response(); } state.received_submit_block.fetch_add(1, Ordering::Relaxed); + // See comment in `handle_submit_block_v1`. Override SSZ with the + // injected status so C3 tests can assert retry / no-retry behavior. + if let Some(status) = state.submit_block_ssz_status_override() && + get_content_type(&headers) == EncodingType::Ssz + { + return (status, "forced ssz override").into_response(); + } + let content_type = get_content_type(&headers); + if !state.supported_content_types.contains(&content_type) { + return (StatusCode::NOT_ACCEPTABLE, "No acceptable content type found".to_string()) + .into_response(); + }; (StatusCode::ACCEPTED, "").into_response() } diff --git a/tests/src/mock_validator.rs b/tests/src/mock_validator.rs index ab593277..07fa8f06 100644 --- a/tests/src/mock_validator.rs +++ b/tests/src/mock_validator.rs @@ -2,9 +2,9 @@ use alloy::{primitives::B256, rpc::types::beacon::relay::ValidatorRegistration}; use cb_common::{ pbs::{BuilderApiVersion, RelayClient, SignedBlindedBeaconBlock}, types::BlsPublicKey, - utils::bls_pubkey_from_hex, + utils::{CONSENSUS_VERSION_HEADER, EncodingType, ForkName, bls_pubkey_from_hex}, }; -use reqwest::Response; +use reqwest::{Response, header::ACCEPT}; use crate::utils::generate_mock_relay; @@ -20,13 +20,36 @@ impl MockValidator { Ok(Self { comm_boost: generate_mock_relay(port, pubkey)? }) } - pub async fn do_get_header(&self, pubkey: Option) -> eyre::Result { + pub async fn do_get_header( + &self, + pubkey: Option, + accept: Vec, + fork_name: ForkName, + ) -> eyre::Result { let default_pubkey = bls_pubkey_from_hex( "0xac6e77dfe25ecd6110b8e780608cce0dab71fdd5ebea22a16c0205200f2f8e2e3ad3b71d3499c54ad14d6c21b41a37ae", )?; let url = self.comm_boost.get_header_url(0, &B256::ZERO, &pubkey.unwrap_or(default_pubkey))?; - Ok(self.comm_boost.client.get(url).send().await?) + let accept = match accept.len() { + 0 => None, + 1 => Some(accept.into_iter().next().unwrap().to_string()), + _ => { + let accept_strings: Vec = + accept.into_iter().map(|e| e.to_string()).collect(); + Some(accept_strings.join(", ")) + } + }; + let mut res = self + .comm_boost + .client + .get(url) + .header(CONSENSUS_VERSION_HEADER, &fork_name.to_string()); + if let Some(accept_header) = accept { + res = res.header(ACCEPT, accept_header); + } + let res = res.send().await?; + Ok(res) } pub async fn do_get_status(&self) -> eyre::Result { @@ -49,16 +72,16 @@ impl MockValidator { pub async fn do_submit_block_v1( &self, - signed_blinded_block: Option, + signed_blinded_block_opt: Option, ) -> eyre::Result { - self.do_submit_block_impl(signed_blinded_block, BuilderApiVersion::V1).await + self.do_submit_block_impl(signed_blinded_block_opt, BuilderApiVersion::V1).await } pub async fn do_submit_block_v2( &self, - signed_blinded_block: Option, + signed_blinded_block_opt: Option, ) -> eyre::Result { - self.do_submit_block_impl(signed_blinded_block, BuilderApiVersion::V2).await + self.do_submit_block_impl(signed_blinded_block_opt, BuilderApiVersion::V2).await } async fn do_submit_block_impl( diff --git a/tests/src/utils.rs b/tests/src/utils.rs index aff7c335..f1ed0114 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -27,6 +27,18 @@ pub fn get_local_address(port: u16) -> String { format!("http://0.0.0.0:{port}") } +/// Bind to port 0 and let the OS assign an unused ephemeral port. +/// +/// The returned listener keeps the port reserved. Pass it to +/// [`start_mock_relay_service_with_listener`] so the socket is never released +/// between allocation and use (zero TOCTOU race). For servers that bind +/// internally (e.g. `PbsService::run`), read the port with +/// `listener.local_addr().unwrap().port()`, then `drop` the listener +/// immediately before starting the server. +pub async fn get_free_listener() -> tokio::net::TcpListener { + tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap() +} + static SYNC_SETUP: Once = Once::new(); pub fn setup_test_env() { SYNC_SETUP.call_once(|| { @@ -83,6 +95,7 @@ pub fn get_pbs_config(port: u16) -> PbsConfig { min_bid_wei: U256::ZERO, late_in_slot_time_ms: u64::MAX, extra_validation_enabled: false, + ssv_node_api_url: Url::parse("http://localhost:0").unwrap(), ssv_public_api_url: Url::parse("http://localhost:0").unwrap(), rpc_url: None, diff --git a/tests/tests/pbs_cfg_file_update.rs b/tests/tests/pbs_cfg_file_update.rs index 770421a3..37dd4eb3 100644 --- a/tests/tests/pbs_cfg_file_update.rs +++ b/tests/tests/pbs_cfg_file_update.rs @@ -9,11 +9,14 @@ use cb_common::{ }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ - mock_relay::{MockRelayState, start_mock_relay_service}, + mock_relay::{MockRelayState, start_mock_relay_service_with_listener}, mock_validator::MockValidator, - utils::{generate_mock_relay, get_pbs_config, setup_test_env, to_pbs_config}, + utils::{ + generate_mock_relay, get_free_listener, get_pbs_config, setup_test_env, to_pbs_config, + }, }; use eyre::Result; +use lh_types::ForkName; use reqwest::StatusCode; use tracing::info; use url::Url; @@ -28,20 +31,23 @@ async fn test_cfg_file_update() -> Result<()> { let pubkey = signer.public_key(); let chain = Chain::Hoodi; - let pbs_port = 3730; + let pbs_listener = get_free_listener().await; + let relay1_listener = get_free_listener().await; + let relay2_listener = get_free_listener().await; + let pbs_port = pbs_listener.local_addr().unwrap().port(); + let relay1_port = relay1_listener.local_addr().unwrap().port(); + let relay2_port = relay2_listener.local_addr().unwrap().port(); // Start relay 1 - let relay1_port = pbs_port + 1; let relay1 = generate_mock_relay(relay1_port, pubkey.clone())?; let relay1_state = Arc::new(MockRelayState::new(chain, signer.clone())); - tokio::spawn(start_mock_relay_service(relay1_state.clone(), relay1_port)); + tokio::spawn(start_mock_relay_service_with_listener(relay1_state.clone(), relay1_listener)); // Start relay 2 - let relay2_port = relay1_port + 1; let relay2 = generate_mock_relay(relay2_port, pubkey.clone())?; let relay2_id = relay2.id.clone().to_string(); let relay2_state = Arc::new(MockRelayState::new(chain, signer)); - tokio::spawn(start_mock_relay_service(relay2_state.clone(), relay2_port)); + tokio::spawn(start_mock_relay_service_with_listener(relay2_state.clone(), relay2_listener)); // Make a config with relay 1 only let pbs_config = PbsConfig { @@ -104,6 +110,7 @@ async fn test_cfg_file_update() -> Result<()> { // Run the PBS service let config = to_pbs_config(chain, get_pbs_config(pbs_port), vec![relay1.clone()]); let state = PbsState::new(config, config_path.clone()); + drop(pbs_listener); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers - extra time for the file watcher @@ -112,7 +119,7 @@ async fn test_cfg_file_update() -> Result<()> { // Send a get header request - should go to relay 1 only let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header"); - let res = mock_validator.do_get_header(None).await?; + let res = mock_validator.do_get_header(None, Vec::new(), ForkName::Fulu).await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(relay1_state.received_get_header(), 1); assert_eq!(relay2_state.received_get_header(), 0); @@ -154,7 +161,7 @@ async fn test_cfg_file_update() -> Result<()> { // Send another get header request - should go to relay 2 only info!("Sending get header after config update"); - let res = mock_validator.do_get_header(None).await?; + let res = mock_validator.do_get_header(None, Vec::new(), ForkName::Fulu).await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(relay1_state.received_get_header(), 1); // no change assert_eq!(relay2_state.received_get_header(), 1); // incremented diff --git a/tests/tests/pbs_get_header.rs b/tests/tests/pbs_get_header.rs index 1cfdc3bb..7228cb6e 100644 --- a/tests/tests/pbs_get_header.rs +++ b/tests/tests/pbs_get_header.rs @@ -1,60 +1,194 @@ -use std::{path::PathBuf, sync::Arc, time::Duration}; +use std::{collections::HashSet, path::PathBuf, sync::Arc, time::Duration}; use alloy::primitives::{B256, U256}; use cb_common::{ - pbs::GetHeaderResponse, + pbs::{GetHeaderResponse, SignedBuilderBid}, signature::sign_builder_root, signer::random_secret, types::{BlsPublicKeyBytes, Chain}, - utils::timestamp_of_slot_start_sec, + utils::{EncodingType, ForkName, get_consensus_version_header, timestamp_of_slot_start_sec}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ - mock_relay::{MockRelayState, start_mock_relay_service}, + mock_relay::{MockRelayState, start_mock_relay_service_with_listener}, mock_validator::MockValidator, - utils::{generate_mock_relay, get_pbs_config, setup_test_env, to_pbs_config}, + utils::{ + generate_mock_relay, get_free_listener, get_pbs_config, setup_test_env, to_pbs_config, + }, }; use eyre::Result; -use lh_types::ForkName; +use lh_eth2::EmptyMetadata; +use lh_types::ForkVersionDecode; use reqwest::StatusCode; use tracing::info; use tree_hash::TreeHash; +use url::Url; +/// Test requesting JSON when the relay supports JSON #[tokio::test] async fn test_get_header() -> Result<()> { + test_get_header_impl( + vec![EncodingType::Json], + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + 1, + StatusCode::OK, + U256::from(10u64), + U256::ZERO, + None, + ForkName::Electra, + ) + .await +} + +/// Test requesting SSZ when the relay supports SSZ +#[tokio::test] +async fn test_get_header_ssz() -> Result<()> { + test_get_header_impl( + vec![EncodingType::Ssz], + HashSet::from([EncodingType::Ssz, EncodingType::Json]), + 1, + StatusCode::OK, + U256::from(10u64), + U256::ZERO, + None, + ForkName::Electra, + ) + .await +} + +/// Test requesting SSZ when the relay only supports JSON, which should be +/// handled because PBS supports both types internally and re-maps them on the +/// fly +#[tokio::test] +async fn test_get_header_ssz_into_json() -> Result<()> { + test_get_header_impl( + vec![EncodingType::Ssz], + HashSet::from([EncodingType::Json]), + 1, + StatusCode::OK, + U256::from(10u64), + U256::ZERO, + None, + ForkName::Electra, + ) + .await +} + +/// Test requesting multiple types when the relay supports SSZ, which should +/// return SSZ +#[tokio::test] +async fn test_get_header_multitype_ssz() -> Result<()> { + test_get_header_impl( + vec![EncodingType::Ssz, EncodingType::Json], + HashSet::from([EncodingType::Ssz]), + 1, + StatusCode::OK, + U256::from(10u64), + U256::ZERO, + None, + ForkName::Electra, + ) + .await +} + +/// Test requesting multiple types when the relay supports JSON, which should +/// still work +#[tokio::test] +async fn test_get_header_multitype_json() -> Result<()> { + test_get_header_impl( + vec![EncodingType::Ssz, EncodingType::Json], + HashSet::from([EncodingType::Json]), + 1, + StatusCode::OK, + U256::from(10u64), + U256::ZERO, + None, + ForkName::Electra, + ) + .await +} + +/// Core implementation for get_header tests. +/// Pass `rpc_url: Some(url)` when testing `HeaderValidationMode::Extra` — PBS +/// requires a non-None rpc_url to start in that mode. A non-existent address is +/// fine; if the parent block fetch fails the relay response is still returned +/// (extra validation is skipped with a warning). +#[allow(clippy::too_many_arguments)] +async fn test_get_header_impl( + accept_types: Vec, + relay_types: HashSet, + expected_try_count: u64, + expected_code: StatusCode, + bid_value: U256, + min_bid_wei: U256, + rpc_url: Option, + fork_name: ForkName, +) -> Result<()> { + // Setup test environment setup_test_env(); let signer = random_secret(); let pubkey = signer.public_key(); - let chain = Chain::Holesky; - let pbs_port = 3200; - let relay_port = pbs_port + 1; + let pbs_listener = get_free_listener().await; + let relay_listener = get_free_listener().await; + let pbs_port = pbs_listener.local_addr().unwrap().port(); + let relay_port = relay_listener.local_addr().unwrap().port(); - // Run a mock relay - let mock_state = Arc::new(MockRelayState::new(chain, signer)); + let mut mock_state = MockRelayState::new(chain, signer).with_bid_value(bid_value); + mock_state.supported_content_types = Arc::new(relay_types); + let mock_state = Arc::new(mock_state); let mock_relay = generate_mock_relay(relay_port, pubkey)?; - tokio::spawn(start_mock_relay_service(mock_state.clone(), relay_port)); + tokio::spawn(start_mock_relay_service_with_listener(mock_state.clone(), relay_listener)); // Run the PBS service - let config = to_pbs_config(chain, get_pbs_config(pbs_port), vec![mock_relay.clone()]); + let mut pbs_config = get_pbs_config(pbs_port); + pbs_config.min_bid_wei = min_bid_wei; + pbs_config.rpc_url = rpc_url; + let config = to_pbs_config(chain, pbs_config, vec![mock_relay.clone()]); let state = PbsState::new(config, PathBuf::new()); + drop(pbs_listener); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers tokio::time::sleep(Duration::from_millis(100)).await; + // Send the get_header request let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header"); - let res = mock_validator.do_get_header(None).await?; - assert_eq!(res.status(), StatusCode::OK); + let res = mock_validator.do_get_header(None, accept_types.clone(), fork_name).await?; + assert_eq!(res.status(), expected_code); + assert_eq!(mock_state.received_get_header(), expected_try_count); + match expected_code { + StatusCode::OK => {} + _ => return Ok(()), + } - let res = serde_json::from_slice::(&res.bytes().await?)?; + // Get the content type + let content_type = match res + .headers() + .get(reqwest::header::CONTENT_TYPE) + .and_then(|ct| ct.to_str().ok()) + .unwrap() + { + ct if ct == EncodingType::Ssz.to_string() => EncodingType::Ssz, + ct if ct == EncodingType::Json.to_string() => EncodingType::Json, + _ => panic!("unexpected content type"), + }; + assert!(accept_types.contains(&content_type)); - assert_eq!(mock_state.received_get_header(), 1); - assert_eq!(res.version, ForkName::Electra); + // Get the data + let res = match content_type { + EncodingType::Json => serde_json::from_slice::(&res.bytes().await?)?, + EncodingType::Ssz => { + let fork = + get_consensus_version_header(res.headers()).expect("missing fork version header"); + let data = SignedBuilderBid::from_ssz_bytes_by_fork(&res.bytes().await?, fork).unwrap(); + GetHeaderResponse { version: fork, data, metadata: EmptyMetadata::default() } + } + }; assert_eq!(res.data.message.header().block_hash().0[0], 1); assert_eq!(res.data.message.header().parent_hash().0, B256::ZERO); - assert_eq!(*res.data.message.value(), U256::from(10)); + assert_eq!(*res.data.message.value(), bid_value); assert_eq!(*res.data.message.pubkey(), BlsPublicKeyBytes::from(mock_state.signer.public_key())); assert_eq!(res.data.message.header().timestamp(), timestamp_of_slot_start_sec(0, chain)); assert_eq!( @@ -65,25 +199,30 @@ async fn test_get_header() -> Result<()> { } #[tokio::test] -async fn test_get_header_returns_204_if_relay_down() -> Result<()> { +async fn test_get_header_returns_204_if_no_relay_reachable() -> Result<()> { setup_test_env(); let signer = random_secret(); let pubkey = signer.public_key(); let chain = Chain::Holesky; - let pbs_port = 3300; - let relay_port = pbs_port + 1; + let pbs_listener = get_free_listener().await; + let relay_listener = get_free_listener().await; + let pbs_port = pbs_listener.local_addr().unwrap().port(); + let relay_port = relay_listener.local_addr().unwrap().port(); // Create a mock relay client let mock_state = Arc::new(MockRelayState::new(chain, signer)); let mock_relay = generate_mock_relay(relay_port, pubkey)?; // Don't start the relay - // tokio::spawn(start_mock_relay_service(mock_state.clone(), relay_port)); + // tokio::spawn(start_mock_relay_service_with_listener(mock_state.clone(), + // relay_listener)); + drop(relay_listener); // Run the PBS service let config = to_pbs_config(chain, get_pbs_config(pbs_port), vec![mock_relay.clone()]); let state = PbsState::new(config, PathBuf::new()); + drop(pbs_listener); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers @@ -91,7 +230,7 @@ async fn test_get_header_returns_204_if_relay_down() -> Result<()> { let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header"); - let res = mock_validator.do_get_header(None).await?; + let res = mock_validator.do_get_header(None, Vec::new(), ForkName::Electra).await?; assert_eq!(res.status(), StatusCode::NO_CONTENT); // 204 error assert_eq!(mock_state.received_get_header(), 0); // no header received @@ -105,17 +244,20 @@ async fn test_get_header_returns_400_if_request_is_invalid() -> Result<()> { let pubkey = signer.public_key(); let chain = Chain::Holesky; - let pbs_port = 3400; - let relay_port = pbs_port + 1; + let pbs_listener = get_free_listener().await; + let relay_listener = get_free_listener().await; + let pbs_port = pbs_listener.local_addr().unwrap().port(); + let relay_port = relay_listener.local_addr().unwrap().port(); // Run a mock relay let mock_state = Arc::new(MockRelayState::new(chain, signer)); let mock_relay = generate_mock_relay(relay_port, pubkey.clone())?; - tokio::spawn(start_mock_relay_service(mock_state.clone(), relay_port)); + tokio::spawn(start_mock_relay_service_with_listener(mock_state.clone(), relay_listener)); // Run the PBS service let config = to_pbs_config(chain, get_pbs_config(pbs_port), vec![mock_relay.clone()]); let state = PbsState::new(config, PathBuf::new()); + drop(pbs_listener); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers @@ -141,3 +283,204 @@ async fn test_get_header_returns_400_if_request_is_invalid() -> Result<()> { assert_eq!(mock_state.received_get_header(), 0); // no header received Ok(()) } + +/// All validation modes (None, Standard, Extra) enforce the min-bid threshold. +/// None skips expensive crypto checks; Standard adds sigverify + structural +/// checks; Extra adds the parent-block check via EL RPC (which is skipped with +/// a warning if the fetch fails, so a non-existent RPC URL still passes here). +#[tokio::test] +async fn test_get_header_extra_validation_enforce_min_bid() -> Result<()> { + let relay_bid = U256::from(7u64); + let min_bid_above_relay = relay_bid + U256::from(1); + // A syntactically valid URL that will never connect — Extra mode config + // validation only requires rpc_url to be Some; the actual fetch failing is + // handled gracefully (extra validation is skipped with a warning). + let fake_rpc: Url = "http://127.0.0.1:1".parse()?; + + // Bid below min → all modes reject (204). + test_get_header_impl( + vec![EncodingType::Json], + HashSet::from([EncodingType::Json]), + 1, + StatusCode::NO_CONTENT, + relay_bid, + min_bid_above_relay, + Some(fake_rpc.clone()), + ForkName::Electra, + ) + .await?; + + // Bid above min → all modes accept (200). + test_get_header_impl( + vec![EncodingType::Json], + HashSet::from([EncodingType::Json]), + 1, + StatusCode::OK, + min_bid_above_relay, + U256::ZERO, + Some(fake_rpc), + ForkName::Electra, + ) + .await?; + + Ok(()) +} + +/// Verify the mock relay returns 400 when the validator requests an unsupported +/// fork. Tested by pointing MockValidator directly at the relay (no PBS) so the +/// assertion is on the relay's raw response, not PBS's 204 fallback. +#[tokio::test] +async fn test_get_header_unsupported_fork_returns_400() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let chain = Chain::Holesky; + + let relay_listener = get_free_listener().await; + let relay_port = relay_listener.local_addr().unwrap().port(); + let mock_state = Arc::new(MockRelayState::new(chain, signer.clone())); + tokio::spawn(start_mock_relay_service_with_listener(mock_state, relay_listener)); + + tokio::time::sleep(Duration::from_millis(100)).await; + + // Point MockValidator directly at the relay (no PBS in the path). + let direct = MockValidator::new(relay_port)?; + for unsupported_fork in [ForkName::Base, ForkName::Altair] { + let res = direct.do_get_header(None, vec![EncodingType::Json], unsupported_fork).await?; + assert_eq!( + res.status(), + StatusCode::BAD_REQUEST, + "expected 400 for unsupported fork {unsupported_fork}" + ); + } + Ok(()) +} + +/// Exhaustive bid-acceptance matrix across every (fork, encoding, mode, bid) +/// combination. +#[tokio::test] +async fn test_get_header_bid_validation_matrix() -> Result<()> { + let bid_low = U256::from(5u64); + let bid_high = U256::from(100u64); + let min_bid = U256::from(50u64); + + // (fork, encoding, mode, relay_bid, expected_status) + let cases: &[(ForkName, EncodingType, U256, StatusCode)] = &[ + (ForkName::Electra, EncodingType::Json, bid_low, StatusCode::NO_CONTENT), + (ForkName::Electra, EncodingType::Json, bid_high, StatusCode::OK), + (ForkName::Electra, EncodingType::Ssz, bid_low, StatusCode::NO_CONTENT), + (ForkName::Electra, EncodingType::Ssz, bid_high, StatusCode::OK), + (ForkName::Fulu, EncodingType::Json, bid_low, StatusCode::NO_CONTENT), + (ForkName::Fulu, EncodingType::Json, bid_high, StatusCode::OK), + (ForkName::Fulu, EncodingType::Ssz, bid_low, StatusCode::NO_CONTENT), + (ForkName::Fulu, EncodingType::Ssz, bid_high, StatusCode::OK), + (ForkName::Electra, EncodingType::Json, bid_low, StatusCode::NO_CONTENT), + (ForkName::Electra, EncodingType::Json, bid_high, StatusCode::OK), + (ForkName::Electra, EncodingType::Ssz, bid_low, StatusCode::NO_CONTENT), + (ForkName::Electra, EncodingType::Ssz, bid_high, StatusCode::OK), + (ForkName::Fulu, EncodingType::Json, bid_low, StatusCode::NO_CONTENT), + (ForkName::Fulu, EncodingType::Json, bid_high, StatusCode::OK), + (ForkName::Fulu, EncodingType::Ssz, bid_low, StatusCode::NO_CONTENT), + (ForkName::Fulu, EncodingType::Ssz, bid_high, StatusCode::OK), + ]; + + for (i, &(fork, encoding, relay_bid, expected_status)) in cases.iter().enumerate() { + test_get_header_impl( + vec![encoding], + HashSet::from([encoding]), + 1, + expected_status, + relay_bid, + min_bid, + None, + fork, + ) + .await + .map_err(|e| { + eyre::eyre!("case {i} (fork={fork} enc={encoding} bid={relay_bid} min={min_bid}): {e}") + })?; + } + Ok(()) +} + +/// PBS must accept relay `Content-Type` values that include MIME parameters +/// (e.g. `application/octet-stream; charset=binary`). The audit fix for C2 +/// switched `EncodingType::from_str` to parse via the `mediatype` crate; +/// this test exercises the full relay→PBS→BN path to guard against +/// regressions at the wire boundary. +#[tokio::test] +async fn test_get_header_tolerates_mime_params_in_content_type() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey = signer.public_key(); + let chain = Chain::Holesky; + let pbs_listener = get_free_listener().await; + let relay_listener = get_free_listener().await; + let pbs_port = pbs_listener.local_addr().unwrap().port(); + let relay_port = relay_listener.local_addr().unwrap().port(); + + let mut mock_state = MockRelayState::new(chain, signer) + .with_response_content_type("application/octet-stream; charset=binary"); + mock_state.supported_content_types = Arc::new(HashSet::from([EncodingType::Ssz])); + let mock_state = Arc::new(mock_state); + let mock_relay = generate_mock_relay(relay_port, pubkey)?; + tokio::spawn(start_mock_relay_service_with_listener(mock_state.clone(), relay_listener)); + + let pbs_config = get_pbs_config(pbs_port); + let config = to_pbs_config(chain, pbs_config, vec![mock_relay]); + let state = PbsState::new(config, PathBuf::new()); + drop(pbs_listener); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let mock_validator = MockValidator::new(pbs_port)?; + let res = + mock_validator.do_get_header(None, vec![EncodingType::Ssz], ForkName::Electra).await?; + assert_eq!(res.status(), StatusCode::OK, "PBS should tolerate `; charset=binary` MIME param"); + assert_eq!(mock_state.received_get_header(), 1); + + let fork = get_consensus_version_header(res.headers()).expect("missing fork version header"); + let bytes = res.bytes().await?; + let data = SignedBuilderBid::from_ssz_bytes_by_fork(&bytes, fork).unwrap(); + assert_eq!(data.message.header().block_hash().0[0], 1); + Ok(()) +} + +/// Same guarantee on the JSON path: `application/json; charset=utf-8` (the +/// value some production relays actually emit) must be accepted as JSON. +#[tokio::test] +async fn test_get_header_tolerates_json_charset_param() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey = signer.public_key(); + let chain = Chain::Holesky; + let pbs_listener = get_free_listener().await; + let relay_listener = get_free_listener().await; + let pbs_port = pbs_listener.local_addr().unwrap().port(); + let relay_port = relay_listener.local_addr().unwrap().port(); + + let mut mock_state = MockRelayState::new(chain, signer) + .with_response_content_type("application/json; charset=utf-8"); + mock_state.supported_content_types = Arc::new(HashSet::from([EncodingType::Json])); + let mock_state = Arc::new(mock_state); + let mock_relay = generate_mock_relay(relay_port, pubkey)?; + tokio::spawn(start_mock_relay_service_with_listener(mock_state.clone(), relay_listener)); + + let pbs_config = get_pbs_config(pbs_port); + let config = to_pbs_config(chain, pbs_config, vec![mock_relay]); + let state = PbsState::new(config, PathBuf::new()); + drop(pbs_listener); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let mock_validator = MockValidator::new(pbs_port)?; + let res = + mock_validator.do_get_header(None, vec![EncodingType::Json], ForkName::Electra).await?; + assert_eq!(res.status(), StatusCode::OK, "PBS should tolerate `; charset=utf-8` MIME param"); + assert_eq!(mock_state.received_get_header(), 1); + + let body: GetHeaderResponse = serde_json::from_slice(&res.bytes().await?)?; + assert_eq!(body.data.message.header().block_hash().0[0], 1); + Ok(()) +} diff --git a/tests/tests/pbs_mux.rs b/tests/tests/pbs_mux.rs index 4f842d56..6d093bef 100644 --- a/tests/tests/pbs_mux.rs +++ b/tests/tests/pbs_mux.rs @@ -12,17 +12,17 @@ use cb_common::{ }, signer::random_secret, types::Chain, - utils::{ResponseReadError, set_ignore_content_length}, + utils::{ForkName, ResponseReadError, set_ignore_content_length}, }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ - mock_relay::{MockRelayState, start_mock_relay_service}, + mock_relay::{MockRelayState, start_mock_relay_service_with_listener}, mock_ssv_node::{SsvNodeMockState, create_mock_ssv_node_server}, mock_ssv_public::{PublicSsvMockState, TEST_HTTP_TIMEOUT, create_mock_public_ssv_server}, mock_validator::MockValidator, utils::{ - bls_pubkey_from_hex_unchecked, generate_mock_relay, get_pbs_config, setup_test_env, - to_pbs_config, + bls_pubkey_from_hex_unchecked, generate_mock_relay, get_free_listener, get_pbs_config, + setup_test_env, to_pbs_config, }, }; use eyre::Result; @@ -36,7 +36,9 @@ use url::Url; /// from the public API async fn test_ssv_public_network_fetch() -> Result<()> { // Start the mock server - let port = 30100; + let listener = get_free_listener().await; + let port = listener.local_addr().unwrap().port(); + drop(listener); let server_handle = create_mock_public_ssv_server(port, None).await?; let url = Url::parse(&format!("http://localhost:{port}/api/v4/test_chain/validators/in_operator/1")) @@ -74,7 +76,9 @@ async fn test_ssv_public_network_fetch() -> Result<()> { /// body is too large async fn test_ssv_network_fetch_big_data() -> Result<()> { // Start the mock server - let port = 30101; + let listener = get_free_listener().await; + let port = listener.local_addr().unwrap().port(); + drop(listener); let server_handle = cb_tests::mock_ssv_public::create_mock_public_ssv_server(port, None).await?; let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); @@ -106,7 +110,9 @@ async fn test_ssv_network_fetch_big_data() -> Result<()> { /// times out async fn test_ssv_network_fetch_timeout() -> Result<()> { // Start the mock server - let port = 30102; + let listener = get_free_listener().await; + let port = listener.local_addr().unwrap().port(); + drop(listener); let state = PublicSsvMockState { validators: Arc::new(RwLock::new(vec![])), force_timeout: Arc::new(RwLock::new(true)), @@ -135,7 +141,9 @@ async fn test_ssv_network_fetch_timeout() -> Result<()> { /// content-length header is missing async fn test_ssv_network_fetch_big_data_without_content_length() -> Result<()> { // Start the mock server - let port = 30103; + let listener = get_free_listener().await; + let port = listener.local_addr().unwrap().port(); + drop(listener); set_ignore_content_length(true); let server_handle = create_mock_public_ssv_server(port, None).await?; let url = Url::parse(&format!("http://localhost:{port}/big_data")).unwrap(); @@ -167,7 +175,9 @@ async fn test_ssv_network_fetch_big_data_without_content_length() -> Result<()> /// from the node API async fn test_ssv_node_network_fetch() -> Result<()> { // Start the mock server - let port = 30104; + let listener = get_free_listener().await; + let port = listener.local_addr().unwrap().port(); + drop(listener); let _server_handle = create_mock_ssv_node_server(port, None).await?; let url = Url::parse(&format!("http://localhost:{port}/v1/validators")).unwrap(); let response = request_ssv_pubkeys_from_ssv_node( @@ -200,17 +210,24 @@ async fn test_mux() -> Result<()> { let pubkey = signer.public_key(); let chain = Chain::Holesky; - let pbs_port = 3700; - - let mux_relay_1 = generate_mock_relay(pbs_port + 1, pubkey.clone())?; - let mux_relay_2 = generate_mock_relay(pbs_port + 2, pubkey.clone())?; - let default_relay = generate_mock_relay(pbs_port + 3, pubkey.clone())?; + let pbs_listener = get_free_listener().await; + let relay_1_listener = get_free_listener().await; + let relay_2_listener = get_free_listener().await; + let relay_3_listener = get_free_listener().await; + let pbs_port = pbs_listener.local_addr().unwrap().port(); + let relay_1_port = relay_1_listener.local_addr().unwrap().port(); + let relay_2_port = relay_2_listener.local_addr().unwrap().port(); + let relay_3_port = relay_3_listener.local_addr().unwrap().port(); + + let mux_relay_1 = generate_mock_relay(relay_1_port, pubkey.clone())?; + let mux_relay_2 = generate_mock_relay(relay_2_port, pubkey.clone())?; + let default_relay = generate_mock_relay(relay_3_port, pubkey.clone())?; // Run 3 mock relays let mock_state = Arc::new(MockRelayState::new(chain, signer)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 1)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 2)); - tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 3)); + tokio::spawn(start_mock_relay_service_with_listener(mock_state.clone(), relay_1_listener)); + tokio::spawn(start_mock_relay_service_with_listener(mock_state.clone(), relay_2_listener)); + tokio::spawn(start_mock_relay_service_with_listener(mock_state.clone(), relay_3_listener)); // Register all relays in PBS config let relays = vec![default_relay.clone()]; @@ -230,6 +247,7 @@ async fn test_mux() -> Result<()> { // Run PBS service let state = PbsState::new(config, PathBuf::new()); + drop(pbs_listener); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers @@ -238,13 +256,19 @@ async fn test_mux() -> Result<()> { // Send default request without specifying a validator key let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header with default"); - assert_eq!(mock_validator.do_get_header(None).await?.status(), StatusCode::OK); + assert_eq!( + mock_validator.do_get_header(None, Vec::new(), ForkName::Electra).await?.status(), + StatusCode::OK + ); assert_eq!(mock_state.received_get_header(), 1); // only default relay was used // Send request specifying a validator key to use mux info!("Sending get header with mux"); assert_eq!( - mock_validator.do_get_header(Some(validator_pubkey)).await?.status(), + mock_validator + .do_get_header(Some(validator_pubkey), Vec::new(), ForkName::Electra) + .await? + .status(), StatusCode::OK ); assert_eq!(mock_state.received_get_header(), 3); // two mux relays were used @@ -261,12 +285,12 @@ async fn test_mux() -> Result<()> { // v1 Submit block requests should go to all relays info!("Sending submit block v1"); - assert_eq!(mock_validator.do_submit_block_v1(None).await?.status(), StatusCode::OK); + assert_eq!(mock_validator.do_submit_block_v1(None,).await?.status(), StatusCode::OK); assert_eq!(mock_state.received_submit_block(), 3); // default + 2 mux relays were used // v2 Submit block requests should go to all relays info!("Sending submit block v2"); - assert_eq!(mock_validator.do_submit_block_v2(None).await?.status(), StatusCode::ACCEPTED); + assert_eq!(mock_validator.do_submit_block_v2(None,).await?.status(), StatusCode::ACCEPTED); assert_eq!(mock_state.received_submit_block(), 6); // default + 2 mux relays were used Ok(()) @@ -282,10 +306,20 @@ async fn test_ssv_multi_with_node() -> Result<()> { let pubkey2 = signer2.public_key(); let chain = Chain::Hoodi; - let pbs_port = 3711; + let pbs_listener = get_free_listener().await; + let ssv_node_listener = get_free_listener().await; + let ssv_public_listener = get_free_listener().await; + let relay_listener = get_free_listener().await; + let pbs_port = pbs_listener.local_addr().unwrap().port(); + let ssv_node_port = ssv_node_listener.local_addr().unwrap().port(); + let ssv_public_port = ssv_public_listener.local_addr().unwrap().port(); + let relay_port = relay_listener.local_addr().unwrap().port(); + // Drop SSV node + public listeners because their mock server helpers bind the + // port themselves. + drop(ssv_node_listener); + drop(ssv_public_listener); // Start the mock SSV node - let ssv_node_port = pbs_port + 1; let ssv_node_url = Url::parse(&format!("http://localhost:{ssv_node_port}/v1/"))?; let mock_ssv_node_state = SsvNodeMockState { validators: Arc::new(RwLock::new(vec![ @@ -298,7 +332,6 @@ async fn test_ssv_multi_with_node() -> Result<()> { create_mock_ssv_node_server(ssv_node_port, Some(mock_ssv_node_state.clone())).await?; // Start the mock SSV public API - let ssv_public_port = ssv_node_port + 1; let ssv_public_url = Url::parse(&format!("http://localhost:{ssv_public_port}/api/v4/"))?; let mock_ssv_public_state = PublicSsvMockState { validators: Arc::new(RwLock::new(vec![SSVPublicValidator { pubkey: pubkey.clone() }])), @@ -308,11 +341,11 @@ async fn test_ssv_multi_with_node() -> Result<()> { create_mock_public_ssv_server(ssv_public_port, Some(mock_ssv_public_state.clone())).await?; // Start a mock relay to be used by the mux - let relay_port = ssv_public_port + 1; let relay = generate_mock_relay(relay_port, pubkey.clone())?; let relay_id = relay.id.clone().to_string(); let relay_state = Arc::new(MockRelayState::new(chain, signer)); - let relay_task = tokio::spawn(start_mock_relay_service(relay_state.clone(), relay_port)); + let relay_task = + tokio::spawn(start_mock_relay_service_with_listener(relay_state.clone(), relay_listener)); // Create the registry mux let loader = MuxKeysLoader::Registry { @@ -346,6 +379,7 @@ async fn test_ssv_multi_with_node() -> Result<()> { // Run PBS service let state = PbsState::new(config, PathBuf::new()); + drop(pbs_listener); let pbs_server = tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); info!("Started PBS server with pubkey {pubkey}"); @@ -356,9 +390,10 @@ async fn test_ssv_multi_with_node() -> Result<()> { // relay only since it hasn't been seen in the mux yet let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header"); - let res = mock_validator.do_get_header(Some(pubkey2.clone())).await?; + let res = + mock_validator.do_get_header(Some(pubkey2.clone()), Vec::new(), ForkName::Electra).await?; assert_eq!(res.status(), StatusCode::OK); - assert_eq!(relay_state.received_get_header(), 1); // pubkey2 was loaded from the SSV node + assert_eq!(relay_state.received_get_header(), 1); // pubkey2 was loaded from the SSV node // Shut down the server handles pbs_server.abort(); @@ -380,10 +415,20 @@ async fn test_ssv_multi_with_public() -> Result<()> { let pubkey2 = signer2.public_key(); let chain = Chain::Hoodi; - let pbs_port = 3720; + let pbs_listener = get_free_listener().await; + let ssv_node_listener = get_free_listener().await; + let ssv_public_listener = get_free_listener().await; + let relay_listener = get_free_listener().await; + let pbs_port = pbs_listener.local_addr().unwrap().port(); + let ssv_node_port = ssv_node_listener.local_addr().unwrap().port(); + let ssv_public_port = ssv_public_listener.local_addr().unwrap().port(); + let relay_port = relay_listener.local_addr().unwrap().port(); + // SSV node is intentionally down — release its reserved port. + drop(ssv_node_listener); + // SSV public mock helper binds the port itself. + drop(ssv_public_listener); // Start the mock SSV node - let ssv_node_port = pbs_port + 1; let ssv_node_url = Url::parse(&format!("http://localhost:{ssv_node_port}/v1/"))?; // Don't start the SSV node server to simulate it being down @@ -391,7 +436,6 @@ async fn test_ssv_multi_with_public() -> Result<()> { // Some(mock_ssv_node_state.clone())).await?; // Start the mock SSV public API - let ssv_public_port = ssv_node_port + 1; let ssv_public_url = Url::parse(&format!("http://localhost:{ssv_public_port}/api/v4/"))?; let mock_ssv_public_state = PublicSsvMockState { validators: Arc::new(RwLock::new(vec![ @@ -404,11 +448,11 @@ async fn test_ssv_multi_with_public() -> Result<()> { create_mock_public_ssv_server(ssv_public_port, Some(mock_ssv_public_state.clone())).await?; // Start a mock relay to be used by the mux - let relay_port = ssv_public_port + 1; let relay = generate_mock_relay(relay_port, pubkey.clone())?; let relay_id = relay.id.clone().to_string(); let relay_state = Arc::new(MockRelayState::new(chain, signer)); - let relay_task = tokio::spawn(start_mock_relay_service(relay_state.clone(), relay_port)); + let relay_task = + tokio::spawn(start_mock_relay_service_with_listener(relay_state.clone(), relay_listener)); // Create the registry mux let loader = MuxKeysLoader::Registry { @@ -442,6 +486,7 @@ async fn test_ssv_multi_with_public() -> Result<()> { // Run PBS service let state = PbsState::new(config, PathBuf::new()); + drop(pbs_listener); let pbs_server = tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); info!("Started PBS server with pubkey {pubkey}"); @@ -452,9 +497,10 @@ async fn test_ssv_multi_with_public() -> Result<()> { // relay only since it hasn't been seen in the mux yet let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header"); - let res = mock_validator.do_get_header(Some(pubkey2.clone())).await?; + let res = + mock_validator.do_get_header(Some(pubkey2.clone()), Vec::new(), ForkName::Electra).await?; assert_eq!(res.status(), StatusCode::OK); - assert_eq!(relay_state.received_get_header(), 1); // pubkey2 was loaded from the SSV public API + assert_eq!(relay_state.received_get_header(), 1); // pubkey2 was loaded from the SSV public API // Shut down the server handles pbs_server.abort(); diff --git a/tests/tests/pbs_mux_refresh.rs b/tests/tests/pbs_mux_refresh.rs index 1d590a49..5935af98 100644 --- a/tests/tests/pbs_mux_refresh.rs +++ b/tests/tests/pbs_mux_refresh.rs @@ -8,12 +8,13 @@ use cb_common::{ }; use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ - mock_relay::{MockRelayState, start_mock_relay_service}, + mock_relay::{MockRelayState, start_mock_relay_service_with_listener}, mock_ssv_public::{PublicSsvMockState, create_mock_public_ssv_server}, mock_validator::MockValidator, - utils::{generate_mock_relay, get_pbs_config, to_pbs_config}, + utils::{generate_mock_relay, get_free_listener, get_pbs_config, to_pbs_config}, }; use eyre::Result; +use lh_types::ForkName; use reqwest::StatusCode; use tokio::sync::RwLock; use tracing::info; @@ -38,10 +39,18 @@ async fn test_auto_refresh() -> Result<()> { let new_mux_pubkey = new_mux_signer.public_key(); let chain = Chain::Hoodi; - let pbs_port = 3710; + let pbs_listener = get_free_listener().await; + let ssv_api_listener = get_free_listener().await; + let default_relay_listener = get_free_listener().await; + let mux_relay_listener = get_free_listener().await; + let pbs_port = pbs_listener.local_addr().unwrap().port(); + let ssv_api_port = ssv_api_listener.local_addr().unwrap().port(); + let default_relay_port = default_relay_listener.local_addr().unwrap().port(); + let mux_relay_port = mux_relay_listener.local_addr().unwrap().port(); + // create_mock_public_ssv_server binds the port itself. + drop(ssv_api_listener); // Start the mock SSV API server - let ssv_api_port = pbs_port + 1; // Intentionally missing a trailing slash to ensure this is handled properly let ssv_api_url = Url::parse(&format!("http://localhost:{ssv_api_port}/api/v4"))?; let mock_ssv_state = PublicSsvMockState { @@ -54,19 +63,21 @@ async fn test_auto_refresh() -> Result<()> { create_mock_public_ssv_server(ssv_api_port, Some(mock_ssv_state.clone())).await?; // Start a default relay for non-mux keys - let default_relay_port = ssv_api_port + 1; let default_relay = generate_mock_relay(default_relay_port, default_pubkey.clone())?; let default_relay_state = Arc::new(MockRelayState::new(chain, default_signer.clone())); - let default_relay_task = - tokio::spawn(start_mock_relay_service(default_relay_state.clone(), default_relay_port)); + let default_relay_task = tokio::spawn(start_mock_relay_service_with_listener( + default_relay_state.clone(), + default_relay_listener, + )); // Start a mock relay to be used by the mux - let mux_relay_port = default_relay_port + 1; let mux_relay = generate_mock_relay(mux_relay_port, default_pubkey.clone())?; let mux_relay_id = mux_relay.id.clone().to_string(); let mux_relay_state = Arc::new(MockRelayState::new(chain, default_signer)); - let mux_relay_task = - tokio::spawn(start_mock_relay_service(mux_relay_state.clone(), mux_relay_port)); + let mux_relay_task = tokio::spawn(start_mock_relay_service_with_listener( + mux_relay_state.clone(), + mux_relay_listener, + )); // Create the registry mux let loader = MuxKeysLoader::Registry { @@ -99,6 +110,7 @@ async fn test_auto_refresh() -> Result<()> { // Run PBS service let state = PbsState::new(config, PathBuf::new()); + drop(pbs_listener); let pbs_server = tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); info!("Started PBS server with pubkey {default_pubkey}"); @@ -109,7 +121,9 @@ async fn test_auto_refresh() -> Result<()> { // relay only since it hasn't been seen in the mux yet let mock_validator = MockValidator::new(pbs_port)?; info!("Sending get header"); - let res = mock_validator.do_get_header(Some(new_mux_pubkey.clone())).await?; + let res = mock_validator + .do_get_header(Some(new_mux_pubkey.clone()), Vec::new(), ForkName::Electra) + .await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(default_relay_state.received_get_header(), 1); // default relay was used assert_eq!(mux_relay_state.received_get_header(), 0); // mux relay was not used @@ -137,14 +151,18 @@ async fn test_auto_refresh() -> Result<()> { assert!(logs_contain(&format!("fetched 2 pubkeys for registry mux {mux_relay_id}"))); // Try to run a get_header on the new pubkey - now it should use the mux relay - let res = mock_validator.do_get_header(Some(new_mux_pubkey.clone())).await?; + let res = mock_validator + .do_get_header(Some(new_mux_pubkey.clone()), Vec::new(), ForkName::Electra) + .await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(default_relay_state.received_get_header(), 1); // default relay was not used here assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was used // Now try to do a get_header with the old pubkey - it should only use the // default relay - let res = mock_validator.do_get_header(Some(default_pubkey.clone())).await?; + let res = mock_validator + .do_get_header(Some(default_pubkey.clone()), Vec::new(), ForkName::Electra) + .await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(default_relay_state.received_get_header(), 2); // default relay was used assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was not used @@ -161,7 +179,9 @@ async fn test_auto_refresh() -> Result<()> { // Try to do a get_header with the removed pubkey - it should only use the // default relay - let res = mock_validator.do_get_header(Some(existing_mux_pubkey.clone())).await?; + let res = mock_validator + .do_get_header(Some(existing_mux_pubkey.clone()), Vec::new(), ForkName::Electra) + .await?; assert_eq!(res.status(), StatusCode::OK); assert_eq!(default_relay_state.received_get_header(), 3); // default relay was used assert_eq!(mux_relay_state.received_get_header(), 1); // mux relay was not used