diff --git a/go/internal/graph/bulk.go b/go/internal/graph/bulk.go index 2d463efd..dd7a6ec5 100644 --- a/go/internal/graph/bulk.go +++ b/go/internal/graph/bulk.go @@ -24,15 +24,48 @@ var nodeColumns = []string{ "props", } -// BulkLoadNodes writes nodes to a temporary CSV file and ingests via Kuzu's -// COPY FROM. This is materially faster than per-node CREATE for the -// enrich-phase volumes we hit (44k files / 100k+ nodes). Empty input is a -// no-op (an empty CSV would still issue a COPY, which Kuzu may reject; the -// no-op behaviour also matches Java's bulkSave convention). +// bulkLoadBatchSize caps the number of rows materialised into any single +// staging CSV / `COPY FROM` call. Kuzu buffers the full CSV in process +// memory during ingest; on real-world polyglot targets (~/projects-scale +// 49k files / 434k nodes) a single CSV pushed the process past the box's +// 15 GiB RAM ceiling and got it OOM-killed. 50k rows keeps the peak +// COPY-side resident set well under 1 GiB while still amortising the +// per-statement Kuzu overhead. Override via CODEIQ_BULK_BATCH_SIZE env +// (validated in resolveBulkBatchSize) for downstream perf tuning. +const bulkLoadBatchSize = 50_000 + +// BulkLoadNodes writes nodes to one or more temporary CSV files and +// ingests them via Kuzu's COPY FROM, in batches of bulkLoadBatchSize. +// This is materially faster than per-node CREATE for the enrich-phase +// volumes we hit (44k files / 100k+ nodes). Empty input is a no-op (an +// empty CSV would still issue a COPY, which Kuzu may reject; the no-op +// behaviour also matches Java's bulkSave convention). +// +// Each batch is staged + ingested + cleaned up before the next batch +// starts so that neither the on-disk CSV footprint nor Kuzu's ingest +// buffer ever holds more than bulkLoadBatchSize rows. Cypher uniqueness +// constraints are still enforced cross-batch, so a duplicate primary +// key surfaces the same Copy exception either way. func (s *Store) BulkLoadNodes(nodes []*model.CodeNode) error { if len(nodes) == 0 { return nil } + batchSize := resolveBulkBatchSize() + for start := 0; start < len(nodes); start += batchSize { + end := start + batchSize + if end > len(nodes) { + end = len(nodes) + } + if err := s.copyNodeBatch(nodes[start:end]); err != nil { + return err + } + } + return nil +} + +// copyNodeBatch stages a single CSV for `batch` and runs one Kuzu COPY +// FROM. Caller is responsible for slicing input into batches. +func (s *Store) copyNodeBatch(batch []*model.CodeNode) error { tmp, err := os.CreateTemp("", "codeiq-nodes-*.csv") if err != nil { return fmt.Errorf("graph: temp csv: %w", err) @@ -41,7 +74,7 @@ func (s *Store) BulkLoadNodes(nodes []*model.CodeNode) error { defer os.Remove(tmp.Name()) w := csv.NewWriter(tmp) - for _, n := range nodes { + for _, n := range batch { row, err := encodeNodeRow(n) if err != nil { tmp.Close() @@ -74,6 +107,19 @@ func (s *Store) BulkLoadNodes(nodes []*model.CodeNode) error { return nil } +// resolveBulkBatchSize honours CODEIQ_BULK_BATCH_SIZE when set to a +// positive integer; otherwise returns the compiled-in default. Invalid +// values silently fall back to the default so a typo in the env never +// blocks enrichment. +func resolveBulkBatchSize() int { + if raw := os.Getenv("CODEIQ_BULK_BATCH_SIZE"); raw != "" { + if v, err := strconv.Atoi(raw); err == nil && v > 0 { + return v + } + } + return bulkLoadBatchSize +} + // encodeNodeRow serialises one CodeNode into the column order declared by // nodeColumns. Numeric INT64 columns are emitted as empty strings when zero // so Kuzu treats them as NULL rather than 0 (line_start/line_end on @@ -152,10 +198,28 @@ func (s *Store) BulkLoadEdges(edges []*model.CodeEdge) error { return nil } -// copyEdgeGroup stages one rel-table CSV and issues COPY FROM. The -// first two columns are the FROM and TO node primary keys per Kuzu's rel -// COPY convention. +// copyEdgeGroup stages rel-table CSVs in batches of bulkLoadBatchSize +// and issues one COPY FROM per batch. The first two columns are +// the FROM and TO node primary keys per Kuzu's rel COPY convention. +// Same memory rationale as BulkLoadNodes — Kuzu buffers the full CSV +// in ingest, so chunking caps peak resident memory. func (s *Store) copyEdgeGroup(kind model.EdgeKind, edges []*model.CodeEdge) error { + batchSize := resolveBulkBatchSize() + for start := 0; start < len(edges); start += batchSize { + end := start + batchSize + if end > len(edges) { + end = len(edges) + } + if err := s.copyEdgeBatch(kind, edges[start:end]); err != nil { + return err + } + } + return nil +} + +// copyEdgeBatch stages a single rel-table CSV for `batch` and runs one +// Kuzu COPY FROM. +func (s *Store) copyEdgeBatch(kind model.EdgeKind, batch []*model.CodeEdge) error { tmp, err := os.CreateTemp("", "codeiq-edges-*.csv") if err != nil { return fmt.Errorf("graph: temp csv: %w", err) @@ -163,7 +227,7 @@ func (s *Store) copyEdgeGroup(kind model.EdgeKind, edges []*model.CodeEdge) erro defer os.Remove(tmp.Name()) w := csv.NewWriter(tmp) - for _, e := range edges { + for _, e := range batch { props, err := json.Marshal(e.Properties) if err != nil { tmp.Close()