Skip to content

Antalya 26.3: Cluster Joins part 2 - global mode#1782

Open
ianton-ru wants to merge 7 commits into
antalya-26.3from
frontport/antalya-26.3/json_part2
Open

Antalya 26.3: Cluster Joins part 2 - global mode#1782
ianton-ru wants to merge 7 commits into
antalya-26.3from
frontport/antalya-26.3/json_part2

Conversation

@ianton-ru
Copy link
Copy Markdown

@ianton-ru ianton-ru commented May 12, 2026

Changelog category (leave one):

  • New Feature

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Cluster Joins part 2 - global mode

Documentation entry for user-facing changes

Frontport of #1527

Setting object_storage_cluster_join_mode wiith value global.
In queries like

SELECT * FROM iceberg_table(...) JOIN local_table(...) ON ...

when left table is executed on cluster (s3Cluster, Iceberg with object_storage_cluster setting, etc.) data from right table is extracted and sent to swarm nodes as temorary tables. JOIN is executed on swarm nodes.

This PR also includes several fixes for issues, found by AI

These changes are in last three commits, and new for 26.3 port, do not exists in #1527 for 26.1.

CI/CD Options

Exclude tests:

  • Fast test
  • Integration Tests
  • Stateless tests
  • Stateful tests
  • Performance tests
  • All with ASAN
  • All with TSAN
  • All with MSAN
  • All with UBSAN
  • All with Coverage
  • All with Aarch64
  • All Regression
  • Disable CI Cache

Regression jobs to run:

  • Fast suites (mostly <1h)
  • Aggregate Functions (2h)
  • Alter (1.5h)
  • Benchmark (30m)
  • ClickHouse Keeper (1h)
  • Iceberg (2h)
  • LDAP (1h)
  • Parquet (1.5h)
  • RBAC (1.5h)
  • SSL Server (1h)
  • S3 (2h)
  • S3 Export (2h)
  • Swarms (30m)
  • Tiered Storage (2h)

@ianton-ru ianton-ru added antalya port-antalya PRs to be ported to all new Antalya releases antalya-26.3 labels May 12, 2026
@ianton-ru
Copy link
Copy Markdown
Author

@codex review

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 12, 2026

Workflow [PR], commit [fc39e6d]

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 01d8b03ac1

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread src/Storages/IStorageCluster.cpp
Comment thread src/Storages/buildQueryTreeForShard.cpp
@svb-alt svb-alt added the forwardport This is a frontport of code that existed in previous Antalya versions label May 12, 2026
@svb-alt svb-alt requested a review from arthurpassos May 14, 2026 12:12
arthurpassos
arthurpassos previously approved these changes May 15, 2026
Copy link
Copy Markdown
Collaborator

@arthurpassos arthurpassos left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not familiar with this piece of code, I highly doubt I can add any value here. The code looks sane, and the tests as well.

@alsugiliazova
Copy link
Copy Markdown
Member

Audit: PR #1782 — Antalya 26.3: Cluster Joins part 2 - global mode

AI audit note: This review comment was generated by AI (Cursor agent, audit-review skill).

Scope reviewed

Head b63ad834 against base 918a26e6 on antalya-26.3. Feature scope: frontport of #1527 (cluster joins, GLOBAL mode for object_storage_cluster_join_mode) plus three 26.3-only follow-up fixes (Fix global IN, Fix cross join replacement, Fix rewriteInToGlobalIn). In-scope files: src/Core/Settings.cpp, src/Storages/IStorageCluster.{cpp,h}, src/Storages/buildQueryTreeForShard.{cpp,h}, src/Storages/StorageDistributed.cpp (deletion of RewriteInToGlobalInVisitor inline class, refactored into buildQueryTreeForShard.cpp), and integration tests test_s3_cluster/test.py, test_database_iceberg/test.py, test_storage_iceberg_with_spark/test_cluster_joins.py. Method: static analysis (call graph + transition matrix + logical fault injection on the new GLOBAL branch and the refactored helpers). No local build/run.

Confirmed defects

Medium: RewriteJoinToGlobalJoinVisitor leaks object_storage_cluster_join_mode=global into unrelated callers (parallel replicas + ClusterProxy)

  • Impact: Whenever a user has object_storage_cluster_join_mode='global' set in the session/profile, queries that go through parallel replicas or ClusterProxy::executeQueryWithCluster* paths now silently override parallel_replicas_prefer_local_join=true and force JOINs to JoinLocality::Global even when neither side is an IStorageCluster. Behavior of unrelated features changes based on an object-storage-only setting.
  • Anchor: src/Storages/buildQueryTreeForShard.cppRewriteJoinToGlobalJoinVisitor::enterImpl
  • Trigger: Session setting object_storage_cluster_join_mode='global' + any query that calls rewriteJoinToGlobalJoin from Planner/findParallelReplicasQuery.cpp:513 or Interpreters/ClusterProxy/executeQuery.cpp:832 (e.g., a normal MergeTree JOIN MergeTree under parallel replicas with parallel_replicas_prefer_local_join=1).
  • Why defect: The new check is prefer_local_join = ...prefer_local_join && ...object_storage_cluster_join_mode != GLOBAL. The visitor is shared by three callers but only one (the new IStorageCluster GLOBAL branch) should care about this setting. A session-wide flip of object_storage_cluster_join_mode thus mutates parallel-replicas/cluster-proxy JOIN locality decisions transparently.
  • Fix direction: Gate the override behind a visitor-constructor parameter (or pass it via the caller) instead of reading object_storage_cluster_join_mode from context inside the visitor.
  • Regression test direction: SELECT … FROM merge_tree A JOIN merge_tree B ON … under max_parallel_replicas=2, parallel_replicas_prefer_local_join=1, object_storage_cluster_join_mode='global'; assert plan keeps the JOIN local (not GLOBAL).

Medium: GLOBAL mode loses GLOBAL IN rewrite when the IN subquery only references StorageDistributed

  • Impact: For WHERE x IN (SELECT … FROM distributed_table) against an s3Cluster/iceberg cluster with object_storage_cluster_join_mode='global', the local IN is shipped verbatim to swarm nodes instead of being executed once on the initiator and broadcast as a temporary table. Each swarm node re-executes the subquery (potentially N× cost) and, if the swarm cluster does not know distributed_table's underlying cluster, the query fails on swarm nodes. User asked for GLOBAL semantics but got local-IN semantics.
  • Anchor: src/Storages/buildQueryTreeForShard.cppRewriteInToGlobalInVisitor::enterImpl (lines ~812–847); reached from IStorageCluster::updateQueryWithJoinToSendIfNeeded GLOBAL case (IStorageCluster.cpp ~324).
  • Trigger: object_storage_cluster_join_mode='global' + a query whose IN-subquery's join tree contains only StorageDistributed tables. Smallest case: SELECT … FROM iceberg_cluster(…) WHERE x IN (SELECT id FROM distributed_local) SETTINGS object_storage_cluster_join_mode='global'.
  • Why defect: The no_replace flag stays true when every table in extractTableExpressions(query->getJoinTree(), false, true) is a StorageDistributed, so the visitor early-returns without converting the function name. buildQueryTreeForShard then never picks the function up (it is still local in, not globalIn), so the temp-table materialization step is skipped. This logic was correct in its original home (StorageDistributed.cpp::buildQueryTreeDistributed, where peer shards are by construction reachable from each other) but is wrong when reused from an object-storage cluster initiator whose swarm nodes are a separate cluster from the distributed_local cluster.
  • Fix direction: When invoked from the IStorageCluster GLOBAL path, force-rewrite local IN to global IN unconditionally (skip the StorageDistributed-only early-return), e.g. by parameterizing the visitor with a force_global_in flag.
  • Regression test direction: Integration test that creates a Distributed table over a non-swarm cluster, runs SELECT … FROM iceberg(…) WHERE col IN (SELECT … FROM distributed_local) SETTINGS object_storage_cluster_join_mode='global'; assert system.query_log on swarm nodes shows the temp table read, not a distributed_local execution.

Low (latent): SearcherVisitor cannot find any match beyond the first when entry > 1

  • Impact: Dead today (all three call sites pass entry=1); future call with entry >= 2 will return nullptr and trip Can't find … LOGICAL_ERRORs in updateQueryWithJoinToSendIfNeeded/getQueryTreeInfo.
  • Anchor: src/Storages/IStorageCluster.cppSearcherVisitor::needChildVisit
  • Trigger: Future caller using entry >= 2 (parameter was added in this PR but never exercised with non-1 values).
  • Why defect: needChildVisit returns getSubqueryDepth() <= 2 && !passed_node && !current_entry. Once the first matching node increments current_entry to 1, recursion stops everywhere, so siblings and grandchildren are no longer visited and the second match is never reached.
  • Fix direction: Replace !current_entry with current_entry < entry (or drop the check entirely and rely on !passed_node).
  • Regression test direction: Add a unit/gtest that constructs a query with two TABLE_FUNCTION nodes and asserts SearcherVisitor({TABLE_FUNCTION}, 2, ctx) returns the second one.

Coverage summary

Item Detail
Scope reviewed IStorageCluster::{read, getQueryProcessingStage, updateQueryWithJoinToSendIfNeeded, getQueryTreeInfo}; SearcherVisitor/CollectUsedColumnsForSourceVisitor; new GLOBAL branch + external-tables propagation into ReadFromCluster; buildQueryTreeForShard extension for find_cross_join; DistributedProductModeRewriteInJoinVisitor extension; new rewriteInToGlobalIn/RewriteInToGlobalInVisitor move + signature fix (b63ad83); RewriteJoinToGlobalJoinVisitor setting interaction; the three new integration tests under test_s3_cluster/test_joins and test_storage_iceberg_with_spark/test_cluster_joins.
Categories failed Cross-setting interaction (object_storage_cluster_join_mode leaks into RewriteJoinToGlobalJoinVisitor for non-cluster callers); semantic mismatch of no_replace heuristic inherited from StorageDistributed.
Categories passed Memory lifetime (std::optional<Tables> external_tables is captured by value; temp-table StoragePtrs outlive the ReadFromCluster step via getMutableQueryContext); use-after-move (temporary_table_expression_node is loop-local in the new CROSS-JOIN branch — moved only on the final emplace, falls out of scope immediately after); RTTI (typeid_cast<CrossJoinNode> properly gated by find_cross_join); exception/rollback (no shared mutable state mutated before throw paths); getQueryProcessingStage asymmetric GLOBAL vs LOCAL routing matches intended swarm-side aggregation; buildQueryTreeForShard descendant-map walk correctly handles compound right-side expressions for both JOIN and the new CROSS_JOIN branch; rewriteInToGlobalIn QueryTreeNodePtr & fix in b63ad83 correctly propagates root-replacement back to caller (query_node.getWhere() reference).
Assumptions / limits Static analysis only. Did not verify whether planner_context->getMutableQueryContext() is identical to the context Context::createCopy()'d in the object_storage_remote_initiator path — temp tables added inside updateQueryWithJoinToSendIfNeeded may not be visible to the remote-initiator forwarding path (the new external_tables capture only feeds the local-cluster ReadFromCluster step, not the remote-initiator storage_and_context.storage->read(...) recursion at IStorageCluster.cpp:383). Flagged as an unknown, not a confirmed defect, because the planner context and read-context relationship in that branch was not exhaustively traced. Cross-join multi-table case (FROM A, B, C with the cluster table at index > 0) is by-design unsupported (consistent with LOCAL-mode behavior) and not classified as a defect.

@alsugiliazova alsugiliazova added the verified-with-issues Verified by QA and issues found. label May 15, 2026
@ianton-ru
Copy link
Copy Markdown
Author

Medium: RewriteJoinToGlobalJoinVisitor leaks object_storage_cluster_join_mode=global into unrelated callers

Expected. object_storage_cluster_join_mode has priority over parallel_replicas_prefer_local_join

Medium: GLOBAL mode loses GLOBAL IN rewrite when the IN subquery only references StorageDistributed

Fixed in a8dbd32

@alsugiliazova
Copy link
Copy Markdown
Member

Audit: PR #1782 — Antalya 26.3: Cluster Joins part 2 - global mode

AI audit note: This review was generated by AI (Cursor agent, audit-review skill). Static analysis only; no local build or test run.

Last updated: 2026-05-19
PR head: a8dbd327946
Base branch: antalya-26.3
Prior audit revision: b63ad834 (re-audited after a8dbd327)


Executive summary

PR #1782 frontports cluster-join GLOBAL mode (object_storage_cluster_join_mode='global') for object-storage cluster table functions (s3Cluster, Iceberg with object_storage_cluster, etc.). The initiator materializes right-hand / IN subquery data into temporary tables and sends a rewritten query plus external_tables to swarm nodes.

Four follow-up commits on 26.3 address AI review feedback (GLOBAL IN, cross join, rewriteInToGlobalIn signature, distributed-table GLOBAL IN). Commit a8dbd327 specifically fixes GLOBAL IN when the subquery references only StorageDistributed.

Open confirmed defects: 1 Medium, 1 Low (latent).
Resolved since first audit: 1 Medium (distributed-table GLOBAL IN).


Commits in scope (26.3 delta)

Commit Summary
01d8b03 Merge #1527 — cluster joins part 2 (GLOBAL mode feature)
3926fa3 Fix global IN
fb33a04 Fix cross join replacement
b63ad834 Fix rewriteInToGlobalIn (QueryTreeNodePtr &, root replacement)
a8dbd327 Fix cluster function global join with distributed table (rewrite_for_distributed + tests)

Files reviewed

Path Role
src/Core/Settings.cpp Documents object_storage_cluster_join_mode='global'
src/Storages/IStorageCluster.{cpp,h} GLOBAL/LOCAL query rewrite, ReadFromCluster + external_tables
src/Storages/buildQueryTreeForShard.{cpp,h} Temp-table materialization, rewriteInToGlobalIn, rewriteJoinToGlobalJoin, cross join
src/Storages/StorageDistributed.cpp Removed inline RewriteInToGlobalInVisitor; uses shared helpers
tests/integration/test_s3_cluster/test.py Join tests
tests/integration/test_database_iceberg/test.py Join tests
tests/integration/test_storage_iceberg_with_spark/test_cluster_joins.py Local/global join, CROSS JOIN, distributed table
tests/integration/test_storage_iceberg_with_spark/conftest.py with_zookeeper=True for ON CLUSTER setup (a8dbd327)

Architecture (call graph & transitions)

Entry points

  1. Client query → planner (allow_experimental_analyzer=true required for object_storage_cluster_join_mode != allow).
  2. IStorageCluster::read (initiator) → updateQueryWithJoinToSendIfNeeded → sample block → ReadFromClusterRemoteQueryExecutor per shard.
  3. Alternate: object_storage_remote_initiatorconvertToRemoteStorageDistributed::read (not fully traced for external_tables propagation).

GLOBAL mode transition (object_storage_cluster_join_mode='global')

When has_join || has_cross_join || has_local_columns_in_where:

  1. Clone query tree.
  2. rewriteJoinToGlobalJoin — set JoinLocality::Global (respects parallel_replicas_prefer_local_join unless object_storage_cluster_join_mode=GLOBAL — see defect below).
  3. If has_local_columns_in_where: rewriteInToGlobalIn(tree, context, /*rewrite_for_distributed*/ true) (a8dbd327).
  4. buildQueryTreeForShard(planner_context, tree, allow_global_join_for_right_table=false, find_cross_join=true) — execute right-side / IN / cross-join RHS subqueries on initiator; add temp tables to planner_context mutable query context.
  5. queryNodeToDistributedSelectQuery → AST sent to swarm nodes.
  6. ReadFromCluster passes planner_context->getMutableQueryContext()->getExternalTables() into RemoteQueryExecutor.

LOCAL mode transition (unchanged intent)

Strip join to left table expression only; project object-storage columns; prune WHERE to object-storage-only predicates; getQueryProcessingStage returns FetchColumns when join/cross join/local WHERE columns present.

Key invariants

  • Swarm nodes must not execute subqueries that reference tables/clusters they cannot reach (GLOBAL IN/JOIN + temp tables).
  • Temp table StoragePtr must outlive ReadFromCluster pipeline (held in query context / external_tables snapshot).
  • Leftmost table in join tree is assumed to be the object-storage cluster table (SearcherVisitor with entry=1).

Confirmed defects (open)

Medium: object_storage_cluster_join_mode=global leaks into unrelated rewriteJoinToGlobalJoin callers

Field Detail
Impact Session/profile object_storage_cluster_join_mode='global' forces JoinLocality::Global on queries that use rewriteJoinToGlobalJoin from parallel replicas or ClusterProxy, even for ordinary MergeTree JOIN MergeTree, overriding parallel_replicas_prefer_local_join=1.
Anchor src/Storages/buildQueryTreeForShard.cppRewriteJoinToGlobalJoinVisitor::enterImpl
Trigger object_storage_cluster_join_mode='global' + rewriteJoinToGlobalJoin from Planner/findParallelReplicasQuery.cpp:513 or Interpreters/ClusterProxy/executeQuery.cpp:832.
Why defect prefer_local_join = parallel_replicas_prefer_local_join && object_storage_cluster_join_mode != GLOBAL is evaluated inside a shared visitor; only IStorageCluster::updateQueryWithJoinToSendIfNeeded should apply this object-storage setting.
Fix direction Add a caller flag to rewriteJoinToGlobalJoin / visitor constructor; do not read object_storage_cluster_join_mode from context in the shared visitor.
Regression test MergeTree JOIN MergeTree with max_parallel_replicas>1, parallel_replicas_prefer_local_join=1, object_storage_cluster_join_mode='global'; assert plan keeps JOIN local.
            bool prefer_local_join = getContext()->getSettingsRef()[Setting::parallel_replicas_prefer_local_join]
                && getContext()->getSettingsRef()[Setting::object_storage_cluster_join_mode] != ObjectStorageClusterJoinMode::GLOBAL;
            bool should_use_global_join = !prefer_local_join || !allStoragesAreMergeTree(join_node->getRightTableExpression());
            if (should_use_global_join)
                join_node->setLocality(JoinLocality::Global);

Callers of rewriteJoinToGlobalJoin:

  • IStorageCluster.cpp — GLOBAL branch (intended)
  • findParallelReplicasQuery.cpp — parallel replicas (unintended side effect)
  • ClusterProxy/executeQuery.cpp — distributed parallel replicas (unintended side effect)

Low (latent): SearcherVisitor cannot find match index ≥ 2

Field Detail
Impact All call sites pass entry=1 today. Future entry>=2 returns nullptrLOGICAL_ERROR in getQueryTreeInfo / updateQueryWithJoinToSendIfNeeded.
Anchor src/Storages/IStorageCluster.cppSearcherVisitor::needChildVisit
Trigger SearcherVisitor(..., entry=2, ...) on a tree with two TABLE / TABLE_FUNCTION nodes.
Why defect needChildVisit uses !current_entry; after the first match current_entry>=1, traversal stops before a second sibling is visited.
Fix direction Use current_entry < entry or rely only on !passed_node.
Regression test Unit test: two table functions; entry=2 returns the second node.
    bool needChildVisit(QueryTreeNodePtr & /*parent*/, QueryTreeNodePtr & /*child*/)
    {
        return getSubqueryDepth() <= 2 && !passed_node && !current_entry;
    }

Resolved findings (addressed in PR)

Medium: GLOBAL IN not rewritten when IN subquery only references StorageDistributedfixed in a8dbd327

Field Detail
Original issue RewriteInToGlobalInVisitor skipped rewrite when every table in the IN subquery was StorageDistributed (no_replace early return). Wrong on object-storage initiator: swarm nodes are not the distributed table's cluster.
Fix rewrite_for_distributed parameter; when true, skip no_replace logic. IStorageCluster GLOBAL path calls rewriteInToGlobalIn(..., /*rewrite_for_distributed*/ true).
Tests test_cluster_joins.py: JOIN distributed table, GLOBAL IN / IN with distributed subquery; conftest with_zookeeper=True for ON CLUSTER.
            if (!rewrite_for_distributed)
            {
                bool no_replace = true;
                for (const auto & table_node : extractTableExpressions(query->getJoinTree(), false, true))
                {
                    // ... StorageDistributed check ...
                }
                if (no_replace)
                    return;
            }

Note: rewriteInToGlobalIn(..., true) runs only when info.has_local_columns_in_where. Pure WHERE tag IN (SELECT … FROM distributed) without other non–object-storage column nodes in WHERE/PREWHERE may not set that flag; behavior then relies on buildQueryTreeForShard (find_cross_join=true + object_storage_cluster_join_mode=GLOBAL in DistributedProductModeRewriteInJoinVisitor). New integration tests cover plain IN with a distributed table for both local and global join modes.


Other fixes verified (no remaining defect classification)

Commit Issue Resolution
3926fa3 GLOBAL IN rewrite in object-storage cluster path rewriteInToGlobalIn / visitor plumbing
fb33a04 CROSS JOIN right-side not materialized find_cross_join + CrossJoinNode handling in buildQueryTreeForShard and DistributedProductModeRewriteInJoinVisitor
b63ad834 Root IN replacement not visible to caller rewriteInToGlobalIn(QueryTreeNodePtr &) + visit full tree / WHERE reference
(feature) Temp tables not sent to swarm ReadFromCluster captures external_tables from planner_contextRemoteQueryExecutor
(feature) getQueryProcessingStage LOCAL: FetchColumns when join/cross join/local WHERE; GLOBAL: join stays on swarm (no forced FetchColumns for join strip) — intentional

Not confirmed / out of scope

Topic Status
object_storage_remote_initiator + GLOBAL join external_tables wired for direct ReadFromCluster path only. Remote-initiator recursion via StorageDistributed::read + Context::createCopy in convertToRemote not exhaustively traced; not classified as confirmed defect.
Cross join with cluster table not at index 0 Same limitation as LOCAL mode; by design / documented behavior.
IN only in JOIN ON (not WHERE) Not covered by has_local_columns_in_where; may rely on buildQueryTreeForShard stack logic — no failing test identified.
CI PR CI had integration/regression failures at a8dbd327; not analyzed for root cause vs. this feature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

antalya antalya-26.3 forwardport This is a frontport of code that existed in previous Antalya versions port-antalya PRs to be ported to all new Antalya releases verified-with-issues Verified by QA and issues found.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants