pyobvector Python SDK API reference
pyobvector is the Python SDK for seekdb's vector storage feature. It provides two operating modes:
-
pymilvus-compatible mode: Operates the database using the MilvusLikeClient object, offering commonly used APIs compatible with the lightweight MilvusClient.
-
SQLAlchemy extension mode: Operates the database using the ObVecClient object, serving as an extension of Python's SDK for relational databases.
This topic describes the APIs in the two modes and provides examples.
MilvusLikeClient
Constructor
def __init__(
self,
uri: str = "127.0.0.1:2881",
user: str = "root@test",
password: str = "",
db_name: str = "test",
**kwargs,
)
collection-related APIs
| API | Description | Example |
|---|---|---|
def create_schema(self, **kwargs) -> CollectionSchema: |
CollectionSchema object. | |
def create_collection(<br/>self,<br/>collection_name: str,<br/>dimension: Optional[int] = None,<br/>primary_field_name: str = "id",<br/>id_type: Union[DataType, str] = DataType.INT64,<br/>vector_field_name: str = "vector",<br/>metric_type: str = "l2",<br/>auto_id: bool = False,<br/>timeout: Optional[float] = None,<br/>schema: Optional[CollectionSchema] = None, # Used for custom setup<br/>index_params: Optional[IndexParams] = None, # Used for custom setup<br/>max_length: int = 16384,<br/>**kwargs,<br/>) | Creates a table:
| client.create_collection(<br/>collection_name=test_collection_name,<br/>schema=schema,<br/>index_params=idx_params,<br/>) |
def get_collection_stats(<br/>self, collection_name: str, timeout: Optional[float] = None # pylint: disable=unused-argument<br/>) -> Dict: | Queries the record count of a table.
| |
def has_collection(self, collection_name: str, timeout: Optional[float] = None) -> bool | Verifies whether a table exists.
| |
def drop_collection(self, collection_name: str) -> None | Drops a table.
| |
def load_table(self, collection_name: str,) | Reads the metadata of a table to the SQLAlchemy metadata cache.
|
CollectionSchema & FieldSchema
MilvusLikeClient describes the schema of a table by using a CollectionSchema. A CollectionSchema contains multiple FieldSchemas, and a FieldSchema describes the column schema of a table.
Create a CollectionSchema by using the create_schema method of the MilvusLikeClient
def __init__(
self,
fields: Optional[List[FieldSchema]] = None,
partitions: Optional[ObPartition] = None,
description: str = "", # ignored in oceanbase
**kwargs,
)
The parameters are described as follows:
-
fields: an optional parameter that specifies a list of FieldSchema objects.
-
partitions: partition rules (for more information, see the ObPartition section).
-
description: compatible with Milvus, but currently has no practical effect in seekdb.
Create a FieldSchema and register it to a CollectionSchema
def add_field(self, field_name: str, datatype: DataType, **kwargs)
-
field_name: the column name.
-
datatype: the column data type. For supported data types, see Compatibility reference.
-
kwargs: additional parameters for configuring column properties, as shown below:
def __init__(
self,
name: str,
dtype: DataType,
description: str = "",
is_primary: bool = False,
auto_id: bool = False,
nullable: bool = False,
**kwargs,
)The parameters are described as follows:
-
is_primary: specifies whether the column is a primary key.
-
auto_id: specifies whether the column value increases automatically.
-
nullable: specifies whether the column can be null.
-
Example
schema = self.client.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(
field_name="title_vector", datatype=DataType.FLOAT_VECTOR, dim=768
)
schema.add_field(field_name="link", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="reading_time", datatype=DataType.INT64)
schema.add_field(
field_name="publication", datatype=DataType.VARCHAR, max_length=512
)
schema.add_field(field_name="claps", datatype=DataType.INT64)
schema.add_field(field_name="responses", datatype=DataType.INT64)
self.client.create_collection(
collection_name="medium_articles_2020", schema=schema
)
Index-related APIs
| API | Description | Example/Remarks |
|---|---|---|
def create_index(<br/>self,<br/>collection_name: str,<br/>index_params: IndexParams,<br/>timeout: Optional[float] = None,<br/>**kwargs,<br/>) | Creates a vector index table based on the constructed IndexParams (for more information about how to use IndexParams, see the prepare_index_params and add_index APIs).
| |
def drop_index(<br/>self,<br/>collection_name: str,<br/>index_name: str,<br/>timeout: Optional[float] = None,<br/>**kwargs,<br/>) | Drops an index table.
| |
def refresh_index(<br/>self,<br/>collection_name: str,<br/>index_name: str,<br/>trigger_threshold: int = 10000,<br/>) | Refreshes a vector index table to improve read performance. It can be understood as a process of moving incremental data.
| An API introduced |
def rebuild_index(<br/>self,<br/>collection_name: str,<br/>index_name: str,<br/>trigger_threshold: float = 0.2,<br/>) | Rebuilds a vector index table to improve read performance. It can be understood as a process of merging incremental data into baseline index data.
| An API introduced by seekdb. Not compatible with Milvus. |
def search(<br/>self,<br/>collection_name: str,<br/>data: list,<br/>anns_field: str,<br/>with_dist: bool = False,<br/>filter=None,limit: int = 10,output_fields: Optional[List[str]] = None,<br/>search_params: Optional[dict] = None,<br/>timeout: Optional[float] = None,<br/>partition_names: Optional[List[str]] = None,<br/>**kwargs,<br/>) -> List[dict] | Executes a vector approximate nearest neighbor search.
A list of records, where each record is a dictionary representing a mapping from column_name to column values. | res = self.client.search(<br/>collection_name=test_collection_name,<br/>data=[0, 0, 1],<br/>anns_field="embedding",<br/>limit=5,<br/>output_fields=["id"],<br/>search_params={"metric_type": "neg_ip"}<br/>)<br/>self.assertEqual(<br/> set([r['id'] for r in res]), set([12, 111, 11, 112, 10])) |
def query(<br/>self,<br/>collection_name: str,<br/>flter=None,<br/>output_fields: Optional[List[str]] = None,<br/>timeout: Optional[float] = None,<br/>partition_names: Optional[List[str]] = None,<br/>**kwargs,<br/>) -> List[dict] | Reads data records using the specified filter condition.
A list of records, where each record is a dictionary representing a mapping from column_name to column values. | table = self.client.load_table(collection_name=test_collection_name)<br/>where_clause = [table.c["id"] < 100]<br/>res = self.client.query(<br/> collection_name=test_collection_name,<br/> output_fields=["id"],<br/> flter=where_clause,<br/>) |
def get(<br/>self,<br/>collection_name: str,<br/>ids: Union[list, str, int],<br/>output_fields: Optional[List[str]] = None,<br/>timeout: Optional[float] = None,<br/>partition_names: Optional[List[str]] = None,<br/>**kwargs,<br/>) -> List[dict] | Retrieves records based on the specified primary keys ids:
A list of records, where each record is a dictionary representing a mapping from column_name to column values. | res = self.client.get(<br/> collection_name=test_collection_name,<br/> output_fields=["id", "meta"],<br/> ids=[80, 12, 112],<br/>) |
def delete(<br/>self,<br/>collection_name: str,<br/>ids: Optional[Union[list, str, int]] = None,<br/>timeout: Optional[float] = None, # pylint: disable=unused-argument<br/>flter=None,<br/>partition_name: Optional[str] = "",<br/>**kwargs, # pylint: disable=unused-argument<br/>) | Deletes data in a collection.
| self.client.delete(<br/> collection_name=test_collection_name, ids=[12, 112], partition_name="p0"<br/>) |
def insert(<br/> self, <br/> collection_name: str, <br/> data: Union[Dict, List[Dict]], <br/> timeout: Optional[float] = None, <br/> partition_name: Optional[str] = ""<br/>) | Inserts data into a table.
| data = [<br/> {"id": 12, "embedding": [1, 2, 3], "meta": {"doc": "document 1"}},<br/> {<br/> "id": 90,<br/> "embedding": [0.13, 0.123, 1.213],<br/> "meta": {"doc": "document 1"},<br/> },<br/> {"id": 112, "embedding": [1, 2, 3], "meta": None},<br/> {"id": 190, "embedding": [0.13, 0.123, 1.213], "meta": None},<br/>]<br/>self.client.insert(collection_name=test_collection_name, data=data) |
def upsert(<br/>self,<br/>collection_name: str,<br/>data: Union[Dict, List[Dict]],<br/>timeout: Optional[float] = None, # pylint: disable=unused-argument<br/>partition_name: Optional[str] = "",<br/>) -> List[Union[str, int]] | Updates data in a table. If a primary key already exists, updates the corresponding record; otherwise, inserts a new record.
| data = [<br/> {"id": 112, "embedding": [1, 2, 3], "meta": {'doc':'hhh1'}},<br/> {"id": 190, "embedding": [0.13, 0.123, 1.213], "meta": {'doc':'hhh2'}},<br/>]<br/>self.client.upsert(collection_name=test_collection_name, data=data) |
def perform_raw_text_sql(self, text_sql: str):<br/> return super().perform_raw_text_sql(text_sql) | Executes an SQL statement directly.
Returns an iterator that provides result sets from SQLAlchemy. |
ObVecClient
Constructor
def __init__(
self,
uri: str = "127.0.0.1:2881",
user: str = "root@test",
password: str = "",
db_name: str = "test",
**kwargs,
)
Table mode-related operations
| API | Description | Example/Remarks |
|---|---|---|
def check_table_exists(self, table_name: str) | Checks whether a table exists.
| |
def create_table(<br/>self,<br/>table_name: str,<br/>columns: List[Column],<br/>indexes: Optional[List[Index]] = None,<br/>partitions: Optional[ObPartition] = None,<br/>) | Creates a table.
| |
@classmethod<br/>def prepare_index_params(cls) | Creates an IndexParams object to record the schema definition of a vector index table.class IndexParams:<br/> """Vector index parameters for MilvusLikeClient"<br/> def __init__(self):<br/> self._indexes = {}The definition of IndexParams is very simple, with only one dictionary member internally that stores a mapping from a tuple of (column name, index name) to an IndexParam structure. The constructor of the IndexParam class is: def __init__(<br/> self,<br/> index_name: str,<br/> field_name: str,<br/> index_type: Union[VecIndexType, str],<br/> **kwargs<br/>)
prepare_index_params, you can register an IndexParam using the add_index interface:def add_index(<br/> self,<br/> field_name: str,<br/> index_type: VecIndexType,<br/> index_name: str,<br/> **kwargs<br/>)The parameter meanings are the same as those in the IndexParam constructor. | Here is a usage example for creating a vector index: idx_params = self.client.prepare_index_params()<br/>idx_params.add_index(<br/> field_name="title_vector",<br/> index_type="HNSW",<br/> index_name="vidx_title_vector",<br/> metric_type="L2",<br/> params={"M": 16, "efConstruction": 256},<br/>)<br/>self.client.create_collection(<br/> collection_name=test_collection_name,<br/> schema=schema,<br/> <br/>index_params=idx_params,<br/>)Note that the prepare_index_params function is recommended for use in MilvusLikeClient, not in ObVecClient. In ObVecClient mode, you should use the create_index interface to define a vector index table. (For details, see the create_index interface.) |
| `def create_table_with_index_params( self, table_name: str, columns: List[Column], indexes: Optional[List[Index]] = None, vidxs: Optional[IndexParams] = None, partitions: Optional[ObPartition] = None, ) | Creates a table and a vector index at the same time using optional index_params.
| Recommended for use in MilvusLikeClient, not recommended for use in ObVecClient |
def create_index(<br/>self,<br/>table_name: str,<br/>is_vec_index: bool,<br/>index_name: str,<br/>column_names: List[str],<br/>vidx_params: Optional[str] = None,<br/>**kw,<br/>) | Supports creating both normal indexes and vector indexes.
type=hnsw and lib=vsag. Please retain these settings. The distance can be set to l2 or inner_product. | `self.client.create_index( test_collection_name, is_vec_index=True, index_name="vidx", column_names=["embedding"], vidx_params="distance=l2, type=hnsw, lib=vsag", ) |
def create_vidx_with_vec_index_param(<br/>self,<br/>table_name: str,<br/>vidx_param: IndexParam,<br/>) | Creates a vector index using vector index parameters.
| |
def drop_table_if_exist(self, table_name: str) | Drops a table.
| |
def drop_index(self, table_name: str, index_name: str) | Drops an index.
| |
def refresh_index(<br/>self,<br/>table_name: str,<br/>index_name: str,<br/>trigger_threshold: int = 10000,<br/>) | Refreshes a vector index table to improve read performance. It can be understood as a process of moving incremental data.
| |
def rebuild_index(<br/>self,<br/>table_name: str,<br/>index_name: str,<br/>trigger_threshold: float = 0.2,<br/>) | Rebuilds a vector index table to improve read performance. It can be understood as a process of merging incremental data into baseline index data.
|
DML operations
| API | Description | Example/Remarks |
|---|---|---|
def insert(<br/>self,<br/>table_name: str,<br/>data: Union[Dict, List[Dict]],<br/>partition_name: Optional[str] = "",<br/>) | Inserts data into a table.
| vector_value1 = [0.748479, 0.276979, 0.555195]<br/>vector_value2 = [0, 0, 0]<br/>data1 = [{"id": i, "embedding": vector_value1} for i in range(10)]<br/>data1.extend([{"id": i, "embedding": vector_value2} for i in range(10, 13)])<br/>data1.extend([{"id": i, "embedding": vector_value2} for i in range(111, 113)])<br/>self.client.insert(test_collection_name, data=data1) |
def upsert(<br/>self,<br/>table_name: str,<br/>data: Union[Dict, List[Dict]],<br/>partition_name: Optional[str] = "",<br/>) | Inserts or updates data in a table. If a primary key already exists, updates the corresponding record; otherwise, inserts a new record.
| |
def update(<br/>self,<br/>table_name: str,<br/>values_clause,<br/>where_clause=None,<br/>partition_name: Optional[str] = "",<br/>) | Updates data in a table. If a primary key is repeated, it will be replaced.
| data = [<br/> {"id": 112, "embedding": [1, 2, 3], "meta": {'doc':'hhh1'}},<br/> {"id": 190, "embedding": [0.13, 0.123, 1.213], "meta": {'doc':'hhh2'}},<br/>]<br/>client.insert(collection_name=test_collection_name, data=data)<br/>client.update(<br/> table_name=test_collection_name,<br/> values_clause=[{'meta':{'doc':'HHH'}}],<br/> where_clause=[text("id=112")]<br/>) |
def delete(<br/>self,<br/>table_name: str,<br/>ids: Optional[Union[list, str, int]] = None,<br/>where_clause=None,<br/>partition_name: Optional[str] = "",<br/>) | Deletes data from a table.
| self.client.delete(test_collection_name, ids=["bcd", "def"]) |
def get(<br/>self,<br/>table_name: str,<br/>ids: Optional[Union[list, str, int]],<br/>where_clause = None,<br/>output_column_name: Optional[List[str]] = None,<br/>partition_names: Optional[List[str]] = None,<br/>) | Retrieves records based on the specified primary keys ids.
Unlike MilvusLikeClient, the return value of ObVecClient is a tuple list, with each tuple representing a row of records. | `res = self.client.get( test_collection_name, ids=["abc", "bcd", "cde", "def"], where_clause=[text("meta->'$.page' > 1")], output_column_name=['id'] ) |
def set_ob_hnsw_ef_search(self, ob_hnsw_ef_search: int) | Set the efSearch parameter of the HNSW index. This is a session-level variable. The larger the value of ef_search, the higher the recall rate but the poorer the query performance.
| |
def get_ob_hnsw_ef_search(self) -> int | Get the efSearch parameter of the HNSW index. | |
def ann_search(<br/>self,<br/>table_name: str,<br/>vec_data: list,<br/>vec_column_name: str,<br/>distance_func,<br/>with_dist: bool = False,<br/>topk: int = 10,<br/>output_column_names: Optional[List[str]] = None,<br/>extra_output_cols: Optional[List] = None,<br/>where_clause=None,<br/>partition_names: Optional[List[str]] = None,<br/>**kwargs,<br/>) | Executes a vector approximate nearest neighbor search.
Unlike MilvusLikeClient, the return value of ObVecClient is a tuple list, with each tuple representing a row of records. | `res = self.client.ann_search( test_collection_name, vec_data=[0, 0, 0], vec_column_name="embedding", distance_func=func.l2_distance, with_dist=True, topk=5, output_column_names=["id"], ) |
| `def precise_search( self, table_name: str, vec_data: list, vec_column_name: str, distance_func, topk: int = 10, output_column_names: Optional[List[str]] = None, where_clause=None, **kwargs, ) | Executes a precise neighbor search algorithm.
Unlike MilvusLikeClient, the return value of ObVecClient is a tuple list, with each tuple representing a row of records. | |
def perform_raw_text_sql(self, text_sql: str) | Executes an SQL statement directly.
Returns an iterator that provides result sets from SQLAlchemy. |
Define partitioning rules by using ObPartition
pyobvector supports the following types for range/range columns, list/list columns, hash, key, and subpartitioning:
-
ObRangePartition: specifies to perform range partitioning. Set
is_range_columnstoTruewhen you construct this object to create range column partitioning. -
ObListPartition: specifies to perform list partitioning. Set
is_list_columnstoTruewhen you construct this object to create list column partitioning. -
ObHashPartition: specifies to perform hash partitioning.
-
ObKeyPartition: specifies to perform key partitioning.
-
ObSubRangePartition: specifies to perform sub-range partitioning. Set
is_range_columnstoTruewhen you construct this object to create sub-range column partitioning. -
ObSubListPartition: specifies to perform sub-list partitioning. Set
is_list_columnstoTruewhen you construct this object to create sub-list column partitioning. -
ObSubHashPartition: specifies to perform sub-hash partitioning.
-
ObSubKeyPartition: specifies to perform sub-key partitioning.
Example of range partitioning
range_part = ObRangePartition(
False,
range_part_infos=[
RangeListPartInfo("p0", 100),
RangeListPartInfo("p1", "maxvalue"),
],
range_expr="id",
)
Example of list partitioning
list_part = ObListPartition(
False,
list_part_infos=[
RangeListPartInfo("p0", [1, 2, 3]),
RangeListPartInfo("p1", [5, 6]),
RangeListPartInfo("p2", "DEFAULT"),
],
list_expr="col1",
)
Example of hash partitioning
hash_part = ObHashPartition("col1", part_count=60)
Example of multi-level partitioning
# Perform range partitioning
range_columns_part = ObRangePartition(
True,
range_part_infos=[
RangeListPartInfo("p0", 100),
RangeListPartInfo("p1", 200),
RangeListPartInfo("p2", 300),
],
col_name_list=["col1"],
)
# Perform sub-range partitioning
range_sub_part = ObSubRangePartition(
False,
range_part_infos=[
RangeListPartInfo("mp0", 1000),
RangeListPartInfo("mp1", 2000),
RangeListPartInfo("mp2", 3000),
],
range_expr="col3",
)
range_columns_part.add_subpartition(range_sub_part)
Pure SQLAlchemy API mode
If you prefer to use a purely SQLAlchemy API for seekdb's vector retrieval functionality, you can obtain a synchronized database engine through the following methods:
- Method 1: Use ObVecClient to create a database engine
from pyobvector import ObVecClient
client = ObVecClient(uri="127.0.0.1:2881", user="test@test")
engine = client.engine
# Proceed to create a session as usual with SQLAlchemy and use its API.
- Method 2: Call the
create_engineinterface of ObVecClient to create a database engine
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy import create_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.oceanbase", "pyobvector.schema.dialect", "OceanBaseDialect")
connection_str = (
# mysql+oceanbase indicates using the MySQL standard with seekdb's synchronous driver.
f"mysql+oceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_engine(connection_str, **kwargs)
# Proceed to create a session as usual with SQLAlchemy and use its API.
If you want to use asynchronous APIs of SQLAlchemy, you can use seekdb's asynchronous driver:
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy.ext.asyncio import create_async_engine
uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.aoceanbase", "pyobvector", "AsyncOceanBaseDialect")
connection_str = (
# mysql+aoceanbase indicates using the MySQL standard with seekdb's asynchronous driver.
f"mysql+aoceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_async_engine(connection_str)
# Proceed to create a session as usual with SQLAlchemy and use its API.
More examples
For more examples, visit the pyobvector repository.