# Ragflow-Plus 后端服务架构教程
1. 目的与范围 #
本文档详细介绍了 Ragflow-Plus 后端服务的架构设计、服务层组织、API 路由架构、数据库连接管理和配置管理。后端服务采用 Flask 框架,遵循分层架构设计原则。
有关系统架构概述,请参阅 系统架构。有关开发指南,请参阅 开发指南。
2. 服务架构概述 #
Ragflow-Plus 后端服务采用分层架构,包含 API 层、服务层、数据访问层和存储层。
2.1 架构层次 #
┌─────────────────────────────────────────┐
│ API 路由层 (API Route Layer) │
│ ┌──────────┐ ┌──────────┐ ┌──────┐ │
│ │ Flask │ │ Blueprint│ │路由 │ │
│ │ Routes │ │ │ │处理 │ │
│ └──────────┘ └──────────┘ └──────┘ │
└─────────────────────────────────────────┘
↕
┌─────────────────────────────────────────┐
│ 服务层 (Service Layer) │
│ ┌──────────┐ ┌──────────┐ ┌──────┐ │
│ │ 对话服务 │ │ 知识库服务│ │文档服务│ │
│ │ │ │ │ │ │ │
│ └──────────┘ └──────────┘ └──────┘ │
└─────────────────────────────────────────┘
↕
┌─────────────────────────────────────────┐
│ 数据访问层 (Data Access Layer) │
│ ┌──────────┐ ┌──────────┐ ┌──────┐ │
│ │ MySQL │ │Elasticsearch│ │MinIO│ │
│ │ 连接 │ │ 连接 │ │连接 │ │
│ └──────────┘ └──────────┘ └──────┘ │
└─────────────────────────────────────────┘
↕
┌─────────────────────────────────────────┐
│ 存储层 (Storage Layer) │
│ ┌──────────┐ ┌──────────┐ ┌──────┐ │
│ │ MySQL │ │Elasticsearch│ │MinIO│ │
│ │ (元数据) │ │ (索引) │ │(文件)│ │
│ └──────────┘ └──────────┘ └──────┘ │
└─────────────────────────────────────────┘2.2 核心服务模块 #
- 对话服务:
api/db/services/dialog_service.py - 知识库服务:
api/db/services/knowledgebase_service.py - 文档服务:
api/db/services/document_service.py - 用户服务:
management/server/services/users/service.py - 团队服务:
management/server/services/teams/service.py
3. 数据库连接架构 #
系统使用连接池和上下文管理器管理数据库连接,确保资源有效利用和连接安全。
3.1 MySQL 连接管理 #
# api/db/database.py
import pymysql
from contextlib import contextmanager
from settings import settings
_connection_pool = None
def get_db_connection():
"""获取 MySQL 数据库连接"""
global _connection_pool
if _connection_pool is None:
_connection_pool = pymysql.connect(
host=settings.MYSQL_HOST,
port=settings.MYSQL_PORT,
user=settings.MYSQL_USER,
password=settings.MYSQL_PASSWORD,
database=settings.MYSQL_DATABASE,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
autocommit=False
)
return _connection_pool
@contextmanager
def get_db_cursor():
"""获取数据库游标(上下文管理器)"""
conn = get_db_connection()
cursor = conn.cursor()
try:
yield cursor
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
def execute_query(sql: str, params: tuple = None):
"""执行查询"""
with get_db_cursor() as cursor:
cursor.execute(sql, params)
return cursor.fetchall()
def execute_update(sql: str, params: tuple = None):
"""执行更新"""
with get_db_cursor() as cursor:
cursor.execute(sql, params)
return cursor.rowcount3.2 Elasticsearch 连接管理 #
# api/db/elasticsearch_client.py
from elasticsearch import Elasticsearch
from settings import settings
_es_client = None
def get_es_client():
"""获取 Elasticsearch 客户端"""
global _es_client
if _es_client is None:
_es_client = Elasticsearch(
[f"{settings.ES_HOST}:{settings.ES_PORT}"],
http_auth=(settings.ES_USER, settings.ES_PASSWORD),
verify_certs=False,
request_timeout=30
)
return _es_client
def create_index(index_name: str, mapping: dict = None):
"""创建索引"""
es = get_es_client()
if not es.indices.exists(index=index_name):
es.indices.create(index=index_name, body=mapping or {})3.3 MinIO 连接管理 #
# api/db/minio_client.py
from minio import Minio
from settings import settings
_minio_client = None
def get_minio_client():
"""获取 MinIO 客户端"""
global _minio_client
if _minio_client is None:
_minio_client = Minio(
settings.MINIO_ENDPOINT,
access_key=settings.MINIO_ACCESS_KEY,
secret_key=settings.MINIO_SECRET_KEY,
secure=False
)
return _minio_client
def upload_file(bucket_name: str, object_name: str, file_path: str):
"""上传文件到 MinIO"""
client = get_minio_client()
client.fput_object(bucket_name, object_name, file_path)4. 服务层组织 #
服务层负责业务逻辑处理,位于 API 路由层和数据访问层之间。
4.1 服务层结构 #
api/db/services/
├── dialog_service.py # 对话服务
├── knowledgebase_service.py # 知识库服务
├── document_service.py # 文档服务
└── file_service.py # 文件服务
management/server/services/
├── users/
│ └── service.py # 用户服务
├── teams/
│ └── service.py # 团队服务
└── knowledgebases/
└── service.py # 知识库管理服务4.2 对话服务实现 #
# api/db/services/dialog_service.py
from api.db.database import get_db_cursor
from rag.llm.llm_bundle import LLMBundle
from rag.search.searcher import Searcher
class DialogService:
def __init__(self):
self.llm_bundle = LLMBundle()
self.searcher = Searcher()
def chat(self, kb_id: str, question: str, conversation_id: str = None):
"""处理对话请求"""
# 1. 检索相关文档
chunks = self.searcher.search(question, kb_id, top_k=5)
# 2. 构建提示词
context = "\n".join([chunk.content for chunk in chunks])
prompt = self._build_prompt(question, context)
# 3. 调用 LLM 生成回答
response = self.llm_bundle.chat(prompt)
# 4. 保存对话历史
if conversation_id:
self._save_message(conversation_id, question, response)
return {
'answer': response,
'citations': [chunk.to_dict() for chunk in chunks]
}
def _build_prompt(self, question: str, context: str) -> str:
"""构建提示词"""
return f"""基于以下上下文回答问题:
上下文:
{context}
问题:{question}
回答:"""
def _save_message(self, conversation_id: str, question: str, answer: str):
"""保存消息到数据库"""
with get_db_cursor() as cursor:
cursor.execute(
"INSERT INTO dialog_message (conversation_id, question, answer) VALUES (%s, %s, %s)",
(conversation_id, question, answer)
)4.3 知识库服务实现 #
# api/db/services/knowledgebase_service.py
from api.db.database import get_db_cursor, execute_query, execute_update
from api.db.elasticsearch_client import get_es_client, create_index
class KnowledgebaseService:
def create_knowledgebase(self, name: str, tenant_id: str, embedding_model: str = None):
"""创建知识库"""
# 1. 数据库操作
with get_db_cursor() as cursor:
cursor.execute(
"INSERT INTO knowledgebase (name, tenant_id, embedding_model) VALUES (%s, %s, %s)",
(name, tenant_id, embedding_model or 'bge-m3')
)
kb_id = cursor.lastrowid
# 2. 创建 Elasticsearch 索引
es = get_es_client()
index_name = f"kb_{kb_id}"
mapping = {
"mappings": {
"properties": {
"content": {"type": "text"},
"embedding": {"type": "dense_vector", "dims": 1024}
}
}
}
create_index(index_name, mapping)
return {'id': kb_id, 'name': name}
def get_knowledgebase(self, kb_id: str):
"""获取知识库信息"""
result = execute_query(
"SELECT * FROM knowledgebase WHERE id = %s",
(kb_id,)
)
return result[0] if result else None
def list_knowledgebases(self, tenant_id: str):
"""列出知识库列表"""
return execute_query(
"SELECT * FROM knowledgebase WHERE tenant_id = %s ORDER BY create_time DESC",
(tenant_id,)
)5. API 路由架构 #
API 路由层负责 HTTP 请求处理和响应格式化。
5.1 Flask 应用结构 #
api/
├── apps/
│ ├── __init__.py # Flask 应用初始化
│ ├── dialog_app.py # 对话 API
│ ├── api_app.py # 核心 API
│ └── sdk_app.py # SDK API
└── routes/
└── knowledgebases/
└── routes.py # 知识库路由5.2 Flask 应用初始化 #
# api/apps/__init__.py
from flask import Flask
from flask_cors import CORS
from flasgger import Swagger
def create_app():
"""创建 Flask 应用"""
app = Flask(__name__)
# CORS 配置
CORS(app, resources={
r"/api/*": {
"origins": "*",
"methods": ["GET", "POST", "PUT", "DELETE"],
"allow_headers": ["Content-Type", "Authorization"]
}
})
# Swagger 文档
Swagger(app, template={
"swagger": "2.0",
"info": {
"title": "Ragflow-Plus API",
"version": "1.0.0"
}
})
# 注册蓝图
from api.apps.dialog_app import dialog_bp
from api.apps.api_app import api_bp
from routes.knowledgebases.routes import kb_bp
app.register_blueprint(dialog_bp, url_prefix='/api/v1')
app.register_blueprint(api_bp, url_prefix='/api/v1')
app.register_blueprint(kb_bp)
return app
app = create_app()5.3 路由实现 #
# routes/knowledgebases/routes.py
from flask import Blueprint, request, jsonify
from api.db.services.knowledgebase_service import KnowledgebaseService
kb_bp = Blueprint('knowledgebases', __name__, url_prefix='/api/v1/knowledgebases')
@kb_bp.route('', methods=['POST'])
def create_knowledgebase():
"""
创建知识库
---
tags:
- 知识库
parameters:
- name: body
in: body
required: true
schema:
type: object
properties:
name:
type: string
tenant_id:
type: string
embedding_model:
type: string
responses:
201:
description: 创建成功
500:
description: 服务器错误
"""
data = request.json
service = KnowledgebaseService()
try:
kb = service.create_knowledgebase(
name=data['name'],
tenant_id=data['tenant_id'],
embedding_model=data.get('embedding_model')
)
return jsonify({'code': 0, 'data': kb}), 201
except Exception as e:
return jsonify({'code': 500, 'message': str(e)}), 500
@kb_bp.route('', methods=['GET'])
def list_knowledgebases():
"""列出知识库"""
tenant_id = request.args.get('tenant_id')
service = KnowledgebaseService()
try:
kbs = service.list_knowledgebases(tenant_id)
return jsonify({'code': 0, 'data': {'list': kbs}})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)}), 500
@kb_bp.route('/<kb_id>', methods=['GET'])
def get_knowledgebase(kb_id: str):
"""获取知识库详情"""
service = KnowledgebaseService()
try:
kb = service.get_knowledgebase(kb_id)
if not kb:
return jsonify({'code': 404, 'message': '知识库不存在'}), 404
return jsonify({'code': 0, 'data': kb})
except Exception as e:
return jsonify({'code': 500, 'message': str(e)}), 5005.4 流式响应实现 #
# api/apps/dialog_app.py
from flask import Blueprint, Response, request, stream_with_context
from api.db.services.dialog_service import DialogService
import json
dialog_bp = Blueprint('dialog', __name__)
@dialog_bp.route('/completion', methods=['GET'])
def completion():
"""流式对话接口"""
kb_id = request.args.get('kb_id')
question = request.args.get('question')
service = DialogService()
def generate():
"""生成器函数"""
# 检索相关文档
chunks = service.searcher.search(question, kb_id, top_k=5)
# 流式生成回答
for chunk in service.llm_bundle.chat_stream(question, chunks):
yield f"data: {json.dumps({'type': 'content', 'content': chunk})}\n\n"
yield f"data: {json.dumps({'type': 'done'})}\n\n"
return Response(
stream_with_context(generate()),
mimetype='text/event-stream',
headers={
'Cache-Control': 'no-cache',
'X-Accel-Buffering': 'no'
}
)6. 依赖管理 #
使用 requirements.txt 管理 Python 依赖。
6.1 核心依赖 #
# Web 框架
flask==3.0.0
flask-cors==4.0.0
flasgger==0.9.7.1
# 数据库
pymysql==1.1.0
sqlalchemy==2.0.0
# 搜索引擎
elasticsearch==8.11.0
# 对象存储
minio==7.2.0
# Redis
redis==5.0.0
# AI/ML
transformers==4.49.0
torch==2.1.0
sentence-transformers==2.2.2
# 工具库
python-dotenv==1.0.0
pydantic==2.5.06.2 依赖安装 #
# 安装依赖
pip install -r requirements.txt
# 开发依赖
pip install -r requirements-dev.txt7. 配置管理 #
系统使用环境变量和配置文件管理应用配置。
7.1 配置加载 #
# settings.py
import os
from dotenv import load_dotenv
load_dotenv()
class Settings:
# 应用配置
DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'
HOST = os.getenv('HOST', '0.0.0.0')
PORT = int(os.getenv('PORT', 9380))
# 数据库配置
MYSQL_HOST = os.getenv('MYSQL_HOST', 'localhost')
MYSQL_PORT = int(os.getenv('MYSQL_PORT', 3306))
MYSQL_USER = os.getenv('MYSQL_USER', 'root')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', '')
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'ragflow')
# Elasticsearch 配置
ES_HOST = os.getenv('ES_HOST', 'localhost')
ES_PORT = int(os.getenv('ES_PORT', 9200))
ES_USER = os.getenv('ES_USER', 'elastic')
ES_PASSWORD = os.getenv('ES_PASSWORD', '')
# MinIO 配置
MINIO_ENDPOINT = os.getenv('MINIO_ENDPOINT', 'localhost:9000')
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY', '')
MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY', '')
# Redis 配置
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', '')
# LLM 配置
LLM_API_KEY = os.getenv('LLM_API_KEY', '')
LLM_BASE_URL = os.getenv('LLM_BASE_URL', '')
settings = Settings()7.2 环境变量示例 #
# .env
DEBUG=False
HOST=0.0.0.0
PORT=9380
MYSQL_HOST=mysql
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=infini_rag_flow
MYSQL_DATABASE=ragflow
ES_HOST=es01
ES_PORT=9200
ES_USER=elastic
ES_PASSWORD=infini_rag_flow
MINIO_ENDPOINT=minio:9000
MINIO_ACCESS_KEY=rag_flow
MINIO_SECRET_KEY=infini_rag_flow
REDIS_HOST=redis
REDIS_PORT=6379
REDIS_PASSWORD=infini_rag_flow
LLM_API_KEY=your_api_key
LLM_BASE_URL=https://api.openai.com/v18. 错误处理 #
8.1 统一异常处理 #
# api/utils/exceptions.py
class APIException(Exception):
def __init__(self, code: int, message: str):
self.code = code
self.message = message
class NotFoundError(APIException):
def __init__(self, message: str = "资源不存在"):
super().__init__(404, message)
class ValidationError(APIException):
def __init__(self, message: str = "参数验证失败"):
super().__init__(400, message)
# 全局异常处理
@app.errorhandler(APIException)
def handle_api_exception(e: APIException):
return jsonify({'code': e.code, 'message': e.message}), e.code
@app.errorhandler(Exception)
def handle_exception(e: Exception):
return jsonify({'code': 500, 'message': str(e)}), 5009. 日志管理 #
9.1 日志配置 #
# api/utils/logger.py
import logging
import sys
def setup_logger(name: str = 'ragflow'):
"""设置日志"""
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
logger = setup_logger()10. 最佳实践 #
10.1 服务层设计 #
- 单一职责:每个服务类只负责一个业务领域
- 依赖注入:通过构造函数注入依赖
- 错误处理:在服务层处理业务异常
- 事务管理:使用上下文管理器管理事务
10.2 API 设计 #
- RESTful 规范:遵循 RESTful API 设计原则
- 统一响应格式:使用统一的响应格式
- 参数验证:验证请求参数
- 错误处理:提供清晰的错误信息
10.3 数据库操作 #
- 连接池:使用连接池管理数据库连接
- 事务管理:使用事务保证数据一致性
- SQL 注入防护:使用参数化查询
- 索引优化:为常用查询字段创建索引
11. 总结 #
本文档介绍了 Ragflow-Plus 后端服务架构的各个方面:
- 架构设计:分层架构和模块组织
- 数据库连接:MySQL、Elasticsearch、MinIO 连接管理
- 服务层:业务逻辑处理和服务实现
- API 路由:Flask 路由和流式响应
- 配置管理:环境变量和配置加载
- 错误处理:统一异常处理机制
通过遵循本文档的架构设计,您可以构建可维护、可扩展的后端服务。