Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,157 changes: 487 additions & 670 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ tokio-util = { version = "0.7", default-features = false, features = ["io", "tim
bytesize = { version = "1.3.0", default-features = false }
bytes = { version = "1.9.0", default-features = false, features = ["serde"] }
async-compression = { version = "0.4.18", default-features = false, features = ["tokio", "gzip", "zstd"] }
object_store = { version = "0.6.1", dependencies = { url = "=2.3.1" }, features = ["gcp"], default-features = false }
object_store = { version = "0.10.2", features = ["gcp"], default-features = false }
once_cell = { version = "1.20.2", default-features = false }
smallvec = { version = "1", default-features = false, features = ["union", "serde"] }
h2 = { version = "0.4.7", default-features = false }
Expand Down Expand Up @@ -391,9 +391,9 @@ k8s-openapi = { version = "0.22.0", default-features = false, features = ["v1_26
kube = { version = "0.93.0", default-features = false, features = ["client", "openssl-tls", "runtime"], optional = true }
listenfd = { version = "1.0.1", default-features = false, optional = true }
lru = { version = "0.12.5", default-features = false, optional = true }
maxminddb = { version = "0.24.0", default-features = false, optional = true }
maxminddb = { version = "0.27.0", default-features = false, optional = true }
md-5 = { version = "0.10", default-features = false, optional = true }
mongodb = { version = "2.8.2", default-features = false, features = ["tokio-runtime"], optional = true }
mongodb = { version = "3.2.5", default-features = false, features = ["rustls-tls", "compat-3-0-0"], optional = true }
async-nats = { version = "0.33.0", default-features = false, optional = true }
nkeys = { version = "0.4.4", default-features = false, optional = true }
nom = { version = "7.1.3", default-features = false, optional = true }
Expand Down
14 changes: 9 additions & 5 deletions distribution/docker/ubi-fips/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM registry.access.redhat.com/ubi9/ubi:9.7-1764794285 AS builder
FROM artifactory.eng.sentinelone.tech/docker-release/common/ubuntu-base/ubuntu:2.0.75 AS builder

WORKDIR /vector

Expand All @@ -7,21 +7,25 @@ RUN tar -xvf vector-*-$(uname -m)-unknown-linux-*.tar.gz --strip-components=2

RUN mkdir -p /var/lib/vector

FROM registry.access.redhat.com/ubi9/ubi:latest
FROM artifactory.eng.sentinelone.tech/docker-release/common/ubuntu-base/ubuntu:2.0.75

# Install dependencies for Vector and FIPS
RUN dnf install -y \
# hadolint ignore=DL3008
RUN apt-get update && apt-get install -y --no-install-recommends \
openssl \
openssl-devel \
libssl-dev \
ca-certificates \
tzdata \
systemd \
&& dnf clean all
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

# Set environment for Rust to use system OpenSSL
ENV OPENSSL_FIPS=1
ENV OPENSSL_DIR=/usr
ENV OPENSSL_STATIC=0
ENV OPENSSL_CONF=/etc/ssl/openssl.cnf.fips
ENV OPENSSL_MODULES=/usr/lib/x86_64-linux-gnu/ossl-modules

# Copy from builder stage
COPY --from=builder /vector/bin/* /usr/bin/
Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ netflow_parser = { version = "=0.5.2", features = ["parse_unknown_fields"]}
memchr = { version = "2", default-features = false }
metrics.workspace = true
ordered-float = { version = "4.5.0", default-features = false }
parquet = {version = "39.0.0", default-feature = false}
parquet = { version = "55.2.0", default-features = false, features = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd"] }
prost.workspace = true
prost-reflect.workspace = true
rand.workspace = true
Expand Down
44 changes: 28 additions & 16 deletions lib/codecs/src/encoding/format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,28 +857,40 @@ mod tests {
) where
<T as DataType>::T: Default,
{
let mut values = Vec::new();
values.resize(count, <T as DataType>::T::default());
let mut def_levels = Vec::new();
def_levels.resize(count, 0);
let mut rep_levels = Vec::new();
rep_levels.resize(count, 0);
let (read, level) = column_reader
.read_batch(
let mut values: Vec<<T as DataType>::T> = Vec::with_capacity(count);
let mut def_levels: Vec<i16> = Vec::with_capacity(count);
let mut rep_levels: Vec<i16> = Vec::with_capacity(count);
let (_records_read, _values_read, levels_read) = column_reader
.read_records(
count,
Some(def_levels.as_mut_slice()).filter(|_| expect_def_levels.is_some()),
Some(rep_levels.as_mut_slice()).filter(|_| expect_rep_levels.is_some()),
if expect_def_levels.is_some() {
Some(&mut def_levels)
} else {
None
},
if expect_rep_levels.is_some() {
Some(&mut rep_levels)
} else {
None
},
&mut values,
)
.unwrap();

assert_eq!(level, count);
assert_eq!(&values[..read], expect_values);
if expect_rep_levels.is_some() {
assert_eq!(rep_levels, expect_rep_levels.unwrap());
assert_eq!(levels_read, count);
assert_eq!(values.as_slice(), expect_values);
if let Some(expected) = expect_rep_levels {
// In parquet 55, columns with max_rep_level = 0 leave the rep_levels
// buffer empty rather than writing zeros. Treat empty as all zeros
// to keep tests independent of buffer-fill behavior.
if rep_levels.is_empty() && expected.iter().all(|&l| l == 0) {
// ok
} else {
assert_eq!(rep_levels, expected);
}
}
if expect_def_levels.is_some() {
assert_eq!(def_levels, expect_def_levels.unwrap());
if let Some(expected) = expect_def_levels {
assert_eq!(def_levels, expected);
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/vector-config-common/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,9 @@ pub enum Validation {
///
/// Can only be used for numbers.
Range {
#[darling(default, rename = "min", with = "maybe_float_or_int")]
#[darling(default, rename = "min", with = maybe_float_or_int)]
minimum: Option<f64>,
#[darling(default, rename = "max", with = "maybe_float_or_int")]
#[darling(default, rename = "max", with = maybe_float_or_int)]
maximum: Option<f64>,
},

Expand Down
29 changes: 27 additions & 2 deletions lib/vector-core/src/tls/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use lookup::lookup_v2::OptionalValuePath;
use openssl::{
pkcs12::{ParsedPkcs12_2, Pkcs12},
pkey::{PKey, Private},
ssl::{select_next_proto, AlpnError, ConnectConfiguration, SslContextBuilder, SslVerifyMode},
ssl::{AlpnError, ConnectConfiguration, SslContextBuilder, SslVerifyMode},
stack::Stack,
x509::{store::X509StoreBuilder, X509},
};
Expand Down Expand Up @@ -331,7 +331,32 @@ impl TlsSettings {
if for_server {
let server_proto = alpn.clone();
context.set_alpn_select_callback(move |_, client_proto| {
select_next_proto(server_proto.as_slice(), client_proto).ok_or(AlpnError::NOACK)
// Walk the server's preference list and return the first
// protocol that also appears in the client's list, returned
// as a borrow of `client_proto` so the lifetime escapes the
// closure as required by the callback signature.
let mut server = server_proto.as_slice();
while let Some((&len, rest)) = server.split_first() {
let len = len as usize;
if rest.len() < len {
break;
}
let (sproto, after) = rest.split_at(len);
let mut client = client_proto;
while let Some((&clen, crest)) = client.split_first() {
let clen = clen as usize;
if crest.len() < clen {
break;
}
let (cproto, cafter) = crest.split_at(clen);
if cproto == sproto {
return Ok(cproto);
}
client = cafter;
}
server = after;
}
Err(AlpnError::NOACK)
});
} else {
context
Expand Down
94 changes: 41 additions & 53 deletions src/enrichment_tables/geoip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
//!
//! [maxmind]: https://dev.maxmind.com/geoip/geoip2/downloadable
//! [geolite]: https://dev.maxmind.com/geoip/geoip2/geolite2/#Download_Access
use std::{collections::BTreeMap, fs, net::IpAddr, sync::Arc, time::SystemTime};
use std::{fs, net::IpAddr, sync::Arc, time::SystemTime};

use maxminddb::{
geoip2::{AnonymousIp, City, ConnectionType, Isp},
MaxMindDBError, Reader,
geoip2::{AnonymousIp, City, ConnectionType, Isp, Names},
Reader,
};
use ordered_float::NotNan;
use vector_lib::configurable::configurable_component;
Expand Down Expand Up @@ -124,24 +124,16 @@ impl Geoip {
)
})?;

// Check if we can read database with dummy Ip.
// Verify the database is readable; missing-record is fine, not an error.
let ip = IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED);
let result = match dbkind {
DatabaseKind::Asn | DatabaseKind::Isp => dbreader.lookup::<Isp>(ip).map(|_| ()),
DatabaseKind::ConnectionType => dbreader.lookup::<ConnectionType>(ip).map(|_| ()),
DatabaseKind::City => dbreader.lookup::<City>(ip).map(|_| ()),
DatabaseKind::AnonymousIp => dbreader.lookup::<AnonymousIp>(ip).map(|_| ()),
};
dbreader.lookup(ip)?;

match result {
Ok(_) | Err(MaxMindDBError::AddressNotFoundError(_)) => Ok(Geoip {
last_modified: fs::metadata(&config.path)?.modified()?,
dbreader,
dbkind,
config,
}),
Err(error) => Err(error.into()),
}
Ok(Geoip {
last_modified: fs::metadata(&config.path)?.modified()?,
dbreader,
dbkind,
config,
})
}

fn lookup(&self, ip: IpAddr, select: Option<&[String]>) -> Option<ObjectMap> {
Expand All @@ -163,7 +155,7 @@ impl Geoip {

match self.dbkind {
DatabaseKind::Asn | DatabaseKind::Isp => {
let data = self.dbreader.lookup::<Isp>(ip).ok()?;
let data: Isp = self.dbreader.lookup(ip).ok()?.decode().ok()??;

add_field!("autonomous_system_number", data.autonomous_system_number);
add_field!(
Expand All @@ -174,62 +166,53 @@ impl Geoip {
add_field!("organization", data.organization);
}
DatabaseKind::City => {
let data = self.dbreader.lookup::<City>(ip).ok()?;
let data: City = self.dbreader.lookup(ip).ok()?.decode().ok()??;

add_field!(
"city_name",
self.take_translation(data.city.as_ref().and_then(|c| c.names.as_ref()))
);
add_field!("city_name", self.take_translation(&data.city.names));

add_field!("continent_code", data.continent.and_then(|c| c.code));
add_field!("continent_code", data.continent.code);

let country = data.country.as_ref();
add_field!("country_code", country.and_then(|country| country.iso_code));
add_field!("country_code", data.country.iso_code);
add_field!(
"country_name",
self.take_translation(country.and_then(|c| c.names.as_ref()))
self.take_translation(&data.country.names)
);

let location = data.location.as_ref();
add_field!("timezone", location.and_then(|location| location.time_zone));
let location = &data.location;
add_field!("timezone", location.time_zone);
add_field!(
"latitude",
location
.and_then(|location| location.latitude)
.map(|latitude| Value::Float(
NotNan::new(latitude).expect("latitude cannot be Nan")
))
location.latitude.map(|latitude| Value::Float(
NotNan::new(latitude).expect("latitude cannot be Nan")
))
);
add_field!(
"longitude",
location
.and_then(|location| location.longitude)
.longitude
.map(|longitude| NotNan::new(longitude).expect("longitude cannot be Nan"))
);
add_field!(
"metro_code",
location.and_then(|location| location.metro_code)
);
add_field!("metro_code", location.metro_code);

// last subdivision is most specific per https://github.com/maxmind/GeoIP2-java/blob/39385c6ce645374039450f57208b886cf87ade47/src/main/java/com/maxmind/geoip2/model/AbstractCityResponse.java#L96-L107
let subdivision = data.subdivisions.as_ref().and_then(|s| s.last());
let subdivision = data.subdivisions.last();
add_field!(
"region_name",
self.take_translation(subdivision.and_then(|s| s.names.as_ref()))
subdivision.and_then(|s| self.take_translation(&s.names))
);
add_field!(
"region_code",
subdivision.and_then(|subdivision| subdivision.iso_code)
);
add_field!("postal_code", data.postal.and_then(|p| p.code));
add_field!("postal_code", data.postal.code);
}
DatabaseKind::ConnectionType => {
let data = self.dbreader.lookup::<ConnectionType>(ip).ok()?;
let data: ConnectionType = self.dbreader.lookup(ip).ok()?.decode().ok()??;

add_field!("connection_type", data.connection_type);
}
DatabaseKind::AnonymousIp => {
let data = self.dbreader.lookup::<AnonymousIp>(ip).ok()?;
let data: AnonymousIp = self.dbreader.lookup(ip).ok()?.decode().ok()??;

add_field!("is_anonymous", data.is_anonymous);
add_field!("is_anonymous_vpn", data.is_anonymous_vpn);
Expand All @@ -243,13 +226,18 @@ impl Geoip {
Some(map)
}

fn take_translation<'a>(
&self,
translations: Option<&BTreeMap<&str, &'a str>>,
) -> Option<&'a str> {
translations
.and_then(|translations| translations.get(&*self.config.locale))
.copied()
fn take_translation<'a>(&self, names: &Names<'a>) -> Option<&'a str> {
match self.config.locale.as_str() {
"de" => names.german,
"en" => names.english,
"es" => names.spanish,
"fr" => names.french,
"ja" => names.japanese,
"pt-BR" => names.brazilian_portuguese,
"ru" => names.russian,
"zh-CN" => names.simplified_chinese,
_ => names.english,
}
}
}

Expand Down
23 changes: 10 additions & 13 deletions src/enrichment_tables/mmdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! [maxmind]: https://maxmind.com
use std::{fs, net::IpAddr, sync::Arc, time::SystemTime};

use maxminddb::{MaxMindDBError, Reader};
use maxminddb::Reader;
use vector_lib::configurable::configurable_component;
use vector_lib::enrichment::{Case, Condition, IndexHandle, Table};
use vrl::value::{ObjectMap, Value};
Expand Down Expand Up @@ -52,22 +52,19 @@ impl Mmdb {
pub fn new(config: MmdbConfig) -> crate::Result<Self> {
let dbreader = Arc::new(Reader::open_readfile(config.path.clone())?);

// Check if we can read database with dummy Ip.
// Verify the database is readable; missing-record is fine, not an error.
let ip = IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED);
let result = dbreader.lookup::<ObjectMap>(ip).map(|_| ());

match result {
Ok(_) | Err(MaxMindDBError::AddressNotFoundError(_)) => Ok(Mmdb {
last_modified: fs::metadata(&config.path)?.modified()?,
dbreader,
config,
}),
Err(error) => Err(error.into()),
}
dbreader.lookup(ip)?;

Ok(Mmdb {
last_modified: fs::metadata(&config.path)?.modified()?,
dbreader,
config,
})
}

fn lookup(&self, ip: IpAddr, select: Option<&[String]>) -> Option<ObjectMap> {
let data = self.dbreader.lookup::<ObjectMap>(ip).ok()?;
let data: ObjectMap = self.dbreader.lookup(ip).ok()?.decode().ok()??;

if let Some(fields) = select {
let mut filtered = Value::from(ObjectMap::new());
Expand Down
Loading