diff --git a/admin/slices/reins/components/knowledgeSources/AddFromArchiveForm.vue b/admin/slices/reins/components/knowledgeSources/AddFromArchiveForm.vue new file mode 100644 index 0000000..23d768f --- /dev/null +++ b/admin/slices/reins/components/knowledgeSources/AddFromArchiveForm.vue @@ -0,0 +1,99 @@ + + + diff --git a/admin/slices/reins/components/knowledgeSources/Provider.vue b/admin/slices/reins/components/knowledgeSources/Provider.vue index 45939d1..4033e28 100644 --- a/admin/slices/reins/components/knowledgeSources/Provider.vue +++ b/admin/slices/reins/components/knowledgeSources/Provider.vue @@ -63,6 +63,11 @@ async function onAdded() { @added="onAdded" /> + +
Loading…
diff --git a/admin/slices/reins/stores/knowledge.ts b/admin/slices/reins/stores/knowledge.ts index c9f4eed..8613e15 100644 --- a/admin/slices/reins/stores/knowledge.ts +++ b/admin/slices/reins/stores/knowledge.ts @@ -12,6 +12,14 @@ function isSitemapResult( return typeof obj.added === 'number' && typeof obj.discovered === 'number'; } +function isArchiveResult( + value: unknown, +): value is { detected: number; started: boolean } { + if (typeof value !== 'object' || value === null) return false; + const obj = value as Record; + return typeof obj.detected === 'number' && typeof obj.started === 'boolean'; +} + export type IndexStatus = 'idle' | 'indexing' | 'ready' | 'failed'; export type SourceType = 'file' | 'url' | 'text'; @@ -229,11 +237,30 @@ export const useKnowledgeStore = defineStore('reins-knowledge', () => { form.append('type', 'file'); form.append('name', file.name); form.append('file', file); - const res = await $fetch(`/api/knowledges/${id}/sources`, { - method: 'POST', - body: form, - }); - return unwrap(res); + // Multipart can't go through the generated SDK, so post on its axios + // instance directly: that reuses the configured apiUrl base and the + // Bearer-token interceptor (setup/api apiBaseUrl.ts). A bare '/api/...' + // URL would instead hit the admin origin with no auth. + const res = await apiClient.instance.post( + `/knowledges/${id}/sources`, + form, + ); + return unwrap(res.data); + } + + async function addSourcesFromArchive( + id: string, + file: File, + ): Promise<{ detected: number; started: boolean }> { + const form = new FormData(); + form.append('file', file); + const res = await apiClient.instance.post( + `/knowledges/${id}/sources/from-archive`, + form, + ); + const data = unwrap(res.data); + if (isArchiveResult(data)) return data; + return { detected: 0, started: false }; } async function addSourcesFromSitemap( @@ -243,11 +270,11 @@ export const useKnowledgeStore = defineStore('reins-knowledge', () => { ): Promise<{ added: number; discovered: number }> { const body: { sitemapUrl: string; urlPrefix?: string } = { sitemapUrl }; if (urlPrefix) body.urlPrefix = urlPrefix; - const res = await $fetch( - `/api/knowledges/${id}/sources/from-sitemap`, - { method: 'POST', body }, + const res = await apiClient.instance.post( + `/knowledges/${id}/sources/from-sitemap`, + body, ); - const data = unwrap(res); + const data = unwrap(res.data); if (isSitemapResult(data)) return data; return { added: 0, discovered: 0 }; } @@ -297,6 +324,7 @@ export const useKnowledgeStore = defineStore('reins-knowledge', () => { addUrlSource, addFileSource, addSourcesFromSitemap, + addSourcesFromArchive, removeSource, getGraphLabels, getGraph, diff --git a/api/src/slices/reins/lightrag/data/lightragHttp.client.ts b/api/src/slices/reins/lightrag/data/lightragHttp.client.ts index 6d191a5..f0f85e5 100644 --- a/api/src/slices/reins/lightrag/data/lightragHttp.client.ts +++ b/api/src/slices/reins/lightrag/data/lightragHttp.client.ts @@ -139,13 +139,17 @@ export class LightragHttpClient extends ILightragClient { new Blob([new Uint8Array(input.content)], { type: input.mimeType }), input.filename, ); - const res = await this.fetchImpl(`${cfg.baseUrl}/documents/file`, { + // LightRAG renamed /documents/file -> /documents/upload (the old path + // now 404s, same drift that killed /documents/url). Upload saves the + // file to the input dir and processes it in the background, returning a + // track_id like the text endpoints. + const res = await this.fetchImpl(`${cfg.baseUrl}/documents/upload`, { method: 'POST', headers: this.headers(cfg.apiKey), body: form, }); - await this.ensureOk(res, '/documents/file'); - return this.extractDocId(res, '/documents/file'); + await this.ensureOk(res, '/documents/upload'); + return this.extractDocId(res, '/documents/upload'); } async query(input: IQueryInput): Promise { diff --git a/api/src/slices/reins/source/data/archive.extractor.ts b/api/src/slices/reins/source/data/archive.extractor.ts new file mode 100644 index 0000000..e088a6f --- /dev/null +++ b/api/src/slices/reins/source/data/archive.extractor.ts @@ -0,0 +1,111 @@ +// @scope:api +// @slice:reins/source +// @layer:data +// @type:utility + +import * as path from 'path'; +import * as unzipper from 'unzipper'; +import { Readable } from 'stream'; + +export interface ArchiveEntry { + path: string; + size: number; + openStream: () => Readable; +} + +/** + * Reads the central directory of a zip (cheap, no extraction) and returns + * the entries worth ingesting. openStream is lazy - the actual bytes are + * only read when the caller pipes the stream, so memory stays flat while + * iterating a 500MB archive. The zip file on disk must outlive all + * openStream calls. + */ +export async function listIngestableEntries( + zipPath: string, +): Promise { + const directory = await unzipper.Open.file(zipPath); + return directory.files + .filter( + (f) => f.type === 'File' && isIngestableEntry(f.path, f.uncompressedSize), + ) + .map((f) => ({ + path: f.path, + size: f.uncompressedSize, + openStream: (): Readable => f.stream(), + })); +} + +// Extensions we can extract usable text from downstream (LightRAG ingest). +// Everything else in an archive (images, video, fonts, audio) is skipped. +const SUPPORTED_EXTENSIONS = new Set([ + '.pdf', + '.docx', + '.doc', + '.pptx', + '.ppt', + '.xlsx', + '.xls', + '.txt', + '.md', + '.markdown', + '.html', + '.htm', + '.xml', + '.csv', + '.rtf', + '.json', +]); + +const CONTENT_TYPE_BY_EXT: Record = { + '.pdf': 'application/pdf', + '.docx': + 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', + '.doc': 'application/msword', + '.pptx': + 'application/vnd.openxmlformats-officedocument.presentationml.presentation', + '.ppt': 'application/vnd.ms-powerpoint', + '.xlsx': + 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', + '.xls': 'application/vnd.ms-excel', + '.txt': 'text/plain', + '.md': 'text/markdown', + '.markdown': 'text/markdown', + '.html': 'text/html', + '.htm': 'text/html', + '.xml': 'application/xml', + '.csv': 'text/csv', + '.rtf': 'application/rtf', + '.json': 'application/json', +}; + +/** + * Decide whether an archive entry should become a knowledge source. + * Filters out macOS resource forks, directories, dot-files, and any + * extension we can't extract text from. `size` is the uncompressed size; + * zero-byte entries are skipped (usually directory placeholders). + */ +export function isIngestableEntry(entryPath: string, size: number): boolean { + if (size <= 0) return false; + if (entryPath.endsWith('/')) return false; + + const base = path.basename(entryPath); + if (entryPath.includes('__MACOSX/')) return false; + if (base === '.DS_Store') return false; + if (base.startsWith('._')) return false; + + const ext = path.extname(base).toLowerCase(); + return SUPPORTED_EXTENSIONS.has(ext); +} + +export function contentTypeForEntry(entryPath: string): string { + const ext = path.extname(entryPath).toLowerCase(); + return CONTENT_TYPE_BY_EXT[ext] ?? 'application/octet-stream'; +} + +// Archive entries keep their folder structure (e.g. "Content/foo.pdf"). +// Source names use just the basename for readability, but two files with the +// same basename in different folders would collide on dedup - so callers +// dedup on the full entry path, not this display name. +export function displayNameForEntry(entryPath: string): string { + return path.basename(entryPath); +} diff --git a/api/src/slices/reins/source/data/source.gateway.ts b/api/src/slices/reins/source/data/source.gateway.ts index 7700a63..cb714d7 100644 --- a/api/src/slices/reins/source/data/source.gateway.ts +++ b/api/src/slices/reins/source/data/source.gateway.ts @@ -14,6 +14,7 @@ import { ISourceData, ICreateSourceData, IUploadSourceFileInput, + IUploadSourceStreamInput, IUploadedSourceFile, } from '../domain/source.types'; import { SourceMapper } from './source.mapper'; @@ -113,6 +114,33 @@ export class SourceGateway extends ISourceGateway { return { url: stored.uri }; } + async uploadFileStream( + input: IUploadSourceStreamInput, + ): Promise { + const bucket = await this.requireBucket(); + const key = `${input.knowledgeId}/${crypto.randomUUID()}-${input.filename}`; + // S3Repository uploads buffers via PutObject; the custom/MinIO endpoint + // doesn't accept unbounded streaming bodies. Archive entries are + // processed one at a time, so materializing a single entry keeps peak + // memory bounded to that one file - the same profile as uploadFile. + const chunks: Buffer[] = []; + for await (const chunk of input.body) { + if (!(chunk instanceof Uint8Array)) { + throw new BadRequestException( + 'archive entry stream emitted a non-binary chunk', + ); + } + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + } + const stored = await this.s3.upload({ + bucket, + key, + body: Buffer.concat(chunks), + contentType: input.contentType, + }); + return { url: stored.uri }; + } + async deleteFile(url: string): Promise { const location = S3Repository.parseUri(url); await this.s3.delete(location); diff --git a/api/src/slices/reins/source/domain/source.gateway.ts b/api/src/slices/reins/source/domain/source.gateway.ts index d1658c8..15ba447 100644 --- a/api/src/slices/reins/source/domain/source.gateway.ts +++ b/api/src/slices/reins/source/domain/source.gateway.ts @@ -2,6 +2,7 @@ import { ISourceData, ICreateSourceData, IUploadSourceFileInput, + IUploadSourceStreamInput, IUploadedSourceFile, } from './source.types'; @@ -15,6 +16,9 @@ export abstract class ISourceGateway { abstract uploadFile( input: IUploadSourceFileInput, ): Promise; + abstract uploadFileStream( + input: IUploadSourceStreamInput, + ): Promise; abstract deleteFile(url: string): Promise; abstract indexSource(source: ISourceData): Promise; diff --git a/api/src/slices/reins/source/domain/source.service.ts b/api/src/slices/reins/source/domain/source.service.ts index c96aa0b..7be3f60 100644 --- a/api/src/slices/reins/source/domain/source.service.ts +++ b/api/src/slices/reins/source/domain/source.service.ts @@ -4,12 +4,19 @@ import { Logger, NotFoundException, } from '@nestjs/common'; +import { promises as fs } from 'fs'; import { ISourceGateway } from './source.gateway'; -import { ISourceData } from './source.types'; +import { IArchiveImportResult, ISourceData } from './source.types'; import { fetchSitemapUrls, SitemapError, } from '../data/sitemap.fetcher'; +import { + ArchiveEntry, + contentTypeForEntry, + displayNameForEntry, + listIngestableEntries, +} from '../data/archive.extractor'; export interface IAddFromSitemapResult { added: number; @@ -105,6 +112,104 @@ export class SourceService { await this.gateway.delete(id); } + /** + * Accepts an already-saved zip on disk, lists its ingestable entries, and + * kicks off a background import (one file-source per entry, streamed to + * S3). Returns immediately with the detected count so the HTTP request + * doesn't hang for the minutes a large archive takes. The caller-owned + * zip at zipPath is deleted once the background pass finishes. Indexing + * into LightRAG is NOT triggered here - that stays the explicit Index + * action. + */ + async addFromArchive( + knowledgeId: string, + zipPath: string, + ): Promise { + let entries: ArchiveEntry[]; + try { + entries = await listIngestableEntries(zipPath); + } catch (err) { + await this.safeUnlink(zipPath); + throw new BadRequestException( + `Could not read archive: ${errorMessage(err)}`, + ); + } + if (entries.length === 0) { + await this.safeUnlink(zipPath); + throw new BadRequestException( + 'Archive contains no ingestable files (pdf, docx, xlsx, txt, html, ...).', + ); + } + void this.runArchiveImport(knowledgeId, zipPath, entries); + return { detected: entries.length, started: true }; + } + + private async runArchiveImport( + knowledgeId: string, + zipPath: string, + entries: ArchiveEntry[], + ): Promise { + let added = 0; + let skipped = 0; + let failed = 0; + try { + const existing = await this.gateway.findByKnowledgeId(knowledgeId); + const existingNames = new Set( + existing.filter((s) => s.type === 'file').map((s) => s.name), + ); + const seenPaths = new Set(); + + for (const entry of entries) { + const name = displayNameForEntry(entry.path); + if (seenPaths.has(entry.path) || existingNames.has(name)) { + skipped += 1; + continue; + } + seenPaths.add(entry.path); + const contentType = contentTypeForEntry(entry.path); + try { + const stored = await this.gateway.uploadFileStream({ + knowledgeId, + filename: name, + body: entry.openStream(), + contentType, + }); + await this.gateway.create({ + knowledgeId, + type: 'file', + name, + url: stored.url, + mimeType: contentType, + sizeBytes: entry.size, + }); + added += 1; + } catch (err) { + failed += 1; + this.logger.warn( + `archive entry failed ${entry.path}: ${errorMessage(err)}`, + ); + } + } + this.logger.log( + `archive import for ${knowledgeId}: added=${added} skipped=${skipped} failed=${failed}`, + ); + } catch (err) { + this.logger.error( + `archive import crashed for ${knowledgeId}: ${errorMessage(err)}`, + ); + } finally { + await this.safeUnlink(zipPath); + } + } + + private async safeUnlink(filePath: string): Promise { + try { + await fs.unlink(filePath); + } catch (err) { + this.logger.warn(`failed to remove temp archive ${filePath}: ${errorMessage(err)}`); + } + } + indexSource(source: ISourceData): Promise { return this.gateway.indexSource(source); } diff --git a/api/src/slices/reins/source/domain/source.types.ts b/api/src/slices/reins/source/domain/source.types.ts index da36d69..80957d6 100644 --- a/api/src/slices/reins/source/domain/source.types.ts +++ b/api/src/slices/reins/source/domain/source.types.ts @@ -1,3 +1,5 @@ +import { Readable } from 'stream'; + export type SourceTypes = 'file' | 'url' | 'text'; export interface ISourceData { @@ -31,6 +33,18 @@ export interface IUploadSourceFileInput { contentType: string; } +export interface IUploadSourceStreamInput { + knowledgeId: string; + filename: string; + body: Readable; + contentType: string; +} + export interface IUploadedSourceFile { url: string; } + +export interface IArchiveImportResult { + detected: number; + started: boolean; +} diff --git a/api/src/slices/reins/source/dtos/addFromArchive.dto.ts b/api/src/slices/reins/source/dtos/addFromArchive.dto.ts new file mode 100644 index 0000000..aa042ee --- /dev/null +++ b/api/src/slices/reins/source/dtos/addFromArchive.dto.ts @@ -0,0 +1,13 @@ +import { ApiProperty } from '@nestjs/swagger'; + +export class AddFromArchiveResultDto { + @ApiProperty({ + example: 288, + description: + 'Number of ingestable files detected in the archive. Import runs in the background; refresh the sources list to watch them appear.', + }) + detected: number; + + @ApiProperty({ example: true }) + started: boolean; +} diff --git a/api/src/slices/reins/source/dtos/index.ts b/api/src/slices/reins/source/dtos/index.ts index ad37ce4..560f42c 100644 --- a/api/src/slices/reins/source/dtos/index.ts +++ b/api/src/slices/reins/source/dtos/index.ts @@ -1,3 +1,4 @@ export * from './source.dto'; export * from './createSource.dto'; export * from './addFromSitemap.dto'; +export * from './addFromArchive.dto'; diff --git a/api/src/slices/reins/source/source.controller.ts b/api/src/slices/reins/source/source.controller.ts index eb33156..3bf4ee3 100644 --- a/api/src/slices/reins/source/source.controller.ts +++ b/api/src/slices/reins/source/source.controller.ts @@ -11,9 +11,30 @@ import { BadRequestException, } from '@nestjs/common'; import { FileInterceptor } from '@nestjs/platform-express'; -import { ApiTags, ApiOperation, ApiConsumes, ApiResponse } from '@nestjs/swagger'; +import { + ApiTags, + ApiOperation, + ApiConsumes, + ApiResponse, +} from '@nestjs/swagger'; +import { promises as fs } from 'fs'; +import * as os from 'os'; +import * as path from 'path'; +import { randomUUID } from 'crypto'; import { SourceService } from './domain/source.service'; -import { AddFromSitemapDto, AddFromSitemapResultDto, CreateSourceDto } from './dtos'; +import { + AddFromArchiveResultDto, + AddFromSitemapDto, + AddFromSitemapResultDto, + CreateSourceDto, +} from './dtos'; + +// Cap archive uploads at 1 GiB. The upload is buffered in memory (multer's +// default storage) and then written to a temp file for extraction. We can't +// use multer's diskStorage here: under the Bun workspace `multer` is a phantom +// dependency of @nestjs/platform-express, so `import { diskStorage } from +// 'multer'` does not resolve at runtime. +const ONE_GIB = 1024 * 1024 * 1024; interface UploadedFileLike { originalname: string; @@ -97,6 +118,33 @@ export class SourceController { ); } + @Post('from-archive') + @ApiOperation({ + summary: 'Bulk-import sources from a zip archive', + operationId: 'addKnowledgeSourcesFromArchive', + description: + 'Accepts a .zip, extracts every ingestable file (pdf, docx, xlsx, txt, html, ...), and creates one file-type source per entry. Upload runs in the background and streams each entry to S3; the response returns immediately with the detected file count. Indexing into LightRAG happens through the normal reindex flow.', + }) + @ApiConsumes('multipart/form-data') + @ApiResponse({ status: 201, type: AddFromArchiveResultDto }) + @UseInterceptors(FileInterceptor('file', { limits: { fileSize: ONE_GIB } })) + async addFromArchive( + @Param('knowledgeId') knowledgeId: string, + @UploadedFile() file?: UploadedFileLike, + ): Promise { + if (!file) { + throw new BadRequestException('zip file is required (field "file")'); + } + // The service consumes and then deletes a zip already saved on disk (see + // its doc comment), so persist the in-memory upload to a temp file first. + const zipPath = path.join( + os.tmpdir(), + `ranch-knowledge-archive-${randomUUID()}.zip`, + ); + await fs.writeFile(zipPath, file.buffer); + return this.service.addFromArchive(knowledgeId, zipPath); + } + @Delete(':sourceId') @ApiOperation({ summary: 'Delete source', diff --git a/terraform/modules/storage/main.tf b/terraform/modules/storage/main.tf index e7af427..41857df 100644 --- a/terraform/modules/storage/main.tf +++ b/terraform/modules/storage/main.tf @@ -9,7 +9,7 @@ terraform { variable "environment" { type = string - description = "Environment name — used as suffix on bucket + IAM user (e.g. dreamvention)" + description = "Environment name - used as suffix on bucket + IAM user (e.g. dreamvention)" } variable "bucket_name" { @@ -31,8 +31,9 @@ variable "secret_name_prefix" { } locals { - bucket_name = var.bucket_name != "" ? var.bucket_name : "ranch-agent-data-${var.environment}" - iam_user = "ranch-agent-${var.environment}" + bucket_name = var.bucket_name != "" ? var.bucket_name : "ranch-agent-data-${var.environment}" + reins_bucket_name = "ranch-reins-sources-${var.environment}" + iam_user = "ranch-agent-${var.environment}" } # --------------------------------------------------------------------- @@ -71,23 +72,26 @@ resource "aws_s3_bucket_public_access_block" "agent_data" { } # --------------------------------------------------------------------- -# S3 bucket for Postgres backups (CNPG barmanObjectStore) +# S3 bucket for Reins / Knowledge source files (PDFs, docs, archives +# uploaded by operators via the knowledge admin UI). ranch-api stores +# files here under /-; LightRAG ingests +# them through ranch-api which fetches the bytes back from S3. # --------------------------------------------------------------------- -resource "aws_s3_bucket" "pg_backups" { - bucket = "ranch-pg-backups-${var.environment}" +resource "aws_s3_bucket" "reins_sources" { + bucket = local.reins_bucket_name } -resource "aws_s3_bucket_versioning" "pg_backups" { - bucket = aws_s3_bucket.pg_backups.id +resource "aws_s3_bucket_versioning" "reins_sources" { + bucket = aws_s3_bucket.reins_sources.id versioning_configuration { status = "Enabled" } } -resource "aws_s3_bucket_server_side_encryption_configuration" "pg_backups" { - bucket = aws_s3_bucket.pg_backups.id +resource "aws_s3_bucket_server_side_encryption_configuration" "reins_sources" { + bucket = aws_s3_bucket.reins_sources.id rule { apply_server_side_encryption_by_default { @@ -96,8 +100,8 @@ resource "aws_s3_bucket_server_side_encryption_configuration" "pg_backups" { } } -resource "aws_s3_bucket_public_access_block" "pg_backups" { - bucket = aws_s3_bucket.pg_backups.id +resource "aws_s3_bucket_public_access_block" "reins_sources" { + bucket = aws_s3_bucket.reins_sources.id block_public_acls = true ignore_public_acls = true @@ -105,6 +109,47 @@ resource "aws_s3_bucket_public_access_block" "pg_backups" { restrict_public_buckets = true } +# --------------------------------------------------------------------- +# Postgres backups bucket retired. +# +# The original lightrag-postgres lived on CloudNativePG with +# barmanObjectStore writing to ranch-pg-backups-. We migrated to a +# plain Postgres Deployment that does not back up; the bucket is now +# unused. The `removed` block instructs terraform to drop these +# resources from state without deleting the AWS bucket itself, so any +# residual backup archive stays around for manual cleanup. +# Requires terraform >= 1.7. On older versions, replace with +# `terraform state rm` before deleting the resource blocks. +# --------------------------------------------------------------------- + +removed { + from = aws_s3_bucket.pg_backups + lifecycle { + destroy = false + } +} + +removed { + from = aws_s3_bucket_versioning.pg_backups + lifecycle { + destroy = false + } +} + +removed { + from = aws_s3_bucket_server_side_encryption_configuration.pg_backups + lifecycle { + destroy = false + } +} + +removed { + from = aws_s3_bucket_public_access_block.pg_backups + lifecycle { + destroy = false + } +} + # --------------------------------------------------------------------- # IAM user scoped to this bucket (optional) # --------------------------------------------------------------------- @@ -149,13 +194,13 @@ data "aws_iam_policy_document" "agent_bucket_access" { } statement { - sid = "PgBackupsBucketLevel" + sid = "ReinsSourcesBucketLevel" actions = ["s3:ListBucket", "s3:GetBucketLocation"] - resources = [aws_s3_bucket.pg_backups.arn] + resources = [aws_s3_bucket.reins_sources.arn] } statement { - sid = "PgBackupsObjectLevel" + sid = "ReinsSourcesObjectLevel" actions = [ "s3:GetObject", "s3:PutObject", @@ -163,7 +208,7 @@ data "aws_iam_policy_document" "agent_bucket_access" { "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts", ] - resources = ["${aws_s3_bucket.pg_backups.arn}/*"] + resources = ["${aws_s3_bucket.reins_sources.arn}/*"] } } @@ -205,10 +250,10 @@ output "secret_access_key" { sensitive = true } -output "pg_backups_bucket_name" { - value = aws_s3_bucket.pg_backups.id +output "reins_sources_bucket_name" { + value = aws_s3_bucket.reins_sources.id } -output "pg_backups_bucket_region" { - value = aws_s3_bucket.pg_backups.region +output "reins_sources_bucket_region" { + value = aws_s3_bucket.reins_sources.region }