#!/usr/bin/env python3 """R.I.V.E.N CYBER AI LLC - Deepfake Detector API (All-in-One)""" import os, uuid, tempfile, time, json from datetime import datetime, timezone from pathlib import Path from fastapi import FastAPI, File, UploadFile, HTTPException from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, JSONResponse import cv2, numpy as np COMPANY = "R.I.V.E.N CYBER AI LLC" app = FastAPI(title=f"{COMPANY} Deepfake Detector") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) STATIC = Path(__file__).parent / "static" STATIC.mkdir(exist_ok=True) TMP = Path(tempfile.gettempdir()) / "riven" TMP.mkdir(exist_ok=True) def analyze_video(path): start = time.time() findings = [] cap = cv2.VideoCapture(path) if not cap.isOpened(): return {"error": "Cannot open video", "deepfake_score": 0, "findings": []} fps = cap.get(cv2.CAP_PROP_FPS) frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) duration = frames / fps if fps > 0 else 0 codec_raw = int(cap.get(cv2.CAP_PROP_FOURCC)) codec = "".join([chr((codec_raw >> 8*i) & 0xFF) for i in range(4)]) if codec_raw else "unknown" if fps and (fps < 20 or fps > 120): findings.append({"check":"METADATA_FPS","severity":"MEDIUM","detail":f"Unusual frame rate: {fps:.1f} fps"}) if w < 320 or h < 240: findings.append({"check":"METADATA_RES","severity":"LOW","detail":f"Low resolution: {w}x{h}"}) face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') sample_count = min(20, frames) step = max(1, frames // sample_count) face_sizes, blur_scores, edge_scores = [], [], [] noise_levels, brightness_vals, frame_diffs = [], [], [] prev_gray = None for i in range(0, frames, step): cap.set(cv2.CAP_PROP_POS_FRAMES, i) ret, frame = cap.read() if not ret: break gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = face_cascade.detectMultiScale(gray, 1.3, 5) if len(faces) > 0: for (x, y, fw, fh) in faces: face_sizes.append(fw * fh) face_roi = gray[y:y+fh, x:x+fw] blur_scores.append(cv2.Laplacian(face_roi, cv2.CV_64F).var()) edges = cv2.Canny(gray, 100, 200) edge_scores.append(np.mean(edges)) noise_levels.append(np.std(gray.astype(float))) brightness_vals.append(np.mean(gray)) if prev_gray is not None: frame_diffs.append(np.mean(cv2.absdiff(gray, prev_gray))) prev_gray = gray.copy() cap.release() if len(face_sizes) > 3: sv = np.std(face_sizes) / (np.mean(face_sizes) + 1e-6) if sv > 0.4: findings.append({"check":"FACE_SIZE_VARIANCE","severity":"HIGH","detail":f"Face size varies abnormally ({sv:.2f}). Possible face swap."}) if len(blur_scores) > 3: bv = np.std(blur_scores) / (np.mean(blur_scores) + 1e-6) if bv > 0.5: findings.append({"check":"FACE_BLUR_INCONSISTENCY","severity":"HIGH","detail":f"Face sharpness varies significantly ({bv:.2f}). Manipulation indicator."}) if np.mean(blur_scores) < 50: findings.append({"check":"FACE_BLUR_LOW","severity":"MEDIUM","detail":f"Faces unusually blurry (avg {np.mean(blur_scores):.0f}). May indicate generation artifacts."}) elif frames > 30 and duration > 1: findings.append({"check":"NO_FACES","severity":"LOW","detail":"No consistent faces detected in video."}) if len(edge_scores) > 3: ev = np.std(edge_scores) / (np.mean(edge_scores) + 1e-6) if ev > 0.35: findings.append({"check":"EDGE_INCONSISTENCY","severity":"MEDIUM","detail":f"Edge patterns vary abnormally ({ev:.2f}). Possible compositing."}) if len(noise_levels) > 3: nv = np.std(noise_levels) / (np.mean(noise_levels) + 1e-6) if nv > 0.15: findings.append({"check":"NOISE_INCONSISTENCY","severity":"HIGH","detail":f"Noise levels inconsistent ({nv:.2f}). Regions may be spliced."}) if np.mean(noise_levels) < 10: findings.append({"check":"NOISE_TOO_CLEAN","severity":"MEDIUM","detail":"Video unusually clean. AI-generated content often lacks natural noise."}) if len(frame_diffs) > 3: dv = np.std(frame_diffs) / (np.mean(frame_diffs) + 1e-6) if dv > 0.6: findings.append({"check":"TEMPORAL_INCONSISTENCY","severity":"HIGH","detail":f"Frame changes erratic ({dv:.2f}). Possible frame manipulation."}) if len(brightness_vals) > 3: bv2 = np.std(brightness_vals) / (np.mean(brightness_vals) + 1e-6) if bv2 > 0.2: findings.append({"check":"LIGHTING_INCONSISTENCY","severity":"MEDIUM","detail":f"Lighting varies abnormally ({bv2:.2f}). May indicate composited elements."}) wts = {"CRITICAL": 25, "HIGH": 15, "MEDIUM": 8, "LOW": 3, "INFO": 1} score = min(100, sum(wts.get(f["severity"], 5) for f in findings)) if score >= 75: verdict = "LIKELY DEEPFAKE" elif score >= 50: verdict = "SUSPICIOUS — Multiple manipulation indicators" elif score >= 25: verdict = "INCONCLUSIVE — Some anomalies found" else: verdict = "LIKELY AUTHENTIC — No significant indicators" sc = {} for f in findings: sc[f["severity"]] = sc.get(f["severity"], 0) + 1 return { "analyzed_by": COMPANY, "version": "1.0.0", "timestamp": datetime.now(timezone.utc).isoformat(), "file_info": {"width": w, "height": h, "fps": round(fps,2), "frames": frames, "duration_sec": round(duration,2), "codec": codec}, "deepfake_score": score, "verdict": verdict, "findings": sorted(findings, key=lambda f: {"CRITICAL":0,"HIGH":1,"MEDIUM":2,"LOW":3,"INFO":4}.get(f["severity"],5)), "findings_by_severity": sc, "analysis_time_sec": round(time.time()-start,2), "frames_analyzed": sample_count} @app.get("/", response_class=HTMLResponse) async def serve_ui(): html = STATIC / "riven_deepfake_web.html" if html.exists(): return HTMLResponse(html.read_text()) return HTMLResponse(f"
Place riven_deepfake_web.html in static/
") @app.get("/api/health") async def health(): return {"status": "online", "service": f"{COMPANY} Deepfake Detector"} @app.post("/api/analyze") async def analyze(file: UploadFile = File(...)): job = str(uuid.uuid4())[:8] ext = Path(file.filename or "v.mp4").suffix or ".mp4" tmp = TMP / f"df_{job}{ext}" try: total = 0 with open(tmp, "wb") as f: while chunk := await file.read(1024*1024): total += len(chunk) if total > 500*1024*1024: raise HTTPException(413, "Max 500MB") f.write(chunk) result = analyze_video(str(tmp)) result["job_id"] = job return JSONResponse(result) except HTTPException: raise except Exception as e: raise HTTPException(500, {"error": str(e)}) finally: tmp.unlink(missing_ok=True) if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host="0.0.0.0", port=8000)