01

初识 CocoIndex

从安装到第一个管道——理解增量数据索引引擎的完整架构

CocoIndex 是什么?

想象你有一个超级数据管家:它能自动读取代码仓库、PDF 文档、会议纪要,提取结构化信息,生成向量嵌入,存入数据库——而且每当数据变化时只重新处理变动的部分。这就是 CocoIndex

💡
核心理念

CocoIndex 不是传统的 ETL 工具——它是声明式的增量索引引擎。你只需声明"目标应该长什么样",引擎自动跟踪源数据变化,只重新计算受影响的部分(delta)。就像 React 的状态驱动 UI 更新一样,但作用于数据管道。

🥥

增量引擎

只处理变化的数据(delta),不变的数据直接命中缓存。10,000 个文件改了 1 个,只重新处理那 1 个。

🐍

Python 声明式

用纯 Python 声明数据管道,不需要 DAG、不需要配置文件。代码即管道,直觉即架构。

🦀

Rust 核心

底层引擎用 Rust 编写,天然并行、零拷贝、故障隔离。生产级可靠性从第一天开始。

🔌

丰富连接器

内置 20+ 连接器:本地文件、S3、Google Drive、Postgres、Neo4j、Kafka、Qdrant……即插即用。

核心架构:数据如何从源头流向目标?

CocoIndex 的架构像一座精密的数据工厂——有原料仓库(Source 数据源)、加工车间(Transform 变换函数)、成品库房(Target 目标存储),以及一位永不疲倦的质检经理(增量引擎)。

数据源层 — 数据从哪里来

📁
Source 连接器
📂
LocalFS (本地文件)
☁️
外部连接器 (S3/GDrive/Kafka)

变换层 — 数据怎么处理

@coco.fn 变换函数
✂️
内置 Ops (分块/嵌入)
💾
Memo 记忆化缓存

目标层 — 数据存到哪里

🗄️
Target 目标存储
🎯
TargetState (声明式状态)
📊
独立处理组件
点击任意组件,了解它在架构中的作用

数据旅程:从源文件到可查询的向量索引

当你运行 CocoIndex 管道时,数据经历一段精心设计的旅程。想象一个自动化工厂流水线——原料进入、逐步加工、成品入库,全程由引擎调度。

📁
数据源
变换处理
💾
Memo 缓存
🗄️
目标存储
点击"下一步"开始旅程

代码翻译:读懂第一个 CocoIndex 管道

这是一个最小可运行的 CocoIndex 管道。左边是原始代码,右边是逐行中文解释:

CODE
import cocoindex as coco
from cocoindex.connectors import localfs
from cocoindex.ops.text import RecursiveSplitter

# 定义文件处理函数
@coco.fn(memo=True)
async def process_file(file, table):
    text = await file.read_text()
    chunks = splitter.split(text, chunk_size=2000)
    for chunk in chunks:
        table.declare_row(
            text=chunk.text,
            embedding=embed(chunk.text)
        )

# 创建 App 并运行
app = coco.App("my_index", main, src="./docs")
app.update_blocking()
中文翻译

导入 CocoIndex 核心库

导入连接器:本地文件系统(localfs)

导入文本操作:递归分块器(RecursiveSplitter)

定义文件处理函数,memo=True 启用缓存

@coco.fn 装饰器——把普通函数变成 CocoIndex 变换组件

异步读取文件内容为字符串

用 RecursiveSplitter 将文本按 2000 字节分块

遍历每个分块

声明目标数据行:文本内容和嵌入向量

调用 Embedding 函数生成向量

创建名为 "my_index" 的 App,源目录为 ./docs

update_blocking() 同步运行:扫描→分块→嵌入→存储

📖
@coco.fn 装饰器是什么?

@coco.fn 将普通 Python 函数转换为 CocoIndex 的处理单元。引擎会自动跟踪函数的输入参数和代码逻辑的变化——只要两者都没变,就复用之前的计算结果(Memo 缓存)。加上 memo=True 后,缓存粒度更细,按 hash(输入) + hash(代码) 存储。

安装与快速上手

CocoIndex 的安装只需要一行命令。它是一个纯 Python 包(底层引擎用 Rust 编写,通过 PyO3 绑定)。

TERMINAL
# 安装 CocoIndex
pip install -U cocoindex

# 验证安装
python -c "import cocoindex; print(cocoindex.__version__)"

# 创建项目目录
mkdir my-cocoindex-app && cd my-cocoindex-app

# 配置本地数据库(可选,默认使用 SQLite)
echo "COCOINDEX_DB=./cocoindex.db" > .env
中文翻译

通过 pip 安装最新版 CocoIndex

验证安装成功,打印版本号

CocoIndex 支持 Python 3.10 到 3.13

创建项目工作目录

配置环境变量,使用本地 SQLite 作为内部状态数据库

🎯
关键概念对比

CocoIndex 的思维模型就像 React——你声明目标状态(Target State),引擎自动计算差异(Delta)并同步。不需要手动写"插入/更新/删除"逻辑,框架帮你搞定。公式:TargetState = Transform(SourceState)

检验你的理解

CocoIndex 的核心编程模型是什么?

@coco.fn(memo=True) 中 memo 参数的作用是什么?

CocoIndex 的底层引擎用什么语言编写?

02

数据流管道

掌握 Source、Transform、Target 三段式管道——理解 mount_each 和处理组件

Source 数据源:数据从哪里来?

Source 是管道的入口。CocoIndex 提供了丰富的连接器,覆盖企业级数据接入场景。最常用的是 localfs——本地文件系统连接器。

📂

localfs(本地文件)

扫描本地目录,支持递归遍历和文件名过滤。支持 live 模式持续监控文件变化。

☁️

Amazon S3 / OCI Storage

连接对象存储服务,扫描 Bucket 中的文件。适合云原生场景和企业数据湖。

📊

Postgres / Google Drive

从数据库查询数据,或从 Google Drive 读取文档。支持增量扫描新数据。

📨

Kafka(消息队列)

消费 Kafka Topic 中的消息作为数据源,适合实时数据流场景。

CODE — 使用 localfs 扫描文件
from cocoindex.connectors import localfs
from cocoindex.resources.file import PatternFilePathMatcher

files = localfs.walk_dir(
    sourcedir,
    recursive=True,
    path_matcher=PatternFilePathMatcher(
        included_patterns=["**/*.md"],
    ),
    live=True,
)

# files.items() 返回 (StableKey, FileLike) 对
# 每个 FileLike 可以 read_text() 读取内容
中文翻译

导入 localfs 连接器和文件路径匹配器

walk_dir 扫描 sourcedir 目录

recursive=True 递归扫描子目录

included_patterns 只匹配 .md 文件

live=True 支持 live 模式持续监控

files.items() 返回可遍历的 (key, file) 序列

每个 FileLike 对象可以 read_text() 读取文件内容

Transform 变换:数据怎么处理?

变换函数是管道的"加工车间"。CocoIndex 使用 @coco.fn 装饰器将 Python 函数变成引擎可追踪的处理单元。关键是 mount_each——为每个数据项创建独立组件。

CODE — 文件分块处理函数
@coco.fn(memo=True)
async def process_file(
    file: FileLike,
    table: postgres.TableTarget[DocEmbedding],
) -> None:
    # 1. 读取文件内容
    text = await file.read_text()
    
    # 2. 文本分块
    chunks = _splitter.split(
        text,
        chunk_size=2000,
        chunk_overlap=500,
        language="markdown",
    )
    
    # 3. 为每个 chunk 生成 embedding 并声明目标行
    id_gen = IdGenerator()
    await coco.map(
        process_chunk, chunks,
        file.file_path.path, id_gen, table
    )
中文翻译

@coco.fn(memo=True)——标记为记忆化变换函数

参数:file(源文件对象)+ table(目标表)

返回 None,因为结果通过 table.declare_row 写入

第一步:异步读取文件文本内容

第二步:用 RecursiveSplitter 分块

chunk_size=2000 字节,chunk_overlap=500 字节重叠

language="markdown" 启用 Markdown 语法感知分块

第三步:为每个 chunk 生成 ID 和嵌入

IdGenerator 基于内容生成确定性 ID

coco.map 并发处理所有 chunks

📖
RecursiveSplitter 是什么?

RecursiveSplitter 是 CocoIndex 内置的文本分块器,基于 Rust 实现的递归分割算法。它会优先在段落边界分割,其次在句子边界,最后才是字符级分割。支持 tree-sitter 实现代码语法感知分块。

mount_each:并行处理的关键

mount_each 是 CocoIndex 最核心的 API 之一——它为数据源中的每个项创建独立的 Processing Component,每个组件独立运行、独立缓存。

CODE — App 主函数
@coco.fn
async def app_main(sourcedir: pathlib.Path) -> None:
    # 1. 挂载目标表
    target_table = await postgres.mount_table_target(
        PG_DB,
        table_name="doc_embeddings",
        table_schema=schema,
    )
    target_table.declare_vector_index(
        column="embedding"
    )
    
    # 2. 扫描源文件
    files = localfs.walk_dir(sourcedir, recursive=True)
    
    # 3. 为每个文件创建独立处理组件
    await coco.mount_each(
        process_file,
        files.items(),
        target_table
    )
中文翻译

app_main 是整个管道的入口函数

第一步:挂载 Postgres 目标表

指定表名和表结构(schema)

声明 embedding 列为向量索引

第二步:用 localfs.walk_dir 扫描文件

第三步(关键):mount_each 为每个文件创建独立组件

process_file 是每个文件的处理函数

files.items() 返回 (StableKey, FileLike) 序列

target_table 作为额外参数传给 process_file

🎯
mount_each vs map

mount_each 为每个项创建独立的处理组件——每个组件有自己的 TargetState 和 Memo 缓存,独立运行并尽快写入目标。map 只是并发执行函数,不创建组件。需要写入目标存储时用 mount_each;只做计算时用 map。

生命周期管理与依赖注入

CocoIndex 使用 ContextKeylifespan 来管理共享资源(如数据库连接池、Embedding 模型)。这就像 React 的 Context——在顶层提供,在深层消费。

CODE — 生命周期与上下文
# 定义上下文键
PG_DB = coco.ContextKey[asyncpg.Pool]("db")
EMBEDDER = coco.ContextKey[SentenceTransformerEmbedder](
    "embedder",
    detect_change=True  # 模型变化时触发重新计算
)

# 定义生命周期——创建和清理共享资源
@coco.lifespan
async def coco_lifespan(builder):
    async with asyncpg.create_pool(DATABASE_URL) as pool:
        builder.provide(PG_DB, pool)
        builder.provide(EMBEDDER, SentenceTransformerEmbedder(model))
        yield  # 生命周期持续到 App 停止

# 在变换函数中使用上下文
embedding = await coco.use_context(EMBEDDER).embed(text)
中文翻译

ContextKey 定义共享资源的键——类型安全

第二个键设置了 detect_change=True

当 Embedder 模型发生变化时,自动触发依赖它的组件重新计算

@coco.lifespan 定义生命周期——类似 FastAPI 的 lifespan

在 async with 中创建数据库连接池

builder.provide() 将资源绑定到 ContextKey

yield 暂停——App 运行期间资源一直可用

在变换函数中通过 use_context() 访问共享资源

检验你的理解

mount_each 和 map 的核心区别是什么?

ContextKey 设置 detect_change=True 的作用是什么?

RecursiveSplitter 的文本分块策略是什么?

03

索引构建

构建向量索引和知识图谱——从文本嵌入到图数据库的完整实战

向量嵌入索引:让 AI 理解文本语义

向量嵌入是 RAG(检索增强生成)系统的核心。CocoIndex 支持多种嵌入方式,最常用的是 Sentence Transformers。

CODE — 完整的文档嵌入管道
from dataclasses import dataclass
from typing import Annotated
from numpy.typing import NDArray

# 定义目标表的数据结构
@dataclass
class DocEmbedding:
    id: int
    filename: str
    text: str
    embedding: Annotated[NDArray, EMBEDDER]
    chunk_start: int
    chunk_end: int

# Annotated[NDArray, EMBEDDER] 告诉 CocoIndex:
# embedding 字段由 EMBEDDER ContextKey 自动生成
# 当模型变化时 (detect_change=True),自动重新计算
中文翻译

导入 dataclass 和类型注解工具

NDArray 是 NumPy 数组类型——向量就是多维浮点数组

用 @dataclass 定义目标表的数据行结构

id:主键(由 IdGenerator 生成)

filename:源文件名

text:分块后的文本内容

embedding:向量嵌入(关键!)

Annotated[NDArray, EMBEDDER] 是类型注解魔法

它告诉 CocoIndex:这个字段由 EMBEDDER 自动生成

当 EMBEDDER 模型变化时,所有依赖的行自动重新计算

Postgres + pgvector:向量存储与检索

CocoIndex 的 Postgres 目标连接器是最常用的目标存储。配合 pgvector 扩展,可以实现高效的向量相似度搜索。

CODE — 挂载 Postgres 目标表
from cocoindex.connectors import postgres

# 挂载目标表
target_table = await postgres.mount_table_target(
    PG_DB,  # ContextKey 绑定的连接池
    table_name="doc_embeddings",
    table_schema=await postgres.TableSchema.from_class(
        DocEmbedding,
        primary_key=["id"],
    ),
    pg_schema_name="coco_examples",
)

# 声明向量索引
target_table.declare_vector_index(
    column="embedding"
)
中文翻译

导入 Postgres 目标连接器

mount_table_target 挂载一个 Postgres 表作为目标

PG_DB 是 ContextKey,绑定到 asyncpg 连接池

table_name 指定表名

table_schema 从 DocEmbedding dataclass 自动推导表结构

primary_key=["id"] 设置主键列

pg_schema_name 指定 PostgreSQL schema

declare_vector_index 在 embedding 列上创建向量索引

引擎会自动管理 pgvector 扩展和索引创建

🎯
TargetState 的魔法

你不需要写 INSERT/UPDATE/DELETE 语句。table.declare_row() 声明"这一行应该存在",引擎自动计算差异:新文件 = INSERT,文件变化 = UPDATE,文件删除 = DELETE。这就是声明式编程的威力。

代码嵌入实战:为代码仓库构建语义搜索

这是 CocoIndex 最经典的应用场景——遍历代码仓库,AST 感知分块,生成嵌入向量,存入 pgvector。之后可以用自然语言搜索代码。

CODE — 代码嵌入管道
from cocoindex.ops.text import RecursiveSplitter, detect_code_language

@coco.fn(memo=True)
async def process_file(file: FileLike, table):
    text = await file.read_text()
    
    # 自动检测编程语言
    language = detect_code_language(
        filename=str(file.file_path.path.name)
    )
    
    # 语法感知分块——不会在函数中间切断
    chunks = _splitter.split(
        text,
        chunk_size=1000,
        min_chunk_size=300,
        chunk_overlap=300,
        language=language,  # Python → AST 感知
    )
    
    id_gen = IdGenerator()
    await coco.map(process_chunk, chunks,
              file.file_path.path, id_gen, table)
中文翻译

导入分块器和代码语言检测函数

memo=True 启用记忆化缓存

异步读取文件内容

detect_code_language 根据文件扩展名检测语言

例如 .py → "python",.rs → "rust"

语法感知分块的核心

chunk_size=1000 字节目标块大小

min_chunk_size=300 最小块大小

chunk_overlap=300 块间重叠

language 参数启用 AST 感知——不会在函数中间切分

为每个 chunk 生成确定性 ID

coco.map 并发处理所有 chunks

知识图谱构建:从非结构化文本到结构化图谱

除了向量索引,CocoIndex 还可以构建 知识图谱。使用 Neo4j 或 FalkorDB 作为目标存储,用 LLM 提取实体和关系。

📝

会议纪要 → 知识图谱

从会议转录中提取人物、议题、决策、行动项,存入 Neo4j。支持增量——只有变化的会议重新提取。

🎧

播客 → 知识图谱

从 YouTube 播客转录中提取说话人、观点、主题,跨集去重后存入 SurrealDB 或 Neo4j。

💬

Slack 对话 → 知识图谱

从 Slack 消息流中提取话题、决策、负责人,构建组织知识网络。

📊

结构化提取

用 BAML 或 DSPy 从表单、发票、简历中提取结构化字段,写入 Postgres。支持增量——只处理变化的文档。

增量机制深入:引擎如何判断哪些需要重算?

CocoIndex 的增量引擎有两个维度的变化追踪:数据变化(源文件内容变了)和代码变化(变换逻辑改了)。两者的重算策略不同。

数据变化 — 源文件被编辑

📝
Fingerprint 哈希比对
只重算受影响的组件

代码变化 — 变换逻辑被修改

🔧
代码 Fingerprint 追踪
🔄
模型变化触发全量重算
点击任意组件,了解增量追踪的具体机制

检验你的理解

DocEmbedding 中 embedding 字段使用 Annotated[NDArray, EMBEDDER] 的作用是什么?

代码嵌入管道中 detect_code_language 函数的作用是什么?

CocoIndex 增量引擎追踪哪两个维度的变化?

04

查询与检索

从向量相似度搜索到知识图谱查询——让 AI Agent 获取精准上下文

向量相似度搜索:用自然语言查找代码和文档

构建索引的最终目的是查询。CocoIndex 将数据存入 Postgres + pgvector 后,你可以用标准的 SQL 进行 向量相似度搜索

CODE — 向量查询函数
async def query_once(pool, embedder, query, *, top_k=5):
    # 1. 将查询文本转换为向量
    query_vec = await embedder.embed(query)
    
    # 2. 在 Postgres 中搜索最相似的向量
    async with pool.acquire() as conn:
        rows = await conn.fetch(
            f"""
            SELECT filename, code,
                   embedding <=> $1 AS distance,
                   start_line, end_line
            FROM "{SCHEMA}"."{TABLE}"
            ORDER BY distance ASC
            LIMIT $2
            """,
            query_vec,
            top_k,
        )
    
    # 3. 打印结果
    for r in rows:
        score = 1.0 - float(r["distance"])
        print(f"[{score:.3f}] {r['filename']}")
中文翻译

定义查询函数,接受连接池、嵌入器和查询文本

第一步:用同一个嵌入模型将查询文本转为向量

第二步:在 Postgres 中执行向量相似度搜索

从连接池获取一个连接

SQL 查询:计算每个行向量与查询向量的距离

embedding <=> $1 是 pgvector 的余弦距离运算符

ORDER BY distance ASC 按相似度排序(越小越相似)

LIMIT $2 只返回 top_k 个最相似的结果

第三步:将距离转换为相似度分数并打印

score = 1 - distance,范围 [0, 1],越大越相似

📖
余弦距离 <=> 是什么?

<=> 是 pgvector 提供的余弦距离运算符。它计算两个向量之间的夹角余弦值。距离为 0 表示完全相同,距离为 2 表示完全相反。查询时用 ORDER BY distance ASC 获取最相似的结果。CocoIndex 会自动管理 pgvector 扩展的安装和索引创建。

交互式查询 REPL:实时搜索体验

CocoIndex 的示例代码通常包含一个交互式查询函数,让你可以像聊天一样搜索索引——输入自然语言,即时返回相关代码片段。

TERMINAL — 交互式查询演示
# 运行查询(命令行传入查询文本)
$ python main.py "how to split text into chunks"

# 输出示例:
[0.847] python/cocoindex/ops/text.py (L120-L155)
    def split(self, text, chunk_size, ...):
        raw_chunks = self._splitter.split(
            text, chunk_size, min_chunk_size, ...
        )
---
[0.792] python/cocoindex/ops/text.py (L39-L84)
    class SeparatorSplitter:
        def split(self, text) -> list[Chunk]:
---

# 也可以进入 REPL 模式
$ python main.py
Enter search query (or Enter to quit): embedding model configuration
[0.831] python/cocoindex/ops/sentence_transformers.py
中文翻译

通过命令行参数传入查询文本

查询:"如何将文本分割为块"

输出示例——每个结果包含:

[0.847] 相似度分数,越高越匹配

文件名和行号范围

匹配的代码片段内容

第二个结果:分隔符分割器类

也可以不带参数运行,进入交互式 REPL

在 REPL 中可以连续输入查询

Live 模式:持续监控与实时更新

CocoIndex 的 Live 模式 让管道持续运行——源数据变化时自动增量更新,不需要手动触发。

CODE — Live 模式运行
# 命令行方式:-L 标志启用 Live 模式
$ cocoindex update -L main

# 或者代码方式:
app = coco.App("CodeEmbedding", app_main, sourcedir="./src")

# 同步运行一次(扫描 + 处理 + 完成)
app.update_blocking()

# Live 模式(持续监控,发现变化自动更新)
handle = app.update(live=True)

# 监控进度
async for snapshot in handle.watch():
    print(f"Status: {snapshot.status}")
    print(f"Stats: {snapshot.stats}")

# 获取最终结果
result = await handle.result()
中文翻译

命令行方式:cocoindex update -L 启用 Live 模式

创建 App 实例

update_blocking() 同步运行一次——扫描、处理、完成

update(live=True) 启动 Live 模式

返回 UpdateHandle 用于监控进度

watch() 返回异步迭代器,实时获取进度快照

snapshot.status 可能是 RUNNING 或 READY

snapshot.stats 包含各组件的处理统计

result() 等待并获取最终结果

🎯
Live 模式的核心价值

在 Live 模式下,CocoIndex 持续监控数据源变化。当你编辑了一个文件,引擎在亚秒级内检测到变化并增量更新索引。AI Agent 始终获得最新鲜的上下文——不需要定时任务、不需要手动触发、不需要全量重建。

多目标存储:向量数据库、图数据库、消息队列

CocoIndex 支持多种目标存储,满足不同的 AI 应用场景。选择合适的目标存储取决于你的查询需求。

🐘

Postgres + pgvector

最常用的组合。关系型数据 + 向量搜索一体化。支持 SQL 查询和混合检索。适合 RAG 场景。

🔮

Neo4j / FalkorDB

图数据库,存储实体和关系。适合知识图谱、社交网络分析、会议纪要智能等场景。使用 Cypher 查询。

🚀

LanceDB / Qdrant / Turbopuffer

专用向量数据库。LanceDB 嵌入式、零运维;Qdrant 高性能过滤;Turbopuffer 无服务器。适合纯向量检索。

📨

Kafka Target

将处理结果发布到 Kafka Topic。适合流式场景——处理完的文档立即通知下游消费者。

检验你的理解

pgvector 的 <=> 运算符计算的是什么距离?

Live 模式和普通模式的核心区别是什么?

05

部署实战

从 CLI 命令到生产部署——掌握 CocoIndex 的完整运维工具链

CocoIndex CLI:命令行工具

CocoIndex 提供了完整的 CLI 工具链。安装后即可使用 cocoindex 命令管理应用、运行更新、查看状态。

TERMINAL — CLI 命令大全
# 查看帮助
$ cocoindex --help

# 运行更新(一次性)
$ cocoindex update main

# 运行更新(Live 模式,持续监控)
$ cocoindex update -L main

# 显示进度详情
$ cocoindex update --report main

# 删除应用(清除所有目标状态)
$ cocoindex drop main

# 列出所有应用
$ cocoindex ls

# 全量重处理(忽略缓存)
$ cocoindex update --full-reprocess main
中文翻译

查看 CLI 帮助信息

update main 运行名为 "main" 的应用(一次性)

-L 标志启用 Live 模式——持续监控数据源变化

--report 在终端显示处理进度详情

drop main 删除应用的所有目标状态和内部数据

ls 列出所有已注册的应用

--full-reprocess 忽略所有缓存,全量重新处理

App 配置与运行模式

CocoIndex 的 App 类支持多种运行模式——同步阻塞、异步非阻塞、Live 模式。理解这些模式的区别是生产部署的基础。

CODE — App 运行模式对比
# 创建 App
app = coco.App(
    coco.AppConfig(
        name="CodeEmbedding",
        max_inflight_components=1024,
    ),
    app_main,
    sourcedir=pathlib.Path("./src"),
)

# 模式 1:同步阻塞(适合脚本、CLI)
result = app.update_blocking(
    report_to_stdout=True,
)

# 模式 2:异步非阻塞(适合 Web 服务)
handle = app.update()
result = await handle.result()

# 模式 3:Live 模式(持续监控)
handle = app.update(live=True)
async for snap in handle.watch():
    if snap.status == UpdateStatus.READY:
        print("Initial processing complete!")
中文翻译

创建 App 并配置参数

AppConfig 指定应用名和并发参数

max_inflight_components 控制最大并发组件数

模式 1:同步阻塞——运行完成后返回结果

report_to_stdout 在终端显示进度

模式 2:异步非阻塞——返回 UpdateHandle

await handle.result() 等待完成

模式 3:Live 模式——持续监控数据源

watch() 返回异步迭代器,实时获取进度

READY 状态表示初始处理完成

内部存储:引擎如何追踪状态?

CocoIndex 的增量引擎需要一个内部数据库来追踪状态——Memo 缓存、处理组件指纹、TargetState 版本等。默认使用 SQLite,也可以配置为 Postgres。

内部存储 — 引擎状态追踪

🗄️
COCOINDEX_DB 配置
💾
Memo 缓存存储
🔍
Fingerprint 指纹追踪

环境配置 — 连接外部服务

🌍
Environment 环境
🔧
多环境支持
⚙️
Settings 全局配置
点击任意组件,了解内部存储和配置的细节

生产部署最佳实践

将 CocoIndex 从开发环境迁移到生产环境需要关注几个关键点:数据库选型、并发控制、故障恢复和监控。

🐘

使用 Postgres 作为内部存储

生产环境推荐将 COCOINDEX_DB 配置为 Postgres 连接字符串。SQLite 在高并发写入时可能出现锁竞争,Postgres 更可靠。

🔄

并发控制

通过 max_inflight_components 和 COCOINDEX_MAX_INFLIGHT_COMPONENTS 环境变量控制并发处理组件数量。Embedding API 通常有速率限制,需要合理配置。

🛡️

异常处理与重试

使用 @coco.exception_handler 注册异常处理器。Rust 核心引擎内置重试和指数退避。单个组件失败不影响其他组件。

📊

监控与进度追踪

通过 UpdateHandle.watch() 实时获取处理统计。UpdateStats 包含每个组件的处理行数、缓存命中数等指标,便于性能分析。

CODE — 异常处理与监控
# 注册异常处理器
@coco.exception_handler
async def handle_error(exc, ctx):
    logger.error(
        f"Component {ctx.stable_path} failed: "
        f"{ctx.processor_name} in {ctx.env_name}"
    )

# 监控更新进度
handle = app.update(live=True)
async for snapshot in handle.watch():
    for name, stats in snapshot.stats.by_component.items():
        print(f"[{name}] "
              f"processed={stats.num_processed} "
              f"cached={stats.num_cached}")
中文翻译

使用 @coco.exception_handler 注册自定义异常处理器

exc 是异常对象,ctx 包含上下文信息

ctx.stable_path 是失败组件的路径

ctx.processor_name 是处理函数名

监控更新进度

watch() 返回异步迭代器

遍历每个组件的统计信息

num_processed 已处理的行数

num_cached 命中缓存的行数

与 AI Agent 集成:CocoIndex-code

CocoIndex 的旗舰产品 CocoIndex-code 将 CocoIndex 变成 AI 编程 Agent 的"眼睛"——让 Claude Code、Cursor 等 Agent 能语义搜索代码、分析调用图、理解项目架构。

🥥
CocoIndex-code 的关键能力

AST 感知分块:基于 tree-sitter,不会在函数/类中间切分。
增量更新:每次 commit 只重新索引变化的文件,80-90% 缓存命中。
语义搜索:按含义搜索代码,不是 grep 关键字匹配。
调用图分析:追踪函数调用链,分析修改的影响范围。
MCP 协议:一个安装包,Claude Code 和 Cursor 都能用。

CODE — 在 Claude Code 中使用 CocoIndex skill
# 1. 将 CocoIndex skill 添加到项目
# 复制 skills/cocoindex/ 到 .claude/skills/
cp -r skills/cocoindex/ .claude/skills/

# 2. Claude Code 自动识别 CocoIndex API
# 正确使用 @coco.fn、mount_each、ContextKey 等

# 3. 管道代码示例
@coco.fn(memo=True)
async def index_file(file, table):
    for chunk in RecursiveSplitter().split(
        await file.read_text()
    ):
        table.declare_row(
            text=chunk.text,
            embedding=embed(chunk.text)
        )
中文翻译

第一步:将 CocoIndex skill 复制到项目的 .claude/skills/ 目录

Skill 包含 API 概念、模式、最佳实践的完整知识

第二步:Claude Code 自动使用 skill 中的知识

确保生成的代码使用正确的 v1 API

第三步:Claude Code 生成的管道代码示例

自动使用 @coco.fn(memo=True) 启用缓存

正确使用 RecursiveSplitter 和 declare_row

完整管道回顾:从零到生产的全流程

让我们回顾整个 CocoIndex 管道——从安装到生产部署的完整流程。每一步都建立在前面学过的概念之上。

📦
安装/定义
🔧
配置管道
🚀
运行更新
🔍
查询检索
点击"下一步"开始全流程回顾

检验你的理解

update_blocking() 和 update() 的核心区别是什么?

什么时候需要使用 --full-reprocess 标志?

CocoIndex-code 的主要用途是什么?