Skip to main content

pyobvector Python SDK API reference

pyobvector is the Python SDK for seekdb's vector storage feature. It provides two operating modes:

  • pymilvus-compatible mode: Operates the database using the MilvusLikeClient object, offering commonly used APIs compatible with the lightweight MilvusClient.

  • SQLAlchemy extension mode: Operates the database using the ObVecClient object, serving as an extension of Python's SDK for relational databases.

This topic describes the APIs in the two modes and provides examples.

MilvusLikeClient

Constructor


def __init__(
self,
uri: str = "127.0.0.1:2881",
user: str = "root@test",
password: str = "",
db_name: str = "test",
**kwargs,
)
APIDescriptionExample
def create_schema(self, **kwargs) -> CollectionSchema:
    Creates a CollectionSchema object.
  • Parameters are optional, allowing the initialization of an empty schema definition.
  • Optional parameters include:
    • fields: A list of FieldSchema objects (see the add_field interface below for details).
    • partitions: Partitioning rules (see the section on defining partition rules using ObPartition).
    • description: Compatible with Milvus, but currently has no practical effect in seekdb.
def create_collection(<br/>self,<br/>collection_name: str,<br/>dimension: Optional[int] = None,<br/>primary_field_name: str = "id",<br/>id_type: Union[DataType, str] = DataType.INT64,<br/>vector_field_name: str = "vector",<br/>metric_type: str = "l2",<br/>auto_id: bool = False,<br/>timeout: Optional[float] = None,<br/>schema: Optional[CollectionSchema] = None, # Used for custom setup<br/>index_params: Optional[IndexParams] = None, # Used for custom setup<br/>max_length: int = 16384,<br/>**kwargs,<br/>)Creates a table:
  • collection_name: the table name
  • dimension: the vector data dimension
  • primary_field_name: the primary field name
  • id_type: the primary field data type (only supports VARCHAR and INT types)
  • vector_field_name: the vector field name
  • metric_type: not used in seekdb, but maintained for API compatibility (because the main table definition does not need to specify a vector distance function)
  • auto_id: specifies whether the primary field value increases automatically
  • timeout: not used in seekdb, but maintained for API compatibility
  • schema: the custom collection schema. When schema is not None, the parameters from dimension to metric_type will be ignored
  • index_params: the custom vector index parameters
  • max_length: the maximum varchar length when the primary field data type is VARCHAR and schema is not None
client.create_collection(<br/>collection_name=test_collection_name,<br/>schema=schema,<br/>index_params=idx_params,<br/>)
def get_collection_stats(<br/>self, collection_name: str, timeout: Optional[float] = None # pylint: disable=unused-argument<br/>) -> Dict:Queries the record count of a table.
  • collection_name: the table name
  • timeout: not used in seekdb, but maintained for API compatibility
def has_collection(self, collection_name: str, timeout: Optional[float] = None) -> boolVerifies whether a table exists.
  • collection_name: the table name
  • timeout: not used in seekdb, but maintained for API compatibility
def drop_collection(self, collection_name: str) -> NoneDrops a table.
  • collection_name: the table name
def load_table(self, collection_name: str,)Reads the metadata of a table to the SQLAlchemy metadata cache.
  • collection_name: the table name

CollectionSchema & FieldSchema

MilvusLikeClient describes the schema of a table by using a CollectionSchema. A CollectionSchema contains multiple FieldSchemas, and a FieldSchema describes the column schema of a table.

Create a CollectionSchema by using the create_schema method of the MilvusLikeClient

def __init__(
self,
fields: Optional[List[FieldSchema]] = None,
partitions: Optional[ObPartition] = None,
description: str = "", # ignored in oceanbase
**kwargs,
)

The parameters are described as follows:

  • fields: an optional parameter that specifies a list of FieldSchema objects.

  • partitions: partition rules (for more information, see the ObPartition section).

  • description: compatible with Milvus, but currently has no practical effect in seekdb.

Create a FieldSchema and register it to a CollectionSchema

def add_field(self, field_name: str, datatype: DataType, **kwargs)
  • field_name: the column name.

  • datatype: the column data type. For supported data types, see Compatibility reference.

  • kwargs: additional parameters for configuring column properties, as shown below:

    def __init__(
    self,
    name: str,
    dtype: DataType,
    description: str = "",
    is_primary: bool = False,
    auto_id: bool = False,
    nullable: bool = False,
    **kwargs,
    )

    The parameters are described as follows:

    • is_primary: specifies whether the column is a primary key.

    • auto_id: specifies whether the column value increases automatically.

    • nullable: specifies whether the column can be null.

Example

schema = self.client.create_schema()
schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(
field_name="title_vector", datatype=DataType.FLOAT_VECTOR, dim=768
)
schema.add_field(field_name="link", datatype=DataType.VARCHAR, max_length=512)
schema.add_field(field_name="reading_time", datatype=DataType.INT64)
schema.add_field(
field_name="publication", datatype=DataType.VARCHAR, max_length=512
)
schema.add_field(field_name="claps", datatype=DataType.INT64)
schema.add_field(field_name="responses", datatype=DataType.INT64)

self.client.create_collection(
collection_name="medium_articles_2020", schema=schema
)
APIDescriptionExample/Remarks
def create_index(<br/>self,<br/>collection_name: str,<br/>index_params: IndexParams,<br/>timeout: Optional[float] = None,<br/>**kwargs,<br/>)Creates a vector index table based on the constructed IndexParams (for more information about how to use IndexParams, see the prepare_index_params and add_index APIs).
  • collection_name: the table name
  • index_params: the index parameters
  • timeout: not used in seekdb, but maintained for API compatibility
  • kwargs: other parameters, currently not used, maintained for compatibility
def drop_index(<br/>self,<br/>collection_name: str,<br/>index_name: str,<br/>timeout: Optional[float] = None,<br/>**kwargs,<br/>)Drops an index table.
  • collection_name: the table name
  • index_name: the index name
def refresh_index(<br/>self,<br/>collection_name: str,<br/>index_name: str,<br/>trigger_threshold: int = 10000,<br/>)Refreshes a vector index table to improve read performance. It can be understood as a process of moving incremental data.
  • collection_name: the table name
  • index_name: the index name
  • trigger_threshold: the trigger threshold of the refresh action. A refresh is triggered when the data volume of the index table exceeds the threshold.
An API introduced
def rebuild_index(<br/>self,<br/>collection_name: str,<br/>index_name: str,<br/>trigger_threshold: float = 0.2,<br/>)Rebuilds a vector index table to improve read performance. It can be understood as a process of merging incremental data into baseline index data.
  • collection_name: the table name
  • index_name: the index name
  • trigger_threshold: the trigger threshold of the rebuild action. The value range is 0 to 1. A rebuild is triggered when the proportion of incremental data to full data reaches the threshold.
An API introduced by seekdb.
Not compatible with Milvus.
def search(<br/>self,<br/>collection_name: str,<br/>data: list,<br/>anns_field: str,<br/>with_dist: bool = False,<br/>filter=None,limit: int = 10,output_fields: Optional[List[str]] = None,<br/>search_params: Optional[dict] = None,<br/>timeout: Optional[float] = None,<br/>partition_names: Optional[List[str]] = None,<br/>**kwargs,<br/>) -> List[dict]Executes a vector approximate nearest neighbor search.
  • collection_name: the table name
  • data: the vector data to be searched
  • anns_field: the name of the vector column to be searched
  • with_dist: specifies whether to return results with vector distances
  • filter: uses vector approximate nearest neighbor search with filter conditions
  • limit: top K
  • output_fields: the output columns (also known as projection columns)
  • search_params: supports only the metric_type value of l2/neg_ip (for example: search_params = {"metric_type": "neg_ip"})
  • timeout: not used in seekdb, maintained for compatibility only
  • partition_names: limits the query to some partitions
    Return value:
    A list of records, where each record is a dictionary
    representing a mapping from column_name to column values.
res = self.client.search(<br/>collection_name=test_collection_name,<br/>data=[0, 0, 1],<br/>anns_field="embedding",<br/>limit=5,<br/>output_fields=["id"],<br/>search_params={"metric_type": "neg_ip"}<br/>)<br/>self.assertEqual(<br/> set([r['id'] for r in res]), set([12, 111, 11, 112, 10]))
def query(<br/>self,<br/>collection_name: str,<br/>flter=None,<br/>output_fields: Optional[List[str]] = None,<br/>timeout: Optional[float] = None,<br/>partition_names: Optional[List[str]] = None,<br/>**kwargs,<br/>) -> List[dict]Reads data records using the specified filter condition.
  • collection_name: the table name
  • flter: uses vector approximate nearest neighbor search with filter conditions
  • output_fields: the output columns (also known as projection columns)
  • timeout: not used in seekdb, maintained for compatibility only
  • partition_names: limits the query to some partitions
    Return value:
    A list of records, where each record is a dictionary
    representing a mapping from column_name to column values.
table = self.client.load_table(collection_name=test_collection_name)<br/>where_clause = [table.c["id"] < 100]<br/>res = self.client.query(<br/> collection_name=test_collection_name,<br/> output_fields=["id"],<br/> flter=where_clause,<br/>)
def get(<br/>self,<br/>collection_name: str,<br/>ids: Union[list, str, int],<br/>output_fields: Optional[List[str]] = None,<br/>timeout: Optional[float] = None,<br/>partition_names: Optional[List[str]] = None,<br/>**kwargs,<br/>) -> List[dict]Retrieves records based on the specified primary keys ids:
  • collection_name: the table name
  • ids: a single ID or a list of IDs. Note: The ids parameter of MilvusLikeClient get interface is different from ObVecClient get. For details, see ObVecClient get
  • output_fields: the output columns (also known as projection columns)
  • timeout: not used in seekdb, maintained for compatibility only
  • partition_names: limits the query to some partitions
Return value:
A list of records, where each record is a dictionary
representing a mapping from column_name to column values.
res = self.client.get(<br/> collection_name=test_collection_name,<br/> output_fields=["id", "meta"],<br/> ids=[80, 12, 112],<br/>)
def delete(<br/>self,<br/>collection_name: str,<br/>ids: Optional[Union[list, str, int]] = None,<br/>timeout: Optional[float] = None, # pylint: disable=unused-argument<br/>flter=None,<br/>partition_name: Optional[str] = "",<br/>**kwargs, # pylint: disable=unused-argument<br/>)Deletes data in a collection.
  • collection_name: the table name
  • ids: a single ID or a list of IDs
  • timeout: not used in seekdb, maintained for compatibility only
  • flter: uses vector approximate nearest neighbor search with filter conditions
  • partition_name: limits the deletion operation to a partition
self.client.delete(<br/> collection_name=test_collection_name, ids=[12, 112], partition_name="p0"<br/>)
def insert(<br/> self, <br/> collection_name: str, <br/> data: Union[Dict, List[Dict]], <br/> timeout: Optional[float] = None, <br/> partition_name: Optional[str] = ""<br/>)Inserts data into a table.
  • collection_name: the table name
  • data: the data to be inserted, described in Key-Value form
  • timeout: not used in seekdb, maintained for compatibility only
  • partition_name: limits the insertion operation to a partition
data = [<br/> {"id": 12, "embedding": [1, 2, 3], "meta": {"doc": "document 1"}},<br/> {<br/> "id": 90,<br/> "embedding": [0.13, 0.123, 1.213],<br/> "meta": {"doc": "document 1"},<br/> },<br/> {"id": 112, "embedding": [1, 2, 3], "meta": None},<br/> {"id": 190, "embedding": [0.13, 0.123, 1.213], "meta": None},<br/>]<br/>self.client.insert(collection_name=test_collection_name, data=data)
def upsert(<br/>self,<br/>collection_name: str,<br/>data: Union[Dict, List[Dict]],<br/>timeout: Optional[float] = None, # pylint: disable=unused-argument<br/>partition_name: Optional[str] = "",<br/>) -> List[Union[str, int]]Updates data in a table. If a primary key already exists, updates the corresponding record; otherwise, inserts a new record.
  • collection_name: the table name
  • data: the data to be inserted or updated, in the same format as the insert interface
  • timeout: not used in seekdb, maintained for compatibility only
  • partition_name: limits the operation to a specified partition
data = [<br/> {"id": 112, "embedding": [1, 2, 3], "meta": {'doc':'hhh1'}},<br/> {"id": 190, "embedding": [0.13, 0.123, 1.213], "meta": {'doc':'hhh2'}},<br/>]<br/>self.client.upsert(collection_name=test_collection_name, data=data)
def perform_raw_text_sql(self, text_sql: str):<br/> return super().perform_raw_text_sql(text_sql)Executes an SQL statement directly.
  • text_sql: the SQL statement to be executed
Return value:
Returns an iterator that provides result sets from SQLAlchemy.

ObVecClient

Constructor

def __init__(
self,
uri: str = "127.0.0.1:2881",
user: str = "root@test",
password: str = "",
db_name: str = "test",
**kwargs,
)
APIDescriptionExample/Remarks
def check_table_exists(self, table_name: str)Checks whether a table exists.
  • table_name: the table name
def create_table(<br/>self,<br/>table_name: str,<br/>columns: List[Column],<br/>indexes: Optional[List[Index]] = None,<br/>partitions: Optional[ObPartition] = None,<br/>)Creates a table.
  • table_name: the table name
  • columns: the column schema of the table, defined using SQLAlchemy
  • indexes: a set of index schemas, defined using SQLAlchemy
  • partitions: optional partition rules (see the section on using ObPartition to define partition rules)
@classmethod<br/>def prepare_index_params(cls)Creates an IndexParams object to record the schema definition of a vector index table.class IndexParams:<br/> """Vector index parameters for MilvusLikeClient"<br/> def __init__(self):<br/> self._indexes = {}
The definition of IndexParams is very simple, with only one dictionary member internally
that stores a mapping from a tuple of (column name, index name) to an IndexParam structure.
The constructor of the IndexParam class is:def __init__(<br/> self,<br/> index_name: str,<br/> field_name: str,<br/> index_type: Union[VecIndexType, str],<br/> **kwargs<br/>)
  • index_name: the vector index table name
  • field_name: the vector column name
  • index_type: an enumerated class for vector index algorithm types. Currently, only HNSW is supported.
After obtaining an IndexParams by calling prepare_index_params, you can register an IndexParam using the add_index interface:def add_index(<br/> self,<br/> field_name: str,<br/> index_type: VecIndexType,<br/> index_name: str,<br/> **kwargs<br/>)The parameter meanings are the same as those in the IndexParam constructor.
Here is a usage example for creating a vector index: idx_params = self.client.prepare_index_params()<br/>idx_params.add_index(<br/> field_name="title_vector",<br/> index_type="HNSW",<br/> index_name="vidx_title_vector",<br/> metric_type="L2",<br/> params={"M": 16, "efConstruction": 256},<br/>)<br/>self.client.create_collection(<br/> collection_name=test_collection_name,<br/> schema=schema,<br/> <br/>index_params=idx_params,<br/>)Note that the prepare_index_params function is recommended for use in MilvusLikeClient, not in ObVecClient. In ObVecClient mode, you should use the create_index interface to define a vector index table. (For details, see the create_index interface.)
`def create_table_with_index_params(
self,
table_name: str,
columns: List[Column],
indexes: Optional[List[Index]] = None,
vidxs: Optional[IndexParams] = None,
partitions: Optional[ObPartition] = None,
)
Creates a table and a vector index at the same time using optional index_params.
  • table_name: the table name
  • columns: the column schema of the table, defined using SQLAlchemy
  • indexes: a set of index schemas, defined using SQLAlchemy
  • vidxs: the vector index schema, specified using IndexParams
  • partitions: optional partition rules (see the section on using ObPartition to define partition rules)
Recommended for use in MilvusLikeClient, not recommended for use in ObVecClient
def create_index(<br/>self,<br/>table_name: str,<br/>is_vec_index: bool,<br/>index_name: str,<br/>column_names: List[str],<br/>vidx_params: Optional[str] = None,<br/>**kw,<br/>)Supports creating both normal indexes and vector indexes.
  • table_name: the table name
  • is_vec_index: specifies whether to create a normal index or a vector index
  • index_name: the index name
  • column_names: the columns on which to create the index
  • vidx_params: the vector index parameters, for example: "distance=l2, type=hnsw, lib=vsag"
Currently, seekdb supports only type=hnsw and lib=vsag. Please retain these settings. The distance can be set to l2 or inner_product.
`self.client.create_index(
test_collection_name,
is_vec_index=True,
index_name="vidx",
column_names=["embedding"],
vidx_params="distance=l2, type=hnsw, lib=vsag",
)
def create_vidx_with_vec_index_param(<br/>self,<br/>table_name: str,<br/>vidx_param: IndexParam,<br/>)Creates a vector index using vector index parameters.
  • table_name: the table name
  • vidx_param: the vector index parameters constructed using IndexParam
def drop_table_if_exist(self, table_name: str)Drops a table.
  • table_name: the table name
def drop_index(self, table_name: str, index_name: str)Drops an index.
  • table_name: the table name
  • index_name: the index name
def refresh_index(<br/>self,<br/>table_name: str,<br/>index_name: str,<br/>trigger_threshold: int = 10000,<br/>)Refreshes a vector index table to improve read performance. It can be understood as a process of moving incremental data.
  • table_name: the table name
  • index_name: the index name
  • trigger_threshold: the trigger threshold of the refresh action. A refresh is triggered when the data volume of the index table exceeds the threshold.
def rebuild_index(<br/>self,<br/>table_name: str,<br/>index_name: str,<br/>trigger_threshold: float = 0.2,<br/>)Rebuilds a vector index table to improve read performance. It can be understood as a process of merging incremental data into baseline index data.
  • table_name: the table name
  • index_name: the index name
  • trigger_threshold: the trigger threshold of the rebuild action. The value range is 0 to 1. A rebuild is triggered when the proportion of incremental data to full data reaches the threshold.

DML operations

APIDescriptionExample/Remarks
def insert(<br/>self,<br/>table_name: str,<br/>data: Union[Dict, List[Dict]],<br/>partition_name: Optional[str] = "",<br/>)Inserts data into a table.
  • table_name: the table name
  • data: the data to be inserted, described in Key-Value form
  • partition_name: limits the insertion operation to a partition
vector_value1 = [0.748479, 0.276979, 0.555195]<br/>vector_value2 = [0, 0, 0]<br/>data1 = [{"id": i, "embedding": vector_value1} for i in range(10)]<br/>data1.extend([{"id": i, "embedding": vector_value2} for i in range(10, 13)])<br/>data1.extend([{"id": i, "embedding": vector_value2} for i in range(111, 113)])<br/>self.client.insert(test_collection_name, data=data1)
def upsert(<br/>self,<br/>table_name: str,<br/>data: Union[Dict, List[Dict]],<br/>partition_name: Optional[str] = "",<br/>)Inserts or updates data in a table. If a primary key already exists, updates the corresponding record; otherwise, inserts a new record.
  • table_name: the table name
  • data: the data to be inserted or updated, in Key-Value format
  • partition_name: limits the operation to a specified partition
def update(<br/>self,<br/>table_name: str,<br/>values_clause,<br/>where_clause=None,<br/>partition_name: Optional[str] = "",<br/>)Updates data in a table. If a primary key is repeated, it will be replaced.
  • table_name: the table name
  • values_clause: the values of the columns to be updated
  • where_clause: the condition for updating
  • partition_name: limits the update operation to some partitions
data = [<br/> {"id": 112, "embedding": [1, 2, 3], "meta": {'doc':'hhh1'}},<br/> {"id": 190, "embedding": [0.13, 0.123, 1.213], "meta": {'doc':'hhh2'}},<br/>]<br/>client.insert(collection_name=test_collection_name, data=data)<br/>client.update(<br/> table_name=test_collection_name,<br/> values_clause=[{'meta':{'doc':'HHH'}}],<br/> where_clause=[text("id=112")]<br/>)
def delete(<br/>self,<br/>table_name: str,<br/>ids: Optional[Union[list, str, int]] = None,<br/>where_clause=None,<br/>partition_name: Optional[str] = "",<br/>)Deletes data from a table.
  • table_name: the table name
  • ids: a single ID or a list of IDs
  • where_clause: the condition for deletion
  • partition_name: limits the deletion operation to some partitions
self.client.delete(test_collection_name, ids=["bcd", "def"])
def get(<br/>self,<br/>table_name: str,<br/>ids: Optional[Union[list, str, int]],<br/>where_clause = None,<br/>output_column_name: Optional[List[str]] = None,<br/>partition_names: Optional[List[str]] = None,<br/>)Retrieves records based on the specified primary keys ids.
  • table_name: the table name
  • ids: a single ID or a list of IDs. Optional parameter, can be ids=None if not provided. The ids parameter of ObVecClient get interface is different from MilvusLikeClient get. For details, see MilvusLikeClient get
  • where_clause: the condition for retrieval
  • output_column_name: a list of output column or projection column names
  • partition_names: limits the retrieval operation to some partitions
Return value:
Unlike MilvusLikeClient, the return value of ObVecClient is a tuple list, with each tuple representing a row of records.
`res = self.client.get(
test_collection_name,
ids=["abc", "bcd", "cde", "def"],
where_clause=[text("meta->'$.page' > 1")],
output_column_name=['id']
)
def set_ob_hnsw_ef_search(self, ob_hnsw_ef_search: int)Set the efSearch parameter of the HNSW index. This is a session-level variable. The larger the value of ef_search, the higher the recall rate but the poorer the query performance.
  • ob_hnsw_ef_search: the efSearch parameter of the HNSW index
def get_ob_hnsw_ef_search(self) -> intGet the efSearch parameter of the HNSW index.
def ann_search(<br/>self,<br/>table_name: str,<br/>vec_data: list,<br/>vec_column_name: str,<br/>distance_func,<br/>with_dist: bool = False,<br/>topk: int = 10,<br/>output_column_names: Optional[List[str]] = None,<br/>extra_output_cols: Optional[List] = None,<br/>where_clause=None,<br/>partition_names: Optional[List[str]] = None,<br/>**kwargs,<br/>)Executes a vector approximate nearest neighbor search.
  • table_name: the table name
  • vec_data: the vector data to be searched
  • vec_column_name: the name of the vector column to be searched
  • distance_func: the distance function. Provides an extension of SQLAlchemy func, with optional values: func.l2_distance/func.cosine_distance/func.inner_product/func.negative_inner_product, representing the l2 distance function, cosine distance function, inner product distance function, and negative inner product distance function, respectively
  • with_dist: specifies whether to return results with vector distances
  • topk: the number of nearest vectors to retrieve
  • output_column_names: a list of output column or projection column names
  • extra_output_cols: additional output columns that allow more complex output expressions
  • where_clause: the filter condition
  • partition_names: limits the query to some partitions
Return value:
Unlike MilvusLikeClient, the return value of ObVecClient is a tuple list, with each tuple representing a row of records.
`res = self.client.ann_search(
test_collection_name,
vec_data=[0, 0, 0],
vec_column_name="embedding",
distance_func=func.l2_distance,
with_dist=True,
topk=5,
output_column_names=["id"],
)
`def precise_search(
self,
table_name: str,
vec_data: list,
vec_column_name: str,
distance_func,
topk: int = 10,
output_column_names: Optional[List[str]] = None,
where_clause=None,
**kwargs,
)
Executes a precise neighbor search algorithm.
  • table_name: the table name
  • vec_data: the query vector
  • vec_column_name: the vector column name
  • distance_func: the vector distance function. Provides an extension of SQLAlchemy func, with optional values: func.l2_distance/func.cosine_distance/func.inner_product/func.negative_inner_product, representing the l2 distance function, cosine distance function, inner product distance function, and negative inner product distance function, respectively
  • topk: the number of nearest vectors to retrieve
  • output_column_names: a list of output column or projection column names
  • where_clause: the filter condition
Return value:
Unlike MilvusLikeClient, the return value of ObVecClient is a tuple list, with each tuple representing a row of records.
def perform_raw_text_sql(self, text_sql: str)Executes an SQL statement directly.
  • text_sql: the SQL statement to be executed
Return value:
Returns an iterator that provides result sets from SQLAlchemy.

Define partitioning rules by using ObPartition

pyobvector supports the following types for range/range columns, list/list columns, hash, key, and subpartitioning:

  • ObRangePartition: specifies to perform range partitioning. Set is_range_columns to True when you construct this object to create range column partitioning.

  • ObListPartition: specifies to perform list partitioning. Set is_list_columns to True when you construct this object to create list column partitioning.

  • ObHashPartition: specifies to perform hash partitioning.

  • ObKeyPartition: specifies to perform key partitioning.

  • ObSubRangePartition: specifies to perform sub-range partitioning. Set is_range_columns to True when you construct this object to create sub-range column partitioning.

  • ObSubListPartition: specifies to perform sub-list partitioning. Set is_list_columns to True when you construct this object to create sub-list column partitioning.

  • ObSubHashPartition: specifies to perform sub-hash partitioning.

  • ObSubKeyPartition: specifies to perform sub-key partitioning.

Example of range partitioning

range_part = ObRangePartition(
False,
range_part_infos=[
RangeListPartInfo("p0", 100),
RangeListPartInfo("p1", "maxvalue"),
],
range_expr="id",
)

Example of list partitioning

list_part = ObListPartition(
False,
list_part_infos=[
RangeListPartInfo("p0", [1, 2, 3]),
RangeListPartInfo("p1", [5, 6]),
RangeListPartInfo("p2", "DEFAULT"),
],
list_expr="col1",
)

Example of hash partitioning

hash_part = ObHashPartition("col1", part_count=60)

Example of multi-level partitioning

# Perform range partitioning
range_columns_part = ObRangePartition(
True,
range_part_infos=[
RangeListPartInfo("p0", 100),
RangeListPartInfo("p1", 200),
RangeListPartInfo("p2", 300),
],
col_name_list=["col1"],
)
# Perform sub-range partitioning
range_sub_part = ObSubRangePartition(
False,
range_part_infos=[
RangeListPartInfo("mp0", 1000),
RangeListPartInfo("mp1", 2000),
RangeListPartInfo("mp2", 3000),
],
range_expr="col3",
)
range_columns_part.add_subpartition(range_sub_part)

Pure SQLAlchemy API mode

If you prefer to use a purely SQLAlchemy API for seekdb's vector retrieval functionality, you can obtain a synchronized database engine through the following methods:

  • Method 1: Use ObVecClient to create a database engine
from pyobvector import ObVecClient

client = ObVecClient(uri="127.0.0.1:2881", user="test@test")
engine = client.engine
# Proceed to create a session as usual with SQLAlchemy and use its API.
  • Method 2: Call the create_engine interface of ObVecClient to create a database engine
import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy import create_engine

uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.oceanbase", "pyobvector.schema.dialect", "OceanBaseDialect")
connection_str = (
# mysql+oceanbase indicates using the MySQL standard with seekdb's synchronous driver.
f"mysql+oceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_engine(connection_str, **kwargs)
# Proceed to create a session as usual with SQLAlchemy and use its API.

If you want to use asynchronous APIs of SQLAlchemy, you can use seekdb's asynchronous driver:

import pyobvector
from sqlalchemy.dialects import registry
from sqlalchemy.ext.asyncio import create_async_engine

uri: str = "127.0.0.1:2881"
user: str = "root@test"
password: str = ""
db_name: str = "test"
registry.register("mysql.aoceanbase", "pyobvector", "AsyncOceanBaseDialect")
connection_str = (
# mysql+aoceanbase indicates using the MySQL standard with seekdb's asynchronous driver.
f"mysql+aoceanbase://{user}:{password}@{uri}/{db_name}?charset=utf8mb4"
)
engine = create_async_engine(connection_str)
# Proceed to create a session as usual with SQLAlchemy and use its API.

More examples

For more examples, visit the pyobvector repository.