Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agentex/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ services:
- RETENTION_CLEANUP_PAGE_SIZE=${RETENTION_CLEANUP_PAGE_SIZE:-200}
- RETENTION_CLEANUP_MAX_IN_FLIGHT=${RETENTION_CLEANUP_MAX_IN_FLIGHT:-20}
- RETENTION_CLEANUP_DRY_RUN=${RETENTION_CLEANUP_DRY_RUN:-true}
- RETENTION_CLEANUP_STALE_RUNNING_DAYS=${RETENTION_CLEANUP_STALE_RUNNING_DAYS:-0}
ports:
- "5003:5003"
volumes:
Expand Down Expand Up @@ -242,6 +243,7 @@ services:
- RETENTION_CLEANUP_PAGE_SIZE=${RETENTION_CLEANUP_PAGE_SIZE:-200}
- RETENTION_CLEANUP_MAX_IN_FLIGHT=${RETENTION_CLEANUP_MAX_IN_FLIGHT:-20}
- RETENTION_CLEANUP_DRY_RUN=${RETENTION_CLEANUP_DRY_RUN:-true}
- RETENTION_CLEANUP_STALE_RUNNING_DAYS=${RETENTION_CLEANUP_STALE_RUNNING_DAYS:-0}
volumes:
- .:/app:cached
depends_on:
Expand Down
45 changes: 38 additions & 7 deletions agentex/src/config/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class EnvVarKeys(str, Enum):
RETENTION_CLEANUP_PAGE_SIZE = "RETENTION_CLEANUP_PAGE_SIZE"
RETENTION_CLEANUP_MAX_IN_FLIGHT = "RETENTION_CLEANUP_MAX_IN_FLIGHT"
RETENTION_CLEANUP_DRY_RUN = "RETENTION_CLEANUP_DRY_RUN"
RETENTION_CLEANUP_STALE_RUNNING_DAYS = "RETENTION_CLEANUP_STALE_RUNNING_DAYS"


class Environment(str, Enum):
Expand All @@ -76,6 +77,30 @@ class Environment(str, Enum):
refreshed_environment_variables = None


def _parse_bool_env(key: EnvVarKeys, default: bool) -> bool:
"""
Strict boolean env parsing: accepts true/false/1/0 case-insensitively,
raises on anything else.

The previous pattern (`os.environ.get(key, ...) == "true"`) silently
coerced any unrecognized value to False. For RETENTION_CLEANUP_DRY_RUN
that failure mode is destructive: `DRY_RUN=True` (capital T, as YAML
tooling tends to render booleans) meant dry_run=False, i.e. live
deletion. Fail loud instead so a misconfigured worker refuses to run.
"""
raw = os.environ.get(key)
if raw is None:
return default
normalized = raw.strip().lower()
if normalized in ("true", "1"):
return True
if normalized in ("false", "0"):
return False
raise ValueError(
f"Invalid boolean for {key.value}: {raw!r} (expected true/false/1/0)"
)


class EnvironmentVariables(BaseModel):
ENVIRONMENT: str | None = Environment.DEV
OPENAI_API_KEY: str | None
Expand Down Expand Up @@ -128,6 +153,10 @@ class EnvironmentVariables(BaseModel):
RETENTION_CLEANUP_PAGE_SIZE: int = 200
RETENTION_CLEANUP_MAX_IN_FLIGHT: int = 20
RETENTION_CLEANUP_DRY_RUN: bool = True
# When > 0, tasks stuck in RUNNING with no interaction for this many days
# are treated as abandoned and become eligible for cleanup. 0 disables the
# override (RUNNING tasks are never cleaned), preserving prior behavior.
RETENTION_CLEANUP_STALE_RUNNING_DAYS: int = 0

@classmethod
def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
Expand Down Expand Up @@ -210,15 +239,14 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
AGENTEX_SERVER_TASK_QUEUE=os.environ.get(
EnvVarKeys.AGENTEX_SERVER_TASK_QUEUE
),
ENABLE_HEALTH_CHECK_WORKFLOW=(
os.environ.get(EnvVarKeys.ENABLE_HEALTH_CHECK_WORKFLOW, "false")
== "true"
ENABLE_HEALTH_CHECK_WORKFLOW=_parse_bool_env(
EnvVarKeys.ENABLE_HEALTH_CHECK_WORKFLOW, default=False
),
WEBHOOK_REQUEST_TIMEOUT=float(
os.environ.get(EnvVarKeys.WEBHOOK_REQUEST_TIMEOUT, "15.0")
),
RETENTION_CLEANUP_ENABLED=(
os.environ.get(EnvVarKeys.RETENTION_CLEANUP_ENABLED, "false") == "true"
RETENTION_CLEANUP_ENABLED=_parse_bool_env(
EnvVarKeys.RETENTION_CLEANUP_ENABLED, default=False
),
RETENTION_CLEANUP_AGENT_ALLOWLIST=[
name.strip()
Expand All @@ -239,8 +267,11 @@ def refresh(cls, force_refresh: bool = False) -> EnvironmentVariables | None:
RETENTION_CLEANUP_MAX_IN_FLIGHT=int(
os.environ.get(EnvVarKeys.RETENTION_CLEANUP_MAX_IN_FLIGHT, "20")
),
RETENTION_CLEANUP_DRY_RUN=(
os.environ.get(EnvVarKeys.RETENTION_CLEANUP_DRY_RUN, "true") == "true"
RETENTION_CLEANUP_DRY_RUN=_parse_bool_env(
EnvVarKeys.RETENTION_CLEANUP_DRY_RUN, default=True
),
RETENTION_CLEANUP_STALE_RUNNING_DAYS=int(
os.environ.get(EnvVarKeys.RETENTION_CLEANUP_STALE_RUNNING_DAYS, "0")
),
)
refreshed_environment_variables = environment_variables
Expand Down
132 changes: 108 additions & 24 deletions agentex/src/domain/services/task_retention_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ async def clean_task(
*,
enforce_idle_threshold: bool = True,
idle_days: int = 7,
stale_running_days: int = 0,
) -> TaskCleanupResultEntity:
"""
Delete content-bearing rows for a stale task. Idempotent: re-running on a
Expand All @@ -189,9 +190,18 @@ async def clean_task(
scheduled Temporal sweep always sets True. The admin endpoint
accepts a force=true flag that flips this to False.
idle_days: Idle threshold in days (when enforce_idle_threshold=True).
stale_running_days: When > 0, a task whose status is RUNNING but
whose last interaction is at least this many days old is treated
as abandoned and may be cleaned. 0 (default) keeps the strict
behavior: RUNNING tasks are never cleaned. Tasks that hang in
RUNNING forever (agent crashed mid-run, workflow never reached a
terminal state) would otherwise be exempt from retention
indefinitely, defeating the policy for exactly the data most
likely to be forgotten.

Refuses (raises) if:
- task is currently active (status == RUNNING).
- task is currently active (status == RUNNING) and not stale per
stale_running_days.
- enforce_idle_threshold=True and the task is not idle long enough.
- unprocessed events exist past agent_task_tracker cursors.

Expand Down Expand Up @@ -235,20 +245,31 @@ async def clean_task(
events_deleted=0,
)

# Last interaction fetched once; both the stale-idle signal and the
# idle-threshold guard compare against it (no per-threshold re-query).
# is_stale_idle is the "abandoned" signal that relaxes both the RUNNING
# and unprocessed-events guards below.
last_interaction = await self._last_interaction_at(task)
is_stale_idle = stale_running_days > 0 and self._is_idle_since(
last_interaction, stale_running_days
)

# 2. Status + idle threshold guards.
if task.status == TaskStatus.RUNNING:
raise ClientError(
f"Cannot clean task {task_id}: status is RUNNING (active)"
)
if enforce_idle_threshold and not await self._is_task_idle(task, idle_days):
running_override = self._check_running_guard(
task, is_stale_idle, emit_forensics=True
)
if enforce_idle_threshold and not self._is_idle_since(
last_interaction, idle_days
):
raise ClientError(
f"Cannot clean task {task_id}: not idle for {idle_days} days "
f"(use force=true to override)"
)

# 3. Unprocessed-events guard.
if await self._has_unprocessed_events(task_id):
raise ClientError(f"Cannot clean task {task_id}: unprocessed events remain")
# 3. Unprocessed-events guard (relaxed only for the stale-RUNNING case).
await self._check_unprocessed_events_guard(
task_id, relax=running_override, emit_forensics=True
)

# 4-5. Mongo deletes.
messages_deleted = await self.task_message_service.delete_all_messages(task_id)
Expand Down Expand Up @@ -293,6 +314,7 @@ async def preview_clean_task(
*,
enforce_idle_threshold: bool = True,
idle_days: int = 7,
stale_running_days: int = 0,
) -> TaskCleanupResultEntity:
"""
Run the same safety checks as clean_task without deleting or updating data.
Expand All @@ -306,19 +328,25 @@ async def preview_clean_task(
if task.cleaned_at is not None:
cleaned_at = task.cleaned_at
else:
if task.status == TaskStatus.RUNNING:
raise ClientError(
f"Cannot clean task {task_id}: status is RUNNING (active)"
)
if enforce_idle_threshold and not await self._is_task_idle(task, idle_days):
last_interaction = await self._last_interaction_at(task)
is_stale_idle = stale_running_days > 0 and self._is_idle_since(
last_interaction, stale_running_days
)
# Preview is a non-mutating audit, so it does not emit the forensic
# WARNING that a real cleanup does (keeps dry-run logs clean).
running_override = self._check_running_guard(
task, is_stale_idle, emit_forensics=False
)
if enforce_idle_threshold and not self._is_idle_since(
last_interaction, idle_days
):
raise ClientError(
f"Cannot clean task {task_id}: not idle for {idle_days} days "
f"(use force=true to override)"
)
if await self._has_unprocessed_events(task_id):
raise ClientError(
f"Cannot clean task {task_id}: unprocessed events remain"
)
await self._check_unprocessed_events_guard(
task_id, relax=running_override, emit_forensics=False
)
cleaned_at = datetime.now(UTC)

result = TaskCleanupResultEntity(
Expand Down Expand Up @@ -452,15 +480,66 @@ async def rehydrate_task(

# ---- internal helpers ----

async def _is_task_idle(self, task, idle_days: int) -> bool:
def _check_running_guard(
self, task, is_stale_idle: bool, emit_forensics: bool
) -> bool:
"""
Raise ClientError if `task` is RUNNING, unless it is stale-idle
(abandoned). A real cleanup logs the override at WARNING for forensics
(cleaning a RUNNING task declares its workflow abandoned); previews pass
emit_forensics=False so a dry-run audit does not produce log entries
indistinguishable from a live deletion.

Returns True iff the stale-RUNNING override fired (task was RUNNING and
abandoned). The caller uses this to decide whether to also relax the
unprocessed-events guard, scoping that relaxation to exactly the
stuck-RUNNING case this feature targets.
"""
if task.status != TaskStatus.RUNNING:
return False
if is_stale_idle:
if emit_forensics:
logger.warning(
"task_cleanup_stale_running_override",
extra={"task_id": task.id},
)
return True
raise ClientError(f"Cannot clean task {task.id}: status is RUNNING (active)")

async def _check_unprocessed_events_guard(
self, task_id: str, relax: bool, emit_forensics: bool
) -> None:
"""
Raise ClientError if events exist past the agent_task_tracker cursor,
unless `relax` (set only when the stale-RUNNING override fired). A task
stuck RUNNING and abandoned will never process those events, and
signal-driven agents never advance the cursor in the first place, so for
that case the events are deleted with the rest. Scoped to stale-RUNNING
on purpose: a terminal task with a lagging cursor keeps the strict guard
so the sweep never deletes genuinely pending events. Forensics on real
cleanups only (see _check_running_guard).
"""
if not await self._has_unprocessed_events(task_id):
return
if relax:
if emit_forensics:
logger.warning(
"task_cleanup_unprocessed_events_override",
extra={"task_id": task_id},
)
return
raise ClientError(f"Cannot clean task {task_id}: unprocessed events remain")

async def _last_interaction_at(self, task) -> datetime | None:
"""
True iff the task has no interaction within the idle window.
Most recent interaction timestamp for the task, or None if never.

Last-interaction = max(task.updated_at, latest message created_at).
`task.updated_at` alone would miss tasks where the only recent
activity is Mongo message writes (which don't bump the Postgres row).
`task.updated_at` alone would miss tasks where the only recent activity
is Mongo message writes (which don't bump the Postgres row). Issues one
Mongo message-fetch; callers compute idle-ness against any threshold
from the returned value rather than re-querying per threshold.
"""
cutoff = datetime.now(UTC) - timedelta(days=idle_days)
last_interaction = task.updated_at

latest_messages = await self.task_message_service.get_messages(
Expand All @@ -479,9 +558,14 @@ async def _is_task_idle(self, task, idle_days: int) -> bool:
if last_interaction is None or latest_at > last_interaction:
last_interaction = latest_at

return last_interaction

@staticmethod
def _is_idle_since(last_interaction: datetime | None, idle_days: int) -> bool:
"""Pure idle check against a precomputed last-interaction timestamp."""
if last_interaction is None:
return True
return last_interaction < cutoff
return last_interaction < datetime.now(UTC) - timedelta(days=idle_days)

async def _has_unprocessed_events(self, task_id: str) -> bool:
"""
Expand Down
8 changes: 8 additions & 0 deletions agentex/src/domain/use_cases/task_retention_use_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,30 @@ async def clean_task(
task_id: str,
force: bool = False,
idle_days: int = 7,
stale_running_days: int = 0,
) -> TaskCleanupResultEntity:
"""
force=True is the admin escape hatch; it bypasses the idle-threshold
check (but NOT the active-workflow / unprocessed-events checks, which
protect correctness, not policy).

stale_running_days > 0 relaxes the active-workflow check for tasks that
have sat in RUNNING with no interaction for at least that many days
(abandoned runs that would otherwise be exempt from retention forever).
"""
return await self.retention_service.clean_task(
task_id=task_id,
enforce_idle_threshold=not force,
idle_days=idle_days,
stale_running_days=stale_running_days,
)

async def preview_clean_task(
self,
task_id: str,
force: bool = False,
idle_days: int = 7,
stale_running_days: int = 0,
) -> TaskCleanupResultEntity:
"""
Dry-run counterpart to clean_task: runs the same safety checks without
Expand All @@ -72,6 +79,7 @@ async def preview_clean_task(
task_id=task_id,
enforce_idle_threshold=not force,
idle_days=idle_days,
stale_running_days=stale_running_days,
)

async def rehydrate_task(
Expand Down
19 changes: 16 additions & 3 deletions agentex/src/temporal/activities/retention_cleanup_activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async def load_cleanup_config(self) -> dict:
"page_size": env.RETENTION_CLEANUP_PAGE_SIZE,
"max_in_flight": env.RETENTION_CLEANUP_MAX_IN_FLIGHT,
"dry_run": env.RETENTION_CLEANUP_DRY_RUN,
"stale_running_days": env.RETENTION_CLEANUP_STALE_RUNNING_DAYS,
}

@activity.defn(name=FIND_CLEANUP_CANDIDATES_ACTIVITY)
Expand Down Expand Up @@ -129,7 +130,11 @@ async def find_multi_agent_cleanup_candidates(

@activity.defn(name=CLEAN_TASK_ACTIVITY)
async def clean_task(
self, task_id: str, idle_days: int, dry_run: bool = True
self,
task_id: str,
idle_days: int,
dry_run: bool = True,
stale_running_days: int = 0,
) -> CleanTaskOutcome:
"""
Delete the stored content (messages, states, events) for a single task.
Expand All @@ -139,6 +144,8 @@ async def clean_task(
idle_days: Passed through to the use case for policy checks.
dry_run: When omitted, preview only. Operators must pass False to
enable writes.
stale_running_days: When > 0, RUNNING tasks idle at least this many
days are treated as abandoned and cleaned instead of skipped.

Returns:
CleanTaskOutcome with ``status`` set to ``"cleaned"`` when content was
Expand All @@ -149,7 +156,10 @@ async def clean_task(
try:
if dry_run:
result = await self.use_case.preview_clean_task(
task_id=task_id, force=False, idle_days=idle_days
task_id=task_id,
force=False,
idle_days=idle_days,
stale_running_days=stale_running_days,
)
logger.info(
"task_cleanup_dry_run",
Expand All @@ -164,7 +174,10 @@ async def clean_task(
"events_deleted": 0,
}
result = await self.use_case.clean_task(
task_id=task_id, force=False, idle_days=idle_days
task_id=task_id,
force=False,
idle_days=idle_days,
stale_running_days=stale_running_days,
)
return {
"task_id": result.task_id,
Expand Down
Loading
Loading