Strata AI Documentation
  • Welcome to the future
  • Getting Started
    • Quickstart
    • Installation
    • LLM API
    • Tools Configuration
  • High Level Guides
    • Inter-Agent Communication
    • Development
    • Serialization & Breakpoint Recovery
    • Implementation Logic
    • RAG Module
    • Environment
    • Integrations
  • API Documentation
    • Initial Documentation
Powered by GitBook
On this page
  1. High Level Guides

Inter-Agent Communication

Agent Communication

In Strata AI, agents interact by exchanging messages, ensuring an efficient flow of tasks and data between components. This communication is managed through the Message class and the publish_message capability provided by the Environment.

Key Concepts

  1. Message Sending:

    • When an agent sends a message, it specifies the source information (e.g., sent_from and cause_by attributes in the Message class).

  2. Message Consumption:

    • Agents subscribe to messages based on the cause_by attribute. This ensures each agent only receives relevant tasks or data.

  3. Message Broadcasting:

    • The Environment object handles broadcasting messages to agents, respecting their subscription requirements.

Message Class

class Message(BaseModel):
    id: str = Field(default="", validate_default=True)  # Unique message identifier
    content: str
    instruct_content: Optional[BaseModel] = Field(default=None, validate_default=True)
    role: str = "user"  # system / user / assistant
    cause_by: str = Field(default="", validate_default=True)
    sent_from: str = Field(default="", validate_default=True)
    send_to: set[str] = Field(default={MESSAGE_ROUTE_TO_ALL}, validate_default=True)

System Design

When designing an agent workflow, consider the following:

  • Inputs:

    • Define the data or messages an agent should process. This aligns with the rc.watch attribute in the agent object.

  • Outputs:

    • Specify the messages the agent generates. These are encapsulated in the Message class.

  • Tasks:

    • Determine the actions an agent performs and their transitions between states.

Example Workflow

Let’s illustrate the interaction with a complex scenario:

  • Agent A: Splits requirements into 10 subtasks.

  • Agent B: Processes each subtask.

  • Agent C: Compiles the processed results.

  • Agent D: Reviews and provides feedback to Agent B.

Steps 2–4 repeat multiple times for refinement.

Implementation

  1. Agent A: Splits user requirements into subtasks and sends them to Agent B.

class AgentA(Role):
    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.set_actions([AgentAAction])
        self._watch({UserRequirement})  # Subscribes to user requirements

class AgentAAction(Action):
    async def run(self, with_messages: List[Message] = None, **kwargs) -> List[str]:
        subtasks: List[str] = split_10_subtask(with_messages[0].content)
        return subtasks

class AgentA(Role):
    async def _act(self) -> Message:
        subtasks = await self.rc.todo.run(self.rc.history)
        for i in subtasks:
           self.rc.env.publish_message(Message(content=i, cause_by=AgentAAction))
        return Message(content="dummy message", send_to=MESSAGE_ROUTE_TO_NONE)
  1. Agent B: Processes subtasks sent by Agent A or Agent D.

class AgentB(Role):
    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.set_actions([AgentBAction])
        self._watch({AgentAAction, AgentDAction})

class AgentBAction(Action):
    async def run(self, with_messages: List[Message] = None, **kwargs) -> Message:
        # Process subtask
        ...
  1. Agent C: Compiles outputs from Agent B.

class AgentC(Role):
    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.set_actions([AgentCAction])
        self._watch({AgentBAction})

class AgentCAction(Action):
    async def run(self, with_messages: List[Message] = None, **kwargs) -> Message:
        # Compile results
        ...
  1. Agent D: Reviews outputs and sends feedback to Agent B.

class AgentD(Role):
    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.set_actions([AgentDAction])
        self._watch({AgentCAction})

class AgentDAction(Action):
    async def run(self, with_messages: List[Message] = None, **kwargs) -> Message:
        # Review and provide feedback
        ...

Environment Setup

Finally, add all agents to an Environment and initialize the workflow:

context = Context()  # Load configuration
env = Environment(context=context)
env.add_roles([AgentA(), AgentB(), AgentC(), AgentD()])
env.publish_message(Message(content='New user requirements', send_to=AgentA))
while not env.is_idle:  # Runs until all messages are processed
    await env.run()

This structure ensures an efficient and modular design for managing complex multi-agent workflows in Strata AI.

PreviousTools ConfigurationNextDevelopment

Last updated 4 months ago