from datetime import datetime
from typing import Dict, List, Optional
def __init__(self, storage_backend):
self.storage = storage_backend
metadata: Optional[Dict] = None
Capture the full context of an LLM interaction for later analysis.
Returns a unique interaction_id for tracking feedback.
interaction_id = hashlib.sha256(
f"{user_id}_{datetime.utcnow().isoformat()}".encode()
"interaction_id": interaction_id,
"timestamp": datetime.utcnow().isoformat(),
"metadata": metadata or {},
"prompt_hash": hashlib.sha256(prompt.encode()).hexdigest()[:8],
"response_hash": hashlib.sha256(response.encode()).hexdigest()[:8]
self.storage.write(record)
def capture_explicit_feedback(
comment: Optional[str] = None
"""Capture user ratings and comments."""
"interaction_id": interaction_id,
"timestamp": datetime.utcnow().isoformat(),
"is_helpful": rating >= 4
self.storage.write(feedback)
def capture_implicit_feedback(
"""Capture behavioral signals like time-to-abandon, copy actions, etc."""
"interaction_id": interaction_id,
"timestamp": datetime.utcnow().isoformat(),
self.storage.write(feedback)
class FeedbackAggregator:
def __init__(self, storage_backend):
self.storage = storage_backend
def generate_labels(self, interaction_id: str) -> Dict[str, float]:
Convert raw signals into structured labels.
Returns a dictionary of label probabilities.
# Fetch all feedback for this interaction
feedbacks = self.storage.get_feedback_by_interaction(interaction_id)
if fb["type"] == "explicit":
# Direct user rating is strong signal
elif fb["type"] == "implicit":
# Time-to-abandon > 60s suggests confusion
if signals.get("time_to_abandon", 0) > 60:
# Escalation to human agent is strong negative signal
if signals.get("escalated", False):
# Copy/paste without follow-up suggests success
if signals.get("copied", False) and not signals.get("follow_up", False):
# Normalize to probabilities
total = sum(max(v, 0) for v in labels.values())
labels = {k: v/total for k, v in labels.items()}
def cluster_failures(self, start_date: str, end_date: str) -> List[Dict]:
Group similar failures to identify patterns.
Uses prompt_hash and response_hash for deduplication.
interactions = self.storage.get_interactions_in_range(start_date, end_date)
for interaction in interactions:
labels = self.generate_labels(interaction["interaction_id"])
# Consider it a failure if any negative label > 0.3
if any(labels.get(l, 0) > 0.3 for l in ["hallucination", "off_topic", "unclear"]):
key = interaction["prompt_hash"]
if key not in failure_clusters:
failure_clusters[key] = {
"prompt": interaction["prompt"],
cluster = failure_clusters[key]
cluster["models"].add(interaction["model"])
for label, score in labels.items():
cluster["failure_types"].add(label)
return list(failure_clusters.values())
def __init__(self, storage_backend, sample_rate=0.05):
self.storage = storage_backend
self.sample_rate = sample_rate
def should_review(self, interaction: Dict, labels: Dict) -> bool:
Decide whether a human should review this interaction.
Uses sampling and heuristics for high-impact cases.
# Always review very low or very high confidence cases
max_label = max(labels.values())
if max_label > 0.9 or max_label < 0.1:
# Review expensive interactions (high token usage)
if interaction.get("metadata", {}).get("token_usage", {}).get("total", 0) > 10000:
return random.random() < self.sample_rate
def create_review_task(self, interaction_id: str, labels: Dict) -> Dict:
"""Generate a review task for human annotators."""
interaction = self.storage.get_interaction(interaction_id)
"task_id": f"review_{interaction_id}",
"interaction": interaction,
"Is the response helpful and accurate?",
"Does the response contain hallucinations?",
"Is the response on-topic?",
"What could be improved?"
"priority": "high" if max(labels.values()) > 0.7 else "medium"
if __name__ == "__main__":
self.records.append(record)
def get_feedback_by_interaction(self, interaction_id):
return [r for r in self.records if r.get("interaction_id") == interaction_id]
def get_interactions_in_range(self, start, end):
return [r for r in self.records if r.get("timestamp", "") >= start and r.get("timestamp", "") <= end]
def get_interaction(self, interaction_id):
if r.get("interaction_id") == interaction_id and "prompt" in r:
collector = FeedbackCollector(storage)
aggregator = FeedbackAggregator(storage)
hitl = HumanInTheLoop(storage)
# Simulate a conversation
interaction_id = collector.log_interaction(
prompt="What's the weather in Tokyo?",
response="Tokyo has a temperate climate with four distinct seasons. Spring is cherry blossom season...",
context={"location": "Tokyo"},
metadata={"token_usage": {"total": 150}}
# User provides explicit feedback
collector.capture_explicit_feedback(interaction_id, rating=2, comment="Not helpful")
labels = aggregator.generate_labels(interaction_id)
print(f"Generated labels: {labels}")
# Check if human review is needed
if hitl.should_review(collector.storage.get_interaction(interaction_id), labels):
review_task = hitl.create_review_task(interaction_id, labels)
print(f"Human review required: {json.dumps(review_task, indent=2)}")