Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 26 additions & 49 deletions agent_core/core/impl/context/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,73 +603,50 @@ def _build_memory_query(
) -> Optional[str]:
"""Build a semantic query for memory retrieval.

Combines task instruction with recent conversation messages (both user
and agent) to provide better context for memory search.

Args:
query: Optional explicit query string.
session_id: Optional session ID for session-specific state lookup.

Returns:
A query string suitable for semantic memory search, or None if no context.
Priority: latest user message → task instruction → explicit query.
Agent messages are deliberately excluded — they often restate or
drift to adjacent topics and were observed dominating the embedding
(a long proactive-tasks reply poisoned a follow-up MCP question).
"""
# Get task instruction as the base query
session = get_session_or_none(session_id)
if session and session.current_task:
task_instruction = session.current_task.instruction
else:
current_task = get_state().current_task
task_instruction = current_task.instruction if current_task else None

if not task_instruction:
# Fall back to explicit query if no task
return query if query else None

# Get recent conversation messages for additional context
recent_context = self._get_recent_conversation_for_memory(session_id, limit=5)
latest_user_message = self._get_latest_user_message(session_id)
if latest_user_message:
return latest_user_message

if recent_context:
return f"{task_instruction}\n\nRecent conversation:\n{recent_context}"
else:
return task_instruction
session = get_session_or_none(session_id)
current_task = (
session.current_task if session and session.current_task
else get_state().current_task
)
if current_task and current_task.instruction:
return current_task.instruction

def _get_recent_conversation_for_memory(
self, session_id: Optional[str], limit: int = 5
) -> str:
"""Get recent conversation messages for memory query context.
return query or None

Args:
session_id: Optional session ID for session-specific event stream.
limit: Maximum number of messages to include.
def _get_latest_user_message(self, session_id: Optional[str]) -> str:
"""Return the most recent user message text, or empty string if none.

Returns:
Formatted string of recent user and agent messages.
Walks the conversation-history buffer from newest to oldest and returns
the first event whose kind contains 'user message'. Agent messages are
skipped entirely.
"""
try:
event_stream_manager = self.state_manager.event_stream_manager
if not event_stream_manager:
return ""

# Get messages from conversation history (includes both user and agent)
recent_messages = event_stream_manager.get_recent_conversation_messages(
limit
limit=20
)
if not recent_messages:
return ""

# Format messages simply for semantic search
lines = []
for event in recent_messages:
# Simplify the kind label for the query
if "user message" in event.kind:
lines.append(f"User: {event.message}")
elif "agent message" in event.kind:
lines.append(f"Agent: {event.message}")

return "\n".join(lines)
for event in reversed(recent_messages):
if "user message" in event.kind and event.message:
return event.message.strip()
return ""

except Exception as e:
logger.warning(f"[MEMORY] Failed to get recent conversation: {e}")
logger.warning(f"[MEMORY] Failed to get latest user message: {e}")
return ""

def get_memory_context(
Expand Down
113 changes: 113 additions & 0 deletions agent_core/core/impl/memory/bm25_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
"""
In-memory BM25 keyword index for memory chunks.

Sits alongside ChromaDB to backstop semantic search on terms vector embeddings
struggle with (proper nouns, dates, IDs, code identifiers). The index is fully
rebuilt from the current chunk set on every refresh — at ~200 memory items it
costs <50ms and avoids the complexity of incremental BM25 updates.
"""

from __future__ import annotations

import re
import threading
from typing import Dict, List, Optional, Tuple

try:
from rank_bm25 import BM25Okapi
_HAS_BM25 = True
except ImportError:
BM25Okapi = None
_HAS_BM25 = False

from agent_core.utils.logger import logger


_TOKEN_RE = re.compile(r"[A-Za-z0-9_]+")


def tokenize(text: str) -> List[str]:
"""Lowercase word/number tokenizer. Keeps identifiers intact."""
if not text:
return []
return [t.lower() for t in _TOKEN_RE.findall(text)]


class BM25Index:
"""Thread-safe BM25 index keyed by chunk_id.

On a fresh install or when ``rank_bm25`` is not installed, BM25 retrieval
silently degrades to an empty result set. The MemoryManager then falls back
to pure vector search, so retrieval keeps working — just without the
keyword channel.
"""

def __init__(self) -> None:
self._lock = threading.Lock()
self._chunk_ids: List[str] = []
self._tokenized: List[List[str]] = []
self._bm25: Optional["BM25Okapi"] = None

def rebuild(self, chunks: Dict[str, str]) -> None:
"""Rebuild the index from ``chunk_id -> raw text``.

Args:
chunks: Mapping of chunk_id to the searchable text body.
"""
with self._lock:
self._chunk_ids = list(chunks.keys())
self._tokenized = [tokenize(chunks[cid]) for cid in self._chunk_ids]

if not _HAS_BM25:
self._bm25 = None
return

if not self._tokenized:
self._bm25 = None
return

# rank_bm25 raises on empty docs; replace with a single sentinel token
sanitized = [doc if doc else ["__empty__"] for doc in self._tokenized]
try:
self._bm25 = BM25Okapi(sanitized)
except Exception as e:
logger.warning(f"[BM25Index] Failed to build index: {e}")
self._bm25 = None

def search(self, query: str, top_k: int = 20) -> List[Tuple[str, float]]:
"""Return ``[(chunk_id, score)]`` sorted high-to-low. Empty when index unavailable."""
if not query or not query.strip():
return []

with self._lock:
if self._bm25 is None or not self._chunk_ids:
return []

tokens = tokenize(query)
if not tokens:
return []

try:
scores = self._bm25.get_scores(tokens)
except Exception as e:
logger.warning(f"[BM25Index] Query failed: {e}")
return []

indexed = [
(self._chunk_ids[i], float(scores[i]))
for i in range(len(self._chunk_ids))
if scores[i] > 0
]
indexed.sort(key=lambda x: x[1], reverse=True)
return indexed[:top_k]

@property
def size(self) -> int:
with self._lock:
return len(self._chunk_ids)

@property
def is_available(self) -> bool:
"""True when rank_bm25 is installed AND the index has documents."""
return _HAS_BM25 and self.size > 0
85 changes: 85 additions & 0 deletions agent_core/core/impl/memory/entity_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
"""
Lightweight heuristic entity extractor for memory chunks.

This is intentionally simple — Phase 1 just needs to surface proper-noun-like
tokens so they end up in chunk metadata (and in the BM25 corpus). Higher-quality
LLM-based NER is a future phase.

The extractor pulls:
- Capitalised multi-word sequences (proper nouns)
- Tokens that look like identifiers (CamelCase, snake_case with caps)
- Quoted strings

Stopword filtering trims common English starters that get capitalised at
sentence boundaries.
"""

from __future__ import annotations

import re
from typing import List

_STOP = {
"the", "a", "an", "and", "or", "but", "of", "in", "on", "at", "to", "for",
"with", "by", "from", "as", "is", "are", "was", "were", "be", "been", "being",
"have", "has", "had", "do", "does", "did", "will", "would", "should", "could",
"may", "might", "must", "can", "i", "you", "he", "she", "it", "we", "they",
"this", "that", "these", "those", "user", "agent", "task", "action", "event",
"memory", "system", "note", "today", "yesterday", "tomorrow", "monday",
"tuesday", "wednesday", "thursday", "friday", "saturday", "sunday",
"january", "february", "march", "april", "may", "june", "july", "august",
"september", "october", "november", "december",
}

# Capitalised words (incl. CamelCase), optionally chained: "Trading View",
# "OpenAI", "CraftBot", "John Doe"
_PROPER_NOUN_RE = re.compile(
r"\b[A-Z][A-Za-z0-9]*(?:[ \-_][A-Z][A-Za-z0-9]*)*\b"
)

# Quoted strings (single or double)
_QUOTED_RE = re.compile(r"\"([^\"]{2,40})\"|'([^']{2,40})'")


def extract_entities(text: str, max_entities: int = 12) -> List[str]:
"""Extract candidate entity strings from text.

Returns a deduplicated, order-preserving list. The cap exists so chunk
metadata stays compact (ChromaDB stores it for every chunk).
"""
if not text:
return []

seen: set[str] = set()
out: List[str] = []

for match in _PROPER_NOUN_RE.finditer(text):
candidate = match.group(0).strip()
if not candidate:
continue
lowered = candidate.lower()
if lowered in _STOP:
continue
# Drop single-letter or pure-numeric tokens
if len(candidate) < 2:
continue
if candidate.isdigit():
continue
if lowered in seen:
continue
seen.add(lowered)
out.append(candidate)
if len(out) >= max_entities:
return out

for match in _QUOTED_RE.finditer(text):
candidate = (match.group(1) or match.group(2) or "").strip()
if not candidate or candidate.lower() in seen:
continue
seen.add(candidate.lower())
out.append(candidate)
if len(out) >= max_entities:
break

return out
Loading