跳到主要内容

OceanBase MCP Server 与 Cursor 集成

MCP(Model Context Protocol) 是 Anthropic 公司于 2024 年 11 月推出并开源,旨在实现大语言模型与外部工具或数据源交互的协议。通过 MCP,用户不需要将大模型的输出手动复制执行,大模型可以直接指挥工具执行相应的动作(Action)。

MCP Server 通过 MCP 协议提供了大模型与 OceanBase 数据库交互的能力,可以执行 SQL 语句。通过合适的客户端可以快速搭建项目原型,已在 github 上开源。

Cursor 是一款集成了 AI 技术的代码编辑器,支持多种操作系统,包括 Windows、macOS 和 Linux。

本文将展示如何使用 Cursor 与 OceanBase MCP Server 集成,快速构建后端应用程序。

前提条件

  • 您已完成部署 seekdb。

  • 安装 Python 3.11 及以上版本 和相应 pip。如果您的机器上 Python 版本较低,可以使用 Miniconda 来创建新的 Python 3.11 及以上的环境,具体可参考 Miniconda 安装指南

  • 根据所用的操作系统,安装 Git

  • 安装 Python 包管理器 uv。安装完成后,可使用 uv --version 命令验证安装是否成功:

    pip install uv
    uv --version
  • 下载 Cursor,根据自己的操作系统选择合适的版本进行安装。注意首次使用 Cursor 时,需要注册一个新账号或使用已有账号进行登录。登录后,可以创建新项目或打开已有项目。

步骤一:获取数据库连接信息

联系 seekdb 部署人员或者管理员获取相应的数据库连接串,例如:

obclient -h$host -P$port -u$user_name -p$password -D$database_name

参数说明:

  • $host:提供 seekdb 连接 IP 地址。

  • $port:提供 seekdb 连接端口,默认是 2881

  • $database_name:需要访问的数据库名称。

    提示

    连接的用户需要拥有该数据库的 CREATEINSERTDROPSELECT 权限。

  • $user_name:提供数据库连接账户。

  • $password:提供账户密码。

步骤二:配置 OceanBase MCP Server

克隆 OceanBase MCP Server 仓库

执行下面的命令将源代码下载到本地:

git clone https://github.com/oceanbase/mcp-oceanbase.git

进入源代码目录:

cd mcp-oceanbase

安装依赖

mcp-oceanbase 目录下执行下面的命令创建虚拟环境,并安装依赖:

uv venv
source .venv/bin/activate
uv pip install .

创建 Cursor 客户端的工作目录并配置 OceanBase MCP Server

手动创建一个 Cursor 的工作目录,并用 Cursor 打开,后面 Cursor 生成的文件将放在这个目录下,示例的目录名为 cursor

使用快捷键 Ctrl + L(Windows)或者 Command + L(MacOS)打开聊天对话框,点击右上角的齿轮,选择 MCP Tools

1

添加并配置 MCP Servers

  1. 点击 Add Custom MCP 填写配置文件。

    2

  2. 填写配置文件,点击确认。

    path/to/your/mcp-oceanbase/src/oceanbase_mcp_server 需要替换为 oceanbase_mcp_server 文件夹的绝对路径,OB_HOSTOB_PORTOB_USEROB_PASSWORDOB_DATABASE 需要替换为自己数据库的对应信息:

    {
    "mcpServers": {
    "oceanbase": {
    "command": "uv",
    "args": [
    "--directory",
    "/path/to/your/mcp-oceanbase/src/oceanbase_mcp_server",
    "run",
    "oceanbase_mcp_server"
    ],
    "env": {
    "OB_HOST": "***",
    "OB_PORT": "***",
    "OB_USER": "***",
    "OB_PASSWORD": "***",
    "OB_DATABASE": "***"
    }
    }
    }
    }
  3. 如配置成功,将显示 可使用 状态。

    3

测试 MCP Server

  1. 在对话框中输入提示:test 库中有多少张表?,Cursor 客户端会展示即将执行的 SQL 语句。确认无误后,点击 Run tool 按钮执行查询。Cursor 客户端会展示 test 库中的所有表名称,这表明我们已经成功连接到 seekdb。

    4

使用 FastAPI 快速创建 RESTful API 风格的项目

你可以使用 FastAPI 快速创建 RESTful API 风格的项目。FastAPI 是一个 Python 的 Web 框架,可以快速构建 RESTful API。

  1. 创建 customer 表

    在对话框中输入提示:创建一个 customer 表,主键是 ID,包含 name,age,telephone,location 字段,确认 SQL 语句后,点击 Run tool 按钮执行查询。

    5

  2. 插入测试数据

    在对话框中输入提示:向 customer 表中插入 10 条数据,确认 SQL 语句后,点击 Run tool 按钮执行查询。插入成功后,会有 已成功向 customer 表插入了 10 条测试数据... 的提示。

    6

  3. 创建 FastAPI 项目

    在对话框中输入提示:创建一个 FastAPI 项目,生成基于 customer 表的 RESTful API,确认 SQL 语句后,点击 Run tool 按钮执行查询。

    7

    此步骤将自动生成两个文件。建议首次使用时选择 全部接受,因为 AI 生成的文件内容可能具有不确定性,后续可根据实际需求进行调整。

  4. 创建虚拟环境并安装依赖

    执行如下命令,在当前目录下使用 uv 包管理工具创建虚拟环境,并安装依赖包:

    uv venv
    source .venv/bin/activate
    uv pip install -r requirements.txt
  5. 启动 FastAPI 项目

    执行如下命令,启动 FastAPI 项目:

    uvicorn main:app --reload
  6. 查看表中数据

    在命令行中运行如下命令,或者使用其他请求工具,查看表中的数据:

    curl http://127.0.0.1:8000/customers

    返回结果如下:

    [{"id":1,"name":"Alice","age":28,"telephone":"1234567890","location":"Beijing"},{"id":2,"name":"Bob","age":32,"telephone":"2345678901","location":"Shanghai"},{"id":3,"name":"Charlie","age":25,"telephone":"3456789012","location":"Guangzhou"},{"id":4,"name":"David","age":40,"telephone":"4567890123","location":"Shenzhen"},{"id":5,"name":"Eve","age":22,"telephone":"5678901234","location":"Chengdu"},{"id":6,"name":"Frank","age":35,"telephone":"6789012345","location":"Wuhan"},{"id":7,"name":"Grace","age":30,"telephone":"7890123456","location":"Hangzhou"},{"id":8,"name":"Heidi","age":27,"telephone":"8901234567","location":"Nanjing"},{"id":9,"name":"Ivan","age":29,"telephone":"9012345678","location":"Tianjin"},{"id":10,"name":"Judy","age":31,"telephone":"0123456789","location":"Chongqing"}]

    可以看到增、删、改、查的 RESTful API 已经成功生成:

    from fastapi import FastAPI, HTTPException, Depends
    from pydantic import BaseModel
    from typing import List
    from sqlalchemy import create_engine, Column, Integer, String
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import sessionmaker, Session

    # seekdb 连接配置(请根据实际情况修改)
    DATABASE_URL = "mysql://***:***@***:***/***"

    engine = create_engine(DATABASE_URL, echo=True)
    SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
    Base = declarative_base()

    class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100))
    age = Column(Integer)
    telephone = Column(String(20))
    location = Column(String(100))

    class CustomerCreate(BaseModel):
    id: int
    name: str
    age: int
    telephone: str
    location: str

    class CustomerUpdate(BaseModel):
    name: str = None
    age: int = None
    telephone: str = None
    location: str = None

    class CustomerOut(BaseModel):
    id: int
    name: str
    age: int
    telephone: str
    location: str
    class Config:
    orm_mode = True

    def get_db():
    db = SessionLocal()
    try:
    yield db
    finally:
    db.close()

    app = FastAPI()

    @app.post("/customers/", response_model=CustomerOut)
    def create_customer(customer: CustomerCreate, db: Session = Depends(get_db)):
    db_customer = Customer(**customer.dict())
    db.add(db_customer)
    try:
    db.commit()
    db.refresh(db_customer)
    except Exception as e:
    db.rollback()
    raise HTTPException(status_code=400, detail=str(e))
    return db_customer

    @app.get("/customers/", response_model=List[CustomerOut])
    def read_customers(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    return db.query(Customer).offset(skip).limit(limit).all()

    @app.get("/customers/{customer_id}", response_model=CustomerOut)
    def read_customer(customer_id: int, db: Session = Depends(get_db)):
    customer = db.query(Customer).filter(Customer.id == customer_id).first()
    if customer is None:
    raise HTTPException(status_code=404, detail="Customer not found")
    return customer

    @app.put("/customers/{customer_id}", response_model=CustomerOut)
    def update_customer(customer_id: int, customer: CustomerUpdate, db: Session = Depends(get_db)):
    db_customer = db.query(Customer).filter(Customer.id == customer_id).first()
    if db_customer is None:
    raise HTTPException(status_code=404, detail="Customer not found")
    for var, value in vars(customer).items():
    if value is not None:
    setattr(db_customer, var, value)
    db.commit()
    db.refresh(db_customer)
    return db_customer

    @app.delete("/customers/{customer_id}")
    def delete_customer(customer_id: int, db: Session = Depends(get_db)):
    db_customer = db.query(Customer).filter(Customer.id == customer_id).first()
    if db_customer is None:
    raise HTTPException(status_code=404, detail="Customer not found")
    db.delete(db_customer)
    db.commit()
    return {"ok": True}