实战客服 RAG 02:清洗、分块并写入向量库

先理解:入库质量决定检索上限

RAG 的“入库”不是把文件读出来塞进向量库那么简单。它实际做的是知识生产线:原始文档进入,变成一条条带元数据的 Chunk,再进入向量索引。后面的检索、回答和评测,全部依赖这一步的质量。

客服场景尤其适合先做简单但可控的分块:FAQ 一问一答天然是一块;政策文档按小节拆分,避免整篇政策被塞进一个 Chunk。这样用户问“进水还能保修吗”时,系统更容易命中“免费保修边界”,而不是命中一整篇售后政策。

为什么要保留元数据

向量相似度只知道语义是否接近,不知道这条知识属于哪个产品、是否过期、能不能直接对用户说。所以每个 Chunk 都要带上 sourceproductdoc_typerisk_level 等字段。

这些字段在后面会用来做过滤、引用展示、风险判断和评测统计。不要等上线后再补,那个时候历史索引已经有了,清理成本很高。

常见坑

最常见的坑是分块过大。一块里混了退款、保修、发票三类信息,检索命中后模型就容易串答案。另一个坑是没有抽检索引,只看脚本运行成功。脚本成功只能说明写入了数据,不能说明写入的是好数据。

本篇完成入库脚本。运行后,FAQ 和政策文档会被转换为 chunk 并写入 Chroma。

本篇成品

text
src/ingest.py
data/index/

入库脚本

src/ingest.py

python
import csv
import hashlib
import re
from chromadb import PersistentClient
from sentence_transformers import SentenceTransformer
from src.config import RAW_DIR, INDEX_DIR, COLLECTION_NAME, EMBEDDING_MODEL
from src.schema import Chunk

def clean_text(text: str) -> str:
    text = re.sub(r"1[3-9]\d{9}", "[PHONE]", text)
    text = re.sub(r"[\w.-]+@[\w.-]+", "[EMAIL]", text)
    return "\n".join(line.strip() for line in text.splitlines() if line.strip())

def stable_id(source: str, text: str) -> str:
    return f"{source}-{hashlib.sha1(text.encode()).hexdigest()[:12]}"

def load_faq() -> list[Chunk]:
    chunks = []
    with open(RAW_DIR / "faq.csv", newline="", encoding="utf-8") as f:
        for row in csv.DictReader(f):
            text = clean_text(f"问题:{row['question']}\n答案:{row['answer']}")
            chunks.append(Chunk(
                id=stable_id("faq", text),
                text=text,
                source="faq.csv",
                product=row["product"],
                doc_type=row["doc_type"],
                effective_from=row["effective_from"],
            ))
    return chunks

def load_policy() -> list[Chunk]:
    raw = (RAW_DIR / "policy.md").read_text(encoding="utf-8")
    chunks = []
    for section in re.split(r"\n(?=## )", raw):
        section = clean_text(section)
        if len(section) < 20:
            continue
        risk = "high" if "不得承诺" in section or "赔偿" in section else "medium"
        chunks.append(Chunk(
            id=stable_id("policy", section),
            text=section,
            source="policy.md",
            product="all",
            doc_type="policy",
            effective_from="2026-04-01",
            risk_level=risk,
        ))
    return chunks

def main():
    INDEX_DIR.mkdir(parents=True, exist_ok=True)
    chunks = load_faq() + load_policy()
    model = SentenceTransformer(EMBEDDING_MODEL)
    client = PersistentClient(path=str(INDEX_DIR))
    col = client.get_or_create_collection(
        COLLECTION_NAME,
        metadata={"hnsw:space": "cosine"},
    )
    embeddings = model.encode([c.text for c in chunks], normalize_embeddings=True).tolist()
    col.upsert(
        ids=[c.id for c in chunks],
        documents=[c.text for c in chunks],
        embeddings=embeddings,
        metadatas=[c.model_dump(exclude={"id", "text"}) for c in chunks],
    )
    print(f"indexed {len(chunks)} chunks")

if __name__ == "__main__":
    main()

运行

bash
python -m src.ingest

预期输出类似:

text
indexed 6 chunks

国内环境应为网络问题可能访问不到HF,需要设置镜像地址

bash
export HF_ENDPOINT=https://hf-mirror.com
python -m src.ingest

如果你之前已经运行过入库脚本,需要先删除旧索引再重建。Chroma collection 创建后,距离类型不会因为代码变了自动更新:

bash
rm -rf data/index
python -m src.ingest

验收点

  • data/index 已生成。
  • FAQ 以一问一答为 chunk。
  • 政策按二级标题分块。
  • 每个 chunk 都有 source/product/doc_type/effective_from/risk_level 元数据。
  • collection 使用 cosine 距离,并且 Embedding 已归一化。
实战客服 RAG 03:实现检索、转人工和问答 API
实战客服 RAG 01:搭建项目骨架和知识源