Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions backend/app/api/agent_groups.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Agent Group Relationships API."""

import uuid
from datetime import datetime, timezone

from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from sqlalchemy import select, delete
from sqlalchemy.ext.asyncio import AsyncSession

from app.database import get_db
from app.models.agent import Agent
from app.models.org import AgentGroup
from app.core.security import get_current_user
from app.models.user import User

router = APIRouter(prefix="/agents", tags=["agent-groups"])


class AgentGroupCreate(BaseModel):
group_name: str = Field(min_length=1, max_length=100)
chat_id: str = Field(min_length=1, max_length=200)
channel: str = Field(default="feishu")
description: str = ""


class AgentGroupOut(BaseModel):
id: uuid.UUID
agent_id: uuid.UUID
group_name: str
chat_id: str
channel: str
description: str
created_at: datetime

model_config = {"from_attributes": True}


@router.get("/{agent_id}/relationships/groups", response_model=list[AgentGroupOut])
async def list_groups(
agent_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
select(AgentGroup)
.where(AgentGroup.agent_id == agent_id)
.order_by(AgentGroup.created_at.desc())
)
groups = result.scalars().all()
return [AgentGroupOut.model_validate(g) for g in groups]


@router.put("/{agent_id}/relationships/groups", response_model=list[AgentGroupOut])
async def update_groups(
agent_id: uuid.UUID,
groups: list[AgentGroupCreate],
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
await db.execute(
delete(AgentGroup).where(AgentGroup.agent_id == agent_id)
)
new_groups = []
for g in groups:
new_group = AgentGroup(
agent_id=agent_id,
group_name=g.group_name,
chat_id=g.chat_id,
channel=g.channel,
description=g.description,
)
db.add(new_group)
new_groups.append(new_group)
await db.commit()
for g in new_groups:
await db.refresh(g)
return [AgentGroupOut.model_validate(g) for g in new_groups]


@router.delete("/{agent_id}/relationships/groups/{group_id}")
async def delete_group(
agent_id: uuid.UUID,
group_id: uuid.UUID,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = await db.execute(
delete(AgentGroup).where(
AgentGroup.id == group_id,
AgentGroup.agent_id == agent_id,
)
)
await db.commit()
if result.rowcount == 0:
raise HTTPException(status_code=404, detail="Group not found")
return {"status": "ok"}
240 changes: 240 additions & 0 deletions backend/app/api/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,9 @@ async def send_message(
agent = await _get_agent_by_key(x_api_key, db)
agent.openclaw_last_seen = datetime.now(timezone.utc)

# Initialize broadcast results
broadcast_results = []

target_name = body.target.strip()
content = body.content.strip()
channel_hint = (body.channel or "").strip().lower()
Expand All @@ -544,10 +547,69 @@ async def send_message(
)
db.add(gw_msg)
await db.commit()

# Broadcast to groups if destinations specified
if body.destinations:
from app.models.channel_config import ChannelConfig
from app.services.feishu_service import feishu_service
import json as _json

config_result = await db.execute(
select(ChannelConfig).where(ChannelConfig.agent_id == agent.id)
)
config = config_result.scalar_one_or_none()

if config and config.extra_config:
broadcast_groups = config.extra_config.get("broadcast_groups", [])

for dest in body.destinations:
matched_group = None
for group in broadcast_groups:
if (group.get("channel") == dest.channel and
group.get("name") == dest.group):
matched_group = group
break

if matched_group:
chat_id = matched_group.get("chat_id")
status = "failed"

try:
if dest.channel == "feishu":
resp = await feishu_service.send_message(
config.app_id, config.app_secret,
receive_id=chat_id,
msg_type="text",
content=_json.dumps({"text": content}, ensure_ascii=False),
receive_id_type="chat_id"
)
status = "sent" if resp.get("code") == 0 else "failed"
elif dest.channel == "wecom":
status = "not_implemented"
elif dest.channel == "dingtalk":
status = "not_implemented"
except Exception as e:
logger.error(f"[Gateway] Broadcast error: {e}")
status = "error"

broadcast_results.append({
"channel": dest.channel,
"group": dest.group,
"chat_id": chat_id,
"status": status
})
else:
broadcast_results.append({
"channel": dest.channel,
"group": dest.group,
"status": "not_found"
})

return {
"status": "accepted",
"target": target_agent.name,
"type": "openclaw_agent",
"broadcast": broadcast_results,
"message": f"Message sent to {target_agent.name}. Reply will appear in your next poll.",
}
else:
Expand All @@ -567,13 +629,133 @@ async def send_message(
))
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)

# Broadcast to groups if destinations specified
if body.destinations:
from app.models.channel_config import ChannelConfig
from app.services.feishu_service import feishu_service
import json as _json

config_result = await db.execute(
select(ChannelConfig).where(ChannelConfig.agent_id == agent.id)
)
config = config_result.scalar_one_or_none()

if config and config.extra_config:
broadcast_groups = config.extra_config.get("broadcast_groups", [])

for dest in body.destinations:
matched_group = None
for group in broadcast_groups:
if (group.get("channel") == dest.channel and
group.get("name") == dest.group):
matched_group = group
break

if matched_group:
chat_id = matched_group.get("chat_id")
status = "failed"

try:
if dest.channel == "feishu":
resp = await feishu_service.send_message(
config.app_id, config.app_secret,
receive_id=chat_id,
msg_type="text",
content=_json.dumps({"text": content}, ensure_ascii=False),
receive_id_type="chat_id"
)
status = "sent" if resp.get("code") == 0 else "failed"
elif dest.channel == "wecom":
status = "not_implemented"
elif dest.channel == "dingtalk":
status = "not_implemented"
except Exception as e:
logger.error(f"[Gateway] Broadcast error: {e}")
status = "error"

broadcast_results.append({
"channel": dest.channel,
"group": dest.group,
"chat_id": chat_id,
"status": status
})
else:
broadcast_results.append({
"channel": dest.channel,
"group": dest.group,
"status": "not_found"
})

return {
"status": "accepted",
"target": target_agent.name,
"type": "agent",
"broadcast": broadcast_results,
"message": f"Message sent to {target_agent.name}. Reply will appear in your next poll.",
}


# 1.5. Try to find target as a Group (via AgentGroup relationships)
from app.models.org import AgentGroup
group_result = await db.execute(
select(AgentGroup).where(
AgentGroup.agent_id == agent.id,
AgentGroup.group_name.ilike(f"%{target_name}%")
)
)
target_group = group_result.scalars().first()

if target_group:
from app.models.channel_config import ChannelConfig
from app.services.feishu_service import feishu_service
import json as _json

config_result = await db.execute(
select(ChannelConfig).where(ChannelConfig.agent_id == agent.id)
)
config = config_result.scalar_one_or_none()

if not config:
await db.commit()
raise HTTPException(status_code=400, detail="No channel configured for this agent")

status = "failed"
resp = None
try:
if target_group.channel == "feishu":
resp = await feishu_service.send_message(
config.app_id, config.app_secret,
receive_id=target_group.chat_id,
msg_type="text",
content=_json.dumps({"text": content}, ensure_ascii=False),
receive_id_type="chat_id"
)
status = "sent" if resp.get("code") == 0 else "failed"
elif target_group.channel == "wecom":
status = "not_implemented"
elif target_group.channel == "dingtalk":
status = "not_implemented"
except Exception as e:
logger.error(f"[Gateway] Group send error: {e}")
status = "error"

await db.commit()

if status == "sent":
return {
"status": "sent",
"target": target_group.group_name,
"type": "group",
"channel": target_group.channel,
"message": f"Message sent to group {target_group.group_name}.",
}
else:
raise HTTPException(
status_code=502,
detail=f"Failed to send to group {target_group.group_name}: {resp.get('msg') if resp else 'unknown error'} (code {resp.get('code') if resp else 'N/A'})"
)

# 2. Try to find target as a human (via relationships)
from app.models.org import AgentRelationship
from sqlalchemy.orm import selectinload
Expand Down Expand Up @@ -646,11 +828,69 @@ async def send_message(
await db.commit()

if resp and resp.get("code") == 0:
# Broadcast to groups if destinations specified
if body.destinations:
from app.models.channel_config import ChannelConfig
from app.services.feishu_service import feishu_service
import json as _json

config_result = await db.execute(
select(ChannelConfig).where(ChannelConfig.agent_id == agent.id)
)
config = config_result.scalar_one_or_none()

if config and config.extra_config:
broadcast_groups = config.extra_config.get("broadcast_groups", [])

for dest in body.destinations:
matched_group = None
for group in broadcast_groups:
if (group.get("channel") == dest.channel and
group.get("name") == dest.group):
matched_group = group
break

if matched_group:
chat_id = matched_group.get("chat_id")
status = "failed"

try:
if dest.channel == "feishu":
resp_bc = await feishu_service.send_message(
config.app_id, config.app_secret,
receive_id=chat_id,
msg_type="text",
content=_json.dumps({"text": content}, ensure_ascii=False),
receive_id_type="chat_id"
)
status = "sent" if resp_bc.get("code") == 0 else "failed"
elif dest.channel == "wecom":
status = "not_implemented"
elif dest.channel == "dingtalk":
status = "not_implemented"
except Exception as e:
logger.error(f"[Gateway] Broadcast error: {e}")
status = "error"

broadcast_results.append({
"channel": dest.channel,
"group": dest.group,
"chat_id": chat_id,
"status": status
})
else:
broadcast_results.append({
"channel": dest.channel,
"group": dest.group,
"status": "not_found"
})

return {
"status": "sent",
"target": target_member.name,
"type": "human",
"channel": "feishu",
"broadcast": broadcast_results,
}
else:
raise HTTPException(
Expand Down
2 changes: 2 additions & 0 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,11 @@ def _bg_task_error(t):
from app.api.webhooks import router as webhooks_router
from app.api.notification import router as notification_router
from app.api.gateway import router as gateway_router
from app.api.agent_groups import router as agent_groups_router
from app.api.admin import router as admin_router
from app.api.pages import router as pages_router, public_router as pages_public_router

app.include_router(agent_groups_router, prefix=settings.API_PREFIX)
app.include_router(auth_router, prefix=settings.API_PREFIX)
app.include_router(agents_router, prefix=settings.API_PREFIX)
app.include_router(tasks_router, prefix=settings.API_PREFIX)
Expand Down
Loading