Skip to content

fix(sync): drain sync_queue on daemon startup (GRA-1245 / epic GRA-1198)#213

Merged
Gradata merged 1 commit into
mainfrom
fix/sync-drain-on-startup-gra-1245
May 20, 2026
Merged

fix(sync): drain sync_queue on daemon startup (GRA-1245 / epic GRA-1198)#213
Gradata merged 1 commit into
mainfrom
fix/sync-drain-on-startup-gra-1245

Conversation

@Gradata
Copy link
Copy Markdown
Owner

@Gradata Gradata commented May 20, 2026

GRA-1245: drain sync_queue on daemon startup

Summary

Daemon restart used to leave rows in sync_queue with synced_at IS NULL until the next correction event triggered a flush. If the daemon crashed mid-batch, pending syncs sat indefinitely.

This PR adds a one-shot synchronous drain that runs before the HTTP listener opens, so the first request after a daemon (re)start never observes a stale backlog.

What changed

  • src/gradata/_sync_worker.py — new module-level drain_sync_queue(brain_dir, api_key, ingest_url=...) -> int that synchronously flushes pending rows. Bounded (max 64 batches × 50 rows), idempotent, and gracefully no-ops on missing system.db / missing api_key / missing sync_queue table so startup never fails because cloud sync is misconfigured.
  • src/gradata/daemon.pyGradataDaemon.start() now calls a new _drain_sync_queue_at_startup() hook before _try_bind(). Wraps the drain in try/except so a drain error never blocks daemon boot; the background SyncWorker still handles steady-state.
  • Emits a single discoverable log line: sync queue drained at startup: N rows (info, logger gradata.sync_worker) so operators can confirm via journalctl or daemon.log.

Concurrency contract

Safe to call concurrently with the background SyncWorker:

  • Both paths open their own short-lived sqlite connections.
  • Worst-case race produces an at-most-once double-POST that the cloud /api/v1/ingest endpoint deduplicates on event_id.
  • Rows already marked synced_at are filtered out by peek_pending, so re-invocation is a no-op.

Test plan

New tests/test_sync_drain_on_startup.py (7 tests, all passing):

  1. test_drain_sync_queue_flushes_pending_rows — seed 5 pending rows, call drain, assert all synced_at populated + log line emitted + 5 HTTP POSTs.
  2. test_drain_sync_queue_is_idempotent — second call on a clean queue drains 0 rows.
  3. test_drain_sync_queue_noop_without_api_keyapi_key=None → 0 rows, no crash, rows remain pending.
  4. test_drain_sync_queue_noop_when_db_missing — fresh brain dir.
  5. test_drain_sync_queue_noop_when_table_missing — DB without sync_queue table.
  6. test_daemon_startup_hook_drains_before_listener — calls GradataDaemon._drain_sync_queue_at_startup() directly (no HTTP server bound), proves the integration.
  7. test_drain_sync_queue_safe_concurrent_with_worker — two concurrent drainers; queue ends fully drained, no errors, no double-mark from the local DB's perspective.

Existing test_sync_worker.py, test_sync_queue.py, test_daemon_sync.py all still pass (34 tests total across all sync files).

pytest tests/test_sync_drain_on_startup.py
# 7 passed in 3.02s

Ruff: ruff check --fix + ruff format clean on all touched files.

Layering check

  • gradata._sync_worker (Layer 0) gains a module-level helper.
  • gradata.daemon (Layer 2) imports from gradata._sync_worker (Layer 0).
  • No Layer 0 → Layer 2 import introduced.

Risk

Low.

  • Startup-drain failures are caught and logged; daemon comes up regardless.
  • Bounded batch loop (64 × 50 rows) prevents pathological startup hangs on huge backlogs.
  • Background SyncWorker continues to handle steady-state and retries anything the startup drain skipped.
  • No schema migration, no public-API change.

Kanban: t_bd7b2bf4 · Epic: GRA-1198

Previously, daemon restart left rows in sync_queue with synced_at IS NULL until the next correction event triggered a flush. If the daemon crashed mid-batch, pending syncs sat indefinitely.

This change adds gradata._sync_worker.drain_sync_queue(), a synchronous one-shot drainer that GradataDaemon.start() now invokes BEFORE binding the HTTP listener. The hook is idempotent, safe to call concurrently with the background SyncWorker (both paths open their own short-lived sqlite connections; the cloud /ingest endpoint dedupes any duplicate POST on event_id), and emits a discoverable info log line 'sync queue drained at startup: N rows' (visible in journalctl / daemon.log).

Summary:

- New gradata._sync_worker.drain_sync_queue(brain_dir, api_key, ingest_url=...) -> int

- New GradataDaemon._drain_sync_queue_at_startup() called before _try_bind()

- New tests/test_sync_drain_on_startup.py covers happy path, idempotency, missing api_key, fresh brain, concurrent callers, and the daemon hook itself

Layering: Layer 0 helper called from Layer 2 daemon; no Layer 0 -> 2 import introduced.

Risk: low — startup-drain failures are caught and logged; the background SyncWorker still handles steady-state and retries.
Copy link
Copy Markdown

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 20, 2026

Review Change Stack

📝 Walkthrough
  • New public API: Exported drain_sync_queue(brain_dir, api_key, ingest_url=...) function in _sync_worker.py that synchronously flushes pending rows on startup (bounded to 64 batches × 50 rows)
  • Daemon startup integration: GradataDaemon.start() now calls _drain_sync_queue_at_startup() before binding the HTTP listener to prevent pending unsynced rows from persisting
  • Graceful error handling: Drain failures are logged as warnings but do not block daemon startup; background SyncWorker remains responsible for steady-state syncing
  • Idempotent and safe: No-ops when system.db, api_key, or sync_queue are missing; safe to run concurrently with background worker (each uses short-lived SQLite connections)
  • Logging: Emits single info log line "sync queue drained at startup: N rows" via gradata.sync_worker logger
  • Comprehensive test coverage: New test_sync_drain_on_startup.py module with 7 tests covering flush behavior, idempotency, missing dependencies, daemon integration, and concurrent access
  • No breaking changes or security fixes in this PR
  • Low risk: Layer 0 helper used by Layer 2 daemon with no new cross-layer imports; worst-case race may produce at-most-once duplicate POST (deduplicated by cloud API on event_id)

Walkthrough

This PR introduces a synchronous drain_sync_queue() helper that flushes pending sync_queue rows by repeatedly invoking SyncWorker._tick(). The daemon integrates this drain into its startup sequence before binding the HTTP listener, ensuring stale backlogs do not block early requests. The PR includes comprehensive tests validating drain behavior, edge cases, daemon integration, and concurrent safety.

Changes

Sync Queue Startup Drain

Layer / File(s) Summary
Sync worker drain implementation
Gradata/src/gradata/_sync_worker.py
Introduces drain_sync_queue(brain_dir, api_key, ingest_url) that validates configuration, pre-counts pending rows via internal _count_pending() helper, then repeatedly calls SyncWorker._tick() up to a hard cap while tracking progress by re-counting and stopping on completion, exceptions, or stalled progress. Module exports are updated to include the new function. Linter suppression comment is removed from urllib.request.urlopen call.
Daemon startup integration
Gradata/src/gradata/daemon.py
Adds _drain_sync_queue_at_startup() helper that resolves the cloud API key, reads ingest URL from GRADATA_CLOUD_INGEST_URL environment variable, and calls drain_sync_queue with graceful error handling. The start() method invokes this drain before binding the HTTP server to prevent stale sync_queue backlog on first request.
Test infrastructure and drain validation
Gradata/tests/test_sync_drain_on_startup.py
Adds helpers to create minimal on-disk system.db with sync_queue schema, enqueue pending rows, and count pending state. Implements an in-process HTTP stub server for testing ingest POST requests. Core tests validate drain_sync_queue flushes all pending rows, marks them synced, emits correct log messages, performs expected POSTs, and is idempotent. Edge-case tests assert safe no-op behavior when api_key is missing, system.db does not exist, or sync_queue table is absent. Daemon-specific test directly calls GradataDaemon._drain_sync_queue_at_startup with stubbed Brain. Concurrency test runs two simultaneous drain invocations from separate threads and verifies no errors and full drain completion.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related issues

Possibly related PRs

  • Gradata/gradata#200: Introduces SyncWorker._tick() and local sync_queue draining behavior that this PR builds upon for daemon startup integration.

Suggested labels

bug

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix(sync): drain sync_queue on daemon startup (GRA-1245 / epic GRA-1198)' clearly and concisely describes the main change: adding a synchronous drain of the sync_queue during daemon startup to prevent stale rows.
Description check ✅ Passed The description comprehensively explains the problem, solution, implementation details, testing approach, concurrency contract, and risk assessment, all directly related to the changeset.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/sync-drain-on-startup-gra-1245

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 OpenGrep (1.21.0)

OpenGrep fatal error (exit code 2):
┌──────────────┐
│ Opengrep CLI │
└──────────────┘

�[32m✔�[39m �[1mOpengrep OSS�[0m
�[32m✔�[39m Basic security coverage for first-party code vulnerabilities.

�[1m Loading rules from local config...�[0m
[00.25][ERROR]: Error: exception Glob.Lexer.Syntax_error("malformed glob pattern: missing ']'")
Raised at Glob__Lexer.syntax_error in file "libs/glob/Lexer.mll", line 8, characters 2-26
Called from Glob__Lexer.__ocaml_lex_token_rec in file "libs/glob/Lexer.mll", line 29, characters 26-53
Cal


Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot added the bug Something isn't working label May 20, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@Gradata/src/gradata/_sync_worker.py`:
- Around line 85-90: The function drain_sync_queue has multiple early-return
paths that don't log the startup no-op; add a discoverability debug log before
each early return so all no-op exits emit the same startup-drain message.
Specifically, in _sync_worker.py add a logger.debug call (matching the existing
style, e.g. "drain_sync_queue: startup no-op - reason") immediately before the
early returns where db_path is missing (the not db_path.exists() branch) and the
other early-return block around lines 101-105, referencing the same logger used
elsewhere so that drain_sync_queue, db_path, and api_key code paths all log the
startup-drain info.

In `@Gradata/tests/test_sync_drain_on_startup.py`:
- Around line 315-317: The assertion is tautological because a prior check
already ensures pending is zero; replace the current assertion with a true
validation of returned accounting by removing the fallback clause and asserting
that sum(results) >= 10 (i.e., assert sum(results) >= 10) so the test actually
verifies drain_sync_queue's reported drained count; locate the assertion
referencing results and _count_pending(db_path) in the
test_sync_drain_on_startup test and update it accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 094e5896-95cb-4b9c-bec7-306de8732ef1

📥 Commits

Reviewing files that changed from the base of the PR and between bc4e066 and baf8300.

📒 Files selected for processing (3)
  • Gradata/src/gradata/_sync_worker.py
  • Gradata/src/gradata/daemon.py
  • Gradata/tests/test_sync_drain_on_startup.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: pytest (py3.11)
  • GitHub Check: pytest macos-latest / py3.11
  • GitHub Check: pytest (py3.12)
  • GitHub Check: pytest ubuntu-latest / py3.11
  • GitHub Check: pytest macos-latest / py3.12
  • GitHub Check: pytest windows-latest / py3.11
  • GitHub Check: pytest ubuntu-latest / py3.12
  • GitHub Check: pytest windows-latest / py3.12
🧰 Additional context used
📓 Path-based instructions (2)
Gradata/src/**/*.py

📄 CodeRabbit inference engine (Gradata/AGENTS.md)

Gradata/src/**/*.py: Prefer sentence-transformers for local embeddings, google-genai for Gemini embeddings, cryptography for AES-GCM encrypted system.db, bm25s for BM25 rule ranking, and mem0ai for external memory adapters — guard all optional dependency imports with try / except ImportError at the call site, never at module level
Maintain strict layering: Layer 0 (Primitives: _types.py, _db.py, _events.py, _paths.py, _file_lock.py; Patterns: contrib/patterns/) must never import from Layer 1 (Enhancements: enhancements/, rules/) or Layer 2 (Public API: brain.py, cli.py, daemon.py, mcp_server.py)
Never use bare except: pass — use typed exceptions or at minimum logger.warning(...) with exc_info=True to avoid silent failure in a memory product
Never import from out-of-scope sibling directories ../Sprites/ or ../Hausgem/ within gradata/* code — that is a layering bug
Never leak private-sibling paths into public docs/code — no references to ../Sprites/, ../Hausgem/, email addresses, OneDrive paths, or Sprites-specific examples from inside gradata/*
Use atomic-write helper when writing JSON files to prevent corruption from mid-write crashes

Files:

  • Gradata/src/gradata/daemon.py
  • Gradata/src/gradata/_sync_worker.py
Gradata/tests/**/*.py

📄 CodeRabbit inference engine (Gradata/AGENTS.md)

Gradata/tests/**/*.py: Set BRAIN_DIR environment variable via tmp_path in conftest.py for test isolation — ensure _paths.py module cache refreshes when calling Brain.init() directly inside tests
Add unit tests in tests/test_*.py for every CI push without LLM calls (deterministic); mark integration tests with @pytest.mark.integration and skip them by default (they hit real LLM APIs)

Files:

  • Gradata/tests/test_sync_drain_on_startup.py
🔇 Additional comments (3)
Gradata/src/gradata/_sync_worker.py (1)

36-36: LGTM!

Also applies to: 47-84, 92-100, 107-151, 318-318

Gradata/src/gradata/daemon.py (1)

1040-1068: LGTM!

Also applies to: 1075-1080

Gradata/tests/test_sync_drain_on_startup.py (1)

1-314: LGTM!

Comment on lines +85 to +90
if not db_path.exists():
return 0
if not api_key:
# Cloud sync not configured — nothing to do. Don't block startup.
logger.debug("drain_sync_queue: no api_key, skipping")
return 0
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Emit the startup-drain info log on all early no-op exits.

Line 85, Line 87, and Line 101 return early without the expected discoverability log. That makes startup behavior harder to audit in production.

🔧 Suggested patch
     db_path = brain_dir / "system.db"
     if not db_path.exists():
+        logger.info("sync queue drained at startup: 0 rows")
         return 0
     if not api_key:
         # Cloud sync not configured — nothing to do. Don't block startup.
         logger.debug("drain_sync_queue: no api_key, skipping")
+        logger.info("sync queue drained at startup: 0 rows")
         return 0
@@
     except sqlite3.DatabaseError as exc:
         # No sync_queue table yet (fresh brain) or DB locked — let the
         # background worker handle it on its first tick.
         logger.debug("drain_sync_queue: pre-check failed (%s); skipping", exc)
+        logger.info("sync queue drained at startup: 0 rows")
         return 0

Also applies to: 101-105

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Gradata/src/gradata/_sync_worker.py` around lines 85 - 90, The function
drain_sync_queue has multiple early-return paths that don't log the startup
no-op; add a discoverability debug log before each early return so all no-op
exits emit the same startup-drain message. Specifically, in _sync_worker.py add
a logger.debug call (matching the existing style, e.g. "drain_sync_queue:
startup no-op - reason") immediately before the early returns where db_path is
missing (the not db_path.exists() branch) and the other early-return block
around lines 101-105, referencing the same logger used elsewhere so that
drain_sync_queue, db_path, and api_key code paths all log the startup-drain
info.

Comment on lines +315 to +317
# Total drained across both runners equals 10 (sum across threads; one
# may see 10, the other 0, or any split — but no row is lost).
assert sum(results) >= 10 or _count_pending(db_path) == 0
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix the tautological assertion in the concurrent test.

Line 314 already guarantees pending is zero, so Line 317 always passes and doesn’t validate drain_sync_queue return accounting.

✅ Suggested assertion
-    assert sum(results) >= 10 or _count_pending(db_path) == 0
+    assert 10 <= sum(results) <= 20
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Total drained across both runners equals 10 (sum across threads; one
# may see 10, the other 0, or any split — but no row is lost).
assert sum(results) >= 10 or _count_pending(db_path) == 0
# Total drained across both runners equals 10 (sum across threads; one
# may see 10, the other 0, or any split — but no row is lost).
assert 10 <= sum(results) <= 20
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@Gradata/tests/test_sync_drain_on_startup.py` around lines 315 - 317, The
assertion is tautological because a prior check already ensures pending is zero;
replace the current assertion with a true validation of returned accounting by
removing the fallback clause and asserting that sum(results) >= 10 (i.e., assert
sum(results) >= 10) so the test actually verifies drain_sync_queue's reported
drained count; locate the assertion referencing results and
_count_pending(db_path) in the test_sync_drain_on_startup test and update it
accordingly.

@Gradata Gradata merged commit ecc287b into main May 20, 2026
9 checks passed
@Gradata Gradata deleted the fix/sync-drain-on-startup-gra-1245 branch May 20, 2026 16:11
Gradata added a commit that referenced this pull request May 20, 2026
…-1198) (#214)

The .claude-plugin/ directory itself was already removed in a prior cleanup
(see CHANGELOG: 'Remove orphaned gradata-plugin/ subdir (#54)'). What
remained were stale string references in docs and examples now that the
SDK ships all subcommands directly (PRs #208/#209/#210/#211 + #213).

Changes:
- .dockerignore: removed dead .claude-plugin exclude line
- examples/with_claude_code.py: replaced '/plugin install gradata' language
  with the canonical 'gradata install --agent claude-code'
- examples/README.md: fix broken link to .claude-plugin/README.md
- CHANGELOG.md: BREAKING entry under Unreleased documenting the retirement

This closes out the kill-the-plugin epic (GRA-1198 / GH #206) from the
references side. Anyone who installed via /plugin marketplace before
2026-05-20 must migrate to the SDK install path.

Verified:
- pip install /home/olive/work/gradata-sdk/Gradata in a fresh venv succeeds
- gradata install --agent claude-code --brain /tmp/test-brain --help works
- pytest tests/ -x -q passes (816 tests, 7 skipped, 1 known-skip on
  test_byo_key_provider for missing httpx in dev env unrelated to this)
- ruff check clean on touched files
- grep for 'claude-plugin|gradata-plugin' on src/ + docs/ shows only the
  intentional CHANGELOG entries (current BREAKING note + historical refs)

Branch authored by delegate_task subagent (hit max_iterations on PR-open);
parent agent verified + extracted clean diff + opened PR.

Co-authored-by: data-engineer <data-engineer@gradata.ai>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant