Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions dev/compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
# query: <optional connection-string query string, without leading '?'>
#
# A service with no `x-test-target` is not a test target and is ignored by the
# registry.
# registry (e.g. the mongot sidecar, which is reached only through its mongod).
#
# Memory: each mongod caps its WiredTiger cache (--wiredTigerCacheSizeGB). By
# default a mongod sizes its cache to ~50% of the host/VM RAM; with several
Expand Down Expand Up @@ -60,7 +60,26 @@ services:
mongo-replset:
image: mongo:8.2.4
profiles: ["mongo-replset", "all"]
command: ["--replSet", "rs0", "--bind_ip_all", "--wiredTigerCacheSizeGB", "1.5"]
command:
- "--replSet"
- "rs0"
- "--bind_ip_all"
- "--wiredTigerCacheSizeGB"
- "1.5"
# Point at the mongot search sidecar so this replica set also serves the
# search surfaces. mongot is transparent to all other behavior, so the
# set behaves identically to a plain replica set apart from gaining
# search; it is one target, not two.
- "--setParameter"
- "mongotHost=mongot:27028"
- "--setParameter"
- "searchIndexManagementHostAndPort=mongot:27028"
- "--setParameter"
- "useGrpcForSearch=true"
- "--setParameter"
- "skipAuthenticationToMongot=true"
- "--setParameter"
- "skipAuthenticationToSearchIndexManagementServer=true"
ports:
- "27018:27017"
healthcheck:
Expand All @@ -71,3 +90,29 @@ services:
x-test-target:
engine: mongodb
query: directConnection=true

# mongot: the search sidecar for the mongo-replset target. Not a test target
# on its own; the suite reaches it only through mongo-replset. mongot is
# MongoDB Search Community Edition (SSPL, same license as the server). It
# replicates from the replica set as an authenticated sync source and reads
# its password from a file, so the entrypoint writes that file (a fixed
# local-dev secret, matched by the searchCoordinator user the harness creates
# on the replica set) with owner-only permissions before launching. It retries
# the connection until that user exists.
mongot:
image: mongodb/mongodb-community-search:latest
profiles: ["mongo-replset", "all"]
entrypoint:
- "sh"
- "-c"
- >
umask 077 &&
mkdir -p /mongot-secrets &&
printf '%s' "$$MONGOT_SYNC_PASSWORD" > /mongot-secrets/passwordFile &&
exec /mongot-community/mongot --config /mongot-config/mongot.yml
environment:
# Fixed local-dev secret shared with the searchCoordinator user the
# harness provisions on mongo-replset. Not a real credential.
MONGOT_SYNC_PASSWORD: "searchSyncPassword"
volumes:
- ./mongot.yml:/mongot-config/mongot.yml:ro
30 changes: 30 additions & 0 deletions dev/mongot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# mongot configuration for the mongo-replset target (dev/compose.yaml service
# "mongot"). mongot is MongoDB Search Community Edition (SSPL), the same license
# as the server. It runs alongside the replica set's mongod and serves the
# search and vector search surfaces.
#
# mongot replicates from the mongod replica set as a sync source. It requires an
# authenticated connection (it has no unauthenticated mode), so it logs in as a
# dedicated user holding the searchCoordinator role. That user and its password
# file are provisioned by the target's startup (see dev/compose.yaml).
syncSource:
replicaSet:
hostAndPort: "mongo-replset:27017"
username: "searchSyncUser"
passwordFile: "/mongot-secrets/passwordFile"
authSource: "admin"
tls: false
storage:
dataPath: "/var/lib/mongot"
server:
grpc:
# mongod reaches mongot here (see mongotHost / searchIndexManagementHostAndPort
# on the mongo-replset service). Bound on all interfaces so the mongod
# container can connect over the compose network.
address: "0.0.0.0:27028"
tls:
mode: "disabled"
healthCheck:
address: "0.0.0.0:8080"
logging:
verbosity: INFO
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
"""Tests for $vectorSearch pipeline position constraints and stage placement."""

from __future__ import annotations

import time

import pytest

from documentdb_tests.compatibility.tests.core.operator.stages.utils.stage_test_case import (
StageTestCase,
)
from documentdb_tests.framework import fixtures
from documentdb_tests.framework.assertions import assertResult
from documentdb_tests.framework.error_codes import (
FACET_PIPELINE_INVALID_STAGE_ERROR,
LOOKUP_SUB_PIPELINE_NOT_ALLOWED_ERROR,
NOT_FIRST_STAGE_ERROR,
)
from documentdb_tests.framework.executor import execute_command
from documentdb_tests.framework.parametrize import pytest_params
from documentdb_tests.framework.test_constants import DOUBLE_ZERO

pytestmark = pytest.mark.requires(search=True)

_POSITION_CORPUS = [
{"_id": 1, "vec": [1.0, DOUBLE_ZERO, DOUBLE_ZERO]},
{"_id": 2, "vec": [0.8, 0.2, DOUBLE_ZERO]},
{"_id": 3, "vec": [0.6, 0.4, DOUBLE_ZERO]},
]

_INDEX_READY_TIMEOUT_SECONDS = 120


@pytest.fixture(scope="module")
def position_collection(engine_client, worker_id):
"""A module-scoped collection with a READY cosine vectorSearch index, shared
read-only across the placement cases so the index is built and polled once
rather than per test. The collection carries a fixed name so the
$unionWith/$lookup sub-pipeline cases can reference it as their source."""
db_name = fixtures.generate_database_name("stages_vectorSearch_position", worker_id)
fixtures.cleanup_database(engine_client, db_name)
db = engine_client[db_name]
coll = db["position"]
coll.insert_many([dict(doc) for doc in _POSITION_CORPUS])
db.command(
{
"createSearchIndexes": coll.name,
"indexes": [
{
"name": "vs_position_index",
"type": "vectorSearch",
"definition": {
"fields": [
{
"type": "vector",
"path": "vec",
"numDimensions": 3,
"similarity": "cosine",
},
]
},
}
],
}
)
deadline = time.monotonic() + _INDEX_READY_TIMEOUT_SECONDS
while time.monotonic() < deadline:
indexes = list(coll.aggregate([{"$listSearchIndexes": {}}]))
if indexes and indexes[0].get("status") == "READY":
break
time.sleep(2)
else:
raise TimeoutError("vectorSearch index did not reach READY state")
yield coll
fixtures.cleanup_database(engine_client, db_name)


# Property [Stage Placement Allowed]: $vectorSearch succeeds as the first stage
# of the main pipeline and as the first stage of a $unionWith sub-pipeline.
VECTORSEARCH_PLACEMENT_TESTS: list[StageTestCase] = [
StageTestCase(
"first_stage_main_pipeline",
pipeline=[
{
"$vectorSearch": {
"index": "vs_position_index",
"path": "vec",
"queryVector": [1.0, DOUBLE_ZERO, DOUBLE_ZERO],
"numCandidates": 10,
"limit": 3,
}
},
{"$project": {"_id": 1}},
],
expected=[{"_id": 1}, {"_id": 2}, {"_id": 3}],
msg="$vectorSearch should succeed as the first stage of the main pipeline",
),
StageTestCase(
"first_stage_union_with_sub_pipeline",
pipeline=[
{"$match": {"_id": {"$lt": 0}}},
{
"$unionWith": {
"coll": "position",
"pipeline": [
{
"$vectorSearch": {
"index": "vs_position_index",
"path": "vec",
"queryVector": [1.0, DOUBLE_ZERO, DOUBLE_ZERO],
"numCandidates": 10,
"limit": 3,
}
},
{"$project": {"_id": 1}},
],
}
},
],
expected=[{"_id": 1}, {"_id": 2}, {"_id": 3}],
msg="$vectorSearch should succeed as the first stage of a $unionWith sub-pipeline",
),
]

# Property [Stage Placement Errors]: $vectorSearch is rejected when it is not the
# first stage of a pipeline, when nested in a $facet sub-pipeline, and when
# nested in a $lookup sub-pipeline.
VECTORSEARCH_PLACEMENT_ERROR_TESTS: list[StageTestCase] = [
StageTestCase(
"not_first_stage",
pipeline=[
{"$match": {"_id": 1}},
{
"$vectorSearch": {
"index": "vs_position_index",
"path": "vec",
"queryVector": [1.0, DOUBLE_ZERO, DOUBLE_ZERO],
"numCandidates": 10,
"limit": 3,
}
},
],
error_code=NOT_FIRST_STAGE_ERROR,
msg="$vectorSearch should be rejected when it is not the first stage",
),
StageTestCase(
"inside_facet",
pipeline=[
{
"$facet": {
"results": [
{
"$vectorSearch": {
"index": "vs_position_index",
"path": "vec",
"queryVector": [1.0, DOUBLE_ZERO, DOUBLE_ZERO],
"numCandidates": 10,
"limit": 3,
}
},
],
}
},
],
error_code=FACET_PIPELINE_INVALID_STAGE_ERROR,
msg="$vectorSearch should be rejected inside a $facet sub-pipeline",
),
StageTestCase(
"inside_lookup_sub_pipeline",
pipeline=[
{
"$lookup": {
"from": "position",
"pipeline": [
{
"$vectorSearch": {
"index": "vs_position_index",
"path": "vec",
"queryVector": [1.0, DOUBLE_ZERO, DOUBLE_ZERO],
"numCandidates": 10,
"limit": 3,
}
},
],
"as": "matches",
}
},
],
error_code=LOOKUP_SUB_PIPELINE_NOT_ALLOWED_ERROR,
msg="$vectorSearch should be rejected inside a $lookup sub-pipeline",
),
]

VECTORSEARCH_POSITION_TESTS: list[StageTestCase] = (
VECTORSEARCH_PLACEMENT_TESTS + VECTORSEARCH_PLACEMENT_ERROR_TESTS
)


@pytest.mark.aggregate
@pytest.mark.parametrize("test_case", pytest_params(VECTORSEARCH_POSITION_TESTS))
def test_vectorSearch_position(position_collection, test_case: StageTestCase):
"""Test $vectorSearch pipeline position constraints and rejections."""
result = execute_command(
position_collection,
{
"aggregate": position_collection.name,
"pipeline": test_case.pipeline,
"cursor": {},
},
)
assertResult(
result,
expected=test_case.expected,
error_code=test_case.error_code,
msg=test_case.msg,
)
Loading
Loading