Learn Go
advanced1 min read

Concurrency Patterns

Go's goroutines and channels compose naturally into higher-level patterns. Understanding these patterns lets you build scalable, readable concurrent programs.

Pipelines

A pipeline connects stages where each stage reads from its input channel, transforms values, and sends results to its output channel:

package main
 
import "fmt"
 
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}
 
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}
 
func main() {
    // pipeline: generate → square → print
    for v := range square(generate(2, 3, 4, 5)) {
        fmt.Println(v) // 4, 9, 16, 25
    }
}

Each stage is independent, enabling easy composition and testing.

Fan-Out and Fan-In

Fan-out: distribute work across multiple goroutines reading from a single channel.
Fan-in: merge multiple channels into one:

package main
 
import (
    "fmt"
    "sync"
)
 
func producer(nums ...int) <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for _, n := range nums {
            ch <- n
        }
    }()
    return ch
}
 
func worker(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}
 
func merge(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)
 
    forward := func(ch <-chan int) {
        defer wg.Done()
        for v := range ch {
            merged <- v
        }
    }
 
    wg.Add(len(channels))
    for _, ch := range channels {
        go forward(ch)
    }
 
    go func() {
        wg.Wait()
        close(merged)
    }()
    return merged
}
 
func main() {
    in := producer(1, 2, 3, 4, 5, 6, 7, 8)
 
    // Fan-out to three workers
    w1 := worker(in)
    w2 := worker(in)
    w3 := worker(in)
 
    // Fan-in results
    for result := range merge(w1, w2, w3) {
        fmt.Println(result)
    }
}

Worker Pool

A worker pool limits concurrency to a fixed number of goroutines, preventing resource exhaustion:

package main
 
import (
    "context"
    "fmt"
    "sync"
)
 
type Job struct{ ID int }
type Result struct {
    JobID  int
    Output int
}
 
func workerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
    results := make(chan Result)
    var wg sync.WaitGroup
 
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    return
                case job, ok := <-jobs:
                    if !ok {
                        return
                    }
                    results <- Result{JobID: job.ID, Output: job.ID * job.ID}
                }
            }
        }()
    }
 
    go func() {
        wg.Wait()
        close(results)
    }()
 
    return results
}
 
func main() {
    ctx := context.Background()
    jobs := make(chan Job, 10)
    results := workerPool(ctx, 3, jobs)
 
    for i := 1; i <= 9; i++ {
        jobs <- Job{ID: i}
    }
    close(jobs)
 
    for r := range results {
        fmt.Printf("job %d%d\n", r.JobID, r.Output)
    }
}

Semaphore with Buffered Channel

Limit concurrent operations without a full worker pool using a buffered channel as a semaphore:

package main
 
import (
    "fmt"
    "sync"
)
 
func limitedWork(id int, sem chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()
    sem <- struct{}{}        // acquire
    defer func() { <-sem }() // release
 
    fmt.Printf("running task %d\n", id)
}
 
func main() {
    const maxConcurrency = 3
    sem := make(chan struct{}, maxConcurrency)
 
    var wg sync.WaitGroup
    for i := 1; i <= 10; i++ {
        wg.Add(1)
        go limitedWork(i, sem, &wg)
    }
    wg.Wait()
}

Done Channel for Cancellation

Pair a pipeline with a done channel (or context) to allow early cancellation without goroutine leaks:

package main
 
import (
    "context"
    "fmt"
)
 
func naturals(ctx context.Context) <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for n := 0; ; n++ {
            select {
            case <-ctx.Done():
                return
            case ch <- n:
            }
        }
    }()
    return ch
}
 
func main() {
    ctx, cancel := context.WithCancel(context.Background())
 
    nums := naturals(ctx)
 
    for i := 0; i < 5; i++ {
        fmt.Println(<-nums)
    }
    cancel() // stop the generator goroutine
}

Key Takeaways