diff --git a/server/application.py b/server/application.py index e1085189..f5f94b09 100644 --- a/server/application.py +++ b/server/application.py @@ -24,6 +24,7 @@ from mergin.app import create_app from mergin.auth.tasks import anonymize_removed_users from mergin.sync.tasks import ( + cleanup_push_idempotency_keys, remove_projects_archives, remove_temp_files, remove_projects_backups, @@ -95,3 +96,8 @@ def setup_periodic_tasks(sender, **kwargs): remove_unused_chunks, name="clean up of outdated chunks", ) + sender.add_periodic_task( + crontab(hour=3, minute=30), + cleanup_push_idempotency_keys, + name="clean up expired push idempotency keys", + ) diff --git a/server/mergin/sync/config.py b/server/mergin/sync/config.py index 8a5081ec..3c3bb21e 100644 --- a/server/mergin/sync/config.py +++ b/server/mergin/sync/config.py @@ -84,3 +84,7 @@ class Configuration(object): UPLOAD_FILES_WHITELIST = config("UPLOAD_FILES_WHITELIST", default="", cast=Csv()) # max batch size for fetch projects in batch endpoint MAX_BATCH_SIZE = config("MAX_BATCH_SIZE", default=100, cast=int) + # seconds before a push idempotency key expires (default 30 days) + PUSH_IDEMPOTENCY_KEY_EXPIRATION = config( + "PUSH_IDEMPOTENCY_KEY_EXPIRATION", default=30 * 24 * 3600, cast=int + ) diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 5f4aa967..09e263c0 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -2275,6 +2275,37 @@ def __init__( self.changes = GeoDiff().changes_count(diff_path) +class PushIdempotencyKey(db.Model): + """Stores idempotency keys for push finish operations. + + Clients send an optional Mm-push-id header (UUID) with push finish requests. + If the key is found here, the cached response is returned immediately without + re-processing. + """ + + __tablename__ = "push_idempotency_keys" + + key = db.Column(UUID(as_uuid=True), primary_key=True) + response = db.Column(JSONB, nullable=False) + created = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + @classmethod + def store(cls, key: str, status_code: int, body: str, content_type: str) -> None: + record = cls( + key=key, + response={ + "status_code": status_code, + "body": body, + "content_type": content_type, + }, + ) + try: + db.session.add(record) + db.session.commit() + except Exception: + db.session.rollback() + + class ProjectUser(db.Model): """Association table for project membership""" diff --git a/server/mergin/sync/public_api.yaml b/server/mergin/sync/public_api.yaml index 157e8262..24813683 100644 --- a/server/mergin/sync/public_api.yaml +++ b/server/mergin/sync/public_api.yaml @@ -709,6 +709,13 @@ paths: schema: type: string example: 970181b5-7143-491b-91a6-36533021c9a2 + - name: Mm-push-id + in: header + description: Optional idempotency key (UUID) to prevent duplicate data push on client retries. + required: false + schema: + type: string + format: uuid responses: "200": $ref: "#/components/responses/Success" diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index 8f142e71..8deae361 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -4,6 +4,7 @@ import binascii import functools +import http import json import os import logging @@ -19,8 +20,10 @@ from flask import ( abort, current_app, + g, jsonify, make_response, + Response, ) from pygeodiff import GeoDiffLibError from flask_login import current_user @@ -45,6 +48,7 @@ ProjectFilePath, ProjectUser, ProjectRole, + PushIdempotencyKey, project_version_created, push_finished, ) @@ -687,6 +691,67 @@ def update_project(namespace, project_name): # noqa: E501 # pylint: disable=W0 return ProjectSchema().dump(project), 200 +def idempotent_push(f): + """Decorator for push finish endpoints to support idempotency via Mm-push-id header. + + If the header is present and a cached response exists, it is returned immediately. + After processing, success and permanent sync failures are cached so retrying clients + receive the same response without re-processing. + """ + + @functools.wraps(f) + def wrapper(*args, **kwargs): + key = request.headers.get("Mm-push-id") + if key: + record = PushIdempotencyKey.query.get(key) + if record: + r = record.response + cached = Response( + r["body"], status=r["status_code"], content_type=r["content_type"] + ) + if r["status_code"] >= 400: + # raise so connexion's error path is used, bypassing response-body validation + raise HTTPException(response=cached) + return cached + + try: + result = f(*args, **kwargs) + body, code = result + if key and not getattr(g, "skip_idempotency", False): + if isinstance(body, Response): + serialized = body.get_data(as_text=True) + content_type = body.headers.get("Content-Type", "application/json") + else: + serialized = json.dumps(body) + content_type = "application/json" + PushIdempotencyKey.store(key, code, serialized, content_type) + return result + except HTTPException as e: + if key and getattr(g, "cacheable_sync_error", False): + if e.response: + body = e.response.get_data(as_text=True) + content_type = e.response.headers.get( + "Content-Type", "application/problem+json" + ) + # HTTPException(response=r) does not copy the status code onto e.code + status_code = e.response.status_code + else: + status_code = e.code + body = json.dumps( + { + "detail": e.description, + "status": status_code, + "title": http.HTTPStatus(status_code).phrase, + "type": "about:blank", + } + ) + content_type = "application/problem+json" + PushIdempotencyKey.store(key, status_code, body, content_type) + raise + + return wrapper + + def catch_sync_failure(f): """Decorator to catch sync failures in push related endpoints""" @@ -903,6 +968,7 @@ def chunk_upload(transaction_id, chunk_id): @auth_required +@idempotent_push @catch_sync_failure def push_finish(transaction_id): """Finalize project data upload. @@ -931,6 +997,7 @@ def push_finish(transaction_id): file_changes, errors = upload.process_chunks(use_shared_chunk_dir=False) if errors: upload.clear() + g.cacheable_sync_error = True unsupported_files = [ k for k, v in errors.items() if v == FileSyncErrorType.UNSUPPORTED.value diff --git a/server/mergin/sync/public_api_v2.yaml b/server/mergin/sync/public_api_v2.yaml index b351654f..63188f18 100644 --- a/server/mergin/sync/public_api_v2.yaml +++ b/server/mergin/sync/public_api_v2.yaml @@ -328,6 +328,13 @@ paths: operationId: create_project_version parameters: - $ref: "#/components/parameters/ProjectId" + - name: Mm-push-id + in: header + description: Optional idempotency key (UUID) to prevent duplicate data push on client retries. + required: false + schema: + type: string + format: uuid requestBody: description: Project files changes and current version head. required: true diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index ebd909ad..ecf3e52d 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -12,7 +12,7 @@ import psycopg2 from connexion import NoContent, request from datetime import datetime, timedelta, timezone -from flask import abort, jsonify, current_app +from flask import abort, g, jsonify, current_app from flask_login import current_user from marshmallow import ValidationError from sqlalchemy.exc import IntegrityError, SQLAlchemyError @@ -50,7 +50,7 @@ require_project_by_uuid, projects_query, ) -from .public_api_controller import catch_sync_failure +from .public_api_controller import catch_sync_failure, idempotent_push from .schemas import ( ProjectMemberSchema, UploadChunkSchema, @@ -217,6 +217,7 @@ def get_project(id, files_at_version=None): @auth_required +@idempotent_push @catch_sync_failure def create_project_version(id): """Create a new project version from pushed data""" @@ -285,8 +286,9 @@ def create_project_version(id): if requested_storage > project.workspace.storage: return StorageLimitHit(current_usage, project.workspace.storage).response(422) - # we have done all checks but this request is just a dry-run + # dry-run: skip idempotency entirely so a check_only response is never cached if request.json.get("check_only", False): + g.skip_idempotency = True return NoContent, 204 try: @@ -308,6 +310,7 @@ def create_project_version(id): # files consistency or geodiff related issues, project push would never succeed, whole upload is aborted if errors: upload.clear() + g.cacheable_sync_error = True return DataSyncError(failed_files=errors).response(422) if os.path.exists(version_dir): diff --git a/server/mergin/sync/tasks.py b/server/mergin/sync/tasks.py index 480222e6..a3284e2d 100644 --- a/server/mergin/sync/tasks.py +++ b/server/mergin/sync/tasks.py @@ -11,7 +11,7 @@ from zipfile import ZIP_DEFLATED, ZipFile from flask import current_app -from .models import Project, ProjectVersion, FileHistory +from .models import Project, ProjectVersion, FileHistory, PushIdempotencyKey from .storages.disk import move_to_tmp from .config import Configuration from .utils import get_chunk_location, remove_outdated_files @@ -178,6 +178,16 @@ def remove_unused_chunks(): remove_outdated_files(dir, time_delta) +@celery.task +def cleanup_push_idempotency_keys(): + """Remove expired push idempotency keys.""" + cutoff = datetime.utcnow() - timedelta( + seconds=Configuration.PUSH_IDEMPOTENCY_KEY_EXPIRATION + ) + PushIdempotencyKey.query.filter(PushIdempotencyKey.created < cutoff).delete() + db.session.commit() + + @celery.task def remove_transaction_chunks(chunks: Optional[List[str]] = None): """Remove chunks related to a specific sync transaction""" diff --git a/server/mergin/tests/test_push_idempotency.py b/server/mergin/tests/test_push_idempotency.py new file mode 100644 index 00000000..f8f528eb --- /dev/null +++ b/server/mergin/tests/test_push_idempotency.py @@ -0,0 +1,260 @@ +# Copyright (C) Lutra Consulting Limited +# +# SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial + +import uuid +from datetime import datetime, timedelta +from unittest.mock import patch + +from ..app import db +from ..sync.config import Configuration +from ..sync.models import ( + FileSyncErrorType, + Project, + PushIdempotencyKey, + Upload, +) +from ..sync.tasks import cleanup_push_idempotency_keys +from . import json_headers, test_project, test_project_dir, test_workspace_id +from .test_project_controller import ( + _get_changes, + _get_changes_without_added, + create_transaction, + upload_chunks, +) +from .utils import file_info + + +def _idempotency_headers(key=None): + """Return json_headers extended with an Mm-push-id key.""" + return {**json_headers, "Mm-push-id": key or str(uuid.uuid4())} + + +def test_v1_idempotency_success(client): + """Second call with same key returns cached 200 without creating a new version.""" + changes = _get_changes(test_project_dir) + upload, upload_dir = create_transaction("mergin", changes) + upload_chunks(upload_dir, upload.changes) + key = str(uuid.uuid4()) + + resp1 = client.post( + f"/v1/project/push/finish/{upload.transaction_id}", + headers=_idempotency_headers(key), + ) + assert resp1.status_code == 200 + assert PushIdempotencyKey.query.get(key) is not None + + project = Project.query.filter_by( + name=test_project, workspace_id=test_workspace_id + ).first() + version_after_first = project.latest_version + + # second call with to prove it is not re-executed — the cached response is returned based on the key alone + resp2 = client.post( + f"/v1/project/push/finish/{upload.transaction_id}", + headers=_idempotency_headers(key), + ) + assert resp2.status_code == 200 + assert resp2.json == resp1.json + assert project.latest_version == version_after_first + + +def test_v1_idempotency_sync_error_cached(client): + """Sync error (corrupted files) is cached; second call returns same error.""" + changes = _get_changes(test_project_dir) + upload, _ = create_transaction("mergin", changes) + # intentionally do NOT upload chunks — process_chunks will report corrupted files + key = str(uuid.uuid4()) + + resp1 = client.post( + f"/v1/project/push/finish/{upload.transaction_id}", + headers=_idempotency_headers(key), + ) + assert resp1.status_code == 422 + assert "corrupted_files" in resp1.json["detail"] + + record = PushIdempotencyKey.query.get(key) + assert record is not None + assert record.response["status_code"] == 422 + + # second call — returns cached error + resp2 = client.post( + f"/v1/project/push/finish/{upload.transaction_id}", + headers=_idempotency_headers(key), + ) + assert resp2.status_code == 422 + assert resp2.json == resp1.json + + +def test_v1_non_sync_errors_not_cached(client): + """404 and 403 errors are not cached.""" + key = str(uuid.uuid4()) + + resp = client.post( + f"/v1/project/push/finish/{str(uuid.uuid4())}", + headers=_idempotency_headers(key), + ) + assert resp.status_code == 404 + assert PushIdempotencyKey.query.get(key) is None + + +def test_v1_no_idempotency_key(client): + """Requests without Mm-push-id header are unaffected — no record stored.""" + changes = _get_changes(test_project_dir) + upload, upload_dir = create_transaction("mergin", changes) + upload_chunks(upload_dir, upload.changes) + + resp = client.post( + f"/v1/project/push/finish/{upload.transaction_id}", headers=json_headers + ) + assert resp.status_code == 200 + assert PushIdempotencyKey.query.count() == 0 + + +def test_v2_idempotency_success(client): + """Second call with same key returns cached 201 without creating a new version.""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + # use only a remove — no chunks needed, avoids "file already exists" validation error + data = { + "version": "v1", + "changes": { + "added": [], + "removed": [file_info(test_project_dir, "base.gpkg")], + "updated": [], + }, + } + key = str(uuid.uuid4()) + + resp1 = client.post( + f"v2/projects/{project.id}/versions", + json=data, + headers=_idempotency_headers(key), + ) + assert resp1.status_code == 201 + assert PushIdempotencyKey.query.get(key) is not None + version_after_first = project.latest_version + + # second call — cached response, no new version created + resp2 = client.post( + f"v2/projects/{project.id}/versions", + json=data, + headers=_idempotency_headers(key), + ) + assert resp2.status_code == 201 + assert resp2.json == resp1.json + assert project.latest_version == version_after_first + + +def test_v2_idempotency_datasync_error_cached(client): + """DataSyncError response is cached; second call returns the same error.""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + # no added files — avoid "file already exists" validation error + changes = _get_changes_without_added(test_project_dir) + data = {"version": "v1", "changes": changes} + key = str(uuid.uuid4()) + + sync_errors = { + f["path"]: f"{FileSyncErrorType.SYNC_ERROR.value}: err" + for f in changes["updated"] + } + with patch.object(Upload, "process_chunks", return_value=({}, sync_errors)): + resp1 = client.post( + f"v2/projects/{project.id}/versions", + json=data, + headers=_idempotency_headers(key), + ) + assert resp1.status_code == 422 + assert resp1.json["code"] == "DataSyncError" + assert resp1.headers["Content-Type"] == "application/problem+json" + + record = PushIdempotencyKey.query.get(key) + assert record is not None + assert record.response["content_type"] == "application/problem+json" + + resp2 = client.post( + f"v2/projects/{project.id}/versions", + json=data, + headers=_idempotency_headers(key), + ) + assert resp2.status_code == 422 + assert resp2.json == resp1.json + assert resp2.headers["Content-Type"] == "application/problem+json" + + +def test_v2_non_sync_errors_not_cached(client): + """StorageLimitHit, ProjectLocked and similar errors are not cached.""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + data = {"version": "v1", "changes": {"added": [], "updated": [], "removed": []}} + key = str(uuid.uuid4()) + + project.locked_until = datetime.utcnow() + timedelta(days=1) + db.session.commit() + + resp = client.post( + f"v2/projects/{project.id}/versions", + json=data, + headers=_idempotency_headers(key), + ) + assert resp.status_code == 423 + assert PushIdempotencyKey.query.get(key) is None + + project.locked_until = None + db.session.commit() + + +def test_v2_check_only_not_cached(client): + """check_only dry-run responses are never cached.""" + project = Project.query.filter_by( + workspace_id=test_workspace_id, name=test_project + ).first() + # use a valid remove-only change so all validation passes and check_only is reached + data = { + "version": "v1", + "changes": { + "added": [], + "removed": [file_info(test_project_dir, "base.gpkg")], + "updated": [], + }, + "check_only": True, + } + key = str(uuid.uuid4()) + + resp = client.post( + f"v2/projects/{project.id}/versions", + json=data, + headers=_idempotency_headers(key), + ) + assert resp.status_code == 204 + assert PushIdempotencyKey.query.get(key) is None + + +def test_cleanup_push_idempotency_keys(client): + """Cleanup task removes keys older than PUSH_IDEMPOTENCY_KEY_EXPIRATION.""" + fresh_key = str(uuid.uuid4()) + expired_key = str(uuid.uuid4()) + + PushIdempotencyKey.store(fresh_key, 201, "{}", "application/json") + + # manually insert an already-expired record + expired = PushIdempotencyKey( + key=expired_key, + response={"status_code": 201, "body": "{}", "content_type": "application/json"}, + created=datetime.utcnow() + - timedelta(seconds=Configuration.PUSH_IDEMPOTENCY_KEY_EXPIRATION + 1), + ) + db.session.add(expired) + db.session.commit() + + assert PushIdempotencyKey.query.count() == 2 + + cleanup_push_idempotency_keys() + + assert PushIdempotencyKey.query.count() == 1 + assert PushIdempotencyKey.query.get(fresh_key) is not None + assert PushIdempotencyKey.query.get(expired_key) is None diff --git a/server/migrations/community/a1b2c3d4e5f6_add_push_idempotency_keys_table.py b/server/migrations/community/a1b2c3d4e5f6_add_push_idempotency_keys_table.py new file mode 100644 index 00000000..18a5ad8d --- /dev/null +++ b/server/migrations/community/a1b2c3d4e5f6_add_push_idempotency_keys_table.py @@ -0,0 +1,40 @@ +"""Add push_idempotency_keys table + +Revision ID: a1b2c3d4e5f6 +Revises: f1d9e4a7b823 +Create Date: 2026-05-19 00:00:00.000000 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import JSONB, UUID + + +# revision identifiers, used by Alembic. +revision = "a1b2c3d4e5f6" +down_revision = "f1d9e4a7b823" +branch_labels = None +depends_on = None + + +def upgrade(): + op.create_table( + "push_idempotency_keys", + sa.Column("key", UUID(), nullable=False), + sa.Column("response", JSONB(), nullable=False), + sa.Column("created", sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint("key"), + ) + op.create_index( + op.f("ix_push_idempotency_keys_created"), + "push_idempotency_keys", + ["created"], + ) + + +def downgrade(): + op.drop_index( + op.f("ix_push_idempotency_keys_created"), table_name="push_idempotency_keys" + ) + op.drop_table("push_idempotency_keys")