diff --git a/app/requirements.txt b/app/requirements.txt index a6cacd7..7d2c031 100644 --- a/app/requirements.txt +++ b/app/requirements.txt @@ -1,2 +1,4 @@ build==1.2.1 pytest==8.3.3 + +pydantic_settings==2.14.1 diff --git a/app/src/zcs/core/settings/telemetry_settings.py b/app/src/zcs/core/settings/telemetry_settings.py new file mode 100644 index 0000000..3ef63b8 --- /dev/null +++ b/app/src/zcs/core/settings/telemetry_settings.py @@ -0,0 +1,36 @@ +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class TelemetrySettings(BaseSettings): + model_config = SettingsConfigDict( + env_file='settings/telemetry-settings.env', + env_file_encoding='utf-8', + extra='ignore', + ) + + grafana_loki_url: str = "" + grafana_loki_instance_id: str = "" + grafana_otlp_url: str = "" + grafana_otlp_instance_id: str = "" + grafana_cloud_api_key: str = "" + metrics_export_interval_millis: int = 60_000 + + alloy_host: str = "" + alloy_insecure: bool = False + alloy_port: int = 4317 + alloy_metrics_port: int = 4318 + + @property + def has_otlp_config(self) -> bool: + return bool(self.grafana_otlp_url and self.grafana_cloud_api_key) + + @property + def has_loki_config(self) -> bool: + return bool(self.grafana_loki_url and self.grafana_cloud_api_key) + + @property + def has_alloy_config(self) -> bool: + return bool(self.alloy_host and self.alloy_port) + + +telemetry_settings = TelemetrySettings() diff --git a/app/src/zcs/core/telemetry/__init__.py b/app/src/zcs/core/telemetry/__init__.py new file mode 100644 index 0000000..e49635b --- /dev/null +++ b/app/src/zcs/core/telemetry/__init__.py @@ -0,0 +1,3 @@ +from .logging import setup_logging # noqa: F401 +from .zcs_telemetry import ZcsTelemetry # noqa: F401 +from .set_trace_attributes import set_trace_attributes # noqa: F401 diff --git a/app/src/zcs/core/telemetry/logging.py b/app/src/zcs/core/telemetry/logging.py new file mode 100644 index 0000000..6e0f47f --- /dev/null +++ b/app/src/zcs/core/telemetry/logging.py @@ -0,0 +1,76 @@ +import logging +import queue + +from zcs.core.logger import ZcsLogging +from zcs.core.session import request_context, RequestState +from zcs.core.settings.telemetry_settings import telemetry_settings + + +def _get_span_context(): + try: + from opentelemetry import trace + except ImportError: + return None + + span_context = trace.get_current_span().get_span_context() + if span_context and span_context.is_valid: + return span_context + + return None + + +class LokiContextFilter(logging.Filter): + """Adds per-request context as dynamic Loki tags.""" + + def filter(self, record: logging.LogRecord) -> bool: + tags = {} + request_state: RequestState = request_context.get() + if request_state and request_state.getOpCode(): + tags.update({ + "request_op_code": request_state.getOpCode(), + "request_request_id": request_state.getRequestId(), + "request_follia_module": str(request_state.getFolliaModule()), + "auth_client_id": request_state.getAuthInfo().client_id if request_state.getAuthInfo() else None, + "auth_tenant_id": request_state.getAuthInfo().tenant_id if request_state.getAuthInfo() else None, + "auth_company_id": request_state.getAuthInfo().company_id if request_state.getAuthInfo() else None, + "auth_user_id": request_state.getAuthInfo().user_id if request_state.getAuthInfo() else None, + "auth_user_mail": request_state.getAuthInfo().user_email if request_state.getAuthInfo() else None + }) + + span_context = _get_span_context() + if span_context: + tags.update({ + "trace_id": format(span_context.trace_id, "032x"), + "span_id": format(span_context.span_id, "016x"), + }) + + if tags: + record.tags = tags + + return True + + +def setup_logging(logging_context: ZcsLogging, app_name: str, app_version: str, app_environment: str): + + # Grafana Cloud - Loki handler (background queue to avoid blocking) + if telemetry_settings.has_loki_config: + import logging_loki + _loki_push_url = telemetry_settings.grafana_loki_url.rstrip("/") + if not _loki_push_url.endswith("/loki/api/v1/push"): + _loki_push_url += "/loki/api/v1/push" + loki_handler = logging_loki.LokiQueueHandler( + queue.Queue(-1), + url=_loki_push_url, + tags={ + "service": app_name, + "env": app_environment, + "version": app_version + }, + auth=(telemetry_settings.grafana_loki_instance_id, telemetry_settings.grafana_cloud_api_key), + version="1", + ) + loki_handler.addFilter(LokiContextFilter()) + logging_context.get_logger().addHandler(loki_handler) + logging_context.get_logger().info("Loki logging handler enabled") + else: + logging_context.get_logger().warning("Loki logging handler not configured") diff --git a/app/src/zcs/core/telemetry/set_trace_attributes.py b/app/src/zcs/core/telemetry/set_trace_attributes.py new file mode 100644 index 0000000..87ae0eb --- /dev/null +++ b/app/src/zcs/core/telemetry/set_trace_attributes.py @@ -0,0 +1,66 @@ +from collections.abc import Mapping +from typing import Any + +from zcs.core.session import RequestState, request_context + + +def _safe_set_attribute(span, key: str, value: Any) -> None: + if value is None: + return + + if isinstance(value, (str, bool, int, float)): + span.set_attribute(key, value) + return + + span.set_attribute(key, str(value)) + + +def set_trace_attributes( + request_state: RequestState | None = None, + *, + app_name: str | None = None, + app_environment: str | None = None, + app_version: str | None = None, + extra_attributes: Mapping[str, Any] | None = None, +) -> None: + """Set trace attributes from app metadata and request context.""" + try: + from opentelemetry import trace + except ImportError: + return + + span = trace.get_current_span() + if not span: + return + + span_context = span.get_span_context() + if not span_context or not span_context.is_valid: + return + + current_state = request_state or request_context.get() + + _safe_set_attribute(span, "app_name", app_name) + _safe_set_attribute(span, "app_environment", app_environment) + _safe_set_attribute(span, "app_version", app_version) + + if not current_state: + if extra_attributes: + for key, value in extra_attributes.items(): + _safe_set_attribute(span, key, value) + return + + _safe_set_attribute(span, "request_op_code", current_state.getOpCode()) + _safe_set_attribute(span, "request_request_id", current_state.getRequestId()) + _safe_set_attribute(span, "request_follia_module", current_state.getFolliaModule()) + + auth_info = current_state.getAuthInfo() + if auth_info: + _safe_set_attribute(span, "auth_client_id", auth_info.client_id) + _safe_set_attribute(span, "auth_tenant_id", auth_info.tenant_id) + _safe_set_attribute(span, "auth_company_id", auth_info.company_id) + _safe_set_attribute(span, "auth_user_id", auth_info.user_id) + _safe_set_attribute(span, "auth_user_mail", auth_info.user_email) + + if extra_attributes: + for key, value in extra_attributes.items(): + _safe_set_attribute(span, key, value) diff --git a/app/src/zcs/core/telemetry/zcs_telemetry.py b/app/src/zcs/core/telemetry/zcs_telemetry.py new file mode 100644 index 0000000..72124f1 --- /dev/null +++ b/app/src/zcs/core/telemetry/zcs_telemetry.py @@ -0,0 +1,292 @@ +import base64 +import logging +import threading + +from typing import Any + +from zcs.core.settings.telemetry_settings import TelemetrySettings, telemetry_settings + + +class ZcsTelemetry: + + def __init__( + self, + telemetry_settings: TelemetrySettings = telemetry_settings, + logger: logging.Logger | None = None, + service_name: str | None = "zcs-app", + service_version: str | None = "0.0.0", + service_environment: str | None = "undefined", + ) -> None: + self._telemetry_initialized = False + self._telemetry_lock = threading.Lock() + self._telemetry_settings = telemetry_settings + self._logger = logger or logging.getLogger(__name__) + self._service_name = service_name + self._service_version = service_version + self._service_environment = service_environment + self._meter = None + self._meters = {} + self._tracer = None + self._instrumented_apps = set() + self._custom_metrics = {} + self._prometheus_counters = {} + self._alloy_metrics_via_prometheus = False + + self._initialize_telemetry() + + def instrument_fastapi_app(self, fastapi_app) -> None: + + if not self._telemetry_initialized: + return + + try: + from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor + from prometheus_fastapi_instrumentator import Instrumentator, metrics + except ImportError: + self._logger.warning("FastAPI OpenTelemetry instrumentation is not installed.") + return + + app_id = id(fastapi_app) + if app_id in self._instrumented_apps: + return + + FastAPIInstrumentor.instrument_app(fastapi_app) + self._instrumented_apps.add(app_id) + self._fastapi_metrics = metrics + + Instrumentator().instrument(fastapi_app).expose(fastapi_app) + + def get_meter(self): + if not self._telemetry_initialized: + self._logger.warning("Telemetry is not initialized - returning None for meter") + return None + return self._meter + + def get_tracer(self): + if not self._telemetry_initialized: + self._logger.warning("Telemetry is not initialized - returning None for tracer") + return None + return self._tracer + + def is_telemetry_enabled(self) -> bool: + return self._telemetry_initialized + + def counter_create(self, name, unit, description): + if not self._telemetry_initialized: + self._logger.warning("Telemetry is not initialized - cannot create counter") + return + + if not self._meters and not self._alloy_metrics_via_prometheus: + self._logger.warning("No telemetry meters are initialized - cannot create counter") + return + + counters = {} + for meter_key, meter in self._meters.items(): + counters[meter_key] = meter.create_counter(name=name, unit=unit, description=description) + self._custom_metrics[name] = counters + + if self._alloy_metrics_via_prometheus: + try: + from prometheus_client import Counter as PrometheusCounter + safe_name = name.replace(".", "_").replace("-", "_") + self._prometheus_counters[name] = PrometheusCounter(safe_name, description) + except ImportError: + self._logger.warning("prometheus_client is not installed - Alloy Prometheus counter skipped") + + def counter_add(self, name, data=None): + if not self._telemetry_initialized: + self._logger.warning("Telemetry is not initialized - cannot add to counter") + return + + amount = 1 + attributes = {} + if isinstance(data, dict): + if "value" in data and isinstance(data["value"], (int, float)): + amount = data["value"] + attributes = {k: v for k, v in data.items() if k != "value"} + else: + attributes = data + + counters = self._custom_metrics.get(name) + if counters: + for counter in counters.values(): + counter.add(amount, attributes) + elif not self._prometheus_counters.get(name): + self._logger.warning("Counter '%s' is not initialized", name) + return + + prometheus_counter = self._prometheus_counters.get(name) + if prometheus_counter is not None: + if attributes: + prometheus_counter.labels(**attributes).inc(amount) + else: + prometheus_counter.inc(amount) + + def _initialize_telemetry(self) -> None: + + # Initialization is idempotent - only the first call will set up telemetry, subsequent calls will be no-ops + if self._telemetry_initialized: + return + + # OTLP configuration is required for telemetry to be enabled - if not present, skip initialization + if not self._telemetry_settings.has_otlp_config and not self._telemetry_settings.has_alloy_config: + self._logger.warning("Neither OTLP nor Alloy configuration found - telemetry initialization skipped") + return + + # Import OpenTelemetry modules here to avoid hard dependency + modules = self._import_telemetry_modules() + if modules is None: + return + + # Use a lock to ensure that only one thread can initialize telemetry at a time in case of concurrent calls + with self._telemetry_lock: + resource = self._build_resource() + + telemetry_connections_initialized = False + tracer_provider = None + tracer_provider_registered = False + + if self._telemetry_settings.has_otlp_config: + auth_header = self._build_basic_auth_header() + + metric_endpoint = f"{self._telemetry_settings.grafana_otlp_url}/v1/metrics" + metric_exporter = modules["OTLPMetricExporter"]( + endpoint=metric_endpoint, + headers={"Authorization": auth_header}, + ) + metric_reader = modules["PeriodicExportingMetricReader"]( + metric_exporter, + export_interval_millis=self._telemetry_settings.metrics_export_interval_millis, + ) + meter_provider = modules["MeterProvider"](resource=resource, metric_readers=[metric_reader]) + modules["metrics"].set_meter_provider(meter_provider) + otlp_meter = meter_provider.get_meter(self._service_name, self._service_version) + self._meters["otlp"] = otlp_meter + if self._meter is None: + self._meter = otlp_meter + telemetry_connections_initialized = True + self._logger.info("OTLP metrics exporter enabled: %s", metric_endpoint) + + trace_endpoint = f"{self._telemetry_settings.grafana_otlp_url}/v1/traces" + tracer_provider = modules["TracerProvider"](resource=resource) + tracer_provider.add_span_processor( + modules["BatchSpanProcessor"]( + modules["OTLPSpanExporter"]( + endpoint=trace_endpoint, + headers={"Authorization": auth_header}, + ) + ) + ) + modules["trace"].set_tracer_provider(tracer_provider) + tracer_provider_registered = True + self._tracer = modules["trace"].get_tracer(self._service_name, self._service_version) + telemetry_connections_initialized = True + self._logger.info("OTLP tracing exporter enabled: %s", trace_endpoint) + + if self._telemetry_settings.has_alloy_config: + alloy_endpoint = self._build_alloy_endpoint() + + self._alloy_metrics_via_prometheus = True + telemetry_connections_initialized = True + self._logger.info("Alloy metrics via Prometheus /metrics scrape endpoint (instrument_fastapi_app)") + + if tracer_provider is None: + tracer_provider = modules["TracerProvider"](resource=resource) + + tracer_provider.add_span_processor( + modules["BatchSpanProcessor"]( + modules["OTLPGrpcSpanExporter"]( + endpoint=alloy_endpoint, + insecure=self._telemetry_settings.alloy_insecure, + ) + ) + ) + + if not tracer_provider_registered: + modules["trace"].set_tracer_provider(tracer_provider) + tracer_provider_registered = True + + self._tracer = modules["trace"].get_tracer(self._service_name, self._service_version) + telemetry_connections_initialized = True + self._logger.info("Alloy tracing exporter enabled: %s", alloy_endpoint) + + logger_provider = modules["LoggerProvider"](resource=resource) + modules["_logs"].set_logger_provider(logger_provider) + logger_provider.add_log_record_processor( + modules["BatchLogRecordProcessor"]( + modules["OTLPLogExporter"]( + endpoint=alloy_endpoint, + insecure=self._telemetry_settings.alloy_insecure, + ) + ) + ) + + handler = modules["LoggingHandler"](level=logging.INFO, logger_provider=logger_provider) + logging.getLogger().addHandler(handler) + telemetry_connections_initialized = True + self._logger.info("Alloy logging exporter enabled: %s", alloy_endpoint) + + if telemetry_connections_initialized: + self._telemetry_initialized = True + else: + self._logger.warning("No telemetry connection was initialized") + + def _build_basic_auth_header(self) -> str: + credentials = base64.b64encode( + f"{self._telemetry_settings.grafana_otlp_instance_id}:{self._telemetry_settings.grafana_cloud_api_key}".encode() + ).decode() + return f"Basic {credentials}" + + def _build_alloy_endpoint(self) -> str: + return f"{self._telemetry_settings.alloy_host}:{self._telemetry_settings.alloy_port}" + + def _build_alloy_metrics_endpoint(self) -> str: + scheme = "http" if self._telemetry_settings.alloy_insecure else "https" + return f"{scheme}://{self._telemetry_settings.alloy_host}:{self._telemetry_settings.alloy_metrics_port}" + + def _build_resource(self): + from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION + + return Resource.create( + { + SERVICE_NAME: self._service_name, + SERVICE_VERSION: self._service_version, + "deployment.environment": self._service_environment, + } + ) + + def _import_telemetry_modules(self) -> dict[str, Any] | None: + try: + from opentelemetry import _logs, metrics, trace + from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter + from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as OTLPGrpcSpanExporter + from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler + from opentelemetry.sdk._logs.export import BatchLogRecordProcessor + from opentelemetry.sdk.metrics import MeterProvider + from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + except ImportError as e: + self._logger.warning( + "OpenTelemetry dependencies are not installed. Telemetry initialization skipped. Error: %s", e + ) + return None + + return { + "_logs": _logs, + "metrics": metrics, + "trace": trace, + "OTLPMetricExporter": OTLPMetricExporter, + "OTLPSpanExporter": OTLPSpanExporter, + "OTLPGrpcSpanExporter": OTLPGrpcSpanExporter, + "OTLPLogExporter": OTLPLogExporter, + "LoggerProvider": LoggerProvider, + "LoggingHandler": LoggingHandler, + "BatchLogRecordProcessor": BatchLogRecordProcessor, + "MeterProvider": MeterProvider, + "PeriodicExportingMetricReader": PeriodicExportingMetricReader, + "TracerProvider": TracerProvider, + "BatchSpanProcessor": BatchSpanProcessor + } diff --git a/app/tests/zcs/core/telemetry/telemetry_test.py b/app/tests/zcs/core/telemetry/telemetry_test.py new file mode 100644 index 0000000..ef4b1db --- /dev/null +++ b/app/tests/zcs/core/telemetry/telemetry_test.py @@ -0,0 +1,12 @@ +from zcs.core.settings.telemetry_settings import TelemetrySettings + + +def test_telemetry_settings_otlp_and_loki_flags() -> None: + settings = TelemetrySettings( + grafana_otlp_url="https://example.com", + grafana_loki_url="https://loki.example.com", + grafana_cloud_api_key="token", + ) + + assert settings.has_otlp_config + assert settings.has_loki_config