from typing import Tuple, Dict
Production-grade input sanitization for LLM applications.
Combines length validation, pattern detection, and encoding analysis.
enable_fuzzy_matching: bool = True):
self.max_chars = max_chars
self.enable_fuzzy_matching = enable_fuzzy_matching
# Critical injection patterns (compiled for performance)
self.injection_patterns = [
re.compile(r'ignore\s+(?:all\s+)?previous\s+instructions?', re.I),
re.compile(r'you\s+are\s+now\s+(?:in\s+)?developer\s+mode', re.I),
re.compile(r'system\s+override', re.I),
re.compile(r'reveal\s+(?:your\s+)?prompt', re.I),
re.compile(r'base64\s*:\s*[A-Za-z0-9+/=]{20,}', re.I),
re.compile(r'forget\s+everything', re.I),
# Unicode smuggling detection
self.invisible_chars = re.compile(r'[\u200B-\u200D\uFEFF\u2060-\u206F]')
# Fuzzy targets for typoglycemia defense
'ignore', 'bypass', 'override', 'reveal',
'delete', 'system', 'prompt', 'instruction'
def sanitize(self, text: str) -> Tuple[bool, str, Dict[str, any]]:
Main sanitization pipeline.
Returns: (is_valid, sanitized_text, metadata)
if not text or not isinstance(text, str):
return False, "", {"error": "Invalid input type"}
"original_length": len(text),
"encoding_detected": False
# Step 1: Remove invisible characters
text = self.invisible_chars.sub('', text)
# Step 2: Length validation
if len(text) > self.max_chars:
metadata["violations"].append("length_exceeded")
text = text[:self.max_chars] + "... [truncated]"
return False, text, metadata
# Step 3: Detect encoding obfuscation
encoding_results = self._detect_encoding(text)
if any(encoding_results.values()):
metadata["encoding_detected"] = True
metadata["encoding_types"] = encoding_results
metadata["violations"].append("encoding_obfuscation")
return False, text, metadata
# Step 4: Pattern matching
if self._detect_injection_patterns(text):
metadata["violations"].append("injection_pattern")
return False, text, metadata
# Step 5: Fuzzy matching (optional, computationally expensive)
if self.enable_fuzzy_matching and self._detect_fuzzy_injection(text):
metadata["violations"].append("fuzzy_injection")
return False, text, metadata
return True, text, metadata
def _detect_encoding(self, text: str) -> Dict[str, bool]:
"""Detect common encoding obfuscation techniques"""
'unicode_smuggling': False
# Base64 with dangerous content
base64_matches = re.findall(r'[A-Za-z0-9+/]{20,}={0,2}', text)
for match in base64_matches:
decoded = base64.b64decode(match).decode('utf-8', errors='ignore')
if any(kw in decoded.lower() for kw in self.fuzzy_targets):
if re.search(r'\b[0-9a-fA-F]{20,}\b', text):
# Unicode smuggling (already cleaned, but check if was present)
if self.invisible_chars.search(text):
results['unicode_smuggling'] = True
def _detect_injection_patterns(self, text: str) -> bool:
"""Fast pattern matching"""
return any(pattern.search(text) for pattern in self.injection_patterns)
def _detect_fuzzy_injection(self, text: str) -> bool:
"""Detect typoglycemia attacks"""
words = re.findall(r'\b\w+\b', text.lower())
for target in self.fuzzy_targets:
if self._is_similar_word(word, target):
def _is_similar_word(word: str, target: str) -> bool:
"""Check typoglycemia similarity"""
if len(word) != len(target) or len(word) < 3:
return (word[0] == target[0] and word[-1] == target[-1] and
sorted(word[1:-1]) == sorted(target[1:-1]))
# FastAPI middleware example
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
sanitizer = LLMSanitizer(max_chars=5000)
async def sanitize_middleware(request: Request, call_next):
if request.url.path == "/api/chat":
body = await request.json()
text = body.get("text", "")
is_valid, sanitized, meta = sanitizer.sanitize(text)
"message": "Input validation failed",
"violations": meta["violations"],
"sanitized_preview": sanitized[:100]
return await call_next(request)