PolarDB Graph Database

Configuration and usage of PolarDB graph database in the MemOS framework

PolarDB Graph Database

MemOS supports using PolarDB (based on Apache AGE extension) as a graph database backend for storing and retrieving knowledge graph-style memory data. PolarDB combines the powerful capabilities of PostgreSQL with the flexibility of graph databases, making it particularly suitable for scenarios requiring both relational and graph data queries.

Features

  • Complete graph database operations: node CRUD, edge management
  • Vector embedding search: semantic retrieval with IVFFlat index support
  • Connection pool management: automatic database connection management with high concurrency support
  • Multi-tenant isolation: supports both physical and logical isolation modes
  • JSONB property storage: flexible metadata storage
  • Batch operations: supports batch insertion of nodes and edges
  • Automatic timestamps: automatically maintains created_at and updated_at
  • SQL injection protection: built-in parameterized queries and string escaping

Directory Structure

MemOS/
└── src/
    └── memos/
        β”œβ”€β”€ configs/
        β”‚   └── graph_db.py              # PolarDBGraphDBConfig configuration class
        └── graph_dbs/
            β”œβ”€β”€ base.py                  # BaseGraphDB abstract base class
            β”œβ”€β”€ factory.py               # GraphDBFactory factory class
            └── polardb.py               # PolarDBGraphDB implementation

Quick Start

1. Install Dependencies

# Install psycopg2 driver (choose one)
pip install psycopg2-binary  # Recommended: pre-compiled version
# or
pip install psycopg2          # Requires PostgreSQL development libraries

# Install MemOS
pip install memos

2. Configure PolarDB

{
  "graph_db_store": {
    "backend": "polardb",
    "config": {
      "host": "localhost",
      "port": 5432,
      "user": "postgres",
      "password": "your_password",
      "db_name": "memos_db",
      "user_name": "alice",
      "use_multi_db": true,
      "auto_create": false,
      "embedding_dimension": 1024,
      "maxconn": 100
    }
  }
}

Method 2: Code Initialization

from memos.configs.graph_db import PolarDBGraphDBConfig
from memos.graph_dbs.polardb import PolarDBGraphDB

# Create configuration
config = PolarDBGraphDBConfig(
    host="localhost",
    port=5432,
    user="postgres",
    password="your_password",
    db_name="memos_db",
    user_name="alice",
    use_multi_db=True,
    embedding_dimension=1024,
    maxconn=100
)

# Initialize database
graph_db = PolarDBGraphDB(config)

3. Basic Operation Examples

# ========================================
# Step 1: Add Node
# ========================================
node_id = graph_db.add_node(
    label="Memory",
    properties={
        "content": "Python is a high-level programming language",
        "memory_type": "Knowledge",
        "tags": ["programming", "python"]
    },
    embedding=[0.1, 0.2, 0.3, ...],  # 1024-dimensional vector
    user_name="alice"
)
print(f"βœ“ Node created: {node_id}")

# ========================================
# Step 2: Update Node
# ========================================
graph_db.update_node(
    id=node_id,
    fields={
        "content": "Python is an interpreted, object-oriented high-level programming language",
        "updated": True
    },
    user_name="alice"
)
print("βœ“ Node updated")

# ========================================
# Step 3: Create Relationship
# ========================================
# First create a second node
node_id_2 = graph_db.add_node(
    label="Memory",
    properties={
        "content": "Django is a web framework for Python",
        "memory_type": "Knowledge"
    },
    embedding=[0.15, 0.25, 0.35, ...],
    user_name="alice"
)

# Create edge
edge_id = graph_db.add_edge(
    source_id=node_id,
    target_id=node_id_2,
    edge_type="RELATED_TO",
    properties={
        "relationship": "framework and language",
        "confidence": 0.95
    },
    user_name="alice"
)
print(f"βœ“ Relationship created: {edge_id}")

# ========================================
# Step 4: Vector Search
# ========================================
query_embedding = [0.12, 0.22, 0.32, ...]  # Query vector

results = graph_db.search_by_embedding(
    embedding=query_embedding,
    top_k=5,
    memory_type="Knowledge",
    user_name="alice"
)

print(f"\nπŸ” Found {len(results)} similar nodes:")
for node in results:
    print(f"  - {node.get('content')} (similarity: {node.get('score', 'N/A')})")

# ========================================
# Step 5: Delete Node
# ========================================
graph_db.delete_node(id=node_id, user_name="alice")
print(f"βœ“ Node {node_id} deleted")

Configuration Details

PolarDBGraphDBConfig Parameters

ParameterTypeDefaultRequiredDescription
hoststr-βœ“Database host address
portint5432βœ—Database port
userstr-βœ“Database username
passwordstr-βœ“Database password
db_namestr-βœ“Target database name
user_namestrNoneβœ—Tenant identifier (for logical isolation)
use_multi_dbboolTrueβœ—Whether to use multi-database physical isolation
auto_createboolFalseβœ—Whether to automatically create database
embedding_dimensionint1024βœ—Vector embedding dimension
maxconnint100βœ—Maximum connections in connection pool

Multi-Tenant Mode Comparison

FeaturePhysical Isolation
(use_multi_db=True)
Logical Isolation
(use_multi_db=False)
Isolation LevelDatabase levelApplication layer tag filtering
Configuration Requirementsdb_name typically equals user_nameMust provide user_name
PerformanceBetter (independent resources)Good (shared resources)
CostHigh (independent DB per tenant)Low (shared database)
Use CasesEnterprise customers, high security requirementsSaaS multi-tenant, development testing
Data MigrationConvenient (full database export)Requires filtering by tags

Configuration Examples

{
  "graph_db_store": {
    "backend": "polardb",
    "config": {
      "host": "prod-polardb.example.com",
      "port": 5432,
      "user": "admin",
      "password": "secure_password",
      "db_name": "customer_001",
      "user_name": null,
      "use_multi_db": true,
      "auto_create": false,
      "embedding_dimension": 1536,
      "maxconn": 200
    }
  }
}
{
  "graph_db_store": {
    "backend": "polardb",
    "config": {
      "host": "shared-polardb.example.com",
      "port": 5432,
      "user": "app_user",
      "password": "app_password",
      "db_name": "shared_memos",
      "user_name": "tenant_alice",
      "use_multi_db": false,
      "auto_create": false,
      "embedding_dimension": 768,
      "maxconn": 50
    }
  }
}

Advanced Features

1. Batch Insert Nodes

# Batch add nodes (high performance)
nodes_data = [
    {
        "label": "Memory",
        "properties": {"content": f"Node {i}", "memory_type": "Test"},
        "embedding": [0.1 * i] * 1024,
    }
    for i in range(100)
]

node_ids = graph_db.add_nodes_batch(
    nodes=nodes_data,
    user_name="alice"
)
print(f"βœ“ Batch created {len(node_ids)} nodes")

2. Complex Query Examples

# Find memories of specific type and sort by time
def get_recent_memories(graph_db, memory_type, limit=10):
    """Get recent memory nodes"""
    query = f"""
        SELECT * FROM "{graph_db.db_name}_graph"."Memory"
        WHERE properties->>'memory_type' = %s
          AND properties->>'user_name' = %s
        ORDER BY updated_at DESC
        LIMIT %s
    """
    
    conn = graph_db._get_connection()
    try:
        with conn.cursor() as cursor:
            cursor.execute(query, [memory_type, "alice", limit])
            results = cursor.fetchall()
            return results
    finally:
        graph_db._return_connection(conn)

# Usage example
recent = get_recent_memories(graph_db, "WorkingMemory", limit=5)
print(f"Recent 5 working memories: {len(recent)} items")

3. Vector Index Optimization

# Create or update vector index
graph_db.create_index(
    label="Memory",
    vector_property="embedding",
    dimensions=1024,
    index_name="memory_vector_index"
)
print("βœ“ Vector index optimized")

4. Connection Pool Monitoring

# View connection pool status (for debugging only)
import logging
logging.basicConfig(level=logging.DEBUG)

# Detailed logs will be output when acquiring connection
conn = graph_db._get_connection()
# [DEBUG] [_get_connection] Successfully acquired connection from pool
graph_db._return_connection(conn)
# [DEBUG] [_return_connection] Successfully returned connection to pool

BaseGraphDB Interface

PolarDB implements all methods of the BaseGraphDB abstract class, ensuring interoperability with other graph database backends.

Core Methods

MethodDescriptionParameters
add_node()Add a single nodelabel, properties, embedding, user_name
add_nodes_batch()Batch add nodesnodes, user_name
update_node()Update node propertiesid, fields, user_name
delete_node()Delete nodeid, user_name
delete_node_by_params()Delete nodes by conditionsparams, user_name
add_edge()Create relationshipsource_id, target_id, edge_type, properties, user_name
update_edge()Update relationship propertiesedge_id, properties, user_name
delete_edge()Delete relationshipedge_id, user_name
search_by_embedding()Vector similarity searchembedding, top_k, memory_type, user_name
get_node()Get a single nodeid, user_name
get_memory_count()Count nodesmemory_type, user_name
remove_oldest_memory()Clean old memoriesmemory_type, keep_latest, user_name

Complete Method Signature Examples

from typing import Any

# Add node
def add_node(
    self,
    label: str = "Memory",
    properties: dict[str, Any] | None = None,
    embedding: list[float] | None = None,
    user_name: str | None = None
) -> str:
    """Add a new node to the graph database"""
    pass

# Vector search
def search_by_embedding(
    self,
    embedding: list[float],
    top_k: int = 10,
    memory_type: str | None = None,
    user_name: str | None = None,
    filters: dict[str, Any] | None = None
) -> list[dict[str, Any]]:
    """Perform similarity search based on vector embedding"""
    pass

# Batch operations
def add_nodes_batch(
    self,
    nodes: list[dict[str, Any]],
    user_name: str | None = None
) -> list[str]:
    """Batch add multiple nodes"""
    pass

Extension Development Guide

If you need to implement custom functionality based on PolarDB, you can inherit the PolarDBGraphDB class:

from memos.graph_dbs.polardb import PolarDBGraphDB
from memos.configs.graph_db import PolarDBGraphDBConfig

class CustomPolarDBGraphDB(PolarDBGraphDB):
    """Custom PolarDB graph database implementation"""
    
    def __init__(self, config: PolarDBGraphDBConfig):
        super().__init__(config)
        # Custom initialization logic
        self.custom_index_created = False
    
    def create_custom_index(self):
        """Create custom index"""
        conn = self._get_connection()
        try:
            with conn.cursor() as cursor:
                cursor.execute(f"""
                    CREATE INDEX IF NOT EXISTS idx_custom_field
                    ON "{self.db_name}_graph"."Memory" 
                    ((properties->>'custom_field'));
                """)
                conn.commit()
                self.custom_index_created = True
                print("βœ“ Custom index created")
        except Exception as e:
            print(f"❌ Failed to create index: {e}")
            conn.rollback()
        finally:
            self._return_connection(conn)
    
    def search_by_custom_field(self, field_value: str):
        """Search based on custom field"""
        query = f"""
            SELECT * FROM "{self.db_name}_graph"."Memory"
            WHERE properties->>'custom_field' = %s
        """
        
        conn = self._get_connection()
        try:
            with conn.cursor() as cursor:
                cursor.execute(query, [field_value])
                results = cursor.fetchall()
                return results
        finally:
            self._return_connection(conn)

# Use custom implementation
config = PolarDBGraphDBConfig(
    host="localhost",
    port=5432,
    user="postgres",
    password="password",
    db_name="custom_db"
)

custom_db = CustomPolarDBGraphDB(config)
custom_db.create_custom_index()
results = custom_db.search_by_custom_field("special_value")

Reference Resources

Next Steps