Ragflow-Plus 数据库模式与模型教程 #
1. 目的与范围 #
本文档详细介绍了 Ragflow-Plus 系统的数据库模式、数据模型、数据访问模式和存储层集成。系统使用 MySQL 作为主数据库,Elasticsearch 作为搜索引擎,MinIO 作为对象存储。
有关系统架构概述,请参阅 系统架构。有关后端服务架构,请参阅 后端服务架构。
2. 数据架构概述 #
Ragflow-Plus 采用多存储架构,不同数据类型存储在不同的存储系统中。
2.1 存储架构 #
┌─────────────────────────────────────────┐
│ MySQL (关系型数据库) │
│ - 用户信息 │
│ - 知识库元数据 │
│ - 对话历史 │
│ - 租户和团队信息 │
└─────────────────────────────────────────┘
↕
┌─────────────────────────────────────────┐
│ Elasticsearch (搜索引擎) │
│ - 文档分块索引 │
│ - 向量存储 │
│ - 全文检索 │
└─────────────────────────────────────────┘
↕
┌─────────────────────────────────────────┐
│ MinIO (对象存储) │
│ - 原始文件 │
│ - 处理后的图像 │
│ - 文档附件 │
└─────────────────────────────────────────┘2.2 数据流向 #
用户上传文档
↓
MinIO 存储原始文件
↓
文档解析和分块
↓
MySQL 存储元数据
↓
Elasticsearch 存储索引和向量
↓
用户查询
↓
Elasticsearch 检索
↓
MySQL 获取元数据
↓
返回结果3. 核心数据库配置 #
3.1 MySQL 配置 #
# database.py
import pymysql
def get_db_connection():
"""获取 MySQL 连接"""
return pymysql.connect(
host=settings.MYSQL_HOST,
port=settings.MYSQL_PORT,
user=settings.MYSQL_USER,
password=settings.MYSQL_PASSWORD,
database=settings.MYSQL_DATABASE,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
autocommit=False
)3.2 Elasticsearch 配置 #
# elasticsearch_client.py
from elasticsearch import Elasticsearch
def get_es_client():
"""获取 Elasticsearch 客户端"""
return Elasticsearch(
[f"{settings.ES_HOST}:{settings.ES_PORT}"],
http_auth=(settings.ES_USER, settings.ES_PASSWORD),
verify_certs=False,
request_timeout=30
)3.3 MinIO 配置 #
# minio_client.py
from minio import Minio
def get_minio_client():
"""获取 MinIO 客户端"""
return Minio(
settings.MINIO_ENDPOINT,
access_key=settings.MINIO_ACCESS_KEY,
secret_key=settings.MINIO_SECRET_KEY,
secure=False
)4. MySQL 数据库模式 #
4.1 用户表 (user) #
CREATE TABLE `user` (
`id` VARCHAR(64) NOT NULL PRIMARY KEY,
`nickname` VARCHAR(255) NOT NULL,
`email` VARCHAR(255) UNIQUE NOT NULL,
`password` VARCHAR(255) NOT NULL,
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX `idx_email` (`email`),
INDEX `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;字段说明:
id:用户唯一标识nickname:用户昵称email:用户邮箱(唯一)password:加密后的密码create_time:创建时间update_time:更新时间
4.2 租户表 (tenant) #
CREATE TABLE `tenant` (
`id` VARCHAR(64) NOT NULL PRIMARY KEY,
`name` VARCHAR(255) NOT NULL,
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;字段说明:
id:租户唯一标识name:租户名称create_time:创建时间update_time:更新时间
4.3 用户租户关系表 (user_tenant) #
CREATE TABLE `user_tenant` (
`id` VARCHAR(64) NOT NULL PRIMARY KEY,
`user_id` VARCHAR(64) NOT NULL,
`tenant_id` VARCHAR(64) NOT NULL,
`role` VARCHAR(50) NOT NULL DEFAULT 'member',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY `uk_user_tenant` (`user_id`, `tenant_id`),
INDEX `idx_user_id` (`user_id`),
INDEX `idx_tenant_id` (`tenant_id`),
FOREIGN KEY (`user_id`) REFERENCES `user`(`id`) ON DELETE CASCADE,
FOREIGN KEY (`tenant_id`) REFERENCES `tenant`(`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;字段说明:
id:关系唯一标识user_id:用户IDtenant_id:租户IDrole:用户角色(owner/admin/member)create_time:创建时间
4.4 知识库表 (knowledgebase) #
CREATE TABLE `knowledgebase` (
`id` VARCHAR(64) NOT NULL PRIMARY KEY,
`name` VARCHAR(255) NOT NULL,
`tenant_id` VARCHAR(64) NOT NULL,
`embedding_model` VARCHAR(255) NOT NULL DEFAULT 'bge-m3',
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX `idx_tenant_id` (`tenant_id`),
INDEX `idx_create_time` (`create_time`),
FOREIGN KEY (`tenant_id`) REFERENCES `tenant`(`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;字段说明:
id:知识库唯一标识name:知识库名称tenant_id:所属租户IDembedding_model:嵌入模型名称create_time:创建时间update_time:更新时间
4.5 文档表 (document) #
CREATE TABLE `document` (
`id` VARCHAR(64) NOT NULL PRIMARY KEY,
`knowledgebase_id` VARCHAR(64) NOT NULL,
`file_id` VARCHAR(64) NOT NULL,
`name` VARCHAR(255) NOT NULL,
`type` VARCHAR(50) NOT NULL,
`size` BIGINT NOT NULL,
`status` VARCHAR(50) NOT NULL DEFAULT 'parsing',
`chunk_num` INT NOT NULL DEFAULT 0,
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX `idx_knowledgebase_id` (`knowledgebase_id`),
INDEX `idx_file_id` (`file_id`),
INDEX `idx_status` (`status`),
FOREIGN KEY (`knowledgebase_id`) REFERENCES `knowledgebase`(`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;字段说明:
id:文档唯一标识knowledgebase_id:所属知识库IDfile_id:文件ID(关联 file 表)name:文档名称type:文档类型(pdf/docx/txt等)size:文件大小(字节)status:处理状态(parsing/parsed/failed)chunk_num:分块数量create_time:创建时间update_time:更新时间
4.6 文件表 (file) #
CREATE TABLE `file` (
`id` VARCHAR(64) NOT NULL PRIMARY KEY,
`name` VARCHAR(255) NOT NULL,
`type` VARCHAR(50) NOT NULL,
`size` BIGINT NOT NULL,
`bucket` VARCHAR(255) NOT NULL,
`object_name` VARCHAR(255) NOT NULL,
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;字段说明:
id:文件唯一标识name:文件名type:文件类型size:文件大小bucket:MinIO 存储桶名称object_name:MinIO 对象名称create_time:创建时间
4.7 对话表 (dialog) #
CREATE TABLE `dialog` (
`id` VARCHAR(64) NOT NULL PRIMARY KEY,
`knowledgebase_id` VARCHAR(64),
`name` VARCHAR(255),
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX `idx_knowledgebase_id` (`knowledgebase_id`),
INDEX `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;字段说明:
id:对话唯一标识knowledgebase_id:关联的知识库ID(可选)name:对话名称create_time:创建时间update_time:更新时间
4.8 对话消息表 (dialog_message) #
CREATE TABLE `dialog_message` (
`id` VARCHAR(64) NOT NULL PRIMARY KEY,
`dialog_id` VARCHAR(64) NOT NULL,
`question` TEXT NOT NULL,
`answer` TEXT NOT NULL,
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX `idx_dialog_id` (`dialog_id`),
INDEX `idx_create_time` (`create_time`),
FOREIGN KEY (`dialog_id`) REFERENCES `dialog`(`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;字段说明:
id:消息唯一标识dialog_id:所属对话IDquestion:用户问题answer:助手回答create_time:创建时间
5. Elasticsearch 索引模式 #
5.1 文档分块索引 #
# 索引映射
mapping = {
"mappings": {
"properties": {
"kb_id": {"type": "keyword"},
"doc_id": {"type": "keyword"},
"chunk_id": {"type": "keyword"},
"content": {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
},
"embedding": {
"type": "dense_vector",
"dims": 1024,
"index": True,
"similarity": "cosine"
},
"image_url": {"type": "keyword"},
"page_num": {"type": "integer"},
"create_time": {"type": "date"}
}
}
}5.2 索引操作 #
# 创建索引
def create_kb_index(kb_id: str):
"""创建知识库索引"""
es = get_es_client()
index_name = f"kb_{kb_id}"
if not es.indices.exists(index=index_name):
es.indices.create(index=index_name, body=mapping)
# 添加文档
def add_chunk(kb_id: str, chunk: dict):
"""添加分块到索引"""
es = get_es_client()
index_name = f"kb_{kb_id}"
es.index(
index=index_name,
id=chunk['chunk_id'],
body=chunk
)
# 搜索
def search(kb_id: str, query: str, top_k: int = 5):
"""搜索文档"""
es = get_es_client()
index_name = f"kb_{kb_id}"
# 全文检索
text_query = {
"match": {
"content": query
}
}
# 向量检索
vector_query = {
"script_score": {
"query": {"match_all": {}},
"script": {
"source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0",
"params": {"query_vector": query_embedding}
}
}
}
# 混合检索
response = es.search(
index=index_name,
body={
"query": {
"bool": {
"should": [text_query, vector_query]
}
},
"size": top_k
}
)
return response['hits']['hits']6. 数据访问模式 #
6.1 ORM 模式 #
# models/knowledgebase.py
from sqlalchemy import Column, String, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
Base = declarative_base()
class Knowledgebase(Base):
__tablename__ = 'knowledgebase'
id = Column(String(64), primary_key=True)
name = Column(String(255), nullable=False)
tenant_id = Column(String(64), ForeignKey('tenant.id'), nullable=False)
embedding_model = Column(String(255), default='bge-m3')
create_time = Column(DateTime, nullable=False)
update_time = Column(DateTime, nullable=False)
# 关系
documents = relationship('Document', back_populates='knowledgebase')6.2 原生 SQL 模式 #
# 使用原生 SQL
def get_knowledgebase(kb_id: str):
"""获取知识库"""
with get_db_cursor() as cursor:
cursor.execute(
"SELECT * FROM knowledgebase WHERE id = %s",
(kb_id,)
)
return cursor.fetchone()
def create_knowledgebase(name: str, tenant_id: str):
"""创建知识库"""
kb_id = generate_id()
with get_db_cursor() as cursor:
cursor.execute(
"INSERT INTO knowledgebase (id, name, tenant_id) VALUES (%s, %s, %s)",
(kb_id, name, tenant_id)
)
return kb_id6.3 批量操作 #
def batch_insert_chunks(kb_id: str, chunks: List[dict]):
"""批量插入分块"""
es = get_es_client()
index_name = f"kb_{kb_id}"
actions = []
for chunk in chunks:
actions.append({
"_index": index_name,
"_id": chunk['chunk_id'],
"_source": chunk
})
from elasticsearch.helpers import bulk
bulk(es, actions)7. 存储层集成 #
7.1 MinIO 存储模式 #
# 上传文件
def upload_file(file_path: str, bucket: str, object_name: str):
"""上传文件到 MinIO"""
client = get_minio_client()
# 确保桶存在
if not client.bucket_exists(bucket):
client.make_bucket(bucket)
# 上传文件
client.fput_object(bucket, object_name, file_path)
return f"{bucket}/{object_name}"
# 下载文件
def download_file(bucket: str, object_name: str, file_path: str):
"""从 MinIO 下载文件"""
client = get_minio_client()
client.fget_object(bucket, object_name, file_path)
# 获取文件 URL
def get_file_url(bucket: str, object_name: str, expires: int = 3600):
"""获取文件访问 URL"""
client = get_minio_client()
return client.presigned_get_object(bucket, object_name, expires=expires)7.2 数据一致性 #
def add_document_to_kb(kb_id: str, file_path: str):
"""添加文档到知识库(保证数据一致性)"""
# 1. 上传文件到 MinIO
file_id = generate_id()
bucket = "rag_flow"
object_name = f"{kb_id}/{file_id}"
upload_file(file_path, bucket, object_name)
# 2. 插入文件记录到 MySQL
with get_db_cursor() as cursor:
cursor.execute(
"INSERT INTO file (id, name, bucket, object_name) VALUES (%s, %s, %s, %s)",
(file_id, os.path.basename(file_path), bucket, object_name)
)
# 3. 插入文档记录到 MySQL
doc_id = generate_id()
with get_db_cursor() as cursor:
cursor.execute(
"INSERT INTO document (id, knowledgebase_id, file_id, name, status) VALUES (%s, %s, %s, %s, %s)",
(doc_id, kb_id, file_id, os.path.basename(file_path), 'parsing')
)
# 4. 解析文档并存储到 Elasticsearch
chunks = parse_document(file_path)
store_chunks_to_es(kb_id, doc_id, chunks)
# 5. 更新文档状态
with get_db_cursor() as cursor:
cursor.execute(
"UPDATE document SET status = 'parsed', chunk_num = %s WHERE id = %s",
(len(chunks), doc_id)
)
return doc_id8. 前端数据模型 #
8.1 TypeScript 类型定义 #
// types/knowledgebase.ts
export interface KnowledgeBase {
id: string;
name: string;
tenant_id: string;
embedding_model: string;
create_time: string;
update_time: string;
}
export interface Document {
id: string;
knowledgebase_id: string;
file_id: string;
name: string;
type: string;
size: number;
status: 'parsing' | 'parsed' | 'failed';
chunk_num: number;
create_time: string;
update_time: string;
}
export interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
timestamp: Date;
citations?: Chunk[];
}
export interface Chunk {
id: string;
content: string;
image_url?: string;
page_num?: number;
score?: number;
}8.2 Vue 数据模型 #
// types/user.ts
export interface User {
id: string;
nickname: string;
email: string;
create_time: string;
}
export interface Team {
id: string;
name: string;
member_count: number;
create_time: string;
}
export interface TeamMember {
user_id: string;
nickname: string;
email: string;
role: 'owner' | 'admin' | 'member';
}9. 数据迁移 #
9.1 数据库迁移脚本 #
# migrations/001_create_tables.py
def up():
"""创建表"""
with get_db_connection() as conn:
cursor = conn.cursor()
# 创建用户表
cursor.execute("""
CREATE TABLE IF NOT EXISTS `user` (
`id` VARCHAR(64) NOT NULL PRIMARY KEY,
`nickname` VARCHAR(255) NOT NULL,
`email` VARCHAR(255) UNIQUE NOT NULL,
`password` VARCHAR(255) NOT NULL,
`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
""")
# 创建其他表...
conn.commit()
def down():
"""删除表"""
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("DROP TABLE IF EXISTS `user`")
# 删除其他表...
conn.commit()10. 性能优化 #
10.1 数据库优化 #
- 索引优化:为常用查询字段创建索引
- 查询优化:使用 EXPLAIN 分析查询计划
- 连接池:使用连接池管理数据库连接
- 分页查询:使用 LIMIT 和 OFFSET 进行分页
10.2 Elasticsearch 优化 #
- 分片策略:合理设置分片数量
- 刷新策略:调整刷新间隔
- 缓存策略:使用查询缓存
- 批量操作:使用 bulk API 进行批量操作
11. 总结 #
本文档介绍了 Ragflow-Plus 数据库模式与模型的各个方面:
- 数据架构:多存储架构设计
- MySQL 模式:核心数据表结构
- Elasticsearch 索引:文档索引和检索
- 数据访问模式:ORM 和原生 SQL
- 存储层集成:MinIO 对象存储
- 前端数据模型:TypeScript 类型定义
通过遵循本文档的数据模型设计,您可以构建高效、可扩展的数据存储系统。