pyobvector Python SDK 接口说明
pyobvector 是 seekdb 向量存储功能的 python SDK,它提供两种使用模式:
-
pymilvus 兼容模式:使用 MilvusLikeClient 对象操作数据库,提供与轻量级 MilvusClient 兼容的常用接口。
-
SQLAlchemy 扩展模式:使用 ObVecClient 对象操作数据库,提供关系型数据库的 python SDK 扩展。
本文分别介绍了这两种模式下的使用接口与示例。
MilvusLikeClient
构造函数
def __init__(
self,
uri: str = "127.0.0.1:2881",
user: str = "root@test",
password: str = "",
db_name: str = "test",
**kwargs,
)
collection 相关接口
| API 接口 | 参数描述 | 示例 |
|---|---|---|
def create_schema(self, **kwargs) -> CollectionSchema: |
| |
def create_collection(self,collection_name: str,dimension: Optional[int] = None,primary_field_name: str = "id",id_type: Union[DataType, str] = DataType.INT64,vector_field_name: str = "vector", metric_type: str = "l2", auto_id: bool = False, timeout: Optional[float] = None, schema: Optional[CollectionSchema] = None, # Used for custom setup index_params: Optional[IndexParams] = None, # Used for custom setup max_length: int = 16384, **kwargs, ) | 创建一个表:
| client.create_collection( collection_name=test_collection_name, schema=schema, index_params=idx_params, ) |
def get_collection_stats( self, collection_name: str, timeout: Optional[float] = None # pylint: disable=unused-argument ) -> Dict: | 获取表的记录数量
| |
def has_collection(self, collection_name: str, timeout: Optional[float] = None) -> bool | 判断表是否存在
| |
def drop_collection(self, collection_name: str) -> None | 删除表
| |
def load_table(self, collection_name: str,) | 读取表元数据到 SQLAlchemy 元数据缓存
|
CollectionSchema & FieldSchema
MilvusLikeClient 通过 CollectionSchema 描述一个表的模式定义,一个 CollectionSchema 包含多个 FieldSchema, FieldSchema 描述一个表的列模式。
通过 MilvusLikeClient 的 create_schema 创建 CollectionSchema
def __init__(
self,
fields: Optional[List[FieldSchema]] = None,
partitions: Optional[ObPartition] = None,
description: str = "", # ignored in oceanbase
**kwargs,
)
参数说明如下:
-
fields:一组可选的 FieldSchema。
-
partitions:分区规则(详见使用 ObPartition 定义分区规则章节)。
-
description:与 Milvus 兼容用,在 seekdb 中暂无实际作用。
创建 FieldSchema 并注册到 CollectionSchema
def add_field(self, field_name: str, datatype: DataType, **kwargs)
-
field_name:列名称。
-
datatype:列数据类型(支持的数据类型请参见兼容性说明)。
-
kwargs:其他参数用于配置列属性,如下:
def __init__(
self,
name: str,
dtype: DataType,
description: str = "",
is_primary: bool = False,
auto_id: bool = False,
nullable: bool = False,
**kwargs,
)参数说明如下:
-
is_primary:是否是主键。
-
auto_id:是否是自增列。
-
nullable:是否允许为空。
-
使用示例
schema = self.client.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(
field_name="title_vector", datatype=DataType.FLOAT_VECTOR, dim=768
)
schema.add_field(field_name="link", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="reading_time", datatype=DataType.INT64)
schema.add_field(
field_name="publication", datatype=DataType.VARCHAR, max_length=512
)
schema.add_field(field_name="claps", datatype=DataType.INT64)
schema.add_field(field_name="responses", datatype=DataType.INT64)
self.client.create_collection(
collection_name="medium_articles_2020", schema=schema
)
索引相关
| API 接口 | 参数描述 | 示例或备注 |
|---|---|---|
def create_index( self, collection_name: str, index_params: IndexParams, timeout: Optional[float] = None, **kwargs, ) | 根据已构造的 IndexParams 创建向量索引表(此接口关于 IndexParams 的使用详见 prepare_index_params 和 add_index 接口)
| |
def drop_index( self, collection_name: str, index_name: str, timeout: Optional[float] = None, **kwargs, ) | 删除索引表
| |
def refresh_index( self, collection_name: str, index_name: str, trigger_threshold: int = 10000, ) | 刷新向量索引表以提升读取性能,可以理解为对增量数据的搬迁
| seekdb 额外引入的接口 非 Milvus 兼容 |
def rebuild_index( self, collection_name: str, index_name: str, trigger_threshold: float = 0.2, ) | 重建向量索引表以提升读取性能,可以理解为将增量数据合并入基线索引数据
| seekdb 额外引入的接口 非 Milvus 兼容 |
def search( self, collection_name: str, data: list, anns_field: str, with_dist: bool = False, filter=None,limit: int = 10,output_fields: Optional[List[str]] = None, search_params: Optional[dict] = None, timeout: Optional[float] = None, partition_names: Optional[List[str]] = None, **kwargs, ) -> List[dict] | 执行向量近似邻近搜索
记录列表,每条记录都是一个字典 表示从 column_name 到列值的映射。 | res = self.client.search( collection_name=test_collection_name, data=[0, 0, 1], anns_field="embedding", limit=5, output_fields=["id"], search_params={"metric_type": "neg_ip"} ) self.assertEqual( set([r['id'] for r in res]), set([12, 111, 11, 112, 10]))` |
def insert( self, collection_name: str, data: Union[Dict, List[Dict]], timeout: Optional[float] = None, partition_name: Optional[str] = "" ) | 向表中插入数据
| data = [ {"id": 12, "embedding": [1, 2, 3], "meta": {"doc": "document 1"}}, { "id": 90, "embedding": [0.13, 0.123, 1.213], "meta": {"doc": "document 1"}, }, {"id": 112, "embedding": [1, 2, 3], "meta": None}, {"id": 190, "embedding": [0.13, 0.123, 1.213], "meta": None}, ] self.client.insert(collection_name=test_collection_name, data=data) |
def upsert( self, collection_name: str, data: Union[Dict, List[Dict]], timeout: Optional[float] = None, # pylint: disable=unused-argument partition_name: Optional[str] = "", ) -> List[Union[str, int]] | 更新表中的数据。如果主键已存在,则更新对应记录;否则,插入新记录。
| data = [ {"id": 112, "embedding": [1, 2, 3], "meta": {'doc':'hhh1'}}, {"id": 190, "embedding": [0.13, 0.123, 1.213], "meta": {'doc':'hhh2'}}, ] self.client.upsert(collection_name=test_collection_name, data=data) |
def perform_raw_text_sql(self, text_sql: str): return super().perform_raw_text_sql(text_sql) | 直接执行 SQL 语句
返回 SQLAlchemy 提供的结果集合迭代器 |
ObVecClient
构造函数
def __init__(
self,
uri: str = "127.0.0.1:2881",
user: str = "root@test",
password: str = "",
db_name: str = "test",
**kwargs,
)
表模式相关操作
| API 接口 | 参数描述 | 示例或备注 |
|---|---|---|
def check_table_exists(self, table_name: str) | 检查表是否存在
| |
def create_table( self, table_name: str, columns: List[Column], indexes: Optional[List[Index]] = None, partitions: Optional[ObPartition] = None, ) | 创建表
| |
@classmethod def prepare_index_params(cls) | 创建一个 IndexParams 对象来记录向量索引表的模式定义class IndexParams: """Vector index parameters for MilvusLikeClient" def __init__(self): self._indexes = {}IndexParams 的定义非常简单,内部只有一个字典类型的成员存放了 (列名,索引名) 的 tuple 到 IndexParam 结构的映射 IndexParam 类的构造函数为 def __init__( self, index_name: str, field_name: str, index_type: Union[VecIndexType, str], **kwargs )
prepare_index_params 获得一个 IndexParams 后,可以通过 add_index 接口来注册 IndexParam:def add_index( self, field_name: str, index_type: VecIndexType, index_name: str, **kwargs )参数含义通 IndexParam 的构造函数 | 给出一个创建向量索引的使用案例:idx_params = self.client.prepare_index_params() idx_params.add_index( field_name="title_vector", index_type="HNSW", index_name="vidx_title_vector", metric_type="L2", params={"M": 16, "efConstruction": 256}, ) self.client.create_collection( collection_name=test_collection_name, schema=schema, index_params=idx_params, )需要注意的是 prepare_index_params 函数建议在 MilvusLikeClient 下使用,不建议在 ObVecClient 中使用,ObVecClient 模式下应使用 create_index 接口来定义向量索引表。(详见 create_index 接口) |
def create_table_with_index_params( self, table_name: str, columns: List[Column], indexes: Optional[List[Index]] = None, vidxs: Optional[IndexParams] = None, partitions: Optional[ObPartition] = None, ) | 使用可选的 index_params 在创建表的同时创建向量索引
| 建议在 MilvusLikeClient 下使用,不建议在 ObVecClient 中使用 |
def drop_table_if_exist(self, table_name: str) | 删除表
| |
def drop_index(self, table_name: str, index_name: str) | 删除索引
| |
def refresh_index( self, table_name: str, index_name: str, trigger_threshold: int = 10000, ) | 刷新向量索引表以 提升读取性能,可以理解为对增量数据的搬迁
| |
def rebuild_index( self, table_name: str, index_name: str, trigger_threshold: float = 0.2, ) | 重建向量索引表以提升读取性能,可以理解为将增量数据合并入基线索引数据
|
DML 操作
| API 接口 | 参数描述 | 示例或备注 |
|---|---|---|
def insert( self, table_name: str, data: Union[Dict, List[Dict]], partition_name: Optional[str] = "", ) | 向表中插入数据
| vector_value1 = [0.748479, 0.276979, 0.555195] vector_value2 = [0, 0, 0] data1 = [{"id": i, "embedding": vector_value1} for i in range(10)] data1.extend([{"id": i, "embedding": vector_value2} for i in range(10, 13)]) data1.extend([{"id": i, "embedding": vector_value2} for i in range(111, 113)]) self.client.insert(test_collection_name, data=data1) |
def upsert( self, table_name: str, data: Union[Dict, List[Dict]], partition_name: Optional[str] = "", ) | 插入或更新表中的数据。如果 主键已存在,则更新对应记录;否则,插入新记录。
| |
def update( self, table_name: str, values_clause, where_clause=None, partition_name: Optional[str] = "", ) | 更新表中的数据。如果主键重复,则替换它。
| data = [ {"id": 112, "embedding": [1, 2, 3], "meta": {'doc':'hhh1'}}, {"id": 190, "embedding": [0.13, 0.123, 1.213], "meta": {'doc':'hhh2'}}, ] client.insert(collection_name=test_collection_name, data=data) client.update( table_name=test_collection_name, values_clause=[{'meta':{'doc':'HHH'}}], where_clause=[text("id=112")] ) |
def get( self, table_name: str, ids: Optional[Union[list, str, int]], where_clause = None, output_column_name: Optional[List[str]] = None, partition_names: Optional[List[str]] = None, ) | 获取指定主键 ids 的记录。
不同于 MilvusLikeClient,ObVecClient 的返回值为一个 tuple list,每个 tuple 代表一行记录 | res = self.client.get( test_collection_name, ids=["abc", "bcd", "cde", "def"], where_clause=[text("meta->'$.page' > 1")], output_column_name=['id'] ) |
def get_ob_hnsw_ef_search(self) -> int | 获取 HNSW 索引的 efSearch 参数 | |
def ann_search( self, table_name: str, vec_data: list, vec_column_name: str, distance_func, with_dist: bool = False, topk: int = 10, output_column_names: Optional[List[str]] = None, extra_output_cols: Optional[List] = None, where_clause=None, partition_names: Optional[List[str]] = None, **kwargs, ) | 执行向量近似邻近搜索
不同于 MilvusLikeClient,ObVecClient 的返回值为一个 tuple list,每个 tuple 代表一行记录 | res = self.client.ann_search( test_collection_name, vec_data=[0, 0, 0], vec_column_name="embedding", distance_func=func.l2_distance, with_dist=True, topk=5, output_column_names=["id"], ) |
使用 ObPartition 定义分区规则
pyobvector 提供了以下类型来支持 range/range columns、list/list columns、hash、key 以及二级分区:
-
ObRangePartition:range 一级分区。构造时设置
is_range_columns = True以创建 range columns 分区。 -
ObListPartition:list 一级分区。构造时设置
is_list_columns = True以创建 list columns 分区。 -
ObHashPartition:hash 一级分区。
-
ObKeyPartition:key 一级分区。
-
ObSubRangePartition:二级 range 分区。构造时设置
is_range_columns = True以创建 range columns 二级分区。 -
ObSubListPartition:list 二级分区。构造时设置
is_list_columns = True以创建 list columns 二级分区。 -
ObSubHashPartition:hash 二级分区。
-
ObSubKeyPartition:key 二级分区。
range 分区示例
range_part = ObRangePartition(
False,
range_part_infos=[
RangeListPartInfo("p0", 100),
RangeListPartInfo("p1", "maxvalue"),
],
range_expr="id",
)
list 分区示例
list_part = ObListPartition(
False,
list_part_infos=[
RangeListPartInfo("p0", [1, 2, 3]),
RangeListPartInfo("p1", [5, 6]),
RangeListPartInfo("p2", "DEFAULT"),
],
list_expr="col1",
)
hash 分区示例
hash_part = ObHashPartition("col1", part_count=60)