跳到主要内容

pyobvector Python SDK 接口说明

pyobvector 是 seekdb 向量存储功能的 python SDK,它提供两种使用模式:

  • pymilvus 兼容模式:使用 MilvusLikeClient 对象操作数据库,提供与轻量级 MilvusClient 兼容的常用接口。

  • SQLAlchemy 扩展模式:使用 ObVecClient 对象操作数据库,提供关系型数据库的 python SDK 扩展。

本文分别介绍了这两种模式下的使用接口与示例。

MilvusLikeClient

构造函数


def __init__(
self,
uri: str = "127.0.0.1:2881",
user: str = "root@test",
password: str = "",
db_name: str = "test",
**kwargs,
)

collection 相关接口

API 接口参数描述示例
def create_schema(self, **kwargs) -> CollectionSchema:
    构造一个 CollectionSchema 对象
  • 可以不传参数,即初始化一个空的模式定义。
  • 可选参数如下:
  • fields:一个 FieldSchema 列表(详见下文 add_field 接口)
  • partitions:分区规则(详见使用 ObPartition 定义分区规则章节)
  • description:与 Milvus 兼容用,在 seekdb 中暂无实际作用
def create_collection(self,collection_name: str,dimension: Optional[int] = None,primary_field_name: str = "id",id_type: Union[DataType, str] = DataType.INT64,vector_field_name: str = "vector", metric_type: str = "l2", auto_id: bool = False, timeout: Optional[float] = None, schema: Optional[CollectionSchema] = None, # Used for custom setup index_params: Optional[IndexParams] = None, # Used for custom setup max_length: int = 16384, **kwargs, )创建一个表:
  • collection_name : 表名称
  • dimension : 向量数据维度
  • primary_field_name: 主字段名称
  • id_type: 主字段数据类型(仅支持 VARCHAR 和 INT 类型)
  • vector_field_name : 向量字段名称
  • metric_type: seekdb 中未使用,但保持接口兼容 (因为主表定义不需要指定向量距离函数)
  • auto_id:主字段是否自动递增
  • timeout : seekdb 中未使用,但保持接口兼容
  • schema : 自定义集合架构,当 schema 不为 None 时上面从 dimension 到 metric_type 的参数将被忽略
  • index_params: 自定义向量索引参数
  • max_length: 当主字段数据类型为 VARCHAR 且 schema 不为 None 时最大 varchar 长度为 max_length
client.create_collection( collection_name=test_collection_name, schema=schema, index_params=idx_params, )
def get_collection_stats( self, collection_name: str, timeout: Optional[float] = None # pylint: disable=unused-argument ) -> Dict:获取表的记录数量
  • collection_name:表名称
  • timeout : seekdb 中未使用,但保持接口兼容
def has_collection(self, collection_name: str, timeout: Optional[float] = None) -> bool判断表是否存在
  • collection_name:表名称
  • timeout : seekdb 中未使用,但保持接口兼容
def drop_collection(self, collection_name: str) -> None删除表
  • collection_name:表名称
def load_table(self, collection_name: str,)读取表元数据到 SQLAlchemy 元数据缓存
  • collection_name:表名称

CollectionSchema & FieldSchema

MilvusLikeClient 通过 CollectionSchema 描述一个表的模式定义,一个 CollectionSchema 包含多个 FieldSchema, FieldSchema 描述一个表的列模式。

通过 MilvusLikeClient 的 create_schema 创建 CollectionSchema

def __init__(
self,
fields: Optional[List[FieldSchema]] = None,
partitions: Optional[ObPartition] = None,
description: str = "", # ignored in oceanbase
**kwargs,
)

参数说明如下:

  • fields:一组可选的 FieldSchema。

  • partitions:分区规则(详见使用 ObPartition 定义分区规则章节)。

  • description:与 Milvus 兼容用,在 seekdb 中暂无实际作用。

创建 FieldSchema 并注册到 CollectionSchema

def add_field(self, field_name: str, datatype: DataType, **kwargs)
  • field_name:列名称。

  • datatype:列数据类型(支持的数据类型请参见兼容性说明)。

  • kwargs:其他参数用于配置列属性,如下:

    def __init__(
    self,
    name: str,
    dtype: DataType,
    description: str = "",
    is_primary: bool = False,
    auto_id: bool = False,
    nullable: bool = False,
    **kwargs,
    )

    参数说明如下:

    • is_primary:是否是主键。

    • auto_id:是否是自增列。

    • nullable:是否允许为空。

使用示例

schema = self.client.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(
field_name="title_vector", datatype=DataType.FLOAT_VECTOR, dim=768
)
schema.add_field(field_name="link", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="reading_time", datatype=DataType.INT64)
schema.add_field(
field_name="publication", datatype=DataType.VARCHAR, max_length=512
)
schema.add_field(field_name="claps", datatype=DataType.INT64)
schema.add_field(field_name="responses", datatype=DataType.INT64)

self.client.create_collection(
collection_name="medium_articles_2020", schema=schema
)

索引相关

API 接口参数描述示例或备注
def create_index( self, collection_name: str, index_params: IndexParams, timeout: Optional[float] = None, **kwargs, )根据已构造的 IndexParams 创建向量索引表(此接口关于 IndexParams 的使用详见 prepare_index_params 和 add_index 接口)
  • collection_name:表名称
  • index_params:索引参数
  • timeout:seekdb 中未使用,但保持接口兼容
  • kwargs:其他参数,目前未使用,保持兼容
def drop_index( self, collection_name: str, index_name: str, timeout: Optional[float] = None, **kwargs, )删除索引表
  • collection_name:表名称
  • index_name:索引名
def refresh_index( self, collection_name: str, index_name: str, trigger_threshold: int = 10000, )刷新向量索引表以提升读取性能,可以理解为对增量数据的搬迁
  • collection_name:表名称
  • index_name:索引名
  • trigger_threshold:刷新动作的触发阈值,如果索引表数据量超过该阈值,则进行刷新
seekdb 额外引入的接口
非 Milvus 兼容
def rebuild_index( self, collection_name: str, index_name: str, trigger_threshold: float = 0.2, )重建向量索引表以提升读取性能,可以理解为将增量数据合并入基线索引数据
  • collection_name:表名称
  • index_name:索引名
  • trigger_threshold:重建动作的触发阈值,值域 0 到 1,增量数据占全量数据比例达到该阈值时触发重建
seekdb 额外引入的接口
非 Milvus 兼容
def search( self, collection_name: str, data: list, anns_field: str, with_dist: bool = False, filter=None,limit: int = 10,output_fields: Optional[List[str]] = None, search_params: Optional[dict] = None, timeout: Optional[float] = None, partition_names: Optional[List[str]] = None, **kwargs, ) -> List[dict]执行向量近似邻近搜索
  • collection_name:表名称
  • data:需要搜索的向量数据
  • anns_field: 需要搜索的向量列名
  • with_dist: 是否返回带向量距离的结果
  • filter : 使用带过滤条件的向量近似邻近搜索
  • limit :top K
  • output_fields: 输出列(或称为投影列)
  • search_params : 仅支持值为 l2/neg_ip 的 metric_type(例如:search_params = {"metric_type": "neg_ip"}
  • timeout : seekdb 中未使用,仅兼容作用
  • partition_names : 将查询限制在某些分区
    返回值:
    记录列表,每条记录都是一个字典
    表示从 column_name 到列值的映射。
res = self.client.search( collection_name=test_collection_name, data=[0, 0, 1], anns_field="embedding", limit=5, output_fields=["id"], search_params={"metric_type": "neg_ip"} ) self.assertEqual( set([r['id'] for r in res]), set([12, 111, 11, 112, 10]))`
def insert( self, collection_name: str, data: Union[Dict, List[Dict]], timeout: Optional[float] = None, partition_name: Optional[str] = "" )向表中插入数据
  • collection_name:表名称
  • data:以 Key-Value 形式描述的待插入数据
  • timeout : seekdb 中未使用,仅兼容作用
  • partition_name : 将插入操作限制在某个分区
data = [ {"id": 12, "embedding": [1, 2, 3], "meta": {"doc": "document 1"}}, { "id": 90, "embedding": [0.13, 0.123, 1.213], "meta": {"doc": "document 1"}, }, {"id": 112, "embedding": [1, 2, 3], "meta": None}, {"id": 190, "embedding": [0.13, 0.123, 1.213], "meta": None}, ] self.client.insert(collection_name=test_collection_name, data=data)
def upsert( self, collection_name: str, data: Union[Dict, List[Dict]], timeout: Optional[float] = None, # pylint: disable=unused-argument partition_name: Optional[str] = "", ) -> List[Union[str, int]]更新表中的数据。如果主键已存在,则更新对应记录;否则,插入新记录。
  • collection_name:表名称
  • data:待插入或更新的数据,格式与 insert 接口一致
  • timeout : seekdb 中未使用,仅兼容作用
  • partition_name : 将操作限制在指定分区
data = [ {"id": 112, "embedding": [1, 2, 3], "meta": {'doc':'hhh1'}}, {"id": 190, "embedding": [0.13, 0.123, 1.213], "meta": {'doc':'hhh2'}}, ] self.client.upsert(collection_name=test_collection_name, data=data)
def perform_raw_text_sql(self, text_sql: str): return super().perform_raw_text_sql(text_sql)直接执行 SQL 语句
  • text_sql: 待执行的 SQL
返回值:
返回 SQLAlchemy 提供的结果集合迭代器

ObVecClient

构造函数

def __init__(
self,
uri: str = "127.0.0.1:2881",
user: str = "root@test",
password: str = "",
db_name: str = "test",
**kwargs,
)

表模式相关操作

API 接口参数描述示例或备注
def check_table_exists(self, table_name: str)检查表是否存在
  • table_name:表名称
def create_table( self, table_name: str, columns: List[Column], indexes: Optional[List[Index]] = None, partitions: Optional[ObPartition] = None, )创建表
  • table_name:表名称
  • columns:使用 SQLAlchemy 定义的的表的列模式
  • indexes:使用 SQLAlchemy 定义的一组索引表模式
  • partitions:可选的分区规则(详见使用 ObPartition 定义分区规则小节)
@classmethod def prepare_index_params(cls)创建一个 IndexParams 对象来记录向量索引表的模式定义class IndexParams: """Vector index parameters for MilvusLikeClient" def __init__(self): self._indexes = {}IndexParams 的定义非常简单,内部只有一个字典类型的成员
存放了 (列名,索引名) 的 tuple 到 IndexParam 结构的映射
IndexParam 类的构造函数为def __init__( self, index_name: str, field_name: str, index_type: Union[VecIndexType, str], **kwargs )
  • index_name:向量索引表名
  • field_name:向量列名
  • index_type:向量索引算法类型的一个枚举类,目前仅支持 HNSW
通过 prepare_index_params 获得一个 IndexParams 后,可以通过 add_index 接口来注册 IndexParam:def add_index( self, field_name: str, index_type: VecIndexType, index_name: str, **kwargs )参数含义通 IndexParam 的构造函数
给出一个创建向量索引的使用案例:idx_params = self.client.prepare_index_params() idx_params.add_index( field_name="title_vector", index_type="HNSW", index_name="vidx_title_vector", metric_type="L2", params={"M": 16, "efConstruction": 256}, ) self.client.create_collection( collection_name=test_collection_name, schema=schema, index_params=idx_params, )需要注意的是 prepare_index_params 函数建议在 MilvusLikeClient 下使用,不建议在 ObVecClient 中使用,ObVecClient 模式下应使用 create_index 接口来定义向量索引表。(详见 create_index 接口)
def create_table_with_index_params( self, table_name: str, columns: List[Column], indexes: Optional[List[Index]] = None, vidxs: Optional[IndexParams] = None, partitions: Optional[ObPartition] = None, )使用可选的 index_params 在创建表的同时创建向量索引
  • table_name:表名称
  • columns:使用 SQLAlchemy 定义的的表的列模式
  • indexes:使用 SQLAlchemy 定义的一组索引表模式
  • vidxs:通过 IndexParams 指定的向量索引表模式
  • partitions:可选的分区规则(详见使用 ObPartition 定义分区规则章节)
建议在 MilvusLikeClient 下使用,不建议在 ObVecClient 中使用
def drop_table_if_exist(self, table_name: str)删除表
  • table_name:表名称
def drop_index(self, table_name: str, index_name: str)删除索引
  • table_name:表名称
  • index_name:索引名称
def refresh_index( self, table_name: str, index_name: str, trigger_threshold: int = 10000, )刷新向量索引表以提升读取性能,可以理解为对增量数据的搬迁
  • table_name:表名称
  • index_name:索引名称
  • trigger_threshold:刷新动作的触发阈值,如果索引表数据量超过该阈值,则进行刷新
def rebuild_index( self, table_name: str, index_name: str, trigger_threshold: float = 0.2, )重建向量索引表以提升读取性能,可以理解为将增量数据合并入基线索引数据
  • table_name:表名称
  • index_name:索引名称
  • trigger_threshold:重建动作的触发阈值,值域 0 到 1,增量数据占全量数据比例达到该阈值时触发重建

DML 操作

API 接口参数描述示例或备注
def insert( self, table_name: str, data: Union[Dict, List[Dict]], partition_name: Optional[str] = "", )向表中插入数据
  • table_name:表名称
  • data:以 Key-Value 形式描述的待插入数据
  • partition_name:将插入操作限制在某个分区
vector_value1 = [0.748479, 0.276979, 0.555195] vector_value2 = [0, 0, 0] data1 = [{"id": i, "embedding": vector_value1} for i in range(10)] data1.extend([{"id": i, "embedding": vector_value2} for i in range(10, 13)]) data1.extend([{"id": i, "embedding": vector_value2} for i in range(111, 113)]) self.client.insert(test_collection_name, data=data1)
def upsert( self, table_name: str, data: Union[Dict, List[Dict]], partition_name: Optional[str] = "", )插入或更新表中的数据。如果主键已存在,则更新对应记录;否则,插入新记录。
  • table_name:表名称
  • data:待插入或更新的数据,Key-Value 格式
  • partition_name:将操作限制在指定分区
def update( self, table_name: str, values_clause, where_clause=None, partition_name: Optional[str] = "", )更新表中的数据。如果主键重复,则替换它。
  • table_name:表名称
  • values_clause:更新列的值
  • where_clause:更新条件
  • partition_name:将更新操作限制在某些分区
data = [ {"id": 112, "embedding": [1, 2, 3], "meta": {'doc':'hhh1'}}, {"id": 190, "embedding": [0.13, 0.123, 1.213], "meta": {'doc':'hhh2'}}, ] client.insert(collection_name=test_collection_name, data=data) client.update( table_name=test_collection_name, values_clause=[{'meta':{'doc':'HHH'}}], where_clause=[text("id=112")] )
def get( self, table_name: str, ids: Optional[Union[list, str, int]], where_clause = None, output_column_name: Optional[List[str]] = None, partition_names: Optional[List[str]] = None, )获取指定主键 ids 的记录。
  • table_name:表名称
  • ids:某个 id 或者一组 id 列表。可选参数,没有可填 ids=None。ObVecClient get 接口的 ids 参数和 MilvusLikeClient 的 get 不同,详见 MilvusLikeClient get
  • where_clause:获取条件
  • output_column_name:一组输出列或者投影列名称
  • partition_names:将获取操作限制在某些分区
返回值:
不同于 MilvusLikeClient,ObVecClient 的返回值为一个 tuple list,每个 tuple 代表一行记录
res = self.client.get( test_collection_name, ids=["abc", "bcd", "cde", "def"], where_clause=[text("meta->'$.page' > 1")], output_column_name=['id'] )
def get_ob_hnsw_ef_search(self) -> int获取 HNSW 索引的 efSearch 参数
def ann_search( self, table_name: str, vec_data: list, vec_column_name: str, distance_func, with_dist: bool = False, topk: int = 10, output_column_names: Optional[List[str]] = None, extra_output_cols: Optional[List] = None, where_clause=None, partition_names: Optional[List[str]] = None, **kwargs, )执行向量近似邻近搜索
  • table_name:表名称
  • vec_data:需要搜索的向量数据
  • vec_column_name:需要搜索的向量列名
  • distance_func:距离函数。提供了 SQLAlchemy func 的扩展,可选值有 func.l2_distance/func.cosine_distance/func.inner_product/func.negative_inner_product,分别表示 l2 距离函数、cosine 距离函数、内积距离函数、内积距离的负值
  • with_dist:是否返回带向量距离的结果
  • topk:取最近多少个向量
  • output_column_names:一组输出列或者投影列名称
  • extra_output_cols:额外输出列,可以提供更复杂的输出表达式
  • where_clause:过滤条件
  • partition_names:将查询限制在某些分区
返回值:
不同于 MilvusLikeClient,ObVecClient 的返回值为一个 tuple list,每个 tuple 代表一行记录
res = self.client.ann_search( test_collection_name, vec_data=[0, 0, 0], vec_column_name="embedding", distance_func=func.l2_distance, with_dist=True, topk=5, output_column_names=["id"], )

使用 ObPartition 定义分区规则

pyobvector 提供了以下类型来支持 range/range columns、list/list columns、hash、key 以及二级分区:

  • ObRangePartition:range 一级分区。构造时设置 is_range_columns = True 以创建 range columns 分区。

  • ObListPartition:list 一级分区。构造时设置 is_list_columns = True 以创建 list columns 分区。

  • ObHashPartition:hash 一级分区。

  • ObKeyPartition:key 一级分区。

  • ObSubRangePartition:二级 range 分区。构造时设置 is_range_columns = True 以创建 range columns 二级分区。

  • ObSubListPartition:list 二级分区。构造时设置 is_list_columns = True 以创建 list columns 二级分区。

  • ObSubHashPartition:hash 二级分区。

  • ObSubKeyPartition:key 二级分区。

range 分区示例

range_part = ObRangePartition(
False,
range_part_infos=[
RangeListPartInfo("p0", 100),
RangeListPartInfo("p1", "maxvalue"),
],
range_expr="id",
)

list 分区示例

list_part = ObListPartition(
False,
list_part_infos=[
RangeListPartInfo("p0", [1, 2, 3]),
RangeListPartInfo("p1", [5, 6]),
RangeListPartInfo("p2", "DEFAULT"),
],
list_expr="col1",
)

hash 分区示例

hash_part = ObHashPartition("col1", part_count=60)

多级分区示例

# 一级range分区
range_columns_part = ObRangePartition(
True,
range_part_infos=[
RangeListPartInfo("p0", 100),
RangeListPartInfo("p1", 200),
RangeListPartInfo("p2", 300),
],
col_name_list=["col1"],
)
# 二级range分区
range_sub_part = ObSubRangePartition(
False,
range_part_infos=[
RangeListPartInfo("mp0", 1000),
RangeListPartInfo("mp1", 2000),
RangeListPartInfo("mp2", 3000),
],
range_expr="col3",
)
range_columns_part.add_subpartition(range_sub_part)

纯 SQLAlchemy API 模式

如果你希望在 seekdb 的向量搜索功能下使用纯粹的 SQLAlchemy API,可以通过如下两种方式获取同步的数据库引擎:

  • 方式一:使用 ObVecClient 辅助创建数据库引擎
from pyobvector import ObVecClient

client = ObVecClient(uri="127.0.0.1:2881", user="test@test")
engine = client.engine
# 接下来正常使用 SQLAlchemy 创建 session,使用 SQLAlchemy 的 API 即可
  • 方式二:使用 ObVecClient 的 create_engine 接口创建数据库引擎
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy import create_engine

uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.oceanbase", "pyobvector.schema.dialect", "OceanBaseDialect")
connection_str = (
# mysql+oceanbase 表示选择 mysql 标准并使用 seekdb 的同步驱动
f"mysql+oceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_engine(connection_str, **kwargs)
# 接下来正常使用 SQLAlchemy 创建 session,使用 SQLAlchemy 的 API 即可

如果期望使用 SQLAlchemy 的异步接口,可以使用 seekdb 的异步驱动:

import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy.ext.asyncio import create_async_engine

uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.aoceanbase", "pyobvector", "AsyncOceanBaseDialect")
connection_str = (
# mysql+aoceanbase 表示选择 mysql 标准并使用 seekdb 的异步驱动
f"mysql+aoceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_async_engine(connection_str)
# 接下来正常使用 SQLAlchemy 创建 session,使用 SQLAlchemy 的 API 即可

更多示例

访问 pyobvector 代码仓库 获取更多示例。