命令队列 (2026-01-16)
我们通过一个小型进程内队列对入站自动回复运行(所有渠道)进行串行化,防止多个 agent 运行相互冲突,同时允许跨会话的安全并行。Why
- 自动回复运行可能成本很高(LLM 调用),且多条入站消息靠得很近时会发生冲突。
- 串行化可避免争用共享资源(会话文件、日志、CLI stdin),并降低上游限流风险。
How it works
- 一个具备 lane 的 FIFO 队列按 lane 进行排队,且每个 lane 有可配置并发上限(未配置的 lane 默认 1;main 默认 4,subagent 默认 8)。
runEmbeddedPiAgent按session key入队(lane 为session:<key>),确保同一会话同一时间仅一个运行。- 每个会话运行随后入队到全局 lane(默认
main),以agents.defaults.maxConcurrent作为整体并行上限。 - 启用 verbose 日志时,若排队超过 ~2s 才开始,会输出短提示。
- 当支持时,typing indicators 在入队时立即触发,因此等待排队不会改变用户体验。
队列模式(按渠道)
入站消息可以 steer 当前运行、等待 followup 回合,或两者兼具:steer:立即注入当前运行(在下一个工具边界后取消待执行工具调用)。若非 streaming,则回退为 followup。followup:在当前运行结束后排队进入下一个 agent 回合。collect:将所有排队消息合并为一个 followup 回合(默认)。若消息目标不同渠道/线程,会分别排空以保持路由。steer-backlog(即steer+backlog):立即 steer,并保留消息用于 followup 回合。interrupt(旧):中止该会话的活动运行,然后运行最新消息。queue(旧别名):等同于steer。
collect/steer。
以独立命令发送 /queue collect(按会话)或设置 messages.queue.byChannel.discord: "collect"。
默认值(配置未设置时):
- 所有表面 →
collect
messages.queue 全局或按渠道配置:
队列选项
选项适用于followup、collect 与 steer-backlog(以及 steer 回退为 followup 时):
debounceMs:等待静默后再启动 followup 回合(防止“继续、继续”)。cap:每个会话最大排队消息数。drop:溢出策略(old、new、summarize)。
debounceMs: 1000、cap: 20、drop: summarize。
按会话覆盖
- 以独立命令发送
/queue <mode>以保存当前会话模式。 - 选项可组合:
/queue collect debounce:2s cap:25 drop:summarize /queue default或/queue reset清除会话覆盖。
范围与保证
- 适用于所有使用 gateway 回复管线的入站渠道的自动回复 agent 运行(WhatsApp web、Telegram、Slack、Discord、Signal、iMessage、webchat 等)。
- 默认 lane(
main)为进程范围,涵盖入站与主 heartbeat;设置agents.defaults.maxConcurrent可允许多个会话并行。 - 可能存在其他 lanes(如
cron、subagent),使后台任务并行而不阻塞入站回复。 - 每会话 lane 保证同一会话同一时间仅一个 agent 运行。
- 无外部依赖或后台 worker 线程;纯 TypeScript + promises。
排查
- 若命令似乎卡住,启用 verbose 日志并查找 “queued for …ms” 行,确认队列正在排空。
- 若需要队列深度,启用 verbose 日志并观察队列计时行。