rcnn/backend/apps/coaster/views.py
2026-04-21 16:33:15 +02:00

291 lines
10 KiB
Python

"""
POST /api/v1/coaster/simulate/
Accepts a list of geographic coordinates, runs the physics-based rail
generator, optionally builds a GLB model, and returns the rail point arrays
plus a presigned S3 URL for the model.
"""
import logging
import math
import uuid
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework import status
# numpy and physics deps are imported lazily inside the view so the server
# can start (and serve all other endpoints) even before the container has been
# rebuilt with the new requirements.
logger = logging.getLogger(__name__)
# ── Coordinate helpers ─────────────────────────────────────────────────────────
_R_EARTH = 6_371_000.0 # metres
def _to_enu(coords: list) -> tuple:
"""
Convert [[lon, lat, alt], …] to a local ENU numpy array (metres).
Uses a flat-Earth approximation; accurate to < 0.01 % for areas < 10 km.
Returns (points_m, origin) where origin = (lon0, lat0, alt0).
"""
lon0, lat0, alt0 = coords[0]
cos_lat = math.cos(math.radians(lat0))
pts = []
for lon, lat, alt in coords:
x = (lon - lon0) * math.radians(1) * cos_lat * _R_EARTH # East
y = (lat - lat0) * math.radians(1) * _R_EARTH # North
z = alt - alt0 # Up
pts.append([x, y, z])
import numpy as np # lazy
return np.array(pts), (lon0, lat0, alt0)
def _from_enu(pts_m, origin: tuple) -> list:
"""Convert local ENU numpy array back to [[lon, lat, alt], …]."""
lon0, lat0, alt0 = origin
cos_lat = math.cos(math.radians(lat0))
result = []
for x, y, z in pts_m:
lon = lon0 + x / (cos_lat * _R_EARTH) * math.degrees(1)
lat = lat0 + y / _R_EARTH * math.degrees(1)
alt = alt0 + z
result.append([lon, lat, alt])
return result
# ── S3 upload ──────────────────────────────────────────────────────────────────
def _upload_glb(glb_bytes: bytes) -> str | None:
"""
Upload GLB bytes to S3 and return a presigned GET URL (1 h).
Returns None in development (no S3 configured).
"""
try:
from apps.utils.storage import _is_s3_storage, _get_client
from django.conf import settings
if not _is_s3_storage():
logger.info("S3 not configured — skipping GLB upload")
return None
key = f"coaster/models/{uuid.uuid4()}.glb"
client = _get_client()
client.put_object(
Bucket=settings.AWS_STORAGE_BUCKET_NAME,
Key=key,
Body=glb_bytes,
ContentType="model/gltf-binary",
)
return client.generate_presigned_url(
"get_object",
Params={"Bucket": settings.AWS_STORAGE_BUCKET_NAME, "Key": key},
ExpiresIn=3600,
)
except Exception:
logger.exception("GLB upload failed")
return None
# ── Allowed simulation params ──────────────────────────────────────────────────
_PARAM_SCHEMA = {
"rail_spacing": (float, 0.5, 10.0, 1.5),
"mass": (float, 1.0, 50000, 1000.0),
"initial_velocity": (float, 0.0, 100.0, 1.0),
"friction_coeff": (float, 0.0, 1.0, 0.02),
"junction_window_length": (float, 0.5, 50.0, 5.0),
"junction_threshold": (float, 50.0, 99.0, 88.0),
"binormal_smooth_iterations": (int, 0, 50, 5),
"downsample_factor": (int, 1, 100, 10),
"internal_steps": (int, 500, 20000, 5000),
}
def _parse_strips(raw) -> list:
"""
Validate and sanitize acceleration strip list from request body.
Each strip: { start_frac: float, end_frac: float, accel_ms2: float }
"""
if not isinstance(raw, list):
return []
result = []
for item in raw:
if not isinstance(item, dict):
continue
try:
sf = float(item['start_frac'])
ef = float(item['end_frac'])
ac = float(item['accel_ms2'])
except (KeyError, TypeError, ValueError):
continue
sf = max(0.0, min(1.0, sf))
ef = max(0.0, min(1.0, ef))
ac = max(-50.0, min(50.0, ac))
if sf >= ef:
continue
result.append({'start_frac': sf, 'end_frac': ef, 'accel_ms2': ac})
return result
def _parse_params(raw: dict) -> dict:
out = {}
for name, (typ, lo, hi, default) in _PARAM_SCHEMA.items():
val = raw.get(name, default)
try:
val = typ(val)
except (TypeError, ValueError):
val = default
out[name] = max(lo, min(hi, val))
return out
# ── View ───────────────────────────────────────────────────────────────────────
class CoasterSimulateView(APIView):
"""
POST /api/v1/coaster/simulate/
Request body:
{
"path": [[lon, lat, alt_m], …], // ≥ 2 points
"params": { … } // optional overrides
}
Response 200:
{
"rail_1": [[lon, lat, alt], …],
"rail_2": [[lon, lat, alt], …],
"origin": [lon0, lat0, alt0],
"model_url": "<presigned URL or null>",
"diagnostics": { … }
}
"""
def post(self, request):
path = request.data.get("path")
if not isinstance(path, list) or len(path) < 2:
return Response(
{"error": "invalid_path", "detail": "path must be a list of ≥ 2 [lon, lat, alt] coordinates."},
status=status.HTTP_400_BAD_REQUEST,
)
# Validate each coordinate
try:
path = [[float(v) for v in pt] for pt in path]
for pt in path:
if len(pt) != 3:
raise ValueError
except (TypeError, ValueError):
return Response(
{"error": "invalid_path", "detail": "Each point must be [lon, lat, alt_m] (three numbers)."},
status=status.HTTP_400_BAD_REQUEST,
)
params = _parse_params(request.data.get("params") or {})
strips = _parse_strips(request.data.get("acceleration_strips") or [])
# Convert to local ENU
pts_enu, origin = _to_enu(path)
# Run physics
try:
from .physics import generate_rails
rail_1, rail_2, diag, profile = generate_rails(
pts_enu, **params, acceleration_strips=strips
)
except Exception as exc:
logger.exception("Physics simulation failed")
return Response(
{"error": "simulation_failed", "detail": str(exc)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
# Build GLB (build123d + trimesh) — gracefully skip on failure
model_url = None
try:
from .mesh import build_glb
glb_bytes = build_glb(rail_1, rail_2)
model_url = _upload_glb(glb_bytes)
except Exception:
logger.exception("GLB generation failed — returning coordinates only")
return Response({
"rail_1": _from_enu(rail_1, origin),
"rail_2": _from_enu(rail_2, origin),
"origin": list(origin),
"model_url": model_url,
"diagnostics": diag,
"profile": profile,
})
# ── Coaster persistence ────────────────────────────────────────────────────────
from rest_framework.permissions import IsAuthenticated # noqa: E402
class CoasterListCreateView(APIView):
"""
GET /api/v1/coaster/challenges/{challenge_id}/coasters/ — list all coasters
POST /api/v1/coaster/challenges/{challenge_id}/coasters/ — save (upsert)
"""
permission_classes = [IsAuthenticated]
def get(self, request, challenge_id):
from .models import Coaster
from .serializers import CoasterSerializer
coasters = (
Coaster.objects
.filter(challenge_id=challenge_id)
.select_related('creator')
.order_by('-updated_at')
)
return Response(CoasterSerializer(coasters, many=True).data)
def post(self, request, challenge_id):
from .models import Coaster
from .serializers import CoasterSerializer
from apps.challenges.models import Challenge
try:
challenge = Challenge.objects.get(id=challenge_id)
except Challenge.DoesNotExist:
return Response(
{"error": "challenge_not_found"},
status=status.HTTP_404_NOT_FOUND,
)
coaster, created = Coaster.objects.update_or_create(
creator=request.user,
challenge=challenge,
defaults={
"name": request.data.get("name", ""),
"anchors": request.data.get("anchors", []),
"acceleration_strips": request.data.get("acceleration_strips", []),
},
)
return Response(
CoasterSerializer(coaster).data,
status=status.HTTP_201_CREATED if created else status.HTTP_200_OK,
)
class CoasterDeleteView(APIView):
"""DELETE /api/v1/coaster/{pk}/"""
permission_classes = [IsAuthenticated]
def delete(self, request, pk):
from .models import Coaster
try:
coaster = Coaster.objects.get(id=pk)
except Coaster.DoesNotExist:
return Response(status=status.HTTP_404_NOT_FOUND)
if coaster.creator != request.user:
return Response(status=status.HTTP_403_FORBIDDEN)
coaster.delete()
return Response(status=status.HTTP_204_NO_CONTENT)