What are Ambient Agents?
Ambient agents are smart agents that work quietly in the background, watching what's happening and taking action when needed. Unlike regular chatbots that wait for you to talk to them, ambient agents are
always working and
aware of what's going on.
Think of them as smart assistants that:
•
Watch continuously without bothering you
•
Act automatically based on what's happening
•
Learn from patterns to get better over time
•
Work smoothly with your existing systems
Main Parts of Ambient Agents
1. Watching for Events
from typing import Dict, Any, List
from datetime import datetime
import asyncio
class EventWatcher:
def __init__(self):
self.event_handlers = {}
self.running = False
def register_handler(self, event_type: str, handler):
"""Register a handler for a specific event type."""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
async def emit_event(self, event_type: str, data: Dict[str, Any]):
"""Tell all handlers about an event."""
if event_type in self.event_handlers:
for handler in self.event_handlers[event_type]:
try:
await handler(data)
except Exception as e:
print(f"Error in event handler: {e}")
async def start_watching(self):
"""Start watching for events."""
self.running = True
while self.running:
# Check for events (add your specific checking logic here)
await self.check_for_events()
await asyncio.sleep(1) # Check every second
async def check_for_events(self):
"""Override this method to check for specific events."""
pass
2. Remembering Information
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph
import json
class AmbientState(TypedDict):
current_context: Annotated[Dict[str, Any], "What's happening right now"]
user_preferences: Annotated[Dict[str, Any], "What the user likes and wants"]
recent_events: Annotated[List[Dict[str, Any]], "Recent things that happened"]
agent_memory: Annotated[Dict[str, Any], "What the agent has learned"]
pending_actions: Annotated[List[Dict[str, Any]], "Actions waiting to be done"]
class AmbientStateManager:
def __init__(self):
self.state = AmbientState(
current_context={},
user_preferences={},
recent_events=[],
agent_memory={},
pending_actions=[]
)
def update_context(self, context: Dict[str, Any]):
"""Update what's happening right now."""
self.state["current_context"].update(context)
def add_event(self, event: Dict[str, Any]):
"""Add a new event to the recent events list."""
self.state["recent_events"].append({
**event,
"timestamp": datetime.now().isoformat()
})
# Keep only the last 100 events
if len(self.state["recent_events"]) > 100:
self.state["recent_events"] = self.state["recent_events"][-100:]
def add_pending_action(self, action: Dict[str, Any]):
"""Add an action to the waiting list."""
self.state["pending_actions"].append(action)
def get_state(self) -> AmbientState:
"""Get the current state."""
return self.state
Building the Ambient Agent with LangGraph
1. Agent Structure
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import Dict, Any
class AmbientAgent:
def __init__(self):
self.llm = ChatOpenAI(temperature=0.1)
self.state_manager = AmbientStateManager()
self.event_watcher = EventWatcher()
self.workflow = self._create_workflow()
def _create_workflow(self) -> StateGraph:
"""Create the LangGraph workflow for the ambient agent."""
workflow = StateGraph(AmbientState)
# Add steps
workflow.add_node("analyze_context", self._analyze_context)
workflow.add_node("decide_action", self._decide_action)
workflow.add_node("execute_action", self._execute_action)
workflow.add_node("update_memory", self._update_memory)
# Set up the flow
workflow.add_edge("analyze_context", "decide_action")
workflow.add_edge("decide_action", "execute_action")
workflow.add_edge("execute_action", "update_memory")
workflow.add_edge("update_memory", END)
return workflow.compile()
async def _analyze_context(self, state: AmbientState) -> AmbientState:
"""Look at what's happening and recent events."""
context = state["current_context"]
recent_events = state["recent_events"]
# Create a prompt for analysis
prompt = f"""
Look at what's happening and recent events:
Current Situation: {json.dumps(context, indent=2)}
Recent Events: {json.dumps(recent_events[-5:], indent=2)}
What patterns do you see? What might be important for the user?
"""
response = self.llm.invoke(prompt)
# Save the analysis
state["agent_memory"]["last_analysis"] = response.content
return state
async def _decide_action(self, state: AmbientState) -> AmbientState:
"""Decide what action to take based on the analysis."""
analysis = state["agent_memory"].get("last_analysis", "")
context = state["current_context"]
prompt = f"""
Based on this analysis: {analysis}
Current situation: {json.dumps(context, indent=2)}
Should I do anything? If yes, what should I do?
Respond with JSON format:
{{
"should_act": true/false,
"action": "action_name",
"parameters": {{}},
"priority": "high/medium/low"
}}
"""
response = self.llm.invoke(prompt)
try:
action_decision = json.loads(response.content)
if action_decision.get("should_act", False):
state["pending_actions"].append(action_decision)
except json.JSONDecodeError:
print("Failed to understand action decision")
return state
async def _execute_action(self, state: AmbientState) -> AmbientState:
"""Do the waiting actions."""
for action in state["pending_actions"]:
await self._do_action(action)
# Clear waiting actions
state["pending_actions"] = []
return state
async def _update_memory(self, state: AmbientState) -> AmbientState:
"""Update what the agent has learned."""
# Save important patterns and learnings
state["agent_memory"]["last_update"] = datetime.now().isoformat()
return state
async def _do_action(self, action: Dict[str, Any]):
"""Do a specific action."""
action_type = action.get("action")
if action_type == "send_notification":
await self._send_notification(action.get("parameters", {}))
elif action_type == "adjust_environment":
await self._adjust_environment(action.get("parameters", {}))
elif action_type == "log_event":
await self._log_event(action.get("parameters", {}))
async def _send_notification(self, params: Dict[str, Any]):
"""Send a message to the user."""
message = params.get("message", "Ambient agent notification")
# Add your notification logic here
print(f"Notification: {message}")
async def _adjust_environment(self, params: Dict[str, Any]):
"""Change the environment based on parameters."""
# Add your environment control logic here
print(f"Adjusting environment: {params}")
async def _log_event(self, params: Dict[str, Any]):
"""Write down an event for later reference."""
# Add your logging logic here
print(f"Logging event: {params}")
2. Event-Driven System
class AmbientAgentWithEvents(AmbientAgent):
def __init__(self):
super().__init__()
self._setup_event_handlers()
def _setup_event_handlers(self):
"""Set up handlers for different types of events."""
self.event_watcher.register_handler("user_activity", self._handle_user_activity)
self.event_watcher.register_handler("environment_change", self._handle_environment_change)
self.event_watcher.register_handler("time_event", self._handle_time_event)
self.event_watcher.register_handler("system_alert", self._handle_system_alert)
async def _handle_user_activity(self, data: Dict[str, Any]):
"""Handle when the user does something."""
self.state_manager.update_context({
"last_user_activity": data.get("activity_type"),
"user_location": data.get("location"),
"user_mood": data.get("mood", "neutral")
})
self.state_manager.add_event({
"type": "user_activity",
"data": data
})
# Start the workflow
await self._trigger_workflow()
async def _handle_environment_change(self, data: Dict[str, Any]):
"""Handle when the environment changes."""
self.state_manager.update_context({
"temperature": data.get("temperature"),
"lighting": data.get("lighting"),
"noise_level": data.get("noise_level")
})
self.state_manager.add_event({
"type": "environment_change",
"data": data
})
await self._trigger_workflow()
async def _handle_time_event(self, data: Dict[str, Any]):
"""Handle time-based events."""
self.state_manager.update_context({
"current_time": data.get("time"),
"day_of_week": data.get("day_of_week"),
"is_work_hours": data.get("is_work_hours", False)
})
self.state_manager.add_event({
"type": "time_event",
"data": data
})
await self._trigger_workflow()
async def _handle_system_alert(self, data: Dict[str, Any]):
"""Handle system alerts."""
self.state_manager.add_event({
"type": "system_alert",
"data": data,
"priority": "high"
})
# High priority events start workflow immediately
await self._trigger_workflow()
async def _trigger_workflow(self):
"""Start the LangGraph workflow."""
current_state = self.state_manager.get_state()
try:
# Run the workflow
result = await self.workflow.ainvoke(current_state)
# Update state with result
self.state_manager.state = result
except Exception as e:
print(f"Error in workflow execution: {e}")
async def start(self):
"""Start the ambient agent."""
print("Starting ambient agent...")
# Start event watching in background
asyncio.create_task(self.event_watcher.start_watching())
# Start regular context updates
asyncio.create_task(self._regular_context_update())
print("Ambient agent started successfully!")
async def _regular_context_update(self):
"""Update context regularly even without events."""
while True:
await asyncio.sleep(60) # Update every minute
# Update time-based context
now = datetime.now()
self.state_manager.update_context({
"current_time": now.isoformat(),
"day_of_week": now.strftime("%A"),
"is_work_hours": 9 <= now.hour <= 17
})
# Start workflow for regular updates
await self._trigger_workflow()
Real Examples
1. Smart Home Helper
class SmartHomeAgent(AmbientAgentWithEvents):
def __init__(self):
super().__init__()
self._setup_smart_home_handlers()
def _setup_smart_home_handlers(self):
"""Set up handlers for smart home events."""
self.event_watcher.register_handler("motion_detected", self._handle_motion)
self.event_watcher.register_handler("door_opened", self._handle_door)
self.event_watcher.register_handler("temperature_change", self._handle_temperature)
self.event_watcher.register_handler("light_level_change", self._handle_lighting)
async def _handle_motion(self, data: Dict[str, Any]):
"""Handle when motion is detected."""
location = data.get("location")
time_of_day = data.get("time_of_day")
# Update context
self.state_manager.update_context({
"motion_detected": True,
"motion_location": location,
"time_of_day": time_of_day
})
# Add event
self.state_manager.add_event({
"type": "motion_detected",
"location": location,
"time": datetime.now().isoformat()
})
await self._trigger_workflow()
async def _handle_door(self, data: Dict[str, Any]):
"""Handle when doors open or close."""
door_id = data.get("door_id")
action = data.get("action") # "opened" or "closed"
self.state_manager.add_event({
"type": "door_event",
"door_id": door_id,
"action": action,
"time": datetime.now().isoformat()
})
await self._trigger_workflow()
async def _do_action(self, action: Dict[str, Any]):
"""Override to handle smart home specific actions."""
action_type = action.get("action")
if action_type == "adjust_lighting":
await self._adjust_lighting(action.get("parameters", {}))
elif action_type == "adjust_temperature":
await self._adjust_temperature(action.get("parameters", {}))
elif action_type == "send_security_alert":
await self._send_security_alert(action.get("parameters", {}))
else:
await super()._do_action(action)
async def _adjust_lighting(self, params: Dict[str, Any]):
"""Adjust home lighting based on context."""
brightness = params.get("brightness", 50)
location = params.get("location", "living_room")
# Add your smart lighting control here
print(f"Adjusting lighting in {location} to {brightness}%")
async def _adjust_temperature(self, params: Dict[str, Any]):
"""Adjust home temperature based on context."""
temperature = params.get("temperature", 22)
# Add your smart thermostat control here
print(f"Setting temperature to {temperature}°C")
async def _send_security_alert(self, params: Dict[str, Any]):
"""Send security alerts."""
alert_type = params.get("alert_type", "general")
location = params.get("location", "unknown")
# Add your security alert system here
print(f"Security alert: {alert_type} at {location}")
2. Work Helper
class WorkHelperAgent(AmbientAgentWithEvents):
def __init__(self):
super().__init__()
self._setup_work_handlers()
def _setup_work_handlers(self):
"""Set up handlers for work-related events."""
self.event_watcher.register_handler("app_usage", self._handle_app_usage)
self.event_watcher.register_handler("calendar_event", self._handle_calendar)
self.event_watcher.register_handler("email_received", self._handle_email)
self.event_watcher.register_handler("focus_session", self._handle_focus)
async def _handle_app_usage(self, data: Dict[str, Any]):
"""Handle when apps are used."""
app_name = data.get("app_name")
duration = data.get("duration")
activity_type = data.get("activity_type") # "productive", "distracting", "neutral"
self.state_manager.update_context({
"current_app": app_name,
"app_activity_type": activity_type
})
self.state_manager.add_event({
"type": "app_usage",
"app_name": app_name,
"duration": duration,
"activity_type": activity_type
})
await self._trigger_workflow()
async def _handle_calendar(self, data: Dict[str, Any]):
"""Handle calendar events."""
event_title = data.get("title")
event_time = data.get("time")
event_duration = data.get("duration")
self.state_manager.update_context({
"current_calendar_event": event_title,
"event_time": event_time
})
self.state_manager.add_event({
"type": "calendar_event",
"title": event_title,
"time": event_time,
"duration": event_duration
})
await self._trigger_workflow()
async def _do_action(self, action: Dict[str, Any]):
"""Override to handle work-specific actions."""
action_type = action.get("action")
if action_type == "send_work_reminder":
await self._send_work_reminder(action.get("parameters", {}))
elif action_type == "adjust_notifications":
await self._adjust_notifications(action.get("parameters", {}))
elif action_type == "suggest_break":
await self._suggest_break(action.get("parameters", {}))
else:
await super()._do_action(action)
async def _send_work_reminder(self, params: Dict[str, Any]):
"""Send work reminders."""
message = params.get("message", "Time to focus!")
reminder_type = params.get("type", "general")
# Add your notification system here
print(f"Work reminder ({reminder_type}): {message}")
async def _adjust_notifications(self, params: Dict[str, Any]):
"""Adjust notification settings based on context."""
enable_notifications = params.get("enable", True)
notification_level = params.get("level", "normal")
# Add your notification control here
print(f"Adjusting notifications: {enable_notifications}, level: {notification_level}")
async def _suggest_break(self, params: Dict[str, Any]):
"""Suggest taking a break."""
break_duration = params.get("duration", 5)
reason = params.get("reason", "You've been working for a while")
# Add your break suggestion system here
print(f"Break suggestion: {reason}. Take a {break_duration}-minute break.")
Good Practices
1. Privacy and Safety
•
Minimize Data: Only collect and store necessary information
•
Local Processing: Process sensitive data on your own computer when possible
•
Encrypt Data: Protect data with encryption
•
User Control: Let users control what data is collected
•
Be Transparent: Tell users what the agent is doing
2. Make It Fast
•
Filter Events: Only process important events
•
Batch Processing: Group similar events together
•
Remember Results: Save frequently accessed data
•
Use Async: Use non-blocking operations
•
Watch Resources: Monitor how much resources are used
3. User Experience
•
Don't Bother: Don't interrupt the user unnecessarily
•
Be Relevant: Provide helpful information at the right time
•
Learn: Allow users to teach the agent their preferences
•
Be Clear: Show users what the agent is doing
•
Give Control: Let users easily control the agent
Deployment
1. Infrastructure
# Docker setup for ambient agent
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "-m", "ambient_agent"]
2. Monitoring
import logging
import prometheus_client
from prometheus_client import Counter, Histogram, Gauge
# Track metrics
EVENT_COUNTER = Counter('ambient_events_total', 'Total events processed', ['event_type'])
ACTION_COUNTER = Counter('ambient_actions_total', 'Total actions executed', ['action_type'])
WORKFLOW_DURATION = Histogram('workflow_duration_seconds', 'Workflow execution time')
ACTIVE_AGENTS = Gauge('active_agents', 'Number of active ambient agents')
class MonitoredAmbientAgent(AmbientAgentWithEvents):
def __init__(self):
super().__init__()
self.logger = logging.getLogger(__name__)
async def _handle_event(self, event_type: str, data: Dict[str, Any]):
"""Handle events with monitoring."""
EVENT_COUNTER.labels(event_type=event_type).inc()
start_time = time.time()
try:
await super()._handle_event(event_type, data)
except Exception as e:
self.logger.error(f"Error handling event {event_type}: {e}")
finally:
duration = time.time() - start_time
WORKFLOW_DURATION.observe(duration)
Summary
Building ambient agents with LangGraph opens up exciting possibilities for creating smart, aware systems that work quietly in the background. By combining event-driven systems with LangGraph's workflow capabilities, you can create agents that:
•
Watch continuously without being annoying
•
Learn and adapt to user preferences and patterns
•
Take action automatically based on context
•
Work smoothly with existing systems
The key to success is:
•
Start simple: Begin with basic event watching and add more later
•
Focus on user experience: Make sure the agent is helpful, not annoying
•
Respect privacy: Be clear about data collection and usage
•
Test thoroughly: Watch performance and user feedback
•
Improve continuously: Make the agent better based on real usage
Ambient agents represent the future of AI agents—smart, aware, and always ready to help when needed. With LangGraph, you have the tools to build these sophisticated systems that truly understand and adapt to their environment.