Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/gooddata-sdk/src/gooddata_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
CatalogExportTemplate,
CatalogExportTemplateAttributes,
)
from gooddata_sdk.catalog.organization.entity_model.ip_allowlist_policy import CatalogIpAllowlistPolicy
from gooddata_sdk.catalog.organization.entity_model.jwk import (
CatalogJwk,
CatalogJwkAttributes,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# (C) 2026 GoodData Corporation
from __future__ import annotations

import json
from typing import Any

import attrs
from attrs import define

from gooddata_sdk.catalog.base import Base


@define(kw_only=True)
class CatalogIpAllowlistPolicy(Base):
"""Represents an IP allowlist policy entity."""

id: str
allowed_sources: list[str] = attrs.field(factory=list)
users: list[str] = attrs.field(factory=list)
user_groups: list[str] = attrs.field(factory=list)

@staticmethod
def client_class() -> type:
return NotImplemented # type: ignore[return-value]

@classmethod
def from_api(cls, entity: dict[str, Any]) -> CatalogIpAllowlistPolicy:
attrs_raw = entity.get("attributes") or {}
users_raw = attrs_raw.get("users") or []
user_groups_raw = attrs_raw.get("userGroups") or []
return cls(
id=entity["id"],
allowed_sources=attrs_raw.get("allowedSources") or [],
users=[u["id"] if isinstance(u, dict) else u for u in users_raw],
user_groups=[g["id"] if isinstance(g, dict) else g for g in user_groups_raw],
)

def to_api_dict(self) -> dict[str, Any]:
"""Serialize to JSON API document dict for POST/PUT requests."""
attributes: dict[str, Any] = {
"allowedSources": self.allowed_sources,
}
if self.users:
attributes["users"] = [{"id": uid, "type": "user"} for uid in self.users]
if self.user_groups:
attributes["userGroups"] = [{"id": gid, "type": "userGroup"} for gid in self.user_groups]
return {
"data": {
"id": self.id,
"type": "ipAllowlistPolicy",
"attributes": attributes,
}
}

def to_api_json_bytes(self) -> bytes:
"""Serialize to JSON bytes for HTTP request body."""
return json.dumps(self.to_api_dict()).encode("utf-8")
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import functools
from typing import Any, Literal

import requests as _requests
from gooddata_api_client.exceptions import NotFoundException
from gooddata_api_client.model.declarative_export_templates import DeclarativeExportTemplates
from gooddata_api_client.model.declarative_notification_channels import DeclarativeNotificationChannels
Expand All @@ -22,6 +23,7 @@
from gooddata_sdk.catalog.catalog_service_base import CatalogServiceBase
from gooddata_sdk.catalog.organization.entity_model.directive import CatalogCspDirective
from gooddata_sdk.catalog.organization.entity_model.identity_provider import CatalogIdentityProvider
from gooddata_sdk.catalog.organization.entity_model.ip_allowlist_policy import CatalogIpAllowlistPolicy
from gooddata_sdk.catalog.organization.entity_model.jwk import CatalogJwk, CatalogJwkDocument
from gooddata_sdk.catalog.organization.entity_model.llm_provider import (
CatalogLlmProvider,
Expand All @@ -35,6 +37,9 @@
from gooddata_sdk.client import GoodDataApiClient
from gooddata_sdk.utils import load_all_entities, load_all_entities_dict

_IP_ALLOWLIST_BASE_PATH = "/api/v1/entities/ipAllowlistPolicies"
_IP_ALLOWLIST_ACTIONS_PATH = "/api/v1/actions/ipAllowlistPolicies"

# Org-level setting controlling which HLL function family calcique uses when
# generating SQL over HLL synopses. `Native` (default) emits StarRocks-native
# `HLL_*` functions; `Presto` emits the Presto-compatible HLL function family
Expand Down Expand Up @@ -628,6 +633,172 @@ def delete_llm_provider(self, id: str) -> None:
"""
self._entities_api.delete_entity_llm_providers(id, _check_return_type=False)

# IP Allowlist Policy CRUD (entity endpoints not yet in generated client)

def _ip_api_call(
self,
method: str,
path: str,
body: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Make an authenticated HTTP request to an IP allowlist policy endpoint.

Uses requests directly (mirroring ``GoodDataApiClient._do_post_request``)
so that we are not dependent on the generated client having typed wrappers
for these endpoints. Returns the parsed JSON body, or an empty dict for
responses without a body (e.g. 204 No Content on DELETE).
"""
hostname: str = self._client._hostname # type: ignore[attr-defined]
token: str = self._client._token # type: ignore[attr-defined]

prefix = "" if hostname.endswith("/") else "/"
url = f"{hostname}{prefix}{path.lstrip('/')}"

headers: dict[str, str] = {
"Content-Type": "application/vnd.gooddata.api+json",
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.gooddata.api+json",
}
kwargs: dict[str, Any] = {"headers": headers}
if body is not None:
kwargs["json"] = body

response = _requests.request(method, url, **kwargs)
response.raise_for_status()
if response.content:
return response.json() # type: ignore[no-any-return]
return {}

def list_ip_allowlist_policies(self) -> list[CatalogIpAllowlistPolicy]:
"""Return all IP allowlist policies in the organization.

Returns:
list[CatalogIpAllowlistPolicy]:
List of IP allowlist policies.
"""
all_items: list[CatalogIpAllowlistPolicy] = []
page = 0
page_size = 500
while True:
raw = self._ip_api_call(
"GET",
f"{_IP_ALLOWLIST_BASE_PATH}?page={page}&size={page_size}",
)
data = raw.get("data") or []
all_items.extend(CatalogIpAllowlistPolicy.from_api(item) for item in data)
if len(data) < page_size:
break
page += 1
return all_items

def get_ip_allowlist_policy(self, policy_id: str) -> CatalogIpAllowlistPolicy:
"""Get an individual IP allowlist policy.

Args:
policy_id (str):
IP allowlist policy identifier.

Returns:
CatalogIpAllowlistPolicy:
The requested IP allowlist policy.
"""
raw = self._ip_api_call("GET", f"{_IP_ALLOWLIST_BASE_PATH}/{policy_id}")
return CatalogIpAllowlistPolicy.from_api(raw["data"])

def create_ip_allowlist_policy(self, policy: CatalogIpAllowlistPolicy) -> CatalogIpAllowlistPolicy:
"""Create a new IP allowlist policy.

Args:
policy (CatalogIpAllowlistPolicy):
IP allowlist policy to create.

Returns:
CatalogIpAllowlistPolicy:
Created IP allowlist policy.
"""
raw = self._ip_api_call("POST", _IP_ALLOWLIST_BASE_PATH, body=policy.to_api_dict())
return CatalogIpAllowlistPolicy.from_api(raw["data"])

def update_ip_allowlist_policy(self, policy: CatalogIpAllowlistPolicy) -> CatalogIpAllowlistPolicy:
"""Update an existing IP allowlist policy.

Args:
policy (CatalogIpAllowlistPolicy):
IP allowlist policy with updated fields.

Returns:
CatalogIpAllowlistPolicy:
Updated IP allowlist policy.

Raises:
ValueError:
IP allowlist policy does not exist.
"""
raw = self._ip_api_call(
"PUT",
f"{_IP_ALLOWLIST_BASE_PATH}/{policy.id}",
body=policy.to_api_dict(),
)
return CatalogIpAllowlistPolicy.from_api(raw["data"])

def delete_ip_allowlist_policy(self, policy_id: str) -> None:
"""Delete an IP allowlist policy.

Args:
policy_id (str):
IP allowlist policy identifier.

Returns:
None
"""
self._ip_api_call("DELETE", f"{_IP_ALLOWLIST_BASE_PATH}/{policy_id}")

def add_targets_to_ip_allowlist_policy(
self,
policy_id: str,
targets: list[dict[str, str]],
) -> None:
"""Add targets to an IP allowlist policy.

Args:
policy_id (str):
IP allowlist policy identifier.
targets (list[dict[str, str]]):
List of targets to add. Each target is a dict with ``id`` and
``type`` keys (e.g. ``{"id": "user1", "type": "user"}``).

Returns:
None
"""
self._ip_api_call(
"POST",
f"{_IP_ALLOWLIST_ACTIONS_PATH}/{policy_id}/addTargets",
body={"targets": targets},
)

def remove_targets_from_ip_allowlist_policy(
self,
policy_id: str,
targets: list[dict[str, str]],
) -> None:
"""Remove targets from an IP allowlist policy.

Args:
policy_id (str):
IP allowlist policy identifier.
targets (list[dict[str, str]]):
List of targets to remove. Each target is a dict with ``id``
and ``type`` keys (e.g. ``{"id": "user1", "type": "user"}``).

Returns:
None
"""
self._ip_api_call(
"POST",
f"{_IP_ALLOWLIST_ACTIONS_PATH}/{policy_id}/removeTargets",
body={"targets": targets},
)

# Layout APIs

def get_declarative_notification_channels(self) -> list[CatalogDeclarativeNotificationChannel]:
Expand Down
Loading
Loading