diff --git a/src/common/websocket.rs b/src/common/websocket.rs index 704313aed..dcc944baf 100644 --- a/src/common/websocket.rs +++ b/src/common/websocket.rs @@ -155,13 +155,15 @@ impl WebSocketConnector { } } - fn handle_connect_error(&self, error: WebSocketError) { + fn handle_connect_error(&self, error: WebSocketError, attempt: u32, elapsed_ms: u64) { if let Some(status) = self.is_retriable_http_error(&error) { // Retriable errors are logged at INFO to avoid unnecessary alarm. info!( message = "WebSocket connection failed with retriable HTTP status, will retry with backoff.", status_code = %status.as_u16(), status = %status, + attempt, + elapsed_ms, internal_log_rate_limit = true, ); } else { @@ -176,21 +178,33 @@ impl WebSocketConnector { timeout_per_attempt: Duration, ) -> WebSocketStream> { let mut backoff = Self::fresh_backoff(); + let start = std::time::Instant::now(); + let mut attempt: u32 = 0; loop { + attempt += 1; // Apply timeout to individual connection attempts, not the entire loop. let result = time::timeout(timeout_per_attempt, self.connect()).await; match result { Ok(Ok(ws_stream)) => { + info!( + message = "WebSocket connection established.", + attempts = attempt, + elapsed_ms = start.elapsed().as_millis() as u64, + ); emit!(WebSocketConnectionEstablished {}); return ws_stream; } Ok(Err(error)) => { - self.handle_connect_error(error); + self.handle_connect_error(error, attempt, start.elapsed().as_millis() as u64); time::sleep(backoff.next().unwrap()).await; } Err(_) => { - self.handle_connect_error(Self::timeout_error()); + self.handle_connect_error( + Self::timeout_error(), + attempt, + start.elapsed().as_millis() as u64, + ); time::sleep(backoff.next().unwrap()).await; } } @@ -219,7 +233,11 @@ pub(crate) struct PingInterval { impl PingInterval { pub(crate) fn new(period: Option) -> Self { Self { - interval: period.map(|period| time::interval(Duration::from_secs(period))), + interval: period.map(|period| { + let mut interval = time::interval(Duration::from_secs(period)); + interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay); + interval + }), } } diff --git a/src/sources/websocket/source.rs b/src/sources/websocket/source.rs index 3509f8c37..368b0cc89 100644 --- a/src/sources/websocket/source.rs +++ b/src/sources/websocket/source.rs @@ -18,6 +18,7 @@ use chrono::Utc; use futures::{pin_mut, sink::SinkExt, Sink, Stream, StreamExt}; use snafu::Snafu; use std::pin::Pin; +use std::time::Instant; use tokio::time; use tokio_tungstenite::tungstenite::protocol::CloseFrame; use tokio_tungstenite::tungstenite::{error::Error as TungsteniteError, Message}; @@ -89,7 +90,9 @@ impl WebSocketSource { let mut out = cx.out; + let connect_start = Instant::now(); let (ws_sink, ws_source) = self.connect(&mut out).await?; + let connect_duration_ms = connect_start.elapsed().as_millis() as u64; pin_mut!(ws_sink, ws_source); @@ -98,6 +101,7 @@ impl WebSocketSource { uri = %self.config.common.uri, ping_interval = ?self.config.common.ping_interval.map(|v| format!("{}s", v)), ping_message = ?self.config.ping_message, + connect_duration_ms, ); loop { @@ -123,7 +127,7 @@ impl WebSocketSource { if let Err(error) = result { match error { WebSocketSourceError::RemoteClosed { frame } => { - warn!( + info!( message = "Connection closed by server.", code = %frame.code, reason = %frame.reason @@ -134,12 +138,12 @@ impl WebSocketSource { emit!(WebSocketConnectionShutdown); } WebSocketSourceError::RemoteClosedEmpty => { - warn!("Connection closed by server without a close frame."); + info!("Connection closed by server without a close frame."); let _ = ws_sink.flush().await; emit!(WebSocketConnectionShutdown); } WebSocketSourceError::PongTimeout => { - warn!( + info!( message = "Pong timeout — connection appears dead, will reconnect.", uri = %self.config.common.uri, ); @@ -191,7 +195,7 @@ impl WebSocketSource { } Err(err) => { let delay = backoff.next().unwrap_or(std::time::Duration::from_secs(30)); - warn!( + info!( message = "Reconnection failed, retrying after delay.", error = ?err, delay_ms = delay.as_millis() @@ -238,7 +242,7 @@ impl WebSocketSource { } Message::Close(frame) => self.handle_close_frame(frame), Message::Frame(_) => { - warn!("Unsupported message type received: frame."); + info!("Unsupported message type received: frame."); Ok(()) } } @@ -315,7 +319,9 @@ impl WebSocketSource { uri = %self.config.common.uri, ); + let reconnect_start = Instant::now(); let (new_sink, new_source) = self.connect(out).await?; + let connect_duration_ms = reconnect_start.elapsed().as_millis() as u64; *ws_sink = new_sink; *ws_source = new_source; @@ -323,6 +329,7 @@ impl WebSocketSource { info!( message = "Reconnected successfully.", uri = %self.config.common.uri, + connect_duration_ms, ); Ok(()) @@ -443,6 +450,7 @@ struct PingManager { interval: PingInterval, waiting_for_pong: bool, message: Message, + last_ping_sent: Option, } impl PingManager { @@ -457,6 +465,7 @@ impl PingManager { interval: PingInterval::new(config.common.ping_interval.map(u64::from)), waiting_for_pong: false, message: ping_message, + last_ping_sent: None, } } @@ -466,13 +475,21 @@ impl PingManager { fn reset(&mut self) { self.waiting_for_pong = false; + self.last_ping_sent = None; } async fn tick(&mut self, ws_sink: &mut WebSocketSink) -> Result<(), WebSocketSourceError> { self.interval.tick().await; if self.waiting_for_pong { - warn!("Pong not received before next ping interval — declaring connection dead."); + let elapsed_since_ping_ms = self + .last_ping_sent + .map(|t| t.elapsed().as_millis() as u64) + .unwrap_or(0); + info!( + message = "Pong not received before next ping interval — declaring connection dead.", + elapsed_since_ping_ms, + ); return Err(WebSocketSourceError::PongTimeout); } @@ -482,6 +499,7 @@ impl PingManager { WebSocketSourceError::Tungstenite { source: error } })?; + self.last_ping_sent = Some(Instant::now()); self.waiting_for_pong = true; Ok(()) }