先理解:入库质量决定检索上限
RAG 的“入库”不是把文件读出来塞进向量库那么简单。它实际做的是知识生产线:原始文档进入,变成一条条带元数据的 Chunk,再进入向量索引。后面的检索、回答和评测,全部依赖这一步的质量。
客服场景尤其适合先做简单但可控的分块:FAQ 一问一答天然是一块;政策文档按小节拆分,避免整篇政策被塞进一个 Chunk。这样用户问“进水还能保修吗”时,系统更容易命中“免费保修边界”,而不是命中一整篇售后政策。
为什么要保留元数据
向量相似度只知道语义是否接近,不知道这条知识属于哪个产品、是否过期、能不能直接对用户说。所以每个 Chunk 都要带上 source、product、doc_type、risk_level 等字段。
这些字段在后面会用来做过滤、引用展示、风险判断和评测统计。不要等上线后再补,那个时候历史索引已经有了,清理成本很高。
常见坑
最常见的坑是分块过大。一块里混了退款、保修、发票三类信息,检索命中后模型就容易串答案。另一个坑是没有抽检索引,只看脚本运行成功。脚本成功只能说明写入了数据,不能说明写入的是好数据。
本篇完成入库脚本。运行后,FAQ 和政策文档会被转换为 chunk 并写入 Chroma。
本篇成品
text
src/ingest.py
data/index/入库脚本
src/ingest.py:
python
import csv
import hashlib
import re
from chromadb import PersistentClient
from sentence_transformers import SentenceTransformer
from src.config import RAW_DIR, INDEX_DIR, COLLECTION_NAME, EMBEDDING_MODEL
from src.schema import Chunk
def clean_text(text: str) -> str:
text = re.sub(r"1[3-9]\d{9}", "[PHONE]", text)
text = re.sub(r"[\w.-]+@[\w.-]+", "[EMAIL]", text)
return "\n".join(line.strip() for line in text.splitlines() if line.strip())
def stable_id(source: str, text: str) -> str:
return f"{source}-{hashlib.sha1(text.encode()).hexdigest()[:12]}"
def load_faq() -> list[Chunk]:
chunks = []
with open(RAW_DIR / "faq.csv", newline="", encoding="utf-8") as f:
for row in csv.DictReader(f):
text = clean_text(f"问题:{row['question']}\n答案:{row['answer']}")
chunks.append(Chunk(
id=stable_id("faq", text),
text=text,
source="faq.csv",
product=row["product"],
doc_type=row["doc_type"],
effective_from=row["effective_from"],
))
return chunks
def load_policy() -> list[Chunk]:
raw = (RAW_DIR / "policy.md").read_text(encoding="utf-8")
chunks = []
for section in re.split(r"\n(?=## )", raw):
section = clean_text(section)
if len(section) < 20:
continue
risk = "high" if "不得承诺" in section or "赔偿" in section else "medium"
chunks.append(Chunk(
id=stable_id("policy", section),
text=section,
source="policy.md",
product="all",
doc_type="policy",
effective_from="2026-04-01",
risk_level=risk,
))
return chunks
def main():
INDEX_DIR.mkdir(parents=True, exist_ok=True)
chunks = load_faq() + load_policy()
model = SentenceTransformer(EMBEDDING_MODEL)
client = PersistentClient(path=str(INDEX_DIR))
col = client.get_or_create_collection(
COLLECTION_NAME,
metadata={"hnsw:space": "cosine"},
)
embeddings = model.encode([c.text for c in chunks], normalize_embeddings=True).tolist()
col.upsert(
ids=[c.id for c in chunks],
documents=[c.text for c in chunks],
embeddings=embeddings,
metadatas=[c.model_dump(exclude={"id", "text"}) for c in chunks],
)
print(f"indexed {len(chunks)} chunks")
if __name__ == "__main__":
main()运行
bash
python -m src.ingest预期输出类似:
text
indexed 6 chunks国内环境应为网络问题可能访问不到HF,需要设置镜像地址
bash
export HF_ENDPOINT=https://hf-mirror.com
python -m src.ingest如果你之前已经运行过入库脚本,需要先删除旧索引再重建。Chroma collection 创建后,距离类型不会因为代码变了自动更新:
bash
rm -rf data/index
python -m src.ingest验收点
data/index已生成。- FAQ 以一问一答为 chunk。
- 政策按二级标题分块。
- 每个 chunk 都有
source/product/doc_type/effective_from/risk_level元数据。 - collection 使用 cosine 距离,并且 Embedding 已归一化。

