Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions src/common/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,15 @@ impl WebSocketConnector {
}
}

fn handle_connect_error(&self, error: WebSocketError) {
fn handle_connect_error(&self, error: WebSocketError, attempt: u32, elapsed_ms: u64) {
if let Some(status) = self.is_retriable_http_error(&error) {
// Retriable errors are logged at INFO to avoid unnecessary alarm.
info!(
message = "WebSocket connection failed with retriable HTTP status, will retry with backoff.",
status_code = %status.as_u16(),
status = %status,
attempt,
elapsed_ms,
internal_log_rate_limit = true,
);
} else {
Expand All @@ -176,21 +178,33 @@ impl WebSocketConnector {
timeout_per_attempt: Duration,
) -> WebSocketStream<MaybeTlsStream<TcpStream>> {
let mut backoff = Self::fresh_backoff();
let start = std::time::Instant::now();
let mut attempt: u32 = 0;
loop {
attempt += 1;
// Apply timeout to individual connection attempts, not the entire loop.
let result = time::timeout(timeout_per_attempt, self.connect()).await;

match result {
Ok(Ok(ws_stream)) => {
info!(
message = "WebSocket connection established.",
attempts = attempt,
elapsed_ms = start.elapsed().as_millis() as u64,
);
emit!(WebSocketConnectionEstablished {});
return ws_stream;
}
Ok(Err(error)) => {
self.handle_connect_error(error);
self.handle_connect_error(error, attempt, start.elapsed().as_millis() as u64);
time::sleep(backoff.next().unwrap()).await;
}
Err(_) => {
self.handle_connect_error(Self::timeout_error());
self.handle_connect_error(
Self::timeout_error(),
attempt,
start.elapsed().as_millis() as u64,
);
time::sleep(backoff.next().unwrap()).await;
}
}
Expand Down Expand Up @@ -219,7 +233,11 @@ pub(crate) struct PingInterval {
impl PingInterval {
pub(crate) fn new(period: Option<u64>) -> Self {
Self {
interval: period.map(|period| time::interval(Duration::from_secs(period))),
interval: period.map(|period| {
let mut interval = time::interval(Duration::from_secs(period));
interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
interval
}),
}
}

Expand Down
30 changes: 24 additions & 6 deletions src/sources/websocket/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use chrono::Utc;
use futures::{pin_mut, sink::SinkExt, Sink, Stream, StreamExt};
use snafu::Snafu;
use std::pin::Pin;
use std::time::Instant;
use tokio::time;
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
use tokio_tungstenite::tungstenite::{error::Error as TungsteniteError, Message};
Expand Down Expand Up @@ -89,7 +90,9 @@ impl WebSocketSource {

let mut out = cx.out;

let connect_start = Instant::now();
let (ws_sink, ws_source) = self.connect(&mut out).await?;
let connect_duration_ms = connect_start.elapsed().as_millis() as u64;

pin_mut!(ws_sink, ws_source);

Expand All @@ -98,6 +101,7 @@ impl WebSocketSource {
uri = %self.config.common.uri,
ping_interval = ?self.config.common.ping_interval.map(|v| format!("{}s", v)),
ping_message = ?self.config.ping_message,
connect_duration_ms,
);

loop {
Expand All @@ -123,7 +127,7 @@ impl WebSocketSource {
if let Err(error) = result {
match error {
WebSocketSourceError::RemoteClosed { frame } => {
warn!(
info!(
message = "Connection closed by server.",
code = %frame.code,
reason = %frame.reason
Expand All @@ -134,12 +138,12 @@ impl WebSocketSource {
emit!(WebSocketConnectionShutdown);
}
WebSocketSourceError::RemoteClosedEmpty => {
warn!("Connection closed by server without a close frame.");
info!("Connection closed by server without a close frame.");
let _ = ws_sink.flush().await;
emit!(WebSocketConnectionShutdown);
}
WebSocketSourceError::PongTimeout => {
warn!(
info!(
message = "Pong timeout — connection appears dead, will reconnect.",
uri = %self.config.common.uri,
);
Expand Down Expand Up @@ -191,7 +195,7 @@ impl WebSocketSource {
}
Err(err) => {
let delay = backoff.next().unwrap_or(std::time::Duration::from_secs(30));
warn!(
info!(
message = "Reconnection failed, retrying after delay.",
error = ?err,
delay_ms = delay.as_millis()
Expand Down Expand Up @@ -238,7 +242,7 @@ impl WebSocketSource {
}
Message::Close(frame) => self.handle_close_frame(frame),
Message::Frame(_) => {
warn!("Unsupported message type received: frame.");
info!("Unsupported message type received: frame.");
Ok(())
}
}
Expand Down Expand Up @@ -315,14 +319,17 @@ impl WebSocketSource {
uri = %self.config.common.uri,
);

let reconnect_start = Instant::now();
let (new_sink, new_source) = self.connect(out).await?;
let connect_duration_ms = reconnect_start.elapsed().as_millis() as u64;

*ws_sink = new_sink;
*ws_source = new_source;

info!(
message = "Reconnected successfully.",
uri = %self.config.common.uri,
connect_duration_ms,
);

Ok(())
Expand Down Expand Up @@ -443,6 +450,7 @@ struct PingManager {
interval: PingInterval,
waiting_for_pong: bool,
message: Message,
last_ping_sent: Option<Instant>,
}

impl PingManager {
Expand All @@ -457,6 +465,7 @@ impl PingManager {
interval: PingInterval::new(config.common.ping_interval.map(u64::from)),
waiting_for_pong: false,
message: ping_message,
last_ping_sent: None,
}
}

Expand All @@ -466,13 +475,21 @@ impl PingManager {

fn reset(&mut self) {
self.waiting_for_pong = false;
self.last_ping_sent = None;
}

async fn tick(&mut self, ws_sink: &mut WebSocketSink) -> Result<(), WebSocketSourceError> {
self.interval.tick().await;

if self.waiting_for_pong {
warn!("Pong not received before next ping interval — declaring connection dead.");
let elapsed_since_ping_ms = self
.last_ping_sent
.map(|t| t.elapsed().as_millis() as u64)
.unwrap_or(0);
info!(
message = "Pong not received before next ping interval — declaring connection dead.",
elapsed_since_ping_ms,
);
return Err(WebSocketSourceError::PongTimeout);
}

Expand All @@ -482,6 +499,7 @@ impl PingManager {
WebSocketSourceError::Tungstenite { source: error }
})?;

self.last_ping_sent = Some(Instant::now());
self.waiting_for_pong = true;
Ok(())
}
Expand Down