Concurrency Patterns
Go's goroutines and channels compose naturally into higher-level patterns. Understanding these patterns lets you build scalable, readable concurrent programs.
Pipelines
A pipeline connects stages where each stage reads from its input channel, transforms values, and sends results to its output channel:
package main
import "fmt"
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
// pipeline: generate → square → print
for v := range square(generate(2, 3, 4, 5)) {
fmt.Println(v) // 4, 9, 16, 25
}
}Each stage is independent, enabling easy composition and testing.
Fan-Out and Fan-In
Fan-out: distribute work across multiple goroutines reading from a single channel.
Fan-in: merge multiple channels into one:
package main
import (
"fmt"
"sync"
)
func producer(nums ...int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for _, n := range nums {
ch <- n
}
}()
return ch
}
func worker(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
forward := func(ch <-chan int) {
defer wg.Done()
for v := range ch {
merged <- v
}
}
wg.Add(len(channels))
for _, ch := range channels {
go forward(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func main() {
in := producer(1, 2, 3, 4, 5, 6, 7, 8)
// Fan-out to three workers
w1 := worker(in)
w2 := worker(in)
w3 := worker(in)
// Fan-in results
for result := range merge(w1, w2, w3) {
fmt.Println(result)
}
}Worker Pool
A worker pool limits concurrency to a fixed number of goroutines, preventing resource exhaustion:
package main
import (
"context"
"fmt"
"sync"
)
type Job struct{ ID int }
type Result struct {
JobID int
Output int
}
func workerPool(ctx context.Context, numWorkers int, jobs <-chan Job) <-chan Result {
results := make(chan Result)
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case job, ok := <-jobs:
if !ok {
return
}
results <- Result{JobID: job.ID, Output: job.ID * job.ID}
}
}
}()
}
go func() {
wg.Wait()
close(results)
}()
return results
}
func main() {
ctx := context.Background()
jobs := make(chan Job, 10)
results := workerPool(ctx, 3, jobs)
for i := 1; i <= 9; i++ {
jobs <- Job{ID: i}
}
close(jobs)
for r := range results {
fmt.Printf("job %d → %d\n", r.JobID, r.Output)
}
}Semaphore with Buffered Channel
Limit concurrent operations without a full worker pool using a buffered channel as a semaphore:
package main
import (
"fmt"
"sync"
)
func limitedWork(id int, sem chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
sem <- struct{}{} // acquire
defer func() { <-sem }() // release
fmt.Printf("running task %d\n", id)
}
func main() {
const maxConcurrency = 3
sem := make(chan struct{}, maxConcurrency)
var wg sync.WaitGroup
for i := 1; i <= 10; i++ {
wg.Add(1)
go limitedWork(i, sem, &wg)
}
wg.Wait()
}Done Channel for Cancellation
Pair a pipeline with a done channel (or context) to allow early cancellation without goroutine leaks:
package main
import (
"context"
"fmt"
)
func naturals(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for n := 0; ; n++ {
select {
case <-ctx.Done():
return
case ch <- n:
}
}
}()
return ch
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
nums := naturals(ctx)
for i := 0; i < 5; i++ {
fmt.Println(<-nums)
}
cancel() // stop the generator goroutine
}Key Takeaways
- Pipelines chain channel-connected stages; each stage runs concurrently.
- Fan-out distributes a single channel's work across multiple goroutines; fan-in merges results back.
- Worker pools fix the concurrency level — useful when unbounded goroutines would exhaust resources.
- A buffered channel of capacity N is a lightweight semaphore.
- Always provide goroutines with a way to stop — use context cancellation or a done channel.