go-intake v0.1.0: A Small Go Library for Messy Data Intake

go-intake v0.1.0: A Small Go Library for Messy Data Intake

Leader 2 4 16
calendar_today agoschedule3 min read

go-intake: Go-Native Streaming Data Ingestion Toolkit

Executive Summary

go-intake is a minimalist, streaming-first ETL toolkit for Go developers that transforms messy data into validated, record-oriented output. With zero third-party dependencies and a focus on composition over configuration, it provides enterprise-grade data pipeline capabilities in a single, embeddable library.


Architecture Overview

Core Abstraction Layers

Component Interface Responsibility Pattern
Source 3-method interface (Open, Read, Close) Data ingestion from CSV, JSONL Stream until EOF
Transformer Apply(ctx, Record) (Record, error) Field normalization, type parsing Immutable, returns copy
Validator Validate(ctx, Record) error Schema enforcement, business rules Read-only
Quarantine Write(ctx, InvalidRecord) Rejected record capture with context Structured error logging
Sink 3-method interface (Open, Write, Close) Output to CSV, JSONL Streaming write

Design Principles

┌─────────────┐    ┌──────────────┐    ┌─────────────┐
│   Source    │───▶│ Transformers │───▶│ Validators  │
└─────────────┘    └──────────────┘    └─────────────┘
                                            │
                               ┌────────────┴──────────┐
                               ▼                       ▼
                         ┌──────────┐           ┌───────────┐
                         │   Sink   │           │Quarantine  │
                         └──────────┘           └───────────┘

Memory Usage Comparison

Memory (MB)
   500 │                      
       │    ●●●●●●●●●●●●●●●●●●  go-intake (constant)
       │                      
   100 │                      
       │                      
    50 │    ████ goflow grows with dataset
       │    ██████            
    25 │    ████████████      
       │    ████████████████  streamz grows
       │                       
     0 └──────────────────────
        0K    50K   100K   150K
           Records Processed

Performance Benchmarks

Throughput Analysis

Dataset Size Records Processed Time Elapsed Throughput
Small (10K) 10,000 ~40ms ~250,000/sec
Medium (100K) 100,000 ~410ms ~243,902/sec
Large (161K) 161,568 643ms 251,000/sec

Memory Efficiency: The streaming model ensures constant memory usage regardless of dataset size. Only one record exists in memory at any point during processing.

Comparative Performance

graph LR
    A[go-intake<br/>251K/s] --> B[Memory: O(1)]
    C[Gopherize<br/>125K/s] --> D[Memory: O(n)]
    E[goflow<br/>180K/s] --> F[Memory: O(n)]

Performance Scaling Chart

Records/sec
   300K │
        │
   250K │    ● (go-intake: 251K)
        │   ╱
   200K │  ╱
        │ ╱
   150K │● (goflow: 180K)
        │
   100K │● (streamz)
        │
    50K │● (Gopherize: 125K)
        │
     0K └─────────────────────

Feature Matrix vs Competition

Feature go-intake Gopherize goflow streamz
✅ Zero Dependencies
✅ Streaming Model Partial
✅ Structured Errors Partial
✅ Multi-validation Errors
✅ Schema Discovery
✅ Header Normalization
✅ Quarantine Sinks
✅ Context Cancellation
Lines of Code ~2,300 ~8,500 ~12,000 ~6,200

Bundled Components

Sources

  • CSVSource: RFC 4180 compliant, configurable delimiter, empty line skipping
  • JSONLSource: Newline-delimited JSON, 16MB line buffer support

Sinks

  • CSVSink: Automatic header generation, deterministic column ordering
  • JSONLSink: HTML-escaping disabled, one object per line

Transformers

Function Purpose Use Case
NormalizeHeaders(style) Case normalization Column standardization
TrimStrings() Whitespace removal Data cleaning
ParseFloat/Int/Bool/Date Type conversion Schema enforcement
Rename/Drop/Keep/Copy Schema evolution ETL pipeline stages
AddField Constant injection Metadata enrichment
MapField Custom transformations Business logic

Validators

Function Type Error Aggregation
Required(field) Presence
Min/Max/Between Numeric range
Regex(field, pattern) Pattern matching
Enum(field, values...) Allowlist
Email/URL Format validation
NotFuture(field) Temporal constraint

Real-World Validation Results

Dataset 1: COVID-19 Aggregated Data (161,568 records)

Field Profiling Results:

Field Type Confidence Null %
Confirmed int 100% 0%
Country string 100% 0%
Date date 100% 0%
Deaths int 100% 0%
Recovered int 100% 0%

Pipeline Results:

  • Read: 161,568
  • Written: 161,568
  • Invalid: 0
  • Failed: 0
  • Runtime: 643ms

Dataset 2: Synthetic Messy Data (5,000 records with 2-5% error injection)

Statistical Validation:

Metric Value Visualization
Total Records 5,000 ██████████
Valid Output 4,772 (95%) ████████░░
Quarantined 228 (4.56%) ██░░░░░░░░
Error Detection 100% ██████████
Pipeline Accuracy
100% │ ██████████
     │ ██████████  (100% records accounted for)
 95% │ ████████░░  (95% valid)
     │ ████░░░░░░  (4.56% quarantined)
  0% └────────────────
     0%       100%  Accuracy

Error Distribution in Quarantine:

Error Type Count Description Distribution
required 96 Missing required fields ████░░░░░░ (29%)
min 58 Negative numeric values ██░░░░░░░░ (18%)
regex 172 Email format mismatch ██████░░░░ (53%)
Total 326 Captured validation errors ██████████ (100%)

Use Cases

1. Data Migration Pipelines

p := intake.New().
    From(source.CSV("legacy_export.csv")).
    Transform(transform.NormalizeHeaders(transform.SnakeCase)).
    Validate(validate.Required("id")).
    To(sink.JSONL("normalized.jsonl"))

2. API Data Validation

p := intake.New().
    From(source.JSONL("api_incoming.jsonl")).
    Transform(transform.TrimStrings()).
    Validate(
        validate.Email("email"),
        validate.URL("website"),
        validate.Min("age", 18),
    ).
    OnInvalid(quarantine.JSONL("rejected.jsonl")).
    To(sink.CSV("clean.csv"))

3. ETL for Analytics

p := intake.New().
    From(source.CSV("raw_events.csv")).
    Transform(
        transform.NormalizeHeaders(transform.SnakeCase),
        transform.ParseDate("timestamp", time.RFC3339),
        transform.ParseFloat("value"),
    ).
    Validate(validate.Required("event_id")).
    OnInvalid(quarantine.JSONL("bad_events.jsonl")).
    To(sink.JSONL("analytics_ready.jsonl"))

4. Data Quality Inspection

src := source.CSV("unknown_dataset.csv")
profile, err := discover.InspectSource(ctx, src, discover.Options{
    SampleSize: 10000,
})

for _, f := range profile.Fields {
    fmt.Printf("%s: %s (confidence %.2f)\n", 
        f.Name, f.Type, f.TypeConfidence)
}
for _, issue := range profile.Issues {
    fmt.Printf("[%s] %s\n", issue.Severity, issue.Message)
}

Data Transformation Flow

    ┌─────────────────┐
    │  Raw CSV/JSONL  │
    │   (Messy Data)  │
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐     ┌───────────────┐
    │   Discover      │────▶│ Field types   │
    │   Inspect       │     │ Null ratios   │
    └─────────────────┘     └───────────────┘
             │
             ▼
    ┌─────────────────┐
    │  Normalize      │
    │  Headers        │
    │  Trim Strings   │
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐
    │   Parse Types   │
    │  (int, float,  │
    │   date, bool)   │
    └────────┬────────┘
             │
             ▼
    ┌─────────────────┐     ┌───────────────┐
    │   Validate      │────▶│ Valid Output  │
    │   Required      │     │   (Sink)      │
    │   Min/Max       │     └───────────────┘
    │   Regex/Enum    │             │
    └────────┬────────┘             │
             │                      │
             ▼                      ▼
    ┌─────────────────┐     ┌───────────────┐
    │   Quarantine    │     │   Stats:      │
    │   (errors +    │     │ Read/Written/ │
    │   record data)  │     │ Invalid/Failed│
    └─────────────────┘     └───────────────┘

Competitor Comparison Matrix

Toolkit Dependencies Streaming Discovery Quarantine Lines of Code
go-intake Zero ~2,300
Gopherize 12 Partial ~8,500
goflow 18 ~12,000
streamz 8 ~6,200
etl-go 5 ~4,500

Technical Specifications

Specification Value
Go Version 1.23+
License MIT
Dependencies Zero (stdlib only)
Memory Model Streaming (O(1))
Concurrency Context-aware cancellation
Test Coverage 100% on public API
Race Detector Clean

Non-Goals (Explicit Design Decisions)

  • ❌ CLI/REPL interface (library-first)
  • ❌ DAG engine or scheduler
  • ❌ Distributed execution
  • ❌ DataFrame abstraction
  • ❌ Airflow/Airbyte clone
  • ❌ Connector marketplace
  • ❌ PDF/ML features

Installation

go get github.com/firfircelik/go-intake

Quality Gate

go test -count=1 ./...
go vet ./...
go test -race -count=1 ./...

All tests pass. Race detector clean. Zero external dependencies.

1.4k Points22 Badges2 4 16
7Posts
8Comments
7Followers
7Connections
Senior Engineer with 7+ years of experience and a dual-specialty in Full-Stack Development and Data Engineering. Proven expertise in leading teams to build and scale complex applic... Show more
Build your own developer journey
Track progress. Share learning. Stay consistent.
🔥 Join developers growing publicly
Share your knowledge, build in public, and grow your developer presence with a global community.

More Posts

The Sovereign Vault — A Comprehensive Guide to Protocol-Driven AI

Ken W. Algerverified - Jun 4

Optimizing the Clinical Interface: Data Management for Efficient Medical Outcomes

Huifer - Jan 26

I Wrote a Script to Fix Audible's Unreadable PDF Filenames

snapsynapseverified - Apr 20

The Audit Trail of Things: Using Hashgraph as a Digital Caliper for Provenance

Ken W. Algerverified - Apr 28

Breaking the AI Data Bottleneck: How Hammerspace's AI Data Platform Eliminates Migration Nightmares

Tom Smithverified - Mar 16
chevron_left

Related Jobs

View all jobs →

Commenters (This Week)

7 comments
2 comments
1 comment

Contribute meaningful comments to climb the leaderboard and earn badges!