OceanBase MCP Server 与 Cursor 集成
MCP(Model Context Protocol) 是 Anthropic 公司于 2024 年 11 月推出并开源,旨在实现大语言模型与外部工具或数据源交互的协议。通过 MCP,用户不需要将大模型的输出手动复制执行,大模型可以直接指挥工具执行相应的动作(Action)。
MCP Server 通过 MCP 协议提供了大模型与 OceanBase 数据库交互的能力,可以执行 SQL 语句。通过合适的客户端可以快速搭建项目原型,已在 github 上开源。
Cursor 是一款集成了 AI 技术的代码编辑器,支持多种操作系统,包括 Windows、macOS 和 Linux。
本文将展示如何使用 Cursor 与 OceanBase MCP Server 集成,快速构建后端应用程序。
前提条件
-
您已完成部署 seekdb。
-
安装 Python 3.11 及以上版本 和相应 pip。如果您的机器上 Python 版本较低,可以使用 Miniconda 来创建新的 Python 3.11 及以上的环境,具体可参考 Miniconda 安装指南。
-
根据所用的操作系统,安装 Git。
-
安装 Python 包管理器 uv。安装完成后,可使用
uv --version命令验证安装是否成功:pip install uv
uv --version -
下载 Cursor,根据自己的操作系统选择合适的版本进行安装。注意首次使用 Cursor 时,需要注册一个新账号或使用已有账号进行登录。登录后,可以创建新项目或打开已有项目。
步骤一:获取数据库连接信息
联系 seekdb 部署人员或者管理员获取相应的数据库连接串,例如:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
参数说明:
-
$host:提供 seekdb 连接 IP 地址。 -
$port:提供 seekdb 连接端口,默认是2881。 -
$database_name:需要访问的数据库名称。提示连接的用户需要拥有该数据库的
CREATE、INSERT、DROP和SELECT权限。 -
$user_name:提供数据库连接账户。 -
$password:提供账户密码。
步骤二:配置 OceanBase MCP Server
克隆 OceanBase MCP Server 仓库
执行下面的命令将源代码下载到本地:
git clone https://github.com/oceanbase/mcp-oceanbase.git
进入源代码目录:
cd mcp-oceanbase
安装依赖
在 mcp-oceanbase 目录下执行下面的命令创建虚拟环境,并安装依赖:
uv venv
source .venv/bin/activate
uv pip install .
创建 Cursor 客户端的工作目录并配置 OceanBase MCP Server
手动创建一个 Cursor 的工作目录,并用 Cursor 打开,后面 Cursor 生成的文件将放在这个目录下,示例的目录名为 cursor。
使用快捷键 Ctrl + L(Windows)或者 Command + L(MacOS)打开聊天对话框,点击右上角的齿轮,选择 MCP Tools。

添加并配置 MCP Servers
-
点击
Add Custom MCP填写配置文件。
-
填写配置文件,点击确认。
path/to/your/mcp-oceanbase/src/oceanbase_mcp_server需要替换为oceanbase_mcp_server文件夹的绝对路径,OB_HOST、OB_PORT、OB_USER、OB_PASSWORD、OB_DATABASE需要替换为自己数据库的对应信息:{
"mcpServers": {
"oceanbase": {
"command": "uv",
"args": [
"--directory",
"/path/to/your/mcp-oceanbase/src/oceanbase_mcp_server",
"run",
"oceanbase_mcp_server"
],
"env": {
"OB_HOST": "***",
"OB_PORT": "***",
"OB_USER": "***",
"OB_PASSWORD": "***",
"OB_DATABASE": "***"
}
}
}
} -
如配置成功,将显示
可使用状态。
测试 MCP Server
-
在对话框中输入提示:
test 库中有多少张表?,Cursor 客户端会展示即将执行的 SQL 语句。确认无误后,点击Run tool按钮执行查询。Cursor 客户端会展示test库中的所有表名称,这表明我们已经成功连接到 seekdb。
使用 FastAPI 快速创建 RESTful API 风格的项目
你可以使用 FastAPI 快速创建 RESTful API 风格的项目。FastAPI 是一个 Python 的 Web 框架,可以快速构建 RESTful API。
-
创建 customer 表
在对话框中输入提示:
创建一个 customer 表,主键是 ID,包含 name,age,telephone,location 字段,确认 SQL 语句后,点击Run tool按钮执行查询。
-
插入测试数据
在对话框中输入提示:
向 customer 表中插入 10 条数据,确认 SQL 语句后,点击Run tool按钮执行查询。插入成功后,会有已成功向 customer 表插入了 10 条测试数据...的提示。
-
创建 FastAPI 项目
在对话框中输入提示:
创建一个 FastAPI 项目,生成基于 customer 表的 RESTful API,确认 SQL 语句后,点击Run tool按钮执行查询。
此步骤将自动生成两个文件。建议首次使用时选择
全部接受,因为 AI 生成的文件内容可能具有不确定性,后续可根据实际需求进行调整。 -
创建虚拟环境并安装依赖
执行如下命令,在当前目录下使用 uv 包管理工具创建虚拟环境,并安装依赖包:
uv venv
source .venv/bin/activate
uv pip install -r requirements.txt -
启动 FastAPI 项目
执行如下命令,启动 FastAPI 项目:
uvicorn main:app --reload -
查看表中数据
在命令行中运行如下命令,或者使用其他请求工具,查看表中的数据:
curl http://127.0.0.1:8000/customers返回结果如下:
[{"id":1,"name":"Alice","age":28,"telephone":"1234567890","location":"Beijing"},{"id":2,"name":"Bob","age":32,"telephone":"2345678901","location":"Shanghai"},{"id":3,"name":"Charlie","age":25,"telephone":"3456789012","location":"Guangzhou"},{"id":4,"name":"David","age":40,"telephone":"4567890123","location":"Shenzhen"},{"id":5,"name":"Eve","age":22,"telephone":"5678901234","location":"Chengdu"},{"id":6,"name":"Frank","age":35,"telephone":"6789012345","location":"Wuhan"},{"id":7,"name":"Grace","age":30,"telephone":"7890123456","location":"Hangzhou"},{"id":8,"name":"Heidi","age":27,"telephone":"8901234567","location":"Nanjing"},{"id":9,"name":"Ivan","age":29,"telephone":"9012345678","location":"Tianjin"},{"id":10,"name":"Judy","age":31,"telephone":"0123456789","location":"Chongqing"}]可以看到增、删、改、查的 RESTful API 已经成功生成:
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
# seekdb 连接配置(请根据实际情况修改)
DATABASE_URL = "mysql://***:***@***:***/***"
engine = create_engine(DATABASE_URL, echo=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
class Customer(Base):
__tablename__ = "customer"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100))
age = Column(Integer)
telephone = Column(String(20))
location = Column(String(100))
class CustomerCreate(BaseModel):
id: int
name: str
age: int
telephone: str
location: str
class CustomerUpdate(BaseModel):
name: str = None
age: int = None
telephone: str = None
location: str = None
class CustomerOut(BaseModel):
id: int
name: str
age: int
telephone: str
location: str
class Config:
orm_mode = True
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
app = FastAPI()
@app.post("/customers/", response_model=CustomerOut)
def create_customer(customer: CustomerCreate, db: Session = Depends(get_db)):
db_customer = Customer(**customer.dict())
db.add(db_customer)
try:
db.commit()
db.refresh(db_customer)
except Exception as e:
db.rollback()
raise HTTPException(status_code=400, detail=str(e))
return db_customer
@app.get("/customers/", response_model=List[CustomerOut])
def read_customers(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
return db.query(Customer).offset(skip).limit(limit).all()
@app.get("/customers/{customer_id}", response_model=CustomerOut)
def read_customer(customer_id: int, db: Session = Depends(get_db)):
customer = db.query(Customer).filter(Customer.id == customer_id).first()
if customer is None:
raise HTTPException(status_code=404, detail="Customer not found")
return customer
@app.put("/customers/{customer_id}", response_model=CustomerOut)
def update_customer(customer_id: int, customer: CustomerUpdate, db: Session = Depends(get_db)):
db_customer = db.query(Customer).filter(Customer.id == customer_id).first()
if db_customer is None:
raise HTTPException(status_code=404, detail="Customer not found")
for var, value in vars(customer).items():
if value is not None:
setattr(db_customer, var, value)
db.commit()
db.refresh(db_customer)
return db_customer
@app.delete("/customers/{customer_id}")
def delete_customer(customer_id: int, db: Session = Depends(get_db)):
db_customer = db.query(Customer).filter(Customer.id == customer_id).first()
if db_customer is None:
raise HTTPException(status_code=404, detail="Customer not found")
db.delete(db_customer)
db.commit()
return {"ok": True}