sqlite
Python Client
查看所有的表结构信息
Copy import sqlite3
conn = sqlite3 . connect ( 'your_database_name.db' )
cursor = conn . cursor ()
# 获取数据库中所有的表格名
cursor . execute ( "SELECT name FROM sqlite_master WHERE type='table';" )
tables = cursor . fetchall ()
# 遍历每个表格,打印其schema, 并打印数据表里的数据
for table in tables :
print ( "=" * 50 )
table_name = table [ 0 ]
print ( f "Schema for table: { table_name } " )
cursor . execute ( "PRAGMA table_info( {} )" . format (table_name))
schema = cursor . fetchall ()
for column in schema :
print (column[ 1 ], column[ 2 ], end = ", " )
print ( "\n" )
print ( f "Data for table: { table_name } " )
cursor . execute ( f "select * from { table_name } " )
result = cursor . fetchall ()
for row in result :
print (row)
print ( "\n" )
# 关闭连接
conn . close ()
MySQL
部署: Docker (TODO)
Python Client (TODO)
sqlalchemy
sqlalchemy 有两套 API, 一套被称为 Core, 另一套被称为 ORM, 分别使用如下方式进行导入:
Copy from sqlalchemy import xxx # Core API
from sqlalchemy . orm import xxx # ORM API
其中 ORM 是对 Core 的上层封装, 也更 pythonic
建立连接
连接到数据库需要先建立 engine (连接池)
Copy from sqlalchemy import create_engine
# sqlite 数据库, 使用 pysqlite 包 (默认情况用 sqlite3 内置包), 在内存中建立数据库
engine = create_engine ( "sqlite+pysqlite:///:memory:" , echo = True ) # 在执行 sql 语句时会打印语句
engine = create_engine ( "sqlite:///example.db" )
# mysql 数据库, 使用 pymysql 包: 登录的用户名, 密码, 服务器地址及端口号, 数据库名
engine = create_engine ( "mysql+pymysql:// {user} : {password} @ {host} : {port} / {database_name} " )
engine . dispose () # 释放连接池
Core: connect
从连接池里获取一个连接, 以进行数据库操作
Copy connect = engine . connect ()
connect . close () # 关闭连接
# 或者使用 with 语法
with engine . connect () as connect : # __exit__ 时会触发 close
...
ORM: session
使用 ORM API 时, 与 connect 对应的概念是 Session
Copy from sqlalchemy . orm import Session
from sqlalchemy . orm import sessionmaker
# 方式 1
session = Session (engine)
# 方式 2
Session = sessionmaker (bind = engine)
session = Session ()
session . close () # 关闭连接
# 或者使用 with 语法
with Session (engine) as session : # __exit__ 时会触发 close
...
构建表
Core 构建表
使用 Core 的方式定义 Table 的方式被称为 Table constructor. 一般都是用多个 Table
共享一个 MetaData
实例.
Copy from sqlalchemy import Table , Column , Integer , String
from sqlalchemy import MetaData
from sqlalchemy import ForeignKey
metadata_obj = MetaData () # sqlalchemy.sql.schema.MetaData
user_table = Table (
"user_account" , # 数据库里的表名
metadata_obj,
Column ( "id" , Integer, primary_key = True ),
Column ( "name" , String ( 30 )),
Column ( "fullname" , String),
)
address_table = Table (
"address" ,
metadata_obj,
Column ( "id" , Integer, primary_key = True ),
Column ( "user_id" , ForeignKey ( "user_account.id" ), nullable = False ),
Column ( "email_address" , String, nullable = False ),
)
metadata_obj . create_all (engine)
ORM 构建表
使用 ORM 的方式定义 Table 的方式被称为: ORM Mapped classes / Declarative Forms, 具体如下:
Step 1: Base,可以用任意一种方式进行
Copy # 方法 1:
from sqlalchemy . orm import DeclarativeBase
class Base ( DeclarativeBase ):
pass
# 方法 2
from sqlalchemy . orm import declarative_base
Base = declarative_base ()
Base . metadata # sqlalchemy.sql.schema.MetaData 对象, 与 Core API 里显式用 metadata_obj = MetaData() 得到的 metadata_obj 对应
Step 2: 继承 Base
Copy from typing import List
from typing import Optional
from sqlalchemy . orm import Mapped
from sqlalchemy . orm import mapped_column
from sqlalchemy import Column
from sqlalchemy . orm import relationship
class User ( Base ):
__tablename__ = "user_account" # 数据库里的表名
id : Mapped [ int ] = mapped_column (primary_key = True )
# id = Column(Integer, primary_key=True) # 这种写法是类似于 Core API 的写法, 许多网上的资料会用这种写法
name : Mapped [ str ] = mapped_column ( String ( 30 ))
fullname : Mapped [ Optional [ str ]]
addresses : Mapped [ List [ "Address" ]] = relationship (back_populates = "user" ) # 不存储在数据库里
def __repr__ ( self ) -> str :
return f "User(id= { self . id !r } , name= { self . name !r } , fullname= { self . fullname !r } )"
class Address ( Base ):
__tablename__ = "address"
id : Mapped [ int ] = mapped_column (primary_key = True )
email_address : Mapped [ str ]
user_id = mapped_column ( ForeignKey ( "user_account.id" ))
user : Mapped [ User ] = relationship (back_populates = "addresses" )
def __repr__ ( self ) -> str :
return f "Address(id= { self . id !r } , email_address= { self . email_address !r } , user_id= { self . user_id !r } )"
relationship 是可选的, 实际的数据库存储里并不包含 user
及 addresses
这两列, 它与 ForeignKey
的关系以及给 ORM API 带来的便利性见后续
操作表: 增删改查
TODO: 下例其实有些杂糅, 操作语句用的 Core API, 而执行用的 ORM API, 本质上此例算作 Core API
Copy from sqlalchemy import create_engine
from sqlalchemy import select , text
from sqlalchemy . orm import sessionmaker
from sql_test import Address , User
engine = create_engine ( 'sqlite:///example.db' , echo = False )
Session = sessionmaker (bind = engine)
with Session () as session :
result = session . execute ( text ( "SELECT * FROM user_account" ))
for row in result :
print ( type (row), row) # (sqlalchemy.engine.row.Row, (1, 'Ask', 'AskBob'))
result1 = session . execute ( select (User))
for row1 in result1 :
print ( type (row), row1) # (sqlalchemy.engine.row.Row, (User(id=1, name='Ask', fullname='AskBob'),))
# 两种查询方式结果有所不同
# for item in result.mappings(): print(item)
# {'id': 1, 'name': 'Ask', 'fullname': 'AskBob'}
# for item in result1.mappings(): print(item)
# {'User': User(id=1, name='Ask', fullname='AskBob')}
Core
Copy from sqlalchemy import select , text
stmt1 = text ( "select * from xx" ) # stmt1 是一个 TextClause 对象, sqlalchemy.sql.elements.TextClause
stmt2 = select (User) # stmt2 是一个 Select 对象 (sqlalchemy.sql.selectable.Select)
# 打印 SQL 语句
print (stmt1) # "select * from xx"
print (stmt2) # "SELECT user_account.id, user_account.name, user_account.fullname FROM user_account"
# 备注: stmt2 转化为 SQL 语句字符串的过程会先经过 compile
# compiled = stmt2.compile()
result = connect . execute (stmt2)
ORM
杂记
Copy from sqlalchemy import select
from sqlalchemy . orm import query
# 注意 select(User) 只是语句, 可以使用 print(select(User)) 打印对应的语句
row = session . execute ( select (User)). first () [ 0 ] # User(...)
row = session . scalars ( select (User)). first () # User(...)
# 注意 query(User) 只是语句, 可以使用 print(query(User)) 打印对应的语句
rows = session . query (User). all () # [User(...), User(...)]
# 注意 query(User).filter(User.id==1) 只是语句
rows1 = session . query (User). filter (User.id == 1 ). all () # [User(...)]
user = session . get (User, 1 ) # 直接执行, user=User(...)
user_alias = session . get (User, 1 )
user is user_alias # True, 在当前会话里, 只会存一个副本
user is rows [ 0 ] # True, 在当前会话里, 只会存一个副本
faiss
faiss 只支持稠密向量的 IP(内积) 和 L2 距离
neo4j
Server
要启用 apoc (langchain
, llama_index
等一般都需要这个功能) 需要对官方镜像打上补丁:
Copy FROM neo4j:4.4.29
RUN cp /var/lib/neo4j/labs/apoc-* /var/lib/neo4j/plugins
build 镜像
Copy docker build -t neo4j:4.4.29.patch .
启动镜像 (可以多个版本独立部署):
Copy mkdir -p $HOME/temp/neo4j-data-4.4.29/data && sudo mkdir -p $HOME/temp/neo4j-data-4.4.29/logs && sudo chmod -R 777 temp/neo4j-data-4.4.29
# https://github.com/langchain-ai/langchain/issues/12901
docker run -it --rm -p 7474:7474 -p 7687:7687 -e NEO4J_AUTH=neo4j/12345678 -v $HOME/temp/neo4j-data-4.4.29/data:/data -v $HOME/temp/neo4j-data-4.4.29/logs:/logs -e NEO4J_dbms_security_procedures_unrestricted=apoc.* -e NEO4J_dbms_security_procedures_allowlist=apoc.* neo4j:4.4.29.patch
注意:
部署多个容器时, 使用 Neo4j browser 登录时要注意选择好正确的 bolt 地址
neo4j 社区版一个实例只能有一个图 (database)
Python Client (TODO)
Milvus 2.3.x
使用 Docker 启动服务, 并安装相应的 Python Client
Copy wget https://raw.githubusercontent.com/milvus-io/milvus/master/scripts/standalone_embed.sh
bash standalone_embed.sh start
pip install protobuf grpcio-tools pymilvus
database: 默认是 default
, 对应于关系型数据库管理系统 (RDBMS) 中的 database
collection: 对应于 RDBMS 中的 table
Copy from pymilvus import connections
from pymilvus import Collection
from pymilvus import list_collections
from pymilvus import db
from pymilvus import utility
# 1. 连接
connections . connect (
alias = "myconnect" , host = "127.0.0.1" , port = '19530' ,
)
connections . disconnect ( "myconnect" )
# MilvusClient() # 另一套 API
# 2. database
db . list_database () # ["default"]
database = db . create_database ( "book" )
db . using_database ( "book" )
db . drop_database ( "book" )
# 3. collection, schema, index
collection = Collection ( "LangChainCollection" )
collection . create_index (field_name = "vector" , index_name = "myindex" )
utility . list_indexes ( "LangChainCollection" ) # ["myindex"], 索引名
collection . load ()
collection . indexes # 所有索引
collection . indexes [ 0 ]. field_name # 索引建立在哪个字段上, "vector"
data = [ { "vector" : [ 0.1 , - 0.2 ] , "text" : "ss" , "source" : "A" } ]
collection . insert (data)
collection . flush () # 使插入数据生效
collection . num_entities # collection 的行数
collection . release () # 修改索引需要先对 collection 释放
collection . drop_index (index_name = "myindex" ) # 根据索引名删除
# 4. 向量相似度查询 与 根据字段过滤 混合检索
# search: 按向量相似度
import random
queries = [[random . random () for i in range ( 1024 ) ] for k in range ( 2 ) ]
search_params = {
"metric_type" : "L2" ,
# "offset": 0,
# "ignore_growing": False,
"params" : { "nprobe" : 10 }
}
results = collection . serach (
data = queries,
anns_field = "vector" , # 指定用来做向量相似度查询应用的字段
params = search_params,
limit = 3 , # 每个查询返回几条查询结果
expr = None , # 按字段进行过滤的语句
output_fields = [ "source" , "text" , "pk" , "vector" ], # 返回字段名, 这里假设pk是主键
consistency_level = "Strong"
)
assert len (result) == 2
assert len (result[ 0 ]) == 3
assert result [ 0 ] [ 0 ] . id == result [ 0 ] [ 0 ] . fields [ 'pk' ]
result [ 0 ] [ 0 ] . distance # 相似度
result [ 0 ] [ 0 ] . fields : Dict # {"source": xxx, "text": xxx, "pk": xxx, "vector": [0.1, ..., -0.3]}
Milvus 2.4.x 新特性
新特性说明可参考 release 信息: https://github.com/milvus-io/milvus/releases/tag/v2.4.0-rc.1
Embedding
以下用法来自上述官方文档
Copy # pip install pymilvus[model]
from pymilvus . model . hybrid import BGEM3EmbeddingFunction
docs = [
"Artificial intelligence was founded as an academic discipline in 1956." ,
"Alan Turing was the first person to conduct substantial research in AI." ,
"Born in Maida Vale, London, Turing was raised in southern England." ,
]
query = "Who started AI research?"
bge_m3_ef = BGEM3EmbeddingFunction (use_fp16 = False , device = "cpu" )
docs_embeddings = bge_m3_ef (docs)
query_embeddings = bge_m3_ef ([query])
备注: 实质上这里是对 FlagEmbedding 的简单封装, 因此算不上是 Milvus 的特性
sparse vector
备注: 关于 sparse vector 所使用的距离函数目前只支持内积 (IP), 不支持 L2 及 COSINE.
hybrid_search
备注: 所谓的 hybrid_search
, 实质上只是根据多个 vector 类型的字段进行独立召回, 然后再对多个召回结果 rerank, 而目前 milvus 支持加权与 RRF 的 rerank 方法. 因此实质上 hybrid_search
也算不上是新特性.
fuzzy match: prefixes, infixes, suffixes search
似乎之前版本的 Milvus 只支持前缀索引 (text like 'the%'
), 2.4 之后支持前缀, 中缀, 后缀索引. 以下代码参考自 https://github.com/milvus-io/pymilvus/blob/2.4/examples/fuzzy_match.py
Copy res = collection . query (expr = 'title like "The%"' , output_fields = [ "id" , "title" ])
res = collection . query (expr = 'title like "%the%"' , output_fields = [ "id" , "title" ])
res = collection . query (expr = 'title like "%Rye"' , output_fields = [ "id" , "title" ])
res = collection . query (expr = 'title like "Flip_ed"' , output_fields = [ "id" , "title" ]) # _ 代表一个任意字符
# you can create inverted index to accelerate the fuzzy match.
collection . release ()
collection . create_index (
"title" , { "index_type" : "INVERTED" })
collection . load ()
Grouping Search
也就是对 multi-vector retriever 的支持 (特别地: 可用于 ParentDocumentRetriever), 代码参考 https://github.com/milvus-io/pymilvus/blob/2.4/examples/example_group_by.py
Copy # 这里 vectors[:3] 是 3 个向量: List[List[float]]
# doc_id 字段只有 44 个取值 (可以理解为 44 篇文章), batch_size=100
result = collection . search (vectors[: 3 ], "float_vector" , search_params, limit = batch_size, timeout = 600 ,
output_fields = [ "chunk_id" , "doc_id" ], group_by_field = "doc_id" )
# 最终 result 里对于每个检索向量, 最终地检索结果只有 44 个 (小于预定义的 100)
# 实际的运作逻辑是: 按 doc_id 分组, 以同一个 doc_id 的所有向量里最相似的分数作为这个 doc_id 的相似度值, 然后排序 (注意返回结果里每个 doc_id 底下只返回最相似的那个 chunk)
备注: 这个特性仍然无法用作支持 BGE-M3 的 colbert 向量搜索
MilvusClient
在 python client 方面, 将 MilvusClient 这种用法做了进一步完善, 估计后续版本的主流用法应该会是用 MilvusClient.
Weaviate
Weaviate 是一个向量数据库, 支持混合检索. 根据下面的文章可以看出, 其实际上只是分别检索, 然后 rerank 实现的. 注意字面检索使用的是 BM25/BM25F, 而 rerank 可以选择加权重或者是 RRF. 官方比较推荐用加权重的方式 rerank.
这里简述下运作逻辑: 假设最终需要检索 k 个文本, 那么分别用字面检索和向量检索得到 k 个文本 (目前似乎不能设置为多于 k 个, 或者其内部有可能设置更高, 但似乎不对用户暴露), 当使用加权 rerank 时, 首先分别将字面检索/向量检索的分数按线性变换到 0-1 之间, 即最相似的文本的相似度为 1, 第 k 个文本的相似度为 0. 然后再加权重 (权重可以设置), 最后排序得到最终的 k 个文本.