PolarDB 图数据库

MemOS 支持使用 PolarDB(基于 Apache AGE 扩展)作为图数据库后端,用于存储和检索知识图谱式的记忆数据。PolarDB 结合了 PostgreSQL 的强大功能和图数据库的灵活性,特别适合需要同时进行关系型和图数据查询的场景。

功能特性

  • 完整的图数据库操作:节点增删改查、边管理
  • 向量嵌入搜索:支持 IVFFlat 索引的语义检索
  • 连接池管理:自动管理数据库连接,支持高并发
  • 多租户隔离:支持物理和逻辑两种隔离模式
  • JSONB 属性存储:灵活的元数据存储
  • 批量操作:支持批量插入节点和边
  • 自动时间戳:自动维护 created_atupdated_at
  • SQL 注入防护:内置参数化查询和字符串转义

目录结构

MemOS/
└── src/
    └── memos/
        ├── configs/
        │   └── graph_db.py              # PolarDBGraphDBConfig 配置类
        └── graph_dbs/
            ├── base.py                  # BaseGraphDB 抽象基类
            ├── factory.py               # GraphDBFactory 工厂类
            └── polardb.py               # PolarDBGraphDB 实现

快速开始

1. 安装依赖

# 安装 psycopg2 驱动(二选一)
pip install psycopg2-binary  # 推荐:预编译版本
# 或
pip install psycopg2          # 需要 PostgreSQL 开发库

# 安装 MemOS
pip install MemoryOS -U

2. 配置 PolarDB

方式一:使用配置文件(推荐)

{
  "graph_db_store": {
    "backend": "polardb",
    "config": {
      "host": "localhost",
      "port": 5432,
      "user": "postgres",
      "password": "your_password",
      "db_name": "memos_db",
      "user_name": "alice",
      "use_multi_db": true,
      "auto_create": false,
      "embedding_dimension": 1024,
      "maxconn": 100
    }
  }
}

方式二:代码初始化

from memos.configs.graph_db import PolarDBGraphDBConfig
from memos.graph_dbs.polardb import PolarDBGraphDB

# 创建配置
config = PolarDBGraphDBConfig(
    host="localhost",
    port=5432,
    user="postgres",
    password="your_password",
    db_name="memos_db",
    user_name="alice",
    use_multi_db=True,
    embedding_dimension=1024,
    maxconn=100
)

# 初始化数据库
graph_db = PolarDBGraphDB(config)

3. 基本操作示例

# ========================================
# 步骤 1: 添加节点
# ========================================
node_id = graph_db.add_node(
    label="Memory",
    properties={
        "content": "Python 是一种高级编程语言",
        "memory_type": "Knowledge",
        "tags": ["programming", "python"]
    },
    embedding=[0.1, 0.2, 0.3, ...],  # 1024维向量
    user_name="alice"
)
print(f"✓ 节点已创建: {node_id}")

# ========================================
# 步骤 2: 更新节点
# ========================================
graph_db.update_node(
    id=node_id,
    fields={
        "content": "Python 是一种解释型、面向对象的高级编程语言",
        "updated": True
    },
    user_name="alice"
)
print("✓ 节点已更新")

# ========================================
# 步骤 3: 创建关系
# ========================================
# 先创建第二个节点
node_id_2 = graph_db.add_node(
    label="Memory",
    properties={
        "content": "Django 是 Python 的 Web 框架",
        "memory_type": "Knowledge"
    },
    embedding=[0.15, 0.25, 0.35, ...],
    user_name="alice"
)

# 创建边
edge_id = graph_db.add_edge(
    source_id=node_id,
    target_id=node_id_2,
    edge_type="RELATED_TO",
    properties={
        "relationship": "框架与语言",
        "confidence": 0.95
    },
    user_name="alice"
)
print(f"✓ 关系已创建: {edge_id}")

# ========================================
# 步骤 4: 向量搜索
# ========================================
query_embedding = [0.12, 0.22, 0.32, ...]  # 查询向量

results = graph_db.search_by_embedding(
    embedding=query_embedding,
    top_k=5,
    memory_type="Knowledge",
    user_name="alice"
)

print(f"\n🔍 找到 {len(results)} 个相似节点:")
for node in results:
    print(f"  - {node.get('content')} (相似度: {node.get('score', 'N/A')})")

# ========================================
# 步骤 5: 删除节点
# ========================================
graph_db.delete_node(id=node_id, user_name="alice")
print(f"✓ 节点 {node_id} 已删除")

配置详解

PolarDBGraphDBConfig 参数说明

参数类型默认值必填说明
hoststr-数据库主机地址
portint5432数据库端口
userstr-数据库用户名
passwordstr-数据库密码
db_namestr-目标数据库名称
user_namestrNone租户标识(用于逻辑隔离)
use_multi_dbboolTrue是否使用多数据库物理隔离
auto_createboolFalse是否自动创建数据库
embedding_dimensionint1024向量嵌入维度
maxconnint100连接池最大连接数

多租户模式对比

特性物理隔离
(use_multi_db=True)
逻辑隔离
(use_multi_db=False)
隔离级别数据库级别应用层标签过滤
配置要求db_name 通常等于 user_name必须提供 user_name
性能更好(独立资源)较好(共享资源)
成本高(每租户独立DB)低(共享数据库)
适用场景企业客户、高安全要求SaaS 多租户、开发测试
数据迁移方便(整库导出)需要按标签过滤

配置示例

示例 1:物理隔离(企业版推荐)

{
  "graph_db_store": {
    "backend": "polardb",
    "config": {
      "host": "prod-polardb.example.com",
      "port": 5432,
      "user": "admin",
      "password": "secure_password",
      "db_name": "customer_001",
      "user_name": null,
      "use_multi_db": true,
      "auto_create": false,
      "embedding_dimension": 1536,
      "maxconn": 200
    }
  }
}

示例 2:逻辑隔离(SaaS 推荐)

{
  "graph_db_store": {
    "backend": "polardb",
    "config": {
      "host": "shared-polardb.example.com",
      "port": 5432,
      "user": "app_user",
      "password": "app_password",
      "db_name": "shared_memos",
      "user_name": "tenant_alice",
      "use_multi_db": false,
      "auto_create": false,
      "embedding_dimension": 768,
      "maxconn": 50
    }
  }
}

高级特性

1. 批量插入节点

# 批量添加节点(高性能)
nodes_data = [
    {
        "label": "Memory",
        "properties": {"content": f"节点 {i}", "memory_type": "Test"},
        "embedding": [0.1 * i] * 1024,
    }
    for i in range(100)
]

node_ids = graph_db.add_nodes_batch(
    nodes=nodes_data,
    user_name="alice"
)
print(f"✓ 批量创建了 {len(node_ids)} 个节点")

2. 复杂查询示例

# 查找特定类型的记忆并按时间排序
def get_recent_memories(graph_db, memory_type, limit=10):
    """获取最近的记忆节点"""
    query = f"""
        SELECT * FROM "{graph_db.db_name}_graph"."Memory"
        WHERE properties->>'memory_type' = %s
          AND properties->>'user_name' = %s
        ORDER BY updated_at DESC
        LIMIT %s
    """
    
    conn = graph_db._get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute(query, [memory_type, "alice", limit])
            results = cursor.fetchall()
            return results
    finally:
        graph_db._return_connection(conn)

# 使用示例
recent = get_recent_memories(graph_db, "WorkingMemory", limit=5)
print(f"最近 5 条工作记忆: {len(recent)} 条")

3. 向量索引优化

# 创建或更新向量索引
graph_db.create_index(
    label="Memory",
    vector_property="embedding",
    dimensions=1024,
    index_name="memory_vector_index"
)
print("✓ 向量索引已优化")

4. 连接池监控

# 查看连接池状态(仅供调试)
import logging
logging.basicConfig(level=logging.DEBUG)

# 获取连接时会输出详细日志
conn = graph_db._get_connection()
# [DEBUG] [_get_connection] Successfully acquired connection from pool
graph_db._return_connection(conn)
# [DEBUG] [_return_connection] Successfully returned connection to pool

BaseGraphDB 接口

PolarDB 实现了 BaseGraphDB 抽象类的所有方法,确保与其他图数据库后端的互换性。

核心方法

方法说明参数
add_node()添加单个节点label, properties, embedding, user_name
add_nodes_batch()批量添加节点nodes, user_name
update_node()更新节点属性id, fields, user_name
delete_node()删除节点id, user_name
delete_node_by_params()按条件删除节点params, user_name
add_edge()创建关系source_id, target_id, edge_type, properties, user_name
update_edge()更新关系属性edge_id, properties, user_name
delete_edge()删除关系edge_id, user_name
search_by_embedding()向量相似度搜索embedding, top_k, memory_type, user_name
get_node()获取单个节点id, user_name
get_memory_count()统计节点数量memory_type, user_name
remove_oldest_memory()清理旧记忆memory_type, keep_latest, user_name

完整方法签名示例

from typing import Any

# 添加节点
def add_node(
    self,
    label: str = "Memory",
    properties: dict[str, Any] | None = None,
    embedding: list[float] | None = None,
    user_name: str | None = None
) -> str:
    """添加一个新节点到图数据库"""
    pass

# 向量搜索
def search_by_embedding(
    self,
    embedding: list[float],
    top_k: int = 10,
    memory_type: str | None = None,
    user_name: str | None = None,
    filters: dict[str, Any] | None = None
) -> list[dict[str, Any]]:
    """基于向量嵌入进行相似度搜索"""
    pass

# 批量操作
def add_nodes_batch(
    self,
    nodes: list[dict[str, Any]],
    user_name: str | None = None
) -> list[str]:
    """批量添加多个节点"""
    pass

扩展开发指南

如果需要基于 PolarDB 实现自定义功能,可以继承 PolarDBGraphDB 类:

from memos.graph_dbs.polardb import PolarDBGraphDB
from memos.configs.graph_db import PolarDBGraphDBConfig

class CustomPolarDBGraphDB(PolarDBGraphDB):
    """自定义 PolarDB 图数据库实现"""
    
    def __init__(self, config: PolarDBGraphDBConfig):
        super().__init__(config)
        # 自定义初始化逻辑
        self.custom_index_created = False
    
    def create_custom_index(self):
        """创建自定义索引"""
        conn = self._get_connection()
        try:
            with conn.cursor() as cursor:
                cursor.execute(f"""
                    CREATE INDEX IF NOT EXISTS idx_custom_field
                    ON "{self.db_name}_graph"."Memory" 
                    ((properties->>'custom_field'));
                """)
                conn.commit()
                self.custom_index_created = True
                print("✓ 自定义索引已创建")
        except Exception as e:
            print(f"❌ 创建索引失败: {e}")
            conn.rollback()
        finally:
            self._return_connection(conn)
    
    def search_by_custom_field(self, field_value: str):
        """基于自定义字段搜索"""
        query = f"""
            SELECT * FROM "{self.db_name}_graph"."Memory"
            WHERE properties->>'custom_field' = %s
        """
        
        conn = self._get_connection()
        try:
            with conn.cursor() as cursor:
                cursor.execute(query, [field_value])
                results = cursor.fetchall()
                return results
        finally:
            self._return_connection(conn)

# 使用自定义实现
config = PolarDBGraphDBConfig(
    host="localhost",
    port=5432,
    user="postgres",
    password="password",
    db_name="custom_db"
)

custom_db = CustomPolarDBGraphDB(config)
custom_db.create_custom_index()
results = custom_db.search_by_custom_field("special_value")

参考资源

下一步