Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 74 additions & 10 deletions go/internal/graph/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,48 @@ var nodeColumns = []string{
"props",
}

// BulkLoadNodes writes nodes to a temporary CSV file and ingests via Kuzu's
// COPY FROM. This is materially faster than per-node CREATE for the
// enrich-phase volumes we hit (44k files / 100k+ nodes). Empty input is a
// no-op (an empty CSV would still issue a COPY, which Kuzu may reject; the
// no-op behaviour also matches Java's bulkSave convention).
// bulkLoadBatchSize caps the number of rows materialised into any single
// staging CSV / `COPY FROM` call. Kuzu buffers the full CSV in process
// memory during ingest; on real-world polyglot targets (~/projects-scale
// 49k files / 434k nodes) a single CSV pushed the process past the box's
// 15 GiB RAM ceiling and got it OOM-killed. 50k rows keeps the peak
// COPY-side resident set well under 1 GiB while still amortising the
// per-statement Kuzu overhead. Override via CODEIQ_BULK_BATCH_SIZE env
// (validated in resolveBulkBatchSize) for downstream perf tuning.
const bulkLoadBatchSize = 50_000

// BulkLoadNodes writes nodes to one or more temporary CSV files and
// ingests them via Kuzu's COPY FROM, in batches of bulkLoadBatchSize.
// This is materially faster than per-node CREATE for the enrich-phase
// volumes we hit (44k files / 100k+ nodes). Empty input is a no-op (an
// empty CSV would still issue a COPY, which Kuzu may reject; the no-op
// behaviour also matches Java's bulkSave convention).
//
// Each batch is staged + ingested + cleaned up before the next batch
// starts so that neither the on-disk CSV footprint nor Kuzu's ingest
// buffer ever holds more than bulkLoadBatchSize rows. Cypher uniqueness
// constraints are still enforced cross-batch, so a duplicate primary
// key surfaces the same Copy exception either way.
func (s *Store) BulkLoadNodes(nodes []*model.CodeNode) error {
if len(nodes) == 0 {
return nil
}
batchSize := resolveBulkBatchSize()
for start := 0; start < len(nodes); start += batchSize {
end := start + batchSize
if end > len(nodes) {
end = len(nodes)
}
if err := s.copyNodeBatch(nodes[start:end]); err != nil {
return err
}
}
return nil
}

// copyNodeBatch stages a single CSV for `batch` and runs one Kuzu COPY
// FROM. Caller is responsible for slicing input into batches.
func (s *Store) copyNodeBatch(batch []*model.CodeNode) error {
tmp, err := os.CreateTemp("", "codeiq-nodes-*.csv")
if err != nil {
return fmt.Errorf("graph: temp csv: %w", err)
Expand All @@ -41,7 +74,7 @@ func (s *Store) BulkLoadNodes(nodes []*model.CodeNode) error {
defer os.Remove(tmp.Name())

w := csv.NewWriter(tmp)
for _, n := range nodes {
for _, n := range batch {
row, err := encodeNodeRow(n)
if err != nil {
tmp.Close()
Expand Down Expand Up @@ -74,6 +107,19 @@ func (s *Store) BulkLoadNodes(nodes []*model.CodeNode) error {
return nil
}

// resolveBulkBatchSize honours CODEIQ_BULK_BATCH_SIZE when set to a
// positive integer; otherwise returns the compiled-in default. Invalid
// values silently fall back to the default so a typo in the env never
// blocks enrichment.
func resolveBulkBatchSize() int {
if raw := os.Getenv("CODEIQ_BULK_BATCH_SIZE"); raw != "" {
if v, err := strconv.Atoi(raw); err == nil && v > 0 {
return v
}
}
return bulkLoadBatchSize
}

// encodeNodeRow serialises one CodeNode into the column order declared by
// nodeColumns. Numeric INT64 columns are emitted as empty strings when zero
// so Kuzu treats them as NULL rather than 0 (line_start/line_end on
Expand Down Expand Up @@ -152,18 +198,36 @@ func (s *Store) BulkLoadEdges(edges []*model.CodeEdge) error {
return nil
}

// copyEdgeGroup stages one rel-table CSV and issues COPY <REL> FROM. The
// first two columns are the FROM and TO node primary keys per Kuzu's rel
// COPY convention.
// copyEdgeGroup stages rel-table CSVs in batches of bulkLoadBatchSize
// and issues one COPY <REL> FROM per batch. The first two columns are
// the FROM and TO node primary keys per Kuzu's rel COPY convention.
// Same memory rationale as BulkLoadNodes — Kuzu buffers the full CSV
// in ingest, so chunking caps peak resident memory.
func (s *Store) copyEdgeGroup(kind model.EdgeKind, edges []*model.CodeEdge) error {
batchSize := resolveBulkBatchSize()
for start := 0; start < len(edges); start += batchSize {
end := start + batchSize
if end > len(edges) {
end = len(edges)
}
if err := s.copyEdgeBatch(kind, edges[start:end]); err != nil {
return err
}
}
return nil
}

// copyEdgeBatch stages a single rel-table CSV for `batch` and runs one
// Kuzu COPY FROM.
func (s *Store) copyEdgeBatch(kind model.EdgeKind, batch []*model.CodeEdge) error {
tmp, err := os.CreateTemp("", "codeiq-edges-*.csv")
if err != nil {
return fmt.Errorf("graph: temp csv: %w", err)
}
defer os.Remove(tmp.Name())

w := csv.NewWriter(tmp)
for _, e := range edges {
for _, e := range batch {
props, err := json.Marshal(e.Properties)
if err != nil {
tmp.Close()
Expand Down
Loading