Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ Two active binaries under `cmd/`:
| Service | Port | Role |
|---|---|---|
| `cmd/api` | 8080 | REST API — reads/writes places and accessibility data |
| `cmd/ingestion` | — | Placeholder — future OpenStreetMap sync |
| `cmd/ingestion` | — | Batch worker: reads OSM .osm.pbf files, filters POIs, upserts to the places table |

**Data flow:**
1. Client submits accessibility data via API
Expand All @@ -53,6 +53,15 @@ The API is a pure data layer. It stores and returns accessibility facts; it neve

**`internal/geo`** — PostGIS spatial queries (`ST_DWithin`, `ST_MakeEnvelope`)

**`internal/osm`** — OpenStreetMap ingestion:
- `Evaluate(tags)` — allowlist-based POI filter; returns matched category or excludes
- `TransformNode(...)` — converts an OSM node into a `models.Place`
- `DeriveRank(...)` — maps category + tags to RankLandmark / RankEstablishment / RankFeature
- `StreamNodes(ctx, r, sink)` — streams OSM nodes from a `.osm.pbf` reader via paulmach/osm

**`internal/place`** — Repository for the places table:
- `UpsertBatch(ctx, places)` — bulk insert/update using `(osm_id, osm_type)` conflict key

**`internal/db`** — GORM setup, AutoMigrate, PostGIS spatial index creation

## Patterns
Expand Down
67 changes: 67 additions & 0 deletions cmd/ingestion/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (C) 2026 InWheel Contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/

package main

import (
"fmt"
"strconv"
"strings"
)

const defaultDBPort = 5432

// config holds the configuration for cmd/ingestion. All values come from environment variables.
type config struct {
DBHost string
DBPort int
DBUser string
DBPassword string
DBName string
DBSSLMode string
OSMPBFPath string
}

// loadConfig reads configuration from the given map (typically os.Environ wrapped as a map).
// Required variables that are absent or malformed produce a single error listing every problem.
func loadConfig(env map[string]string) (config, error) {
var errs []string

osmPath := env["OSM_PBF_PATH"]
if strings.TrimSpace(osmPath) == "" {
errs = append(errs, "OSM_PBF_PATH is required but was not set")
}

port := defaultDBPort
if raw, ok := env["DB_PORT"]; ok && raw != "" {
parsed, err := strconv.Atoi(raw)
if err != nil {
errs = append(errs, fmt.Sprintf("DB_PORT must be an integer, got: %q", raw))
} else {
port = parsed
}
}

if len(errs) > 0 {
return config{}, fmt.Errorf("invalid configuration:\n - %s", strings.Join(errs, "\n - "))
}

return config{
DBHost: valueOr(env, "DB_HOST", "localhost"),
DBPort: port,
DBUser: valueOr(env, "DB_USER", "postgres"),
DBPassword: valueOr(env, "DB_PASSWORD", "postgres"),
DBName: valueOr(env, "DB_NAME", "inwheel"),
DBSSLMode: valueOr(env, "DB_SSLMODE", "disable"),
OSMPBFPath: osmPath,
}, nil
}

func valueOr(env map[string]string, key, fallback string) string {
if v, ok := env[key]; ok && v != "" {
return v
}
return fallback
}
85 changes: 85 additions & 0 deletions cmd/ingestion/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (C) 2026 InWheel Contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/

package main

import (
"strings"
"testing"
)

func TestLoadConfig_AllValuesProvided(t *testing.T) {
env := map[string]string{
"DB_HOST": "db.example.com",
"DB_PORT": "6543",
"DB_USER": "inwheel_user",
"DB_PASSWORD": "s3cret",
"DB_NAME": "inwheel_prod",
"DB_SSLMODE": "require",
"OSM_PBF_PATH": "/data/finland.osm.pbf",
}

cfg, err := loadConfig(env)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if cfg.DBHost != "db.example.com" || cfg.DBPort != 6543 || cfg.DBUser != "inwheel_user" {
t.Errorf("unexpected DB config: %+v", cfg)
}
if cfg.OSMPBFPath != "/data/finland.osm.pbf" {
t.Errorf("OSM_PBF_PATH not honored: %q", cfg.OSMPBFPath)
}
}

func TestLoadConfig_AppliesDefaults(t *testing.T) {
env := map[string]string{"OSM_PBF_PATH": "/tmp/a.pbf"}

cfg, err := loadConfig(env)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if cfg.DBHost != "localhost" || cfg.DBPort != 5432 || cfg.DBUser != "postgres" ||
cfg.DBPassword != "postgres" || cfg.DBName != "inwheel" || cfg.DBSSLMode != "disable" {
t.Errorf("defaults not applied: %+v", cfg)
}
}

func TestLoadConfig_MissingOSMPath(t *testing.T) {
_, err := loadConfig(map[string]string{})
if err == nil {
t.Fatal("expected error for missing OSM_PBF_PATH, got nil")
}
if !strings.Contains(err.Error(), "OSM_PBF_PATH") {
t.Errorf("error should mention OSM_PBF_PATH: %v", err)
}
}

func TestLoadConfig_MalformedPort(t *testing.T) {
env := map[string]string{
"OSM_PBF_PATH": "/tmp/a.pbf",
"DB_PORT": "not-a-number",
}
_, err := loadConfig(env)
if err == nil {
t.Fatal("expected error for malformed DB_PORT, got nil")
}
if !strings.Contains(err.Error(), "DB_PORT") {
t.Errorf("error should mention DB_PORT: %v", err)
}
}

func TestLoadConfig_AccumulatesMultipleErrors(t *testing.T) {
env := map[string]string{"DB_PORT": "bad"}

_, err := loadConfig(env)
if err == nil {
t.Fatal("expected error, got nil")
}
msg := err.Error()
if !strings.Contains(msg, "OSM_PBF_PATH") || !strings.Contains(msg, "DB_PORT") {
t.Errorf("expected both errors, got: %v", err)
}
}
147 changes: 147 additions & 0 deletions cmd/ingestion/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright (C) 2026 InWheel Contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/

package main

import (
"context"
"fmt"
"log/slog"
"os"

"github.com/InWheelOrg/inwheel-api/internal/db"
"github.com/InWheelOrg/inwheel-api/internal/osm"
"github.com/InWheelOrg/inwheel-api/internal/place"
"github.com/InWheelOrg/inwheel-api/pkg/models"
)

const batchSize = 1000

func main() {
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})))

if len(os.Args) < 2 {
fmt.Fprintln(os.Stderr, "usage: inwheel-ingestion <full-import|diff-sync>")
os.Exit(1)
}

cfg, err := loadConfig(environAsMap())
if err != nil {
slog.Error("config load failed", "error", err)
os.Exit(1)
}
slog.Info("config loaded",
"db.host", cfg.DBHost,
"db.port", cfg.DBPort,
"db.user", cfg.DBUser,
"db.name", cfg.DBName,
"db.sslmode", cfg.DBSSLMode,
"osm.pbf_path", cfg.OSMPBFPath,
)

switch os.Args[1] {
case "full-import":
if err := runFullImport(context.Background(), cfg); err != nil {
slog.Error("full-import failed", "error", err)
os.Exit(1)
}
case "diff-sync":
slog.Error("diff-sync not implemented yet")
os.Exit(1)
default:
fmt.Fprintf(os.Stderr, "unknown command: %s\n", os.Args[1])
os.Exit(1)
}
}

func environAsMap() map[string]string {
out := make(map[string]string, len(os.Environ()))
for _, kv := range os.Environ() {
for i := 0; i < len(kv); i++ {
if kv[i] == '=' {
out[kv[:i]] = kv[i+1:]
break
}
}
}
return out
}

func runFullImport(ctx context.Context, cfg config) error {
gormDB, err := db.Connect(db.Config{
Host: cfg.DBHost,
Port: cfg.DBPort,
User: cfg.DBUser,
Password: cfg.DBPassword,
Name: cfg.DBName,
SSLMode: cfg.DBSSLMode,
})
if err != nil {
return fmt.Errorf("db connect: %w", err)
}
if err := db.Migrate(gormDB); err != nil {
return fmt.Errorf("db migrate: %w", err)
}

repo := place.NewRepository(gormDB)

f, err := os.Open(cfg.OSMPBFPath)
if err != nil {
return fmt.Errorf("open pbf: %w", err)
}
defer f.Close() //nolint:errcheck

var (
buffer []models.Place
processed int
written int
)

flush := func() error {
if len(buffer) == 0 {
return nil
}
if err := repo.UpsertBatch(ctx, buffer); err != nil {
return err
}
written += len(buffer)
buffer = buffer[:0]
return nil
}

err = osm.StreamNodes(ctx, f, func(node osm.Node) error {
processed++
if processed%10000 == 0 {
slog.Info("progress", "processed", processed, "written", written)
}

category, ok := osm.Evaluate(node.Tags)
if !ok {
return nil
}

p, err := osm.TransformNode(node.ID, node.Lat, node.Lng, node.Tags, category)
if err != nil {
slog.Warn("skipping node, transform error", "node_id", node.ID, "error", err)
return nil
}

buffer = append(buffer, *p)
if len(buffer) >= batchSize {
return flush()
}
return nil
})
if err != nil {
return fmt.Errorf("stream: %w", err)
}

if err := flush(); err != nil {
return fmt.Errorf("final flush: %w", err)
}

slog.Info("full-import complete", "processed", processed, "written", written)
return nil
}
48 changes: 48 additions & 0 deletions cmd/ingestion/main_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//go:build integration

/*
* Copyright (C) 2026 InWheel Contributors
* SPDX-License-Identifier: AGPL-3.0-only
*/

package main

import (
"context"
"testing"

"github.com/InWheelOrg/inwheel-api/internal/testhelpers"
"github.com/InWheelOrg/inwheel-api/pkg/models"
)

func TestRunFullImport_AgainstFixturePBF(t *testing.T) {
ctx := context.Background()
db, connInfo, cleanup, err := testhelpers.StartPostgresWithConnInfo(ctx)
if err != nil {
t.Fatalf("start postgres: %v", err)
}
defer cleanup()

cfg := config{
DBHost: connInfo.Host,
DBPort: connInfo.Port,
DBUser: connInfo.User,
DBPassword: connInfo.Password,
DBName: connInfo.Name,
DBSSLMode: connInfo.SSLMode,
OSMPBFPath: "../../testdata/andorra-sample.osm.pbf",
}

if err := runFullImport(ctx, cfg); err != nil {
t.Fatalf("runFullImport: %v", err)
}

var count int64
if err := db.Model(&models.Place{}).Count(&count).Error; err != nil {
t.Fatalf("count: %v", err)
}
t.Logf("imported %d places from the Andorra fixture", count)
if count == 0 {
t.Fatal("expected at least one place row, got zero")
}
}
Loading
Loading