Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions server/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from mergin.app import create_app
from mergin.auth.tasks import anonymize_removed_users
from mergin.sync.tasks import (
cleanup_push_idempotency_keys,
remove_projects_archives,
remove_temp_files,
remove_projects_backups,
Expand Down Expand Up @@ -95,3 +96,8 @@ def setup_periodic_tasks(sender, **kwargs):
remove_unused_chunks,
name="clean up of outdated chunks",
)
sender.add_periodic_task(
crontab(hour=3, minute=30),
cleanup_push_idempotency_keys,
name="clean up expired push idempotency keys",
)
4 changes: 4 additions & 0 deletions server/mergin/sync/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,7 @@ class Configuration(object):
UPLOAD_FILES_WHITELIST = config("UPLOAD_FILES_WHITELIST", default="", cast=Csv())
# max batch size for fetch projects in batch endpoint
MAX_BATCH_SIZE = config("MAX_BATCH_SIZE", default=100, cast=int)
# seconds before a push idempotency key expires (default 30 days)
PUSH_IDEMPOTENCY_KEY_EXPIRATION = config(
"PUSH_IDEMPOTENCY_KEY_EXPIRATION", default=30 * 24 * 3600, cast=int
)
31 changes: 31 additions & 0 deletions server/mergin/sync/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2275,6 +2275,37 @@ def __init__(
self.changes = GeoDiff().changes_count(diff_path)


class PushIdempotencyKey(db.Model):
"""Stores idempotency keys for push finish operations.

Clients send an optional Mm-push-id header (UUID) with push finish requests.
If the key is found here, the cached response is returned immediately without
re-processing.
"""

__tablename__ = "push_idempotency_keys"

key = db.Column(UUID(as_uuid=True), primary_key=True)
response = db.Column(JSONB, nullable=False)
created = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)

@classmethod
def store(cls, key: str, status_code: int, body: str, content_type: str) -> None:
record = cls(
key=key,
response={
"status_code": status_code,
"body": body,
"content_type": content_type,
},
)
try:
db.session.add(record)
db.session.commit()
except Exception:
db.session.rollback()


class ProjectUser(db.Model):
"""Association table for project membership"""

Expand Down
7 changes: 7 additions & 0 deletions server/mergin/sync/public_api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,13 @@ paths:
schema:
type: string
example: 970181b5-7143-491b-91a6-36533021c9a2
- name: Mm-push-id
in: header
description: Optional idempotency key (UUID) to prevent duplicate data push on client retries.
required: false
schema:
type: string
format: uuid
responses:
"200":
$ref: "#/components/responses/Success"
Expand Down
67 changes: 67 additions & 0 deletions server/mergin/sync/public_api_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import binascii
import functools
import http
import json
import os
import logging
Expand All @@ -19,8 +20,10 @@
from flask import (
abort,
current_app,
g,
jsonify,
make_response,
Response,
)
from pygeodiff import GeoDiffLibError
from flask_login import current_user
Expand All @@ -45,6 +48,7 @@
ProjectFilePath,
ProjectUser,
ProjectRole,
PushIdempotencyKey,
project_version_created,
push_finished,
)
Expand Down Expand Up @@ -687,6 +691,67 @@ def update_project(namespace, project_name): # noqa: E501 # pylint: disable=W0
return ProjectSchema().dump(project), 200


def idempotent_push(f):
"""Decorator for push finish endpoints to support idempotency via Mm-push-id header.

If the header is present and a cached response exists, it is returned immediately.
After processing, success and permanent sync failures are cached so retrying clients
receive the same response without re-processing.
"""

@functools.wraps(f)
def wrapper(*args, **kwargs):
key = request.headers.get("Mm-push-id")
if key:
record = PushIdempotencyKey.query.get(key)
if record:
r = record.response
cached = Response(
r["body"], status=r["status_code"], content_type=r["content_type"]
)
if r["status_code"] >= 400:
# raise so connexion's error path is used, bypassing response-body validation
raise HTTPException(response=cached)
return cached

try:
result = f(*args, **kwargs)
body, code = result
if key and not getattr(g, "skip_idempotency", False):
if isinstance(body, Response):
serialized = body.get_data(as_text=True)
content_type = body.headers.get("Content-Type", "application/json")
else:
serialized = json.dumps(body)
content_type = "application/json"
PushIdempotencyKey.store(key, code, serialized, content_type)
return result
except HTTPException as e:
if key and getattr(g, "cacheable_sync_error", False):
if e.response:
body = e.response.get_data(as_text=True)
content_type = e.response.headers.get(
"Content-Type", "application/problem+json"
)
# HTTPException(response=r) does not copy the status code onto e.code
status_code = e.response.status_code
else:
status_code = e.code
body = json.dumps(
{
"detail": e.description,
"status": status_code,
"title": http.HTTPStatus(status_code).phrase,
"type": "about:blank",
}
)
content_type = "application/problem+json"
PushIdempotencyKey.store(key, status_code, body, content_type)
raise

return wrapper


def catch_sync_failure(f):
"""Decorator to catch sync failures in push related endpoints"""

Expand Down Expand Up @@ -903,6 +968,7 @@ def chunk_upload(transaction_id, chunk_id):


@auth_required
@idempotent_push
@catch_sync_failure
def push_finish(transaction_id):
"""Finalize project data upload.
Expand Down Expand Up @@ -931,6 +997,7 @@ def push_finish(transaction_id):
file_changes, errors = upload.process_chunks(use_shared_chunk_dir=False)
if errors:
upload.clear()
g.cacheable_sync_error = True

unsupported_files = [
k for k, v in errors.items() if v == FileSyncErrorType.UNSUPPORTED.value
Expand Down
7 changes: 7 additions & 0 deletions server/mergin/sync/public_api_v2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,13 @@ paths:
operationId: create_project_version
parameters:
- $ref: "#/components/parameters/ProjectId"
- name: Mm-push-id
in: header
description: Optional idempotency key (UUID) to prevent duplicate data push on client retries.
required: false
schema:
type: string
format: uuid
requestBody:
description: Project files changes and current version head.
required: true
Expand Down
9 changes: 6 additions & 3 deletions server/mergin/sync/public_api_v2_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import psycopg2
from connexion import NoContent, request
from datetime import datetime, timedelta, timezone
from flask import abort, jsonify, current_app
from flask import abort, g, jsonify, current_app
from flask_login import current_user
from marshmallow import ValidationError
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
Expand Down Expand Up @@ -50,7 +50,7 @@
require_project_by_uuid,
projects_query,
)
from .public_api_controller import catch_sync_failure
from .public_api_controller import catch_sync_failure, idempotent_push
from .schemas import (
ProjectMemberSchema,
UploadChunkSchema,
Expand Down Expand Up @@ -217,6 +217,7 @@ def get_project(id, files_at_version=None):


@auth_required
@idempotent_push
@catch_sync_failure
def create_project_version(id):
"""Create a new project version from pushed data"""
Expand Down Expand Up @@ -285,8 +286,9 @@ def create_project_version(id):
if requested_storage > project.workspace.storage:
return StorageLimitHit(current_usage, project.workspace.storage).response(422)

# we have done all checks but this request is just a dry-run
# dry-run: skip idempotency entirely so a check_only response is never cached
if request.json.get("check_only", False):
g.skip_idempotency = True
return NoContent, 204

try:
Expand All @@ -308,6 +310,7 @@ def create_project_version(id):
# files consistency or geodiff related issues, project push would never succeed, whole upload is aborted
if errors:
upload.clear()
g.cacheable_sync_error = True
return DataSyncError(failed_files=errors).response(422)

if os.path.exists(version_dir):
Expand Down
12 changes: 11 additions & 1 deletion server/mergin/sync/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from zipfile import ZIP_DEFLATED, ZipFile
from flask import current_app

from .models import Project, ProjectVersion, FileHistory
from .models import Project, ProjectVersion, FileHistory, PushIdempotencyKey
from .storages.disk import move_to_tmp
from .config import Configuration
from .utils import get_chunk_location, remove_outdated_files
Expand Down Expand Up @@ -178,6 +178,16 @@ def remove_unused_chunks():
remove_outdated_files(dir, time_delta)


@celery.task
def cleanup_push_idempotency_keys():
"""Remove expired push idempotency keys."""
cutoff = datetime.utcnow() - timedelta(
seconds=Configuration.PUSH_IDEMPOTENCY_KEY_EXPIRATION
)
PushIdempotencyKey.query.filter(PushIdempotencyKey.created < cutoff).delete()
db.session.commit()


@celery.task
def remove_transaction_chunks(chunks: Optional[List[str]] = None):
"""Remove chunks related to a specific sync transaction"""
Expand Down
Loading
Loading