From b678c55e8bef5986bfda394fab12a3124b7016e7 Mon Sep 17 00:00:00 2001 From: YuanZhang Date: Sun, 29 Mar 2026 09:36:34 +0800 Subject: [PATCH 1/5] feat: add group relationship support for agent broadcast messages - Add AgentGroup model for managing group relationships - Add agent_groups API for CRUD operations - Modify gateway send-message to support group targets - Add group relationships UI in AgentDetail page - Add broadcast_groups UI in ChannelConfig (feishu) --- backend/app/api/agent_groups.py | 97 +++++++++ backend/app/api/gateway.py | 240 ++++++++++++++++++++++ backend/app/main.py | 2 + backend/app/models/org.py | 16 ++ backend/app/schemas/schemas.py | 8 + frontend/src/components/ChannelConfig.tsx | 54 ++++- frontend/src/pages/AgentDetail.tsx | 84 ++++++++ 7 files changed, 500 insertions(+), 1 deletion(-) create mode 100644 backend/app/api/agent_groups.py diff --git a/backend/app/api/agent_groups.py b/backend/app/api/agent_groups.py new file mode 100644 index 00000000..9f3e77d8 --- /dev/null +++ b/backend/app/api/agent_groups.py @@ -0,0 +1,97 @@ +"""Agent Group Relationships API.""" + +import uuid +from datetime import datetime, timezone + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel, Field +from sqlalchemy import select, delete +from sqlalchemy.ext.asyncio import AsyncSession + +from app.database import get_db +from app.models.agent import Agent +from app.models.org import AgentGroup +from app.core.security import get_current_user +from app.models.user import User + +router = APIRouter(prefix="/agents", tags=["agent-groups"]) + + +class AgentGroupCreate(BaseModel): + group_name: str = Field(min_length=1, max_length=100) + chat_id: str = Field(min_length=1, max_length=200) + channel: str = Field(default="feishu") + description: str = "" + + +class AgentGroupOut(BaseModel): + id: uuid.UUID + agent_id: uuid.UUID + group_name: str + chat_id: str + channel: str + description: str + created_at: datetime + + model_config = {"from_attributes": True} + + +@router.get("/{agent_id}/relationships/groups", response_model=list[AgentGroupOut]) +async def list_groups( + agent_id: uuid.UUID, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + result = await db.execute( + select(AgentGroup) + .where(AgentGroup.agent_id == agent_id) + .order_by(AgentGroup.created_at.desc()) + ) + groups = result.scalars().all() + return [AgentGroupOut.model_validate(g) for g in groups] + + +@router.put("/{agent_id}/relationships/groups", response_model=list[AgentGroupOut]) +async def update_groups( + agent_id: uuid.UUID, + groups: list[AgentGroupCreate], + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + await db.execute( + delete(AgentGroup).where(AgentGroup.agent_id == agent_id) + ) + new_groups = [] + for g in groups: + new_group = AgentGroup( + agent_id=agent_id, + group_name=g.group_name, + chat_id=g.chat_id, + channel=g.channel, + description=g.description, + ) + db.add(new_group) + new_groups.append(new_group) + await db.commit() + for g in new_groups: + await db.refresh(g) + return [AgentGroupOut.model_validate(g) for g in new_groups] + + +@router.delete("/{agent_id}/relationships/groups/{group_id}") +async def delete_group( + agent_id: uuid.UUID, + group_id: uuid.UUID, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_current_user), +): + result = await db.execute( + delete(AgentGroup).where( + AgentGroup.id == group_id, + AgentGroup.agent_id == agent_id, + ) + ) + await db.commit() + if result.rowcount == 0: + raise HTTPException(status_code=404, detail="Group not found") + return {"status": "ok"} diff --git a/backend/app/api/gateway.py b/backend/app/api/gateway.py index 8182562b..af00cad9 100644 --- a/backend/app/api/gateway.py +++ b/backend/app/api/gateway.py @@ -518,6 +518,9 @@ async def send_message( agent = await _get_agent_by_key(x_api_key, db) agent.openclaw_last_seen = datetime.now(timezone.utc) + # Initialize broadcast results + broadcast_results = [] + target_name = body.target.strip() content = body.content.strip() channel_hint = (body.channel or "").strip().lower() @@ -544,10 +547,69 @@ async def send_message( ) db.add(gw_msg) await db.commit() + + # Broadcast to groups if destinations specified + if body.destinations: + from app.models.channel_config import ChannelConfig + from app.services.feishu_service import feishu_service + import json as _json + + config_result = await db.execute( + select(ChannelConfig).where(ChannelConfig.agent_id == agent.id) + ) + config = config_result.scalar_one_or_none() + + if config and config.extra_config: + broadcast_groups = config.extra_config.get("broadcast_groups", []) + + for dest in body.destinations: + matched_group = None + for group in broadcast_groups: + if (group.get("channel") == dest.channel and + group.get("name") == dest.group): + matched_group = group + break + + if matched_group: + chat_id = matched_group.get("chat_id") + status = "failed" + + try: + if dest.channel == "feishu": + resp = await feishu_service.send_message( + config.app_id, config.app_secret, + receive_id=chat_id, + msg_type="text", + content=_json.dumps({"text": content}, ensure_ascii=False), + receive_id_type="chat_id" + ) + status = "sent" if resp.get("code") == 0 else "failed" + elif dest.channel == "wecom": + status = "not_implemented" + elif dest.channel == "dingtalk": + status = "not_implemented" + except Exception as e: + logger.error(f"[Gateway] Broadcast error: {e}") + status = "error" + + broadcast_results.append({ + "channel": dest.channel, + "group": dest.group, + "chat_id": chat_id, + "status": status + }) + else: + broadcast_results.append({ + "channel": dest.channel, + "group": dest.group, + "status": "not_found" + }) + return { "status": "accepted", "target": target_agent.name, "type": "openclaw_agent", + "broadcast": broadcast_results, "message": f"Message sent to {target_agent.name}. Reply will appear in your next poll.", } else: @@ -567,13 +629,133 @@ async def send_message( )) _background_tasks.add(task) task.add_done_callback(_background_tasks.discard) + + # Broadcast to groups if destinations specified + if body.destinations: + from app.models.channel_config import ChannelConfig + from app.services.feishu_service import feishu_service + import json as _json + + config_result = await db.execute( + select(ChannelConfig).where(ChannelConfig.agent_id == agent.id) + ) + config = config_result.scalar_one_or_none() + + if config and config.extra_config: + broadcast_groups = config.extra_config.get("broadcast_groups", []) + + for dest in body.destinations: + matched_group = None + for group in broadcast_groups: + if (group.get("channel") == dest.channel and + group.get("name") == dest.group): + matched_group = group + break + + if matched_group: + chat_id = matched_group.get("chat_id") + status = "failed" + + try: + if dest.channel == "feishu": + resp = await feishu_service.send_message( + config.app_id, config.app_secret, + receive_id=chat_id, + msg_type="text", + content=_json.dumps({"text": content}, ensure_ascii=False), + receive_id_type="chat_id" + ) + status = "sent" if resp.get("code") == 0 else "failed" + elif dest.channel == "wecom": + status = "not_implemented" + elif dest.channel == "dingtalk": + status = "not_implemented" + except Exception as e: + logger.error(f"[Gateway] Broadcast error: {e}") + status = "error" + + broadcast_results.append({ + "channel": dest.channel, + "group": dest.group, + "chat_id": chat_id, + "status": status + }) + else: + broadcast_results.append({ + "channel": dest.channel, + "group": dest.group, + "status": "not_found" + }) + return { "status": "accepted", "target": target_agent.name, "type": "agent", + "broadcast": broadcast_results, "message": f"Message sent to {target_agent.name}. Reply will appear in your next poll.", } + + # 1.5. Try to find target as a Group (via AgentGroup relationships) + from app.models.org import AgentGroup + group_result = await db.execute( + select(AgentGroup).where( + AgentGroup.agent_id == agent.id, + AgentGroup.group_name.ilike(f"%{target_name}%") + ) + ) + target_group = group_result.scalars().first() + + if target_group: + from app.models.channel_config import ChannelConfig + from app.services.feishu_service import feishu_service + import json as _json + + config_result = await db.execute( + select(ChannelConfig).where(ChannelConfig.agent_id == agent.id) + ) + config = config_result.scalar_one_or_none() + + if not config: + await db.commit() + raise HTTPException(status_code=400, detail="No channel configured for this agent") + + status = "failed" + resp = None + try: + if target_group.channel == "feishu": + resp = await feishu_service.send_message( + config.app_id, config.app_secret, + receive_id=target_group.chat_id, + msg_type="text", + content=_json.dumps({"text": content}, ensure_ascii=False), + receive_id_type="chat_id" + ) + status = "sent" if resp.get("code") == 0 else "failed" + elif target_group.channel == "wecom": + status = "not_implemented" + elif target_group.channel == "dingtalk": + status = "not_implemented" + except Exception as e: + logger.error(f"[Gateway] Group send error: {e}") + status = "error" + + await db.commit() + + if status == "sent": + return { + "status": "sent", + "target": target_group.group_name, + "type": "group", + "channel": target_group.channel, + "message": f"Message sent to group {target_group.group_name}.", + } + else: + raise HTTPException( + status_code=502, + detail=f"Failed to send to group {target_group.group_name}: {resp.get('msg') if resp else 'unknown error'} (code {resp.get('code') if resp else 'N/A'})" + ) + # 2. Try to find target as a human (via relationships) from app.models.org import AgentRelationship from sqlalchemy.orm import selectinload @@ -646,11 +828,69 @@ async def send_message( await db.commit() if resp and resp.get("code") == 0: + # Broadcast to groups if destinations specified + if body.destinations: + from app.models.channel_config import ChannelConfig + from app.services.feishu_service import feishu_service + import json as _json + + config_result = await db.execute( + select(ChannelConfig).where(ChannelConfig.agent_id == agent.id) + ) + config = config_result.scalar_one_or_none() + + if config and config.extra_config: + broadcast_groups = config.extra_config.get("broadcast_groups", []) + + for dest in body.destinations: + matched_group = None + for group in broadcast_groups: + if (group.get("channel") == dest.channel and + group.get("name") == dest.group): + matched_group = group + break + + if matched_group: + chat_id = matched_group.get("chat_id") + status = "failed" + + try: + if dest.channel == "feishu": + resp_bc = await feishu_service.send_message( + config.app_id, config.app_secret, + receive_id=chat_id, + msg_type="text", + content=_json.dumps({"text": content}, ensure_ascii=False), + receive_id_type="chat_id" + ) + status = "sent" if resp_bc.get("code") == 0 else "failed" + elif dest.channel == "wecom": + status = "not_implemented" + elif dest.channel == "dingtalk": + status = "not_implemented" + except Exception as e: + logger.error(f"[Gateway] Broadcast error: {e}") + status = "error" + + broadcast_results.append({ + "channel": dest.channel, + "group": dest.group, + "chat_id": chat_id, + "status": status + }) + else: + broadcast_results.append({ + "channel": dest.channel, + "group": dest.group, + "status": "not_found" + }) + return { "status": "sent", "target": target_member.name, "type": "human", "channel": "feishu", + "broadcast": broadcast_results, } else: raise HTTPException( diff --git a/backend/app/main.py b/backend/app/main.py index 83c46dba..b585a03a 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -279,9 +279,11 @@ def _bg_task_error(t): from app.api.webhooks import router as webhooks_router from app.api.notification import router as notification_router from app.api.gateway import router as gateway_router +from app.api.agent_groups import router as agent_groups_router from app.api.admin import router as admin_router from app.api.pages import router as pages_router, public_router as pages_public_router +app.include_router(agent_groups_router, prefix=settings.API_PREFIX) app.include_router(auth_router, prefix=settings.API_PREFIX) app.include_router(agents_router, prefix=settings.API_PREFIX) app.include_router(tasks_router, prefix=settings.API_PREFIX) diff --git a/backend/app/models/org.py b/backend/app/models/org.py index 017e3318..a543075c 100644 --- a/backend/app/models/org.py +++ b/backend/app/models/org.py @@ -76,6 +76,22 @@ class AgentRelationship(Base): member: Mapped["OrgMember"] = relationship() + + +class AgentGroup(Base): + """Group relationship for an agent - allows broadcasting to chat groups.""" + + __tablename__ = "agent_groups" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + agent_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("agents.id", ondelete="CASCADE"), nullable=False) + group_name: Mapped[str] = mapped_column(String(100), nullable=False) + chat_id: Mapped[str] = mapped_column(String(200), nullable=False) + channel: Mapped[str] = mapped_column(String(20), nullable=False, default="feishu") # feishu | wecom | dingtalk + description: Mapped[str] = mapped_column(Text, default="") + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + + class AgentAgentRelationship(Base): """Relationship between two agents (digital employees).""" diff --git a/backend/app/schemas/schemas.py b/backend/app/schemas/schemas.py index 35172338..68f9efab 100644 --- a/backend/app/schemas/schemas.py +++ b/backend/app/schemas/schemas.py @@ -448,7 +448,15 @@ class GatewayReportRequest(BaseModel): result: str = Field(min_length=1) + + +class MessageDestination(BaseModel): + """Destination for broadcasting a message to a channel/group.""" + channel: str # feishu | wecom | dingtalk + group: str # Group name (matches config name) + class GatewaySendMessageRequest(BaseModel): target: str # Name of target person or agent content: str = Field(min_length=1) channel: str | None = None # Optional: "feishu", "agent", etc. Auto-detected if omitted. + destinations: list[MessageDestination] | None = None # Optional: broadcast destinations diff --git a/frontend/src/components/ChannelConfig.tsx b/frontend/src/components/ChannelConfig.tsx index d2fdc498..92c80406 100644 --- a/frontend/src/components/ChannelConfig.tsx +++ b/frontend/src/components/ChannelConfig.tsx @@ -284,6 +284,13 @@ export default function ChannelConfig({ mode, agentId, canManage = true, values, discord: 'gateway', }); + // Broadcast groups state + const [broadcastGroups, setBroadcastGroups] = useState>([]); + const addBroadcastGroup = () => setBroadcastGroups(prev => [...prev, {channel: 'feishu', name: '', chat_id: ''}]); + const removeBroadcastGroup = (index: number) => setBroadcastGroups(prev => prev.filter((_, i) => i !== index)); + const updateBroadcastGroup = (index: number, field: string, value: string) => + setBroadcastGroups(prev => prev.map((g, i) => i === index ? {...g, [field]: value} : g)); + // Password visibility const [showPwds, setShowPwds] = useState>({}); const togglePwd = (fieldId: string) => setShowPwds(p => ({ ...p, [fieldId]: !p[fieldId] })); @@ -460,7 +467,10 @@ export default function ChannelConfig({ mode, agentId, canManage = true, values, app_id: form.app_id, app_secret: form.app_secret, encrypt_key: form.encrypt_key || undefined, - extra_config: { connection_mode: connectionModes.feishu || 'websocket' }, + extra_config: { + connection_mode: connectionModes.feishu || 'websocket', + broadcast_groups: broadcastGroups.filter(g => g.name && g.chat_id), + }, }; } if (ch.id === 'wecom') { @@ -839,6 +849,7 @@ export default function ChannelConfig({ mode, agentId, canManage = true, values, prefill.app_secret = config.app_secret || ''; prefill.encrypt_key = config.encrypt_key || ''; setConnectionModes(prev => ({ ...prev, feishu: config.extra_config?.connection_mode || 'websocket' })); + setBroadcastGroups(config.extra_config?.broadcast_groups || []); } else if (ch.id === 'wecom') { const cm = config.extra_config?.connection_mode === 'websocket' ? 'websocket' : 'webhook'; setConnectionModes(prev => ({ ...prev, wecom: cm })); @@ -917,6 +928,47 @@ export default function ChannelConfig({ mode, agentId, canManage = true, values, renderField(field, ch.id, form[field.key] || '', (val) => setFormField(ch.id, field.key, val)) )} + {/* Broadcast Groups Configuration (Feishu only) */} + {ch.id === 'feishu' && ( +
+
+ 📢 群组广播配置 + +
+
+ 配置后,Agent发送消息时会自动同步到指定的飞书群 +
+ {broadcastGroups.map((group, index) => ( +
+ + updateBroadcastGroup(index, 'name', e.target.value)} + placeholder="群组名称" + style={{ flex: 1, fontSize: '12px', padding: '4px 8px', borderRadius: '4px', border: '1px solid var(--border-default)' }} /> + updateBroadcastGroup(index, 'chat_id', e.target.value)} + placeholder="Chat ID" + style={{ flex: 1, fontSize: '12px', padding: '4px 8px', borderRadius: '4px', border: '1px solid var(--border-default)' }} /> + +
+ ))} + {broadcastGroups.length === 0 && ( +
+ 暂未配置群组广播 +
+ )} +
+ )} + {/* Atlassian extra hints */} {ch.id === 'atlassian' && ( <> diff --git a/frontend/src/pages/AgentDetail.tsx b/frontend/src/pages/AgentDetail.tsx index 0a89e51f..8bc8b8dd 100644 --- a/frontend/src/pages/AgentDetail.tsx +++ b/frontend/src/pages/AgentDetail.tsx @@ -593,6 +593,32 @@ function RelationshipEditor({ agentId, readOnly = false }: { agentId: string; re const [editAgentRelation, setEditAgentRelation] = useState(''); const [editAgentDescription, setEditAgentDescription] = useState(''); + // Group relationships state + const [addingGroup, setAddingGroup] = useState(false); + const [newGroupName, setNewGroupName] = useState(''); + const [newGroupChatId, setNewGroupChatId] = useState(''); + const [newGroupChannel, setNewGroupChannel] = useState('feishu'); + const [newGroupDesc, setNewGroupDesc] = useState(''); + + const { data: agentGroups = [], refetch: refetchGroups } = useQuery({ + queryKey: ['agent-groups', agentId], + queryFn: () => fetchAuth(`/agents/${agentId}/relationships/groups`), + }); + + const addGroupRelationship = async () => { + if (!newGroupName || !newGroupChatId) return; + const existing = agentGroups.map((g: any) => ({ group_name: g.group_name, chat_id: g.chat_id, channel: g.channel, description: g.description })); + existing.push({ group_name: newGroupName, chat_id: newGroupChatId, channel: newGroupChannel, description: newGroupDesc }); + await fetchAuth(`/agents/${agentId}/relationships/groups`, { method: 'PUT', body: JSON.stringify(existing) }); + setAddingGroup(false); setNewGroupName(''); setNewGroupChatId(''); setNewGroupChannel('feishu'); setNewGroupDesc(''); + refetchGroups(); + }; + + const removeGroupRelationship = async (groupId: string) => { + await fetchAuth(`/agents/${agentId}/relationships/groups/${groupId}`, { method: 'DELETE' }); + refetchGroups(); + }; + const { data: relationships = [], refetch } = useQuery({ queryKey: ['relationships', agentId], queryFn: () => fetchAuth(`/agents/${agentId}/relationships/`), @@ -819,6 +845,64 @@ function RelationshipEditor({ agentId, readOnly = false }: { agentId: string; re )} + + {/* ── Group Relationships ── */} +
+

+ 📢 群组广播 + — Agent可向以下群组发送广播消息 +

+

+ 配置Agent的关系网络中的群组,Agent发送消息时可直接发送到这些群。 +

+ {agentGroups.length > 0 && ( +
+ {agentGroups.map((g: any) => ( +
+
+
G
+
+
+ {g.group_name} + + {g.channel === 'feishu' ? '飞书' : g.channel === 'wecom' ? '企微' : g.channel === 'dingtalk' ? '钉钉' : g.channel} + +
+
Chat ID: {g.chat_id}
+ {g.description &&
{g.description}
} +
+ {!readOnly && ( + + )} +
+
+ ))} +
+ )} + {!readOnly && !addingGroup && ( + + )} + {!readOnly && addingGroup && ( +
+
+ setNewGroupName(e.target.value)} style={{ flex: 1, fontSize: '12px' }} /> + +
+ setNewGroupChatId(e.target.value)} style={{ fontSize: '12px', marginBottom: '8px', width: '100%' }} /> +