Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/schematic/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
from typing import Any, Callable, Dict, List, Optional, Union

import httpx

from .base_client import AsyncBaseSchematic, BaseSchematic
from .cache import DEFAULT_CACHE_SIZE, DEFAULT_CACHE_TTL, AsyncCacheProvider, CacheProvider, LocalCache
from .datastream import DataStreamClient, DataStreamClientOptions
from .event_buffer import AsyncEventBuffer, EventBuffer
from .http_client import AsyncOfflineHTTPClient, OfflineHTTPClient
from .logging import get_default_logger
from .logging import DEFAULT_LOG_LEVEL, LogLevel, get_default_logger
from .types import (
CheckFlagRequestBody,
CheckFlagResponseData,
Expand Down Expand Up @@ -63,6 +62,9 @@ class SchematicConfig:
follow_redirects: Optional[bool] = True
httpx_client: Optional[httpx.Client] = None
logger: Optional[logging.Logger] = None
# Level for the default logger; ignored when `logger` is provided so the
# consumer's own logger configuration is the source of truth.
log_level: LogLevel = DEFAULT_LOG_LEVEL
offline: bool = False
timeout: Optional[float] = None
cache_providers: Optional[List[CacheProvider[CheckFlagResponseData]]] = None
Expand All @@ -81,7 +83,7 @@ def __init__(self, api_key: str, config: Optional[SchematicConfig] = None):
timeout=config.timeout,
)
self.event_buffer_period = config.event_buffer_period
self.logger = config.logger or get_default_logger()
self.logger = config.logger or get_default_logger(level=config.log_level)
self.flag_defaults = config.flag_defaults or {}
self.event_buffer = EventBuffer(
events_api=self.events,
Expand Down Expand Up @@ -321,6 +323,9 @@ class AsyncSchematicConfig:
follow_redirects: Optional[bool] = True
httpx_client: Optional[httpx.AsyncClient] = None
logger: Optional[logging.Logger] = None
# Level for the default logger; ignored when `logger` is provided so the
# consumer's own logger configuration is the source of truth.
log_level: LogLevel = DEFAULT_LOG_LEVEL
offline: bool = False
timeout: Optional[float] = None
cache_providers: Optional[List[CacheProvider[CheckFlagResponseData]]] = None
Expand Down Expand Up @@ -370,7 +375,7 @@ def __init__(self, api_key: str, config: Optional[AsyncSchematicConfig] = None):
timeout=config.timeout,
)
self.event_buffer_period = config.event_buffer_period
self.logger = config.logger or get_default_logger()
self.logger = config.logger or get_default_logger(level=config.log_level)
self.flag_defaults = config.flag_defaults or {}
self.event_buffer = AsyncEventBuffer(
events_api=self.events,
Expand Down
42 changes: 29 additions & 13 deletions src/schematic/logging.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,37 @@
import logging
from typing import Optional
from typing import Union

LogLevel = Union[int, str]
DEFAULT_LOG_LEVEL: int = logging.WARNING

def get_default_logger(name: str = "schematic") -> logging.Logger:
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# Name tag for the StreamHandler we attach to the default logger. Lets us
# find and update *our* handler without touching any other handlers a
# consumer may have attached to a same-named logger out-of-band.
_SDK_HANDLER_NAME = "schematichq-default"

if not logger.handlers:
# Create console handler
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)

# Create formatter and add it to the handlers
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
def get_default_logger(name: str = "schematic", level: LogLevel = DEFAULT_LOG_LEVEL) -> logging.Logger:
"""Build the SDK's default console logger.

# Add the handler to the logger
logger.addHandler(ch)
The level defaults to WARNING so production consumers don't get flooded
with diagnostics they didn't ask for. Pass an explicit level (e.g.
``logging.DEBUG`` or ``"DEBUG"``) to enable verbose output. Consumers
who want full control should pass their own ``logging.Logger`` to the
Schematic client instead — the SDK leaves that logger's level alone.
"""
logger = logging.getLogger(name)
logger.setLevel(level)

sdk_handler = next(
(h for h in logger.handlers if getattr(h, "name", None) == _SDK_HANDLER_NAME),
None,
)
if sdk_handler is None:
sdk_handler = logging.StreamHandler()
sdk_handler.name = _SDK_HANDLER_NAME
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
sdk_handler.setFormatter(formatter)
logger.addHandler(sdk_handler)
sdk_handler.setLevel(level)

return logger
158 changes: 158 additions & 0 deletions tests/custom/test_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from __future__ import annotations

import logging
import uuid

from schematic.client import AsyncSchematic, AsyncSchematicConfig, Schematic, SchematicConfig
from schematic.logging import DEFAULT_LOG_LEVEL, get_default_logger


def _unique(name: str) -> str:
"""Use unique names so Python's global logger cache doesn't bleed state across tests."""
return f"{name}-{uuid.uuid4().hex[:8]}"


class TestDefaultLogLevel:
def test_default_level_is_warning(self) -> None:
"""Default must be WARNING — debug/info output is opt-in, not on by default."""
assert DEFAULT_LOG_LEVEL == logging.WARNING

def test_default_logger_uses_warning_when_no_level_specified(self) -> None:
logger = get_default_logger(_unique("schematic-test"))
assert logger.level == logging.WARNING

def test_default_logger_drops_info_and_debug(self) -> None:
"""isEnabledFor is the canonical "would this level emit?" check.
caplog can't be used here since pytest forcibly raises the level."""
logger = get_default_logger(_unique("schematic-test"))
assert not logger.isEnabledFor(logging.DEBUG)
assert not logger.isEnabledFor(logging.INFO)
assert logger.isEnabledFor(logging.WARNING)
assert logger.isEnabledFor(logging.ERROR)


class TestConfigurableLevel:
def test_explicit_level_is_applied(self) -> None:
logger = get_default_logger(_unique("schematic-test"), level=logging.DEBUG)
assert logger.level == logging.DEBUG

def test_handler_level_matches_logger_level(self) -> None:
"""The StreamHandler must also get the configured level — otherwise the
logger lets the message through but the handler still suppresses it."""
logger = get_default_logger(_unique("schematic-test"), level=logging.ERROR)
assert len(logger.handlers) >= 1
for handler in logger.handlers:
assert handler.level == logging.ERROR

def test_string_level_is_accepted(self) -> None:
"""Python's logging module accepts string levels; we shouldn't reject them."""
logger = get_default_logger(_unique("schematic-test"), level="DEBUG")
assert logger.level == logging.DEBUG

def test_re_call_updates_handler_level(self) -> None:
"""Python's getLogger() returns the same Logger instance for a given name,
so a second call with a different level must keep the handler in sync —
otherwise the first call's handler level wins forever."""
name = _unique("schematic-test")
logger = get_default_logger(name, level=logging.ERROR)
for handler in logger.handlers:
assert handler.level == logging.ERROR

logger = get_default_logger(name, level=logging.DEBUG)
assert logger.level == logging.DEBUG
for handler in logger.handlers:
assert handler.level == logging.DEBUG

def test_does_not_mutate_foreign_handlers(self) -> None:
"""If a consumer has attached their own handlers to a same-named logger
out-of-band, the SDK must only touch its own tagged handler."""
name = _unique("schematic-test")
logger = logging.getLogger(name)
foreign = logging.StreamHandler()
foreign.name = "consumer-handler"
foreign.setLevel(logging.CRITICAL)
logger.addHandler(foreign)

get_default_logger(name, level=logging.DEBUG)

# Foreign handler's level must be untouched.
assert foreign.level == logging.CRITICAL
# And the SDK's own handler is present and at the requested level.
sdk_handlers = [h for h in logger.handlers if getattr(h, "name", None) == "schematichq-default"]
assert len(sdk_handlers) == 1
assert sdk_handlers[0].level == logging.DEBUG

def test_re_call_does_not_add_duplicate_sdk_handler(self) -> None:
"""Calling get_default_logger twice with the same name should not stack
SDK handlers — otherwise each call doubles the output."""
name = _unique("schematic-test")
get_default_logger(name, level=logging.WARNING)
get_default_logger(name, level=logging.DEBUG)

logger = logging.getLogger(name)
sdk_handlers = [h for h in logger.handlers if getattr(h, "name", None) == "schematichq-default"]
assert len(sdk_handlers) == 1


class TestSchematicConfigLogLevel:
def test_sync_client_applies_log_level_to_default_logger(self) -> None:
client = Schematic("test-key", config=SchematicConfig(
offline=True, log_level=logging.DEBUG,
))
assert client.logger.level == logging.DEBUG

async def test_async_client_applies_log_level_to_default_logger(self) -> None:
client = AsyncSchematic("test-key", config=AsyncSchematicConfig(
offline=True, log_level=logging.DEBUG,
))
try:
assert client.logger.level == logging.DEBUG
finally:
await client.shutdown()

def test_sync_client_default_is_warning(self) -> None:
client = Schematic("test-key", config=SchematicConfig(offline=True))
assert client.logger.level == logging.WARNING

async def test_async_client_default_is_warning(self) -> None:
client = AsyncSchematic("test-key", config=AsyncSchematicConfig(offline=True))
try:
assert client.logger.level == logging.WARNING
finally:
await client.shutdown()


class TestCustomLoggerRespectsItsOwnLevel:
"""When a consumer provides their own logger, the SDK must not touch its
level — even if log_level is also set on the config."""

def test_provided_logger_level_is_unchanged(self) -> None:
custom = logging.getLogger(_unique("user-app"))
custom.setLevel(logging.CRITICAL)
client = Schematic("test-key", config=SchematicConfig(
offline=True, logger=custom,
))
assert client.logger is custom
assert client.logger.level == logging.CRITICAL

def test_log_level_config_ignored_when_logger_provided(self) -> None:
"""If both `logger` and `log_level` are passed, the explicit logger
wins and its level is untouched."""
custom = logging.getLogger(_unique("user-app"))
custom.setLevel(logging.CRITICAL)
client = Schematic("test-key", config=SchematicConfig(
offline=True, logger=custom, log_level=logging.DEBUG,
))
assert client.logger.level == logging.CRITICAL # NOT DEBUG

async def test_async_provided_logger_level_is_unchanged(self) -> None:
custom = logging.getLogger(_unique("user-app"))
custom.setLevel(logging.CRITICAL)
client = AsyncSchematic("test-key", config=AsyncSchematicConfig(
offline=True, logger=custom,
))
try:
assert client.logger is custom
assert client.logger.level == logging.CRITICAL
finally:
await client.shutdown()
Loading