开启左侧

Openclaw Pre-compaction memory 核心代码分析

[复制链接]
作者:CSDN博客
Openclaw Pre-compaction memory 核心代码分析
# "Pre-compaction memory" 操作代码分析

## 核心代码位置
主要代码位于 e:\2fen\node_modules\openclaw\dist\extensionAPI.js 文件中,对应源码文件 src/auto-reply/reply/memory-flush.ts 。
## 关键常量定义
// 默认内存刷新软阈值,设置为 4000 个 token
const DEFAULT_MEMORY_FLUSH_SOFT_TOKENS = 4e3;
// 默认内存刷新提示,当会话接近自动压缩时发送给代理
const DEFAULT_MEMORY_FLUSH_PROMPT = [
    "Pre-compaction memory flush.",
    "Store durable memories now (use memory/YYYY-MM-DD.md; create memory/ if needed).",
    `If nothing to store, reply with ${SILENT_REPLY_TOKEN}.`
].join(" ");
// 默认内存刷新系统提示,为代理提供上下文
const DEFAULT_MEMORY_FLUSH_SYSTEM_PROMPT = [
    "Pre-compaction memory flush turn.",
    "The session is near auto-compaction; capture durable memories to disk.",
    `You may reply, but usually ${SILENT_REPLY_TOKEN} is correct.`
].join(" ");
## 关键函数分析
### 1. 解析内存刷新设置
function resolveMemoryFlushSettings(cfg) {
    // 从配置中获取默认的内存刷新设置
    const defaults = cfg?.agents?.defaults?.compaction?.memoryFlush;
    // 检查内存刷新是否启用,默认为 true
    const enabled = defaults?.enabled ?? true;
    if (!enabled) return null;

    // 解析软阈值 token 数,默认为 DEFAULT_MEMORY_FLUSH_SOFT_TOKENS
    const softThresholdTokens = normalizeNonNegativeInt(defaults?.softThresholdTokens) ?? DEFAULT_MEMORY_FLUSH_SOFT_TOKENS;

    // 解析提示文本,默认为 DEFAULT_MEMORY_FLUSH_PROMPT
    const prompt = defaults?.prompt?.trim() || DEFAULT_MEMORY_FLUSH_PROMPT;

    // 解析系统提示文本,默认为 DEFAULT_MEMORY_FLUSH_SYSTEM_PROMPT
    const systemPrompt = defaults?.systemPrompt?.trim() || DEFAULT_MEMORY_FLUSH_SYSTEM_PROMPT;

    // 解析保留 token 数下限
    const reserveTokensFloor = normalizeNonNegativeInt(cfg?.agents?.defaults?.compaction?.reserveTokensFloor) ?? DEFAULT_PI_COMPACTION_RESERVE_TOKENS_FLOOR;

    return {
        enabled,
        softThresholdTokens,
        prompt: ensureNoReplyHint(prompt),
        systemPrompt: ensureNoReplyHint(systemPrompt),
        reserveTokensFloor
    };
}
2. 检查是否应该运行内存刷新
function shouldRunMemoryFlush(params) {
    // 获取总 token 数
    const totalTokens = params.entry?.totalTokens;
    if (!totalTokens || totalTokens <= 0) return false;

    // 计算上下文窗口大小
    const contextWindow = Math.max(1, Math.floor(params.contextWindowTokens));

    // 获取保留 token 数
    const reserveTokens = Math.max(0, Math.floor(params.reserveTokensFloor));

    // 获取软阈值
    const softThreshold = Math.max(0, Math.floor(params.softThresholdTokens));

    // 计算触发内存刷新的阈值
    const threshold = Math.max(0, contextWindow - reserveTokens - softThreshold);
    if (threshold <= 0) return false;

    // 检查总 token 数是否超过阈值
    if (totalTokens < threshold) return false;

    // 获取当前压缩计数和上一次内存刷新的压缩计数
    const compactionCount = params.entry?.compactionCount ?? 0;
    const lastFlushAt = params.entry?.memoryFlushCompactionCount;

    // 检查上一次内存刷新是否已经在当前压缩计数中执行过
    if (typeof lastFlushAt === "number" && lastFlushAt === compactionCount) return false;

    return true;
}
3. 运行内存刷新(如果需要)
async function runMemoryFlushIfNeeded(params) {
    // 解析内存刷新设置
    const memoryFlushSettings = resolveMemoryFlushSettings(params.cfg);
    if (!memoryFlushSettings) return params.sessionEntry;

    // 检查内存刷新是否可写
    const memoryFlushWritable = (() => {
        if (!params.sessionKey) return true;
        const runtime = resolveSandboxRuntimeStatus({
            cfg: params.cfg,
            sessionKey: params.sessionKey
        });
        if (!runtime.sandboxed) return true;
        return resolveSandboxConfigForAgent(params.cfg, runtime.agentId).workspaceAccess === "rw";
    })();

    // 检查是否需要运行内存刷新
    if (!(memoryFlushSettings && memoryFlushWritable && !params.isHeartbeat && !isCliProvider(params.followupRun.run.provider, params.cfg) && shouldRunMemoryFlush({
        entry: params.sessionEntry ?? (params.sessionKey ? params.sessionStore?.[params.sessionKey] : void 0),
        contextWindowTokens: resolveMemoryFlushContextWindowTokens({
            modelId: params.followupRun.run.model ?? params.defaultModel,
            agentCfgContextTokens: params.agentCfgContextTokens
        }),
        reserveTokensFloor: memoryFlushSettings.reserveTokensFloor,
        softThresholdTokens: memoryFlushSettings.softThresholdTokens
    }))) return params.sessionEntry;

    let activeSessionEntry = params.sessionEntry;
    const activeSessionStore = params.sessionStore;
    const flushRunId = crypto.randomUUID();

    if (params.sessionKey) registerAgentRunContext(flushRunId, {
        sessionKey: params.sessionKey,
        verboseLevel: params.resolvedVerboseLevel
    });

    let memoryCompactionCompleted = false;
    const flushSystemPrompt = [params.followupRun.run.extraSystemPrompt, memoryFlushSettings.systemPrompt].filter(Boolean).join("\n\n");

    try {
        await runWithModelFallback({
            cfg: params.followupRun.run.config,
            provider: params.followupRun.run.provider,
            model: params.followupRun.run.model,
            agentDir: params.followupRun.run.agentDir,
            fallbacksOverride: resolveAgentModelFallbacksOverride(params.followupRun.run.config, resolveAgentIdFromSessionKey(params.followupRun.run.sessionKey)),
            run: (provider, model) => {
                const authProfileId = provider === params.followupRun.run.provider ? params.followupRun.run.authProfileId : void 0;
                return runEmbeddedPiAgent({
                    sessionId: params.followupRun.run.sessionId,
                    sessionKey: params.sessionKey,
                    agentId: params.followupRun.run.agentId,
                    messageProvider: params.sessionCtx.Provider?.trim().toLowerCase() || void 0,
                    agentAccountId: params.sessionCtx.AccountId,
                    messageTo: params.sessionCtx.OriginatingTo ?? params.sessionCtx.To,
                    messageThreadId: params.sessionCtx.MessageThreadId ?? void 0,
                    ...buildThreadingToolContext({
                        sessionCtx: params.sessionCtx,
                        config: params.followupRun.run.config,
                        hasRepliedRef: params.opts?.hasRepliedRef
                    }),
                    senderId: params.sessionCtx.SenderId?.trim() || void 0,
                    senderName: params.sessionCtx.SenderName?.trim() || void 0,
                    senderUsername: params.sessionCtx.SenderUsername?.trim() || void 0,
                    senderE164: params.sessionCtx.SenderE164?.trim() || void 0,
                    sessionFile: params.followupRun.run.sessionFile,
                    workspaceDir: params.followupRun.run.workspaceDir,
                    agentDir: params.followupRun.run.agentDir,
                    config: params.followupRun.run.config,
                    skillsSnapshot: params.followupRun.run.skillsSnapshot,
                    prompt: memoryFlushSettings.prompt,
                    extraSystemPrompt: flushSystemPrompt,
                    ownerNumbers: params.followupRun.run.ownerNumbers,
                    enforceFinalTag: resolveEnforceFinalTag(params.followupRun.run, provider),
                    provider,
                    model,
                    authProfileId,
                    authProfileIdSource: authProfileId ? params.followupRun.run.authProfileIdSource : void 0,
                    thinkLevel: params.followupRun.run.thinkLevel,
                    verboseLevel: params.followupRun.run.verboseLevel,
                    reasoningLevel: params.followupRun.run.reasoningLevel,
                    execOverrides: params.followupRun.run.execOverrides,
                    bashElevated: params.followupRun.run.bashElevated,
                    timeoutMs: params.followupRun.run.timeoutMs,
                    runId: flushRunId,
                    onAgentEvent: (evt) => {
                        if (evt.stream === "compaction") {
                            const phase = typeof evt.data.phase === "string" ? evt.data.phase : "";
                            const willRetry = Boolean(evt.data.willRetry);
                            if (phase === "end" && !willRetry) memoryCompactionCompleted = true;
                        }
                    }
                });
            }
        });

        let memoryFlushCompactionCount = activeSessionEntry?.compactionCount ?? (params.sessionKey ? activeSessionStore?.[params.sessionKey]?.compactionCount : 0) ?? 0;
        if (memoryCompactionCompleted) {
            const nextCount = await incrementCompactionCount({
                sessionEntry: activeSessionEntry,
                sessionStore: activeSessionStore,
                sessionKey: params.sessionKey,
                storePath: params.storePath
            });
            if (typeof nextCount === "number") memoryFlushCompactionCount = nextCount;
        }

        if (params.storePath && params.sessionKey) try {
            const updatedEntry = await updateSessionStoreEntry({
                storePath: params.storePath,
                sessionKey: params.sessionKey,
                update: async () => ({
                    memoryFlushAt: Date.now(),
                    memoryFlushCompactionCount
                })
            });
            if (updatedEntry) activeSessionEntry = updatedEntry;
        } catch (err) {
            logVerbose(`failed to persist memory flush metadata: ${String(err)}`);
        }
    } catch (err) {
        logVerbose(`memory flush run failed: ${String(err)}`);
    }

    return activeSessionEntry;
}
### 函数功能说明:
1. 内存刷新设置解析 :首先解析配置中的内存刷新设置,如果没有设置则直接返回当前会话条目。
2. 可写性检查 :通过立即执行函数表达式(IIFE)检查内存刷新是否可写,考虑了沙箱环境的权限限制。
3. 触发条件检查 :综合多个条件判断是否需要执行内存刷新:

   - 存在内存刷新设置
   - 内存可写
   - 不是心跳操作
   - 不是CLI提供商
   - 满足内存刷新的令牌阈值条件
4. 内存刷新执行 :

   - 生成唯一的刷新运行ID
   - 注册代理运行上下文
   - 使用模型回退机制运行嵌入式PI代理
   - 执行内存压缩操作
5. 状态更新 :

   - 跟踪内存压缩是否完成
   - 更新压缩计数
   - 持久化内存刷新元数据到会话存储
6. 错误处理 :捕获并记录内存刷新过程中的错误,但不影响主流程。
7. 返回结果 :返回更新后的会话条目。
该函数是OpenClaw内存管理系统的核心部分,负责在适当的时候触发内存刷新操作,以优化模型的上下文窗口使用并持久化重要信息。
## 内存刷新机制详解
### 1. 什么是 "Pre-compaction memory flush"?
"Pre-compaction memory flush" 是 OpenClaw 的一种内存管理机制,当会话接近自动压缩时触发,用于将临时内存中的重要信息持久化到磁盘上的内存文件中。
### 2. 触发条件
内存刷新会在以下条件满足时触发:
- 会话总 token 数大于 0
- 计算阈值 = 上下文窗口大小 - 保留 token 数 - 软阈值
- 会话总 token 数超过计算阈值
- 上一次内存刷新的压缩计数不等于当前压缩计数
### 3. 执行流程
1. 解析内存刷新设置,包括软阈值、提示文本等
2. 检查是否满足内存刷新条件
3. 如果满足条件,运行内存刷新
4. 发送内存刷新提示给代理,要求代理将重要信息存储到 memory/YYYY-MM-DD.md 文件中
5. 代理可以选择回复,或者发送 NO_REPLY 表示没有需要存储的信息
### 4. 目的和作用
- 防止信息丢失 :在会话压缩前,将重要信息持久化到磁盘
- 优化内存使用 :通过定期刷新,减少会话中的冗余信息
- 提高代理性能 :让代理能够将重要信息存储到长期内存中,减少临时内存占用
## 如何禁用 "Pre-compaction memory flush"
可以通过修改 OpenClaw 配置文件 openclaw.json 来禁用内存刷新:
"agents": {
    "defaults": {
        "compaction": {
            "memoryFlush": {
                "enabled": false
            }
        }
    }
}
## 代码优化建议
1. 可配置性增强 :

   - 增加内存刷新频率的配置选项
   - 允许用户自定义内存刷新的触发阈值
2. 错误处理 :

   - 增加内存文件写入失败的错误处理
   - 增加内存刷新过程中的异常捕获
3. 监控和日志 :

   - 增加内存刷新的详细日志
   - 提供内存使用情况的监控指标
## 总结
"Pre-compaction memory flush" 是 OpenClaw 的一项重要内存管理机制,通过在会话压缩前将重要信息持久化到磁盘,防止信息丢失并优化内存使用。该机制通过配置文件可灵活调整,默认设置适用于大多数场景。


原文地址:https://blog.csdn.net/Wange_video/article/details/158206286
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

发布主题
阅读排行更多+

Powered by Discuz! X3.4© 2001-2013 Discuz Team.( 京ICP备17022993号-3 )