导航菜单

  • 1.概述
  • 2.功能与能力
  • 3.系统架构
  • 4.部署与配置
  • 5.Docker 部署
  • 6.环境配置
  • 7.外部服务设置
  • 8.AI模型与LLM配置
  • 9.核心系统
  • 10.文档处理流水线
  • 11.RAG引擎与搜索
  • 12.知识库管理系统
  • 13.对话与对话系统
  • 14.翻译与跨语言支持
  • 15.用户界面
  • 16.主应用界面
  • 17.管理仪表盘
  • 18.文档编写界面
  • 19.知识库内容管理
  • 20.国际化与本地化
  • 21.管理功能
  • 22.用户与团队管理
  • 23.文件和存储管理
  • 24.知识库管理
  • 25.系统监控与健康状态
  • 26.API 参考
  • 27.知识库API
  • 28.对话与聊天API
  • 29.文件管理API
  • 30.管理与Admin API
  • 31.开发指南
  • 32.前端开发
  • 33.后端服务架构
  • 34.数据库模式与模型
  • 35.基础设施与文档
  • 36.快速入门指南
  • 1. 目的与范围
  • 2. 服务架构概述
    • 2.1 架构层次
    • 2.2 核心服务模块
  • 3. 数据库连接架构
    • 3.1 MySQL 连接管理
    • 3.2 Elasticsearch 连接管理
    • 3.3 MinIO 连接管理
  • 4. 服务层组织
    • 4.1 服务层结构
    • 4.2 对话服务实现
    • 4.3 知识库服务实现
  • 5. API 路由架构
    • 5.1 Flask 应用结构
    • 5.2 Flask 应用初始化
    • 5.3 路由实现
    • 5.4 流式响应实现
  • 6. 依赖管理
    • 6.1 核心依赖
    • 6.2 依赖安装
  • 7. 配置管理
    • 7.1 配置加载
    • 7.2 环境变量示例
  • 8. 错误处理
    • 8.1 统一异常处理
  • 9. 日志管理
    • 9.1 日志配置
  • 10. 最佳实践
    • 10.1 服务层设计
    • 10.2 API 设计
    • 10.3 数据库操作
  • 11. 总结

# Ragflow-Plus 后端服务架构教程

1. 目的与范围 #

本文档详细介绍了 Ragflow-Plus 后端服务的架构设计、服务层组织、API 路由架构、数据库连接管理和配置管理。后端服务采用 Flask 框架,遵循分层架构设计原则。

有关系统架构概述,请参阅 系统架构。有关开发指南,请参阅 开发指南。

2. 服务架构概述 #

Ragflow-Plus 后端服务采用分层架构,包含 API 层、服务层、数据访问层和存储层。

2.1 架构层次 #

┌─────────────────────────────────────────┐
│      API 路由层 (API Route Layer)       │
│  ┌──────────┐  ┌──────────┐  ┌──────┐  │
│  │ Flask    │  │ Blueprint│  │路由  │  │
│  │ Routes   │  │          │  │处理  │  │
│  └──────────┘  └──────────┘  └──────┘  │
└─────────────────────────────────────────┘
                 ↕
┌─────────────────────────────────────────┐
│      服务层 (Service Layer)              │
│  ┌──────────┐  ┌──────────┐  ┌──────┐  │
│  │ 对话服务 │  │ 知识库服务│  │文档服务│  │
│  │          │  │          │  │      │  │
│  └──────────┘  └──────────┘  └──────┘  │
└─────────────────────────────────────────┘
                 ↕
┌─────────────────────────────────────────┐
│      数据访问层 (Data Access Layer)      │
│  ┌──────────┐  ┌──────────┐  ┌──────┐  │
│  │ MySQL    │  │Elasticsearch│  │MinIO│  │
│  │ 连接     │  │ 连接     │  │连接  │  │
│  └──────────┘  └──────────┘  └──────┘  │
└─────────────────────────────────────────┘
                 ↕
┌─────────────────────────────────────────┐
│      存储层 (Storage Layer)              │
│  ┌──────────┐  ┌──────────┐  ┌──────┐  │
│  │ MySQL    │  │Elasticsearch│  │MinIO│  │
│  │ (元数据)  │  │ (索引)    │  │(文件)│  │
│  └──────────┘  └──────────┘  └──────┘  │
└─────────────────────────────────────────┘

2.2 核心服务模块 #

  • 对话服务:api/db/services/dialog_service.py
  • 知识库服务:api/db/services/knowledgebase_service.py
  • 文档服务:api/db/services/document_service.py
  • 用户服务:management/server/services/users/service.py
  • 团队服务:management/server/services/teams/service.py

3. 数据库连接架构 #

系统使用连接池和上下文管理器管理数据库连接,确保资源有效利用和连接安全。

3.1 MySQL 连接管理 #

# api/db/database.py
import pymysql
from contextlib import contextmanager
from settings import settings

_connection_pool = None

def get_db_connection():
    """获取 MySQL 数据库连接"""
    global _connection_pool
    if _connection_pool is None:
        _connection_pool = pymysql.connect(
            host=settings.MYSQL_HOST,
            port=settings.MYSQL_PORT,
            user=settings.MYSQL_USER,
            password=settings.MYSQL_PASSWORD,
            database=settings.MYSQL_DATABASE,
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor,
            autocommit=False
        )
    return _connection_pool

@contextmanager
def get_db_cursor():
    """获取数据库游标(上下文管理器)"""
    conn = get_db_connection()
    cursor = conn.cursor()
    try:
        yield cursor
        conn.commit()
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        cursor.close()

def execute_query(sql: str, params: tuple = None):
    """执行查询"""
    with get_db_cursor() as cursor:
        cursor.execute(sql, params)
        return cursor.fetchall()

def execute_update(sql: str, params: tuple = None):
    """执行更新"""
    with get_db_cursor() as cursor:
        cursor.execute(sql, params)
        return cursor.rowcount

3.2 Elasticsearch 连接管理 #

# api/db/elasticsearch_client.py
from elasticsearch import Elasticsearch
from settings import settings

_es_client = None

def get_es_client():
    """获取 Elasticsearch 客户端"""
    global _es_client
    if _es_client is None:
        _es_client = Elasticsearch(
            [f"{settings.ES_HOST}:{settings.ES_PORT}"],
            http_auth=(settings.ES_USER, settings.ES_PASSWORD),
            verify_certs=False,
            request_timeout=30
        )
    return _es_client

def create_index(index_name: str, mapping: dict = None):
    """创建索引"""
    es = get_es_client()
    if not es.indices.exists(index=index_name):
        es.indices.create(index=index_name, body=mapping or {})

3.3 MinIO 连接管理 #

# api/db/minio_client.py
from minio import Minio
from settings import settings

_minio_client = None

def get_minio_client():
    """获取 MinIO 客户端"""
    global _minio_client
    if _minio_client is None:
        _minio_client = Minio(
            settings.MINIO_ENDPOINT,
            access_key=settings.MINIO_ACCESS_KEY,
            secret_key=settings.MINIO_SECRET_KEY,
            secure=False
        )
    return _minio_client

def upload_file(bucket_name: str, object_name: str, file_path: str):
    """上传文件到 MinIO"""
    client = get_minio_client()
    client.fput_object(bucket_name, object_name, file_path)

4. 服务层组织 #

服务层负责业务逻辑处理,位于 API 路由层和数据访问层之间。

4.1 服务层结构 #

api/db/services/
├── dialog_service.py          # 对话服务
├── knowledgebase_service.py   # 知识库服务
├── document_service.py        # 文档服务
└── file_service.py            # 文件服务

management/server/services/
├── users/
│   └── service.py             # 用户服务
├── teams/
│   └── service.py             # 团队服务
└── knowledgebases/
    └── service.py             # 知识库管理服务

4.2 对话服务实现 #

# api/db/services/dialog_service.py
from api.db.database import get_db_cursor
from rag.llm.llm_bundle import LLMBundle
from rag.search.searcher import Searcher

class DialogService:
    def __init__(self):
        self.llm_bundle = LLMBundle()
        self.searcher = Searcher()

    def chat(self, kb_id: str, question: str, conversation_id: str = None):
        """处理对话请求"""
        # 1. 检索相关文档
        chunks = self.searcher.search(question, kb_id, top_k=5)

        # 2. 构建提示词
        context = "\n".join([chunk.content for chunk in chunks])
        prompt = self._build_prompt(question, context)

        # 3. 调用 LLM 生成回答
        response = self.llm_bundle.chat(prompt)

        # 4. 保存对话历史
        if conversation_id:
            self._save_message(conversation_id, question, response)

        return {
            'answer': response,
            'citations': [chunk.to_dict() for chunk in chunks]
        }

    def _build_prompt(self, question: str, context: str) -> str:
        """构建提示词"""
        return f"""基于以下上下文回答问题:

上下文:
{context}

问题:{question}

回答:"""

    def _save_message(self, conversation_id: str, question: str, answer: str):
        """保存消息到数据库"""
        with get_db_cursor() as cursor:
            cursor.execute(
                "INSERT INTO dialog_message (conversation_id, question, answer) VALUES (%s, %s, %s)",
                (conversation_id, question, answer)
            )

4.3 知识库服务实现 #

# api/db/services/knowledgebase_service.py
from api.db.database import get_db_cursor, execute_query, execute_update
from api.db.elasticsearch_client import get_es_client, create_index

class KnowledgebaseService:
    def create_knowledgebase(self, name: str, tenant_id: str, embedding_model: str = None):
        """创建知识库"""
        # 1. 数据库操作
        with get_db_cursor() as cursor:
            cursor.execute(
                "INSERT INTO knowledgebase (name, tenant_id, embedding_model) VALUES (%s, %s, %s)",
                (name, tenant_id, embedding_model or 'bge-m3')
            )
            kb_id = cursor.lastrowid

        # 2. 创建 Elasticsearch 索引
        es = get_es_client()
        index_name = f"kb_{kb_id}"
        mapping = {
            "mappings": {
                "properties": {
                    "content": {"type": "text"},
                    "embedding": {"type": "dense_vector", "dims": 1024}
                }
            }
        }
        create_index(index_name, mapping)

        return {'id': kb_id, 'name': name}

    def get_knowledgebase(self, kb_id: str):
        """获取知识库信息"""
        result = execute_query(
            "SELECT * FROM knowledgebase WHERE id = %s",
            (kb_id,)
        )
        return result[0] if result else None

    def list_knowledgebases(self, tenant_id: str):
        """列出知识库列表"""
        return execute_query(
            "SELECT * FROM knowledgebase WHERE tenant_id = %s ORDER BY create_time DESC",
            (tenant_id,)
        )

5. API 路由架构 #

API 路由层负责 HTTP 请求处理和响应格式化。

5.1 Flask 应用结构 #

api/
├── apps/
│   ├── __init__.py           # Flask 应用初始化
│   ├── dialog_app.py          # 对话 API
│   ├── api_app.py             # 核心 API
│   └── sdk_app.py             # SDK API
└── routes/
    └── knowledgebases/
        └── routes.py          # 知识库路由

5.2 Flask 应用初始化 #

# api/apps/__init__.py
from flask import Flask
from flask_cors import CORS
from flasgger import Swagger

def create_app():
    """创建 Flask 应用"""
    app = Flask(__name__)

    # CORS 配置
    CORS(app, resources={
        r"/api/*": {
            "origins": "*",
            "methods": ["GET", "POST", "PUT", "DELETE"],
            "allow_headers": ["Content-Type", "Authorization"]
        }
    })

    # Swagger 文档
    Swagger(app, template={
        "swagger": "2.0",
        "info": {
            "title": "Ragflow-Plus API",
            "version": "1.0.0"
        }
    })

    # 注册蓝图
    from api.apps.dialog_app import dialog_bp
    from api.apps.api_app import api_bp
    from routes.knowledgebases.routes import kb_bp

    app.register_blueprint(dialog_bp, url_prefix='/api/v1')
    app.register_blueprint(api_bp, url_prefix='/api/v1')
    app.register_blueprint(kb_bp)

    return app

app = create_app()

5.3 路由实现 #

# routes/knowledgebases/routes.py
from flask import Blueprint, request, jsonify
from api.db.services.knowledgebase_service import KnowledgebaseService

kb_bp = Blueprint('knowledgebases', __name__, url_prefix='/api/v1/knowledgebases')

@kb_bp.route('', methods=['POST'])
def create_knowledgebase():
    """
    创建知识库
    ---
    tags:
      - 知识库
    parameters:
      - name: body
        in: body
        required: true
        schema:
          type: object
          properties:
            name:
              type: string
            tenant_id:
              type: string
            embedding_model:
              type: string
    responses:
      201:
        description: 创建成功
      500:
        description: 服务器错误
    """
    data = request.json
    service = KnowledgebaseService()

    try:
        kb = service.create_knowledgebase(
            name=data['name'],
            tenant_id=data['tenant_id'],
            embedding_model=data.get('embedding_model')
        )
        return jsonify({'code': 0, 'data': kb}), 201
    except Exception as e:
        return jsonify({'code': 500, 'message': str(e)}), 500

@kb_bp.route('', methods=['GET'])
def list_knowledgebases():
    """列出知识库"""
    tenant_id = request.args.get('tenant_id')
    service = KnowledgebaseService()

    try:
        kbs = service.list_knowledgebases(tenant_id)
        return jsonify({'code': 0, 'data': {'list': kbs}})
    except Exception as e:
        return jsonify({'code': 500, 'message': str(e)}), 500

@kb_bp.route('/<kb_id>', methods=['GET'])
def get_knowledgebase(kb_id: str):
    """获取知识库详情"""
    service = KnowledgebaseService()

    try:
        kb = service.get_knowledgebase(kb_id)
        if not kb:
            return jsonify({'code': 404, 'message': '知识库不存在'}), 404
        return jsonify({'code': 0, 'data': kb})
    except Exception as e:
        return jsonify({'code': 500, 'message': str(e)}), 500

5.4 流式响应实现 #

# api/apps/dialog_app.py
from flask import Blueprint, Response, request, stream_with_context
from api.db.services.dialog_service import DialogService
import json

dialog_bp = Blueprint('dialog', __name__)

@dialog_bp.route('/completion', methods=['GET'])
def completion():
    """流式对话接口"""
    kb_id = request.args.get('kb_id')
    question = request.args.get('question')

    service = DialogService()

    def generate():
        """生成器函数"""
        # 检索相关文档
        chunks = service.searcher.search(question, kb_id, top_k=5)

        # 流式生成回答
        for chunk in service.llm_bundle.chat_stream(question, chunks):
            yield f"data: {json.dumps({'type': 'content', 'content': chunk})}\n\n"

        yield f"data: {json.dumps({'type': 'done'})}\n\n"

    return Response(
        stream_with_context(generate()),
        mimetype='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',
            'X-Accel-Buffering': 'no'
        }
    )

6. 依赖管理 #

使用 requirements.txt 管理 Python 依赖。

6.1 核心依赖 #

# Web 框架
flask==3.0.0
flask-cors==4.0.0
flasgger==0.9.7.1

# 数据库
pymysql==1.1.0
sqlalchemy==2.0.0

# 搜索引擎
elasticsearch==8.11.0

# 对象存储
minio==7.2.0

# Redis
redis==5.0.0

# AI/ML
transformers==4.49.0
torch==2.1.0
sentence-transformers==2.2.2

# 工具库
python-dotenv==1.0.0
pydantic==2.5.0

6.2 依赖安装 #

# 安装依赖
pip install -r requirements.txt

# 开发依赖
pip install -r requirements-dev.txt

7. 配置管理 #

系统使用环境变量和配置文件管理应用配置。

7.1 配置加载 #

# settings.py
import os
from dotenv import load_dotenv

load_dotenv()

class Settings:
    # 应用配置
    DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'
    HOST = os.getenv('HOST', '0.0.0.0')
    PORT = int(os.getenv('PORT', 9380))

    # 数据库配置
    MYSQL_HOST = os.getenv('MYSQL_HOST', 'localhost')
    MYSQL_PORT = int(os.getenv('MYSQL_PORT', 3306))
    MYSQL_USER = os.getenv('MYSQL_USER', 'root')
    MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', '')
    MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'ragflow')

    # Elasticsearch 配置
    ES_HOST = os.getenv('ES_HOST', 'localhost')
    ES_PORT = int(os.getenv('ES_PORT', 9200))
    ES_USER = os.getenv('ES_USER', 'elastic')
    ES_PASSWORD = os.getenv('ES_PASSWORD', '')

    # MinIO 配置
    MINIO_ENDPOINT = os.getenv('MINIO_ENDPOINT', 'localhost:9000')
    MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY', '')
    MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY', '')

    # Redis 配置
    REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
    REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
    REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', '')

    # LLM 配置
    LLM_API_KEY = os.getenv('LLM_API_KEY', '')
    LLM_BASE_URL = os.getenv('LLM_BASE_URL', '')

settings = Settings()

7.2 环境变量示例 #

# .env
DEBUG=False
HOST=0.0.0.0
PORT=9380

MYSQL_HOST=mysql
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=infini_rag_flow
MYSQL_DATABASE=ragflow

ES_HOST=es01
ES_PORT=9200
ES_USER=elastic
ES_PASSWORD=infini_rag_flow

MINIO_ENDPOINT=minio:9000
MINIO_ACCESS_KEY=rag_flow
MINIO_SECRET_KEY=infini_rag_flow

REDIS_HOST=redis
REDIS_PORT=6379
REDIS_PASSWORD=infini_rag_flow

LLM_API_KEY=your_api_key
LLM_BASE_URL=https://api.openai.com/v1

8. 错误处理 #

8.1 统一异常处理 #

# api/utils/exceptions.py
class APIException(Exception):
    def __init__(self, code: int, message: str):
        self.code = code
        self.message = message

class NotFoundError(APIException):
    def __init__(self, message: str = "资源不存在"):
        super().__init__(404, message)

class ValidationError(APIException):
    def __init__(self, message: str = "参数验证失败"):
        super().__init__(400, message)

# 全局异常处理
@app.errorhandler(APIException)
def handle_api_exception(e: APIException):
    return jsonify({'code': e.code, 'message': e.message}), e.code

@app.errorhandler(Exception)
def handle_exception(e: Exception):
    return jsonify({'code': 500, 'message': str(e)}), 500

9. 日志管理 #

9.1 日志配置 #

# api/utils/logger.py
import logging
import sys

def setup_logger(name: str = 'ragflow'):
    """设置日志"""
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)

    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(logging.INFO)

    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    handler.setFormatter(formatter)

    logger.addHandler(handler)
    return logger

logger = setup_logger()

10. 最佳实践 #

10.1 服务层设计 #

  1. 单一职责:每个服务类只负责一个业务领域
  2. 依赖注入:通过构造函数注入依赖
  3. 错误处理:在服务层处理业务异常
  4. 事务管理:使用上下文管理器管理事务

10.2 API 设计 #

  1. RESTful 规范:遵循 RESTful API 设计原则
  2. 统一响应格式:使用统一的响应格式
  3. 参数验证:验证请求参数
  4. 错误处理:提供清晰的错误信息

10.3 数据库操作 #

  1. 连接池:使用连接池管理数据库连接
  2. 事务管理:使用事务保证数据一致性
  3. SQL 注入防护:使用参数化查询
  4. 索引优化:为常用查询字段创建索引

11. 总结 #

本文档介绍了 Ragflow-Plus 后端服务架构的各个方面:

  • 架构设计:分层架构和模块组织
  • 数据库连接:MySQL、Elasticsearch、MinIO 连接管理
  • 服务层:业务逻辑处理和服务实现
  • API 路由:Flask 路由和流式响应
  • 配置管理:环境变量和配置加载
  • 错误处理:统一异常处理机制

通过遵循本文档的架构设计,您可以构建可维护、可扩展的后端服务。

访问验证

请输入访问令牌

Token不正确,请重新输入