Inter-Agent Communication
Agent Communication
In Strata AI, agents interact by exchanging messages, ensuring an efficient flow of tasks and data between components. This communication is managed through the Message class and the publish_message capability provided by the Environment.
Key Concepts
- Message Sending: - When an agent sends a message, it specifies the source information (e.g., - sent_fromand- cause_byattributes in the- Messageclass).
 
- Message Consumption: - Agents subscribe to messages based on the - cause_byattribute. This ensures each agent only receives relevant tasks or data.
 
- Message Broadcasting: - The - Environmentobject handles broadcasting messages to agents, respecting their subscription requirements.
 
Message Class
class Message(BaseModel):
    id: str = Field(default="", validate_default=True)  # Unique message identifier
    content: str
    instruct_content: Optional[BaseModel] = Field(default=None, validate_default=True)
    role: str = "user"  # system / user / assistant
    cause_by: str = Field(default="", validate_default=True)
    sent_from: str = Field(default="", validate_default=True)
    send_to: set[str] = Field(default={MESSAGE_ROUTE_TO_ALL}, validate_default=True)System Design
When designing an agent workflow, consider the following:
- Inputs: - Define the data or messages an agent should process. This aligns with the - rc.watchattribute in the agent object.
 
- Outputs: - Specify the messages the agent generates. These are encapsulated in the - Messageclass.
 
- Tasks: - Determine the actions an agent performs and their transitions between states. 
 
Example Workflow
Let’s illustrate the interaction with a complex scenario:
- Agent A: Splits requirements into 10 subtasks. 
- Agent B: Processes each subtask. 
- Agent C: Compiles the processed results. 
- Agent D: Reviews and provides feedback to Agent B. 
Steps 2–4 repeat multiple times for refinement.
Implementation
- Agent A: Splits user requirements into subtasks and sends them to Agent B. 
class AgentA(Role):
    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.set_actions([AgentAAction])
        self._watch({UserRequirement})  # Subscribes to user requirements
class AgentAAction(Action):
    async def run(self, with_messages: List[Message] = None, **kwargs) -> List[str]:
        subtasks: List[str] = split_10_subtask(with_messages[0].content)
        return subtasks
class AgentA(Role):
    async def _act(self) -> Message:
        subtasks = await self.rc.todo.run(self.rc.history)
        for i in subtasks:
           self.rc.env.publish_message(Message(content=i, cause_by=AgentAAction))
        return Message(content="dummy message", send_to=MESSAGE_ROUTE_TO_NONE)- Agent B: Processes subtasks sent by Agent A or Agent D. 
class AgentB(Role):
    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.set_actions([AgentBAction])
        self._watch({AgentAAction, AgentDAction})
class AgentBAction(Action):
    async def run(self, with_messages: List[Message] = None, **kwargs) -> Message:
        # Process subtask
        ...- Agent C: Compiles outputs from Agent B. 
class AgentC(Role):
    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.set_actions([AgentCAction])
        self._watch({AgentBAction})
class AgentCAction(Action):
    async def run(self, with_messages: List[Message] = None, **kwargs) -> Message:
        # Compile results
        ...- Agent D: Reviews outputs and sends feedback to Agent B. 
class AgentD(Role):
    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.set_actions([AgentDAction])
        self._watch({AgentCAction})
class AgentDAction(Action):
    async def run(self, with_messages: List[Message] = None, **kwargs) -> Message:
        # Review and provide feedback
        ...Environment Setup
Finally, add all agents to an Environment and initialize the workflow:
context = Context()  # Load configuration
env = Environment(context=context)
env.add_roles([AgentA(), AgentB(), AgentC(), AgentD()])
env.publish_message(Message(content='New user requirements', send_to=AgentA))
while not env.is_idle:  # Runs until all messages are processed
    await env.run()This structure ensures an efficient and modular design for managing complex multi-agent workflows in Strata AI.
Last updated
