Inter-Agent Communication
Agent Communication
In Strata AI, agents interact by exchanging messages, ensuring an efficient flow of tasks and data between components. This communication is managed through the Message
class and the publish_message
capability provided by the Environment
.
Key Concepts
Message Sending:
When an agent sends a message, it specifies the source information (e.g.,
sent_from
andcause_by
attributes in theMessage
class).
Message Consumption:
Agents subscribe to messages based on the
cause_by
attribute. This ensures each agent only receives relevant tasks or data.
Message Broadcasting:
The
Environment
object handles broadcasting messages to agents, respecting their subscription requirements.
Message Class
class Message(BaseModel):
id: str = Field(default="", validate_default=True) # Unique message identifier
content: str
instruct_content: Optional[BaseModel] = Field(default=None, validate_default=True)
role: str = "user" # system / user / assistant
cause_by: str = Field(default="", validate_default=True)
sent_from: str = Field(default="", validate_default=True)
send_to: set[str] = Field(default={MESSAGE_ROUTE_TO_ALL}, validate_default=True)
System Design
When designing an agent workflow, consider the following:
Inputs:
Define the data or messages an agent should process. This aligns with the
rc.watch
attribute in the agent object.
Outputs:
Specify the messages the agent generates. These are encapsulated in the
Message
class.
Tasks:
Determine the actions an agent performs and their transitions between states.
Example Workflow
Let’s illustrate the interaction with a complex scenario:
Agent A: Splits requirements into 10 subtasks.
Agent B: Processes each subtask.
Agent C: Compiles the processed results.
Agent D: Reviews and provides feedback to Agent B.
Steps 2–4 repeat multiple times for refinement.
Implementation
Agent A: Splits user requirements into subtasks and sends them to Agent B.
class AgentA(Role):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.set_actions([AgentAAction])
self._watch({UserRequirement}) # Subscribes to user requirements
class AgentAAction(Action):
async def run(self, with_messages: List[Message] = None, **kwargs) -> List[str]:
subtasks: List[str] = split_10_subtask(with_messages[0].content)
return subtasks
class AgentA(Role):
async def _act(self) -> Message:
subtasks = await self.rc.todo.run(self.rc.history)
for i in subtasks:
self.rc.env.publish_message(Message(content=i, cause_by=AgentAAction))
return Message(content="dummy message", send_to=MESSAGE_ROUTE_TO_NONE)
Agent B: Processes subtasks sent by Agent A or Agent D.
class AgentB(Role):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.set_actions([AgentBAction])
self._watch({AgentAAction, AgentDAction})
class AgentBAction(Action):
async def run(self, with_messages: List[Message] = None, **kwargs) -> Message:
# Process subtask
...
Agent C: Compiles outputs from Agent B.
class AgentC(Role):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.set_actions([AgentCAction])
self._watch({AgentBAction})
class AgentCAction(Action):
async def run(self, with_messages: List[Message] = None, **kwargs) -> Message:
# Compile results
...
Agent D: Reviews outputs and sends feedback to Agent B.
class AgentD(Role):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.set_actions([AgentDAction])
self._watch({AgentCAction})
class AgentDAction(Action):
async def run(self, with_messages: List[Message] = None, **kwargs) -> Message:
# Review and provide feedback
...
Environment Setup
Finally, add all agents to an Environment
and initialize the workflow:
context = Context() # Load configuration
env = Environment(context=context)
env.add_roles([AgentA(), AgentB(), AgentC(), AgentD()])
env.publish_message(Message(content='New user requirements', send_to=AgentA))
while not env.is_idle: # Runs until all messages are processed
await env.run()
This structure ensures an efficient and modular design for managing complex multi-agent workflows in Strata AI.
Last updated