From 68404b6f7748113091b186adb677044eccbe3e14 Mon Sep 17 00:00:00 2001 From: Martin Varga Date: Tue, 19 May 2026 14:00:24 +0200 Subject: [PATCH] Add support for push idempotency keys Cache responses for push finish actions so we can return them to client as they were in case of repeated calls (e.g. on network issues). --- server/application.py | 6 + server/mergin/sync/config.py | 4 + server/mergin/sync/models.py | 31 +++ server/mergin/sync/public_api.yaml | 7 + server/mergin/sync/public_api_controller.py | 67 +++++ server/mergin/sync/public_api_v2.yaml | 7 + .../mergin/sync/public_api_v2_controller.py | 9 +- server/mergin/sync/tasks.py | 12 +- server/mergin/tests/test_push_idempotency.py | 260 ++++++++++++++++++ ...3d4e5f6_add_push_idempotency_keys_table.py | 40 +++ 10 files changed, 439 insertions(+), 4 deletions(-) create mode 100644 server/mergin/tests/test_push_idempotency.py create mode 100644 server/migrations/community/a1b2c3d4e5f6_add_push_idempotency_keys_table.py diff --git a/server/application.py b/server/application.py index e1085189..f5f94b09 100644 --- a/server/application.py +++ b/server/application.py @@ -24,6 +24,7 @@ from mergin.app import create_app from mergin.auth.tasks import anonymize_removed_users from mergin.sync.tasks import ( + cleanup_push_idempotency_keys, remove_projects_archives, remove_temp_files, remove_projects_backups, @@ -95,3 +96,8 @@ def setup_periodic_tasks(sender, **kwargs): remove_unused_chunks, name="clean up of outdated chunks", ) + sender.add_periodic_task( + crontab(hour=3, minute=30), + cleanup_push_idempotency_keys, + name="clean up expired push idempotency keys", + ) diff --git a/server/mergin/sync/config.py b/server/mergin/sync/config.py index 8a5081ec..3c3bb21e 100644 --- a/server/mergin/sync/config.py +++ b/server/mergin/sync/config.py @@ -84,3 +84,7 @@ class Configuration(object): UPLOAD_FILES_WHITELIST = config("UPLOAD_FILES_WHITELIST", default="", cast=Csv()) # max batch size for fetch projects in batch endpoint MAX_BATCH_SIZE = config("MAX_BATCH_SIZE", default=100, cast=int) + # seconds before a push idempotency key expires (default 30 days) + PUSH_IDEMPOTENCY_KEY_EXPIRATION = config( + "PUSH_IDEMPOTENCY_KEY_EXPIRATION", default=30 * 24 * 3600, cast=int + ) diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 5f4aa967..09e263c0 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -2275,6 +2275,37 @@ def __init__( self.changes = GeoDiff().changes_count(diff_path) +class PushIdempotencyKey(db.Model): + """Stores idempotency keys for push finish operations. + + Clients send an optional Mm-push-id header (UUID) with push finish requests. + If the key is found here, the cached response is returned immediately without + re-processing. + """ + + __tablename__ = "push_idempotency_keys" + + key = db.Column(UUID(as_uuid=True), primary_key=True) + response = db.Column(JSONB, nullable=False) + created = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + @classmethod + def store(cls, key: str, status_code: int, body: str, content_type: str) -> None: + record = cls( + key=key, + response={ + "status_code": status_code, + "body": body, + "content_type": content_type, + }, + ) + try: + db.session.add(record) + db.session.commit() + except Exception: + db.session.rollback() + + class ProjectUser(db.Model): """Association table for project membership""" diff --git a/server/mergin/sync/public_api.yaml b/server/mergin/sync/public_api.yaml index 157e8262..24813683 100644 --- a/server/mergin/sync/public_api.yaml +++ b/server/mergin/sync/public_api.yaml @@ -709,6 +709,13 @@ paths: schema: type: string example: 970181b5-7143-491b-91a6-36533021c9a2 + - name: Mm-push-id + in: header + description: Optional idempotency key (UUID) to prevent duplicate data push on client retries. + required: false + schema: + type: string + format: uuid responses: "200": $ref: "#/components/responses/Success" diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 8f142e71..8deae361 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -4,6 +4,7 @@ import binascii import functools +import http import json import os import logging @@ -19,8 +20,10 @@ from flask import ( abort, current_app, + g, jsonify, make_response, + Response, ) from pygeodiff import GeoDiffLibError from flask_login import current_user @@ -45,6 +48,7 @@ ProjectFilePath, ProjectUser, ProjectRole, + PushIdempotencyKey, project_version_created, push_finished, ) @@ -687,6 +691,67 @@ def update_project(namespace, project_name): # noqa: E501 # pylint: disable=W0 return ProjectSchema().dump(project), 200 +def idempotent_push(f): + """Decorator for push finish endpoints to support idempotency via Mm-push-id header. + + If the header is present and a cached response exists, it is returned immediately. + After processing, success and permanent sync failures are cached so retrying clients + receive the same response without re-processing. + """ + + @functools.wraps(f) + def wrapper(*args, **kwargs): + key = request.headers.get("Mm-push-id") + if key: + record = PushIdempotencyKey.query.get(key) + if record: + r = record.response + cached = Response( + r["body"], status=r["status_code"], content_type=r["content_type"] + ) + if r["status_code"] >= 400: + # raise so connexion's error path is used, bypassing response-body validation + raise HTTPException(response=cached) + return cached + + try: + result = f(*args, **kwargs) + body, code = result + if key and not getattr(g, "skip_idempotency", False): + if isinstance(body, Response): + serialized = body.get_data(as_text=True) + content_type = body.headers.get("Content-Type", "application/json") + else: + serialized = json.dumps(body) + content_type = "application/json" + PushIdempotencyKey.store(key, code, serialized, content_type) + return result + except HTTPException as e: + if key and getattr(g, "cacheable_sync_error", False): + if e.response: + body = e.response.get_data(as_text=True) + content_type = e.response.headers.get( + "Content-Type", "application/problem+json" + ) + # HTTPException(response=r) does not copy the status code onto e.code + status_code = e.response.status_code + else: + status_code = e.code + body = json.dumps( + { + "detail": e.description, + "status": status_code, + "title": http.HTTPStatus(status_code).phrase, + "type": "about:blank", + } + ) + content_type = "application/problem+json" + PushIdempotencyKey.store(key, status_code, body, content_type) + raise + + return wrapper + + def catch_sync_failure(f): """Decorator to catch sync failures in push related endpoints""" @@ -903,6 +968,7 @@ def chunk_upload(transaction_id, chunk_id): @auth_required +@idempotent_push @catch_sync_failure def push_finish(transaction_id): """Finalize project data upload. @@ -931,6 +997,7 @@ def push_finish(transaction_id): file_changes, errors = upload.process_chunks(use_shared_chunk_dir=False) if errors: upload.clear() + g.cacheable_sync_error = True unsupported_files = [ k for k, v in errors.items() if v == FileSyncErrorType.UNSUPPORTED.value diff --git a/server/mergin/sync/public_api_v2.yaml b/server/mergin/sync/public_api_v2.yaml index b351654f..63188f18 100644 --- a/server/mergin/sync/public_api_v2.yaml +++ b/server/mergin/sync/public_api_v2.yaml @@ -328,6 +328,13 @@ paths: operationId: create_project_version parameters: - $ref: "#/components/parameters/ProjectId" + - name: Mm-push-id + in: header + description: Optional idempotency key (UUID) to prevent duplicate data push on client retries. + required: false + schema: + type: string + format: uuid requestBody: description: Project files changes and current version head. required: true diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index ebd909ad..ecf3e52d 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -12,7 +12,7 @@ import psycopg2 from connexion import NoContent, request from datetime import datetime, timedelta, timezone -from flask import abort, jsonify, current_app +from flask import abort, g, jsonify, current_app from flask_login import current_user from marshmallow import ValidationError from sqlalchemy.exc import IntegrityError, SQLAlchemyError @@ -50,7 +50,7 @@ require_project_by_uuid, projects_query, ) -from .public_api_controller import catch_sync_failure +from .public_api_controller import catch_sync_failure, idempotent_push from .schemas import ( ProjectMemberSchema, UploadChunkSchema, @@ -217,6 +217,7 @@ def get_project(id, files_at_version=None): @auth_required +@idempotent_push @catch_sync_failure def create_project_version(id): """Create a new project version from pushed data""" @@ -285,8 +286,9 @@ def create_project_version(id): if requested_storage > project.workspace.storage: return StorageLimitHit(current_usage, project.workspace.storage).response(422) - # we have done all checks but this request is just a dry-run + # dry-run: skip idempotency entirely so a check_only response is never cached if request.json.get("check_only", False): + g.skip_idempotency = True return NoContent, 204 try: @@ -308,6 +310,7 @@ def create_project_version(id): # files consistency or geodiff related issues, project push would never succeed, whole upload is aborted if errors: upload.clear() + g.cacheable_sync_error = True return DataSyncError(failed_files=errors).response(422) if os.path.exists(version_dir): diff --git a/server/mergin/sync/tasks.py b/server/mergin/sync/tasks.py index 480222e6..a3284e2d 100644 --- a/server/mergin/sync/tasks.py +++ b/server/mergin/sync/tasks.py @@ -11,7 +11,7 @@ from zipfile import ZIP_DEFLATED, ZipFile from flask import current_app -from .models import Project, ProjectVersion, FileHistory +from .models import Project, ProjectVersion, FileHistory, PushIdempotencyKey from .storages.disk import move_to_tmp from .config import Configuration from .utils import get_chunk_location, remove_outdated_files @@ -178,6 +178,16 @@ def remove_unused_chunks(): remove_outdated_files(dir, time_delta) +@celery.task +def cleanup_push_idempotency_keys(): + """Remove expired push idempotency keys.""" + cutoff = datetime.utcnow() - timedelta( + seconds=Configuration.PUSH_IDEMPOTENCY_KEY_EXPIRATION + ) + PushIdempotencyKey.query.filter(PushIdempotencyKey.created < cutoff).delete() + db.session.commit() + + @celery.task def remove_transaction_chunks(chunks: Optional[List[str]] = None): """Remove chunks related to a specific sync transaction""" diff --git a/server/mergin/tests/test_push_idempotency.py b/server/mergin/tests/test_push_idempotency.py new file mode 100644 index 00000000..f8f528eb --- /dev/null +++ b/server/mergin/tests/test_push_idempotency.py @@ -0,0 +1,260 @@ +# Copyright (C) Lutra Consulting Limited +# +# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial + +import uuid +from datetime import datetime, timedelta +from unittest.mock import patch + +from ..app import db +from ..sync.config import Configuration +from ..sync.models import ( + FileSyncErrorType, + Project, + PushIdempotencyKey, + Upload, +) +from ..sync.tasks import cleanup_push_idempotency_keys +from . import json_headers, test_project, test_project_dir, test_workspace_id +from .test_project_controller import ( + _get_changes, + _get_changes_without_added, + create_transaction, + upload_chunks, +) +from .utils import file_info + + +def _idempotency_headers(key=None): + """Return json_headers extended with an Mm-push-id key.""" + return {**json_headers, "Mm-push-id": key or str(uuid.uuid4())} + + +def test_v1_idempotency_success(client): + """Second call with same key returns cached 200 without creating a new version.""" + changes = _get_changes(test_project_dir) + upload, upload_dir = create_transaction("mergin", changes) + upload_chunks(upload_dir, upload.changes) + key = str(uuid.uuid4()) + + resp1 = client.post( + f"/v1/project/push/finish/{upload.transaction_id}", + headers=_idempotency_headers(key), + ) + assert resp1.status_code == 200 + assert PushIdempotencyKey.query.get(key) is not None + + project = Project.query.filter_by( + name=test_project, workspace_id=test_workspace_id + ).first() + version_after_first = project.latest_version + + # second call with to prove it is not re-executed — the cached response is returned based on the key alone + resp2 = client.post( + f"/v1/project/push/finish/{upload.transaction_id}", + headers=_idempotency_headers(key), + ) + assert resp2.status_code == 200 + assert resp2.json == resp1.json + assert project.latest_version == version_after_first + + +def test_v1_idempotency_sync_error_cached(client): + """Sync error (corrupted files) is cached; second call returns same error.""" + changes = _get_changes(test_project_dir) + upload, _ = create_transaction("mergin", changes) + # intentionally do NOT upload chunks — process_chunks will report corrupted files + key = str(uuid.uuid4()) + + resp1 = client.post( + f"/v1/project/push/finish/{upload.transaction_id}", + headers=_idempotency_headers(key), + ) + assert resp1.status_code == 422 + assert "corrupted_files" in resp1.json["detail"] + + record = PushIdempotencyKey.query.get(key) + assert record is not None + assert record.response["status_code"] == 422 + + # second call — returns cached error + resp2 = client.post( + f"/v1/project/push/finish/{upload.transaction_id}", + headers=_idempotency_headers(key), + ) + assert resp2.status_code == 422 + assert resp2.json == resp1.json + + +def test_v1_non_sync_errors_not_cached(client): + """404 and 403 errors are not cached.""" + key = str(uuid.uuid4()) + + resp = client.post( + f"/v1/project/push/finish/{str(uuid.uuid4())}", + headers=_idempotency_headers(key), + ) + assert resp.status_code == 404 + assert PushIdempotencyKey.query.get(key) is None + + +def test_v1_no_idempotency_key(client): + """Requests without Mm-push-id header are unaffected — no record stored.""" + changes = _get_changes(test_project_dir) + upload, upload_dir = create_transaction("mergin", changes) + upload_chunks(upload_dir, upload.changes) + + resp = client.post( + f"/v1/project/push/finish/{upload.transaction_id}", headers=json_headers + ) + assert resp.status_code == 200 + assert PushIdempotencyKey.query.count() == 0 + + +def test_v2_idempotency_success(client): + """Second call with same key returns cached 201 without creating a new version.""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + # use only a remove — no chunks needed, avoids "file already exists" validation error + data = { + "version": "v1", + "changes": { + "added": [], + "removed": [file_info(test_project_dir, "base.gpkg")], + "updated": [], + }, + } + key = str(uuid.uuid4()) + + resp1 = client.post( + f"v2/projects/{project.id}/versions", + json=data, + headers=_idempotency_headers(key), + ) + assert resp1.status_code == 201 + assert PushIdempotencyKey.query.get(key) is not None + version_after_first = project.latest_version + + # second call — cached response, no new version created + resp2 = client.post( + f"v2/projects/{project.id}/versions", + json=data, + headers=_idempotency_headers(key), + ) + assert resp2.status_code == 201 + assert resp2.json == resp1.json + assert project.latest_version == version_after_first + + +def test_v2_idempotency_datasync_error_cached(client): + """DataSyncError response is cached; second call returns the same error.""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + # no added files — avoid "file already exists" validation error + changes = _get_changes_without_added(test_project_dir) + data = {"version": "v1", "changes": changes} + key = str(uuid.uuid4()) + + sync_errors = { + f["path"]: f"{FileSyncErrorType.SYNC_ERROR.value}: err" + for f in changes["updated"] + } + with patch.object(Upload, "process_chunks", return_value=({}, sync_errors)): + resp1 = client.post( + f"v2/projects/{project.id}/versions", + json=data, + headers=_idempotency_headers(key), + ) + assert resp1.status_code == 422 + assert resp1.json["code"] == "DataSyncError" + assert resp1.headers["Content-Type"] == "application/problem+json" + + record = PushIdempotencyKey.query.get(key) + assert record is not None + assert record.response["content_type"] == "application/problem+json" + + resp2 = client.post( + f"v2/projects/{project.id}/versions", + json=data, + headers=_idempotency_headers(key), + ) + assert resp2.status_code == 422 + assert resp2.json == resp1.json + assert resp2.headers["Content-Type"] == "application/problem+json" + + +def test_v2_non_sync_errors_not_cached(client): + """StorageLimitHit, ProjectLocked and similar errors are not cached.""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + data = {"version": "v1", "changes": {"added": [], "updated": [], "removed": []}} + key = str(uuid.uuid4()) + + project.locked_until = datetime.utcnow() + timedelta(days=1) + db.session.commit() + + resp = client.post( + f"v2/projects/{project.id}/versions", + json=data, + headers=_idempotency_headers(key), + ) + assert resp.status_code == 423 + assert PushIdempotencyKey.query.get(key) is None + + project.locked_until = None + db.session.commit() + + +def test_v2_check_only_not_cached(client): + """check_only dry-run responses are never cached.""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + # use a valid remove-only change so all validation passes and check_only is reached + data = { + "version": "v1", + "changes": { + "added": [], + "removed": [file_info(test_project_dir, "base.gpkg")], + "updated": [], + }, + "check_only": True, + } + key = str(uuid.uuid4()) + + resp = client.post( + f"v2/projects/{project.id}/versions", + json=data, + headers=_idempotency_headers(key), + ) + assert resp.status_code == 204 + assert PushIdempotencyKey.query.get(key) is None + + +def test_cleanup_push_idempotency_keys(client): + """Cleanup task removes keys older than PUSH_IDEMPOTENCY_KEY_EXPIRATION.""" + fresh_key = str(uuid.uuid4()) + expired_key = str(uuid.uuid4()) + + PushIdempotencyKey.store(fresh_key, 201, "{}", "application/json") + + # manually insert an already-expired record + expired = PushIdempotencyKey( + key=expired_key, + response={"status_code": 201, "body": "{}", "content_type": "application/json"}, + created=datetime.utcnow() + - timedelta(seconds=Configuration.PUSH_IDEMPOTENCY_KEY_EXPIRATION + 1), + ) + db.session.add(expired) + db.session.commit() + + assert PushIdempotencyKey.query.count() == 2 + + cleanup_push_idempotency_keys() + + assert PushIdempotencyKey.query.count() == 1 + assert PushIdempotencyKey.query.get(fresh_key) is not None + assert PushIdempotencyKey.query.get(expired_key) is None diff --git a/server/migrations/community/a1b2c3d4e5f6_add_push_idempotency_keys_table.py b/server/migrations/community/a1b2c3d4e5f6_add_push_idempotency_keys_table.py new file mode 100644 index 00000000..18a5ad8d --- /dev/null +++ b/server/migrations/community/a1b2c3d4e5f6_add_push_idempotency_keys_table.py @@ -0,0 +1,40 @@ +"""Add push_idempotency_keys table + +Revision ID: a1b2c3d4e5f6 +Revises: f1d9e4a7b823 +Create Date: 2026-05-19 00:00:00.000000 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB, UUID + + +# revision identifiers, used by Alembic. +revision = "a1b2c3d4e5f6" +down_revision = "f1d9e4a7b823" +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + "push_idempotency_keys", + sa.Column("key", UUID(), nullable=False), + sa.Column("response", JSONB(), nullable=False), + sa.Column("created", sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint("key"), + ) + op.create_index( + op.f("ix_push_idempotency_keys_created"), + "push_idempotency_keys", + ["created"], + ) + + +def downgrade(): + op.drop_index( + op.f("ix_push_idempotency_keys_created"), table_name="push_idempotency_keys" + ) + op.drop_table("push_idempotency_keys")