跳转到内容

Go语言并发编程

课程目标

通过本课程的学习,你将掌握Go语言的并发编程特性,包括goroutine、channel、同步原语等核心概念,以及并发编程的最佳实践和常见问题解决方案。

1. 并发编程基础

1.1 并发与并行的区别

  • 并发:多个任务在同一时间段内交替执行(宏观上同时,微观上交替)
  • 并行:多个任务在同一时刻同时执行(宏观和微观上都同时)

Go语言的并发模型支持这两种方式,通过goroutine和调度器实现。

1.2 Go语言的并发模型

Go语言采用**CSP(Communicating Sequential Processes)**并发模型:

  • Goroutine:轻量级线程,由Go运行时管理
  • Channel:用于goroutine之间的通信
  • Select:用于监听多个channel的操作

这种模型的核心思想是:"不要通过共享内存来通信,而是通过通信来共享内存"。

1.3 并发编程的优势

  • 提高程序性能:充分利用多核CPU
  • 改善用户体验:避免阻塞主线程
  • 简化复杂问题:将复杂任务分解为多个小任务
  • 提高资源利用率:IO操作时可以处理其他任务

2. Goroutine

2.1 什么是Goroutine

Goroutine是Go语言中的轻量级线程,具有以下特点:

  • 轻量:占用内存小(约2KB),启动速度快
  • 由Go运行时管理:不是操作系统线程
  • 多路复用:多个goroutine可以运行在一个操作系统线程上
  • 调度灵活:Go调度器会自动在多个线程上分配goroutine

2.2 创建Goroutine

go
// 方法1:使用go关键字启动goroutine
func sayHello() {
    fmt.Println("Hello from goroutine!")
}

func main() {
    go sayHello() // 启动goroutine
    fmt.Println("Hello from main!")
    time.Sleep(1 * time.Second) // 等待goroutine执行完成
}

// 方法2:启动匿名函数作为goroutine
func main() {
    go func() {
        fmt.Println("Hello from anonymous goroutine!")
    }()
    fmt.Println("Hello from main!")
    time.Sleep(1 * time.Second)
}

// 方法3:带参数的goroutine
func printNumbers(n int) {
    for i := 1; i <= n; i++ {
        fmt.Println(i)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    go printNumbers(5) // 启动带参数的goroutine
    fmt.Println("Main function")
    time.Sleep(1 * time.Second)
}

// 方法4:启动多个goroutine
func worker(id int) {
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(1 * time.Second)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    for i := 1; i <= 3; i++ {
        go worker(i)
    }
    fmt.Println("All workers started")
    time.Sleep(2 * time.Second)
}

2.3 Goroutine的生命周期

  1. 创建:使用go关键字创建
  2. 运行:在调度器的管理下执行
  3. 阻塞:遇到channel操作、系统调用等时阻塞
  4. 唤醒:阻塞条件解除后被唤醒
  5. 结束:函数执行完成后自动销毁

2.4 Goroutine的调度

Go调度器采用G-M-P模型:

  • G(Goroutine):goroutine对象
  • M(Machine):操作系统线程
  • P(Processor):处理器,负责管理goroutine队列

调度策略

  • 工作窃取:空闲P会从其他P的队列中窃取goroutine
  • 本地队列:每个P都有自己的goroutine队列
  • 全局队列:存储待调度的goroutine

2.5 Goroutine的注意事项

  1. Goroutine泄漏:goroutine如果不结束,会一直占用资源
  2. 内存占用:虽然每个goroutine占用内存小,但大量goroutine仍会消耗内存
  3. 调度开销:过多的goroutine会增加调度开销
  4. 竞态条件:多个goroutine访问共享资源时可能产生竞态条件

3. Channel

3.1 什么是Channel

Channel是Go语言中用于goroutine之间通信的管道,具有以下特点:

  • 类型安全:只能传输指定类型的数据
  • 同步机制:默认情况下,发送和接收操作会阻塞
  • FIFO:先进先出的消息队列
  • 可以关闭:关闭后不能再发送数据,但可以继续接收

3.2 创建Channel

go
// 创建无缓冲channel
ch := make(chan int)

// 创建缓冲channel
ch := make(chan int, 5) // 容量为5

// 创建带方向的channel
var sendCh chan<- int // 只能发送
var recvCh <-chan int // 只能接收

3.3 Channel的基本操作

3.3.1 发送数据

go
ch := make(chan int)
go func() {
    ch <- 10 // 发送数据到channel
}()

3.3.2 接收数据

go
// 方式1:基本接收
value := <-ch

// 方式2:带接收状态
value, ok := <-ch
if !ok {
    fmt.Println("Channel closed")
}

// 方式3:忽略接收值
<-ch

3.3.3 关闭Channel

go
ch := make(chan int, 5)
// 发送数据
for i := 1; i <= 3; i++ {
    ch <- i
}
// 关闭channel
close(ch)

// 尝试发送到已关闭的channel会panic
// ch <- 4 // panic: send on closed channel

// 从已关闭的channel接收数据
for value := range ch {
    fmt.Println(value)
}

3.4 无缓冲Channel

无缓冲Channel(也称为同步Channel):

  • 发送操作会阻塞,直到有goroutine接收数据
  • 接收操作会阻塞,直到有goroutine发送数据
  • 适用于需要严格同步的场景
go
func main() {
    ch := make(chan int) // 无缓冲channel
    
    go func() {
        fmt.Println("Goroutine: Sending 1")
        ch <- 1 // 阻塞,直到main goroutine接收
        fmt.Println("Goroutine: Sent 1")
    }()
    
    fmt.Println("Main: Waiting for data")
    value := <-ch // 阻塞,直到goroutine发送
    fmt.Println("Main: Received", value)
    
    time.Sleep(1 * time.Second)
}

3.5 缓冲Channel

缓冲Channel:

  • 发送操作在缓冲区未满时不会阻塞
  • 接收操作在缓冲区未空时不会阻塞
  • 适用于生产者-消费者模式
go
func main() {
    ch := make(chan int, 3) // 缓冲容量为3
    
    // 发送数据到缓冲channel
    fmt.Println("Sending 1")
    ch <- 1
    fmt.Println("Sending 2")
    ch <- 2
    fmt.Println("Sending 3")
    ch <- 3
    
    // 缓冲区已满,发送会阻塞
    // fmt.Println("Sending 4")
    // ch <- 4 // 阻塞
    
    // 接收数据
    fmt.Println("Receiving 1:", <-ch)
    fmt.Println("Receiving 2:", <-ch)
    fmt.Println("Receiving 3:", <-ch)
    
    // 缓冲区已空,接收会阻塞
    // fmt.Println("Receiving 4:", <-ch) // 阻塞
    
    close(ch)
}

3.6 带方向的Channel

带方向的Channel可以提高代码的安全性和可读性:

go
// 只发送的channel
func sendData(ch chan<- int) {
    for i := 1; i <= 5; i++ {
        ch <- i
        time.Sleep(100 * time.Millisecond)
    }
    close(ch)
}

// 只接收的channel
func receiveData(ch <-chan int) {
    for value := range ch {
        fmt.Println("Received:", value)
    }
}

func main() {
    ch := make(chan int, 5)
    
    go sendData(ch)
    receiveData(ch)
}

3.7 Channel的使用场景

  1. goroutine间通信:传递数据和信号
  2. 同步操作:等待goroutine完成
  3. 任务分发:将任务分发给多个worker
  4. 事件通知:通知其他goroutine事件发生
  5. 资源池:管理共享资源

4. Select语句

4.1 什么是Select语句

Select语句用于监听多个channel的操作,类似于switch语句,但专门用于channel操作。

4.2 基本用法

go
func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "from channel 1"
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "from channel 2"
    }()
    
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("Received:", msg1)
        case msg2 := <-ch2:
            fmt.Println("Received:", msg2)
        case <-time.After(1500 * time.Millisecond):
            fmt.Println("Timeout")
        }
    }
}

4.3 Select的特性

  1. 随机性:当多个case都可以执行时,随机选择一个
  2. 阻塞性:当所有case都不可执行时,select会阻塞
  3. default分支:当所有case都不可执行时,执行default分支
  4. 可以处理多个channel:同时监听多个channel的操作

4.4 常见用法

4.4.1 超时处理

go
func main() {
    ch := make(chan string)
    
    go func() {
        time.Sleep(3 * time.Second)
        ch <- "result"
    }()
    
    select {
    case result := <-ch:
        fmt.Println("Received:", result)
    case <-time.After(2 * time.Second):
        fmt.Println("Timeout")
    }
}

4.4.2 非阻塞操作

go
func main() {
    ch := make(chan int, 1)
    
    // 非阻塞发送
    select {
    case ch <- 10:
        fmt.Println("Sent successfully")
    default:
        fmt.Println("Channel full")
    }
    
    // 非阻塞接收
    select {
    case value := <-ch:
        fmt.Println("Received:", value)
    default:
        fmt.Println("Channel empty")
    }
}

4.4.3 退出信号

go
func worker(done chan bool) {
    for {
        select {
        case <-done:
            fmt.Println("Worker exiting")
            return
        default:
            fmt.Println("Working...")
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    done := make(chan bool)
    
    go worker(done)
    
    time.Sleep(3 * time.Second)
    fmt.Println("Sending exit signal")
    done <- true
    time.Sleep(1 * time.Second)
}

4.4.4 监听多个channel

go
func main() {
    ch1 := make(chan int)
    ch2 := make(chan int)
    ch3 := make(chan int)
    
    go func() {
        for i := 0; i < 5; i++ {
            time.Sleep(500 * time.Millisecond)
            ch1 <- i
        }
    }()
    
    go func() {
        for i := 0; i < 3; i++ {
            time.Sleep(1 * time.Second)
            ch2 <- i * 10
        }
    }()
    
    go func() {
        time.Sleep(2 * time.Second)
        ch3 <- 100
    }()
    
    // 监听多个channel
    count := 0
    for count < 9 {
        select {
        case v := <-ch1:
            fmt.Println("From ch1:", v)
            count++
        case v := <-ch2:
            fmt.Println("From ch2:", v)
            count++
        case v := <-ch3:
            fmt.Println("From ch3:", v)
            count++
        }
    }
}

5. 同步原语

5.1 Mutex(互斥锁)

Mutex用于保护共享资源,防止多个goroutine同时访问:

go
import "sync"

type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    counter := &Counter{}
    var wg sync.WaitGroup
    
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }
    
    wg.Wait()
    fmt.Println("Final value:", counter.Value())
}

5.2 RWMutex(读写锁)

RWMutex允许多个读操作同时进行,但写操作需要独占:

go
import "sync"

type DataStore struct {
    mu   sync.RWMutex
    data map[string]string
}

func (ds *DataStore) Get(key string) string {
    ds.mu.RLock()
    defer ds.mu.RUnlock()
    return ds.data[key]
}

func (ds *DataStore) Set(key, value string) {
    ds.mu.Lock()
    defer ds.mu.Unlock()
    ds.data[key] = value
}

func main() {
    ds := &DataStore{
        data: make(map[string]string),
    }
    
    // 写入数据
    ds.Set("key1", "value1")
    
    // 多个读操作可以同时进行
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            value := ds.Get("key1")
            fmt.Printf("Goroutine %d: %s\n", i, value)
        }(i)
    }
    
    wg.Wait()
}

5.3 WaitGroup

WaitGroup用于等待一组goroutine完成:

go
import "sync"

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(time.Duration(id) * 100 * time.Millisecond)
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    var wg sync.WaitGroup
    
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }
    
    fmt.Println("Waiting for all workers to complete")
    wg.Wait()
    fmt.Println("All workers completed")
}

5.4 Once

Once用于确保某个函数只执行一次:

go
import "sync"

var (
    once     sync.Once
    instance *Database
)

type Database struct {
    connection string
}

func GetDatabase() *Database {
    once.Do(func() {
        fmt.Println("Initializing database connection")
        instance = &Database{
            connection: "postgres://localhost:5432/mydb",
        }
    })
    return instance
}

func main() {
    for i := 0; i < 5; i++ {
        go func(i int) {
            db := GetDatabase()
            fmt.Printf("Goroutine %d: %s\n", i, db.connection)
        }(i)
    }
    
    time.Sleep(1 * time.Second)
}

5.5 Cond

Cond用于等待或宣布事件的发生:

go
import "sync"

type Queue struct {
    mu    sync.Mutex
    cond  *sync.Cond
    items []int
}

func NewQueue() *Queue {
    q := &Queue{
        items: make([]int, 0),
    }
    q.cond = sync.NewCond(&q.mu)
    return q
}

func (q *Queue) Enqueue(item int) {
    q.mu.Lock()
    defer q.mu.Unlock()
    q.items = append(q.items, item)
    fmt.Printf("Enqueued: %d\n", item)
    q.cond.Signal() // 通知一个等待的goroutine
}

func (q *Queue) Dequeue() int {
    q.mu.Lock()
    defer q.mu.Unlock()
    
    for len(q.items) == 0 {
        fmt.Println("Queue is empty, waiting...")
        q.cond.Wait() // 等待直到队列非空
    }
    
    item := q.items[0]
    q.items = q.items[1:]
    fmt.Printf("Dequeued: %d\n", item)
    return item
}

func main() {
    q := NewQueue()
    
    // 启动消费者
    go func() {
        for i := 0; i < 3; i++ {
            q.Dequeue()
        }
    }()
    
    // 生产者延迟生产
    time.Sleep(1 * time.Second)
    q.Enqueue(1)
    time.Sleep(500 * time.Millisecond)
    q.Enqueue(2)
    time.Sleep(500 * time.Millisecond)
    q.Enqueue(3)
    
    time.Sleep(1 * time.Second)
}

5.6 Atomic

Atomic包提供了原子操作,用于无锁编程:

go
import "sync/atomic"

func main() {
    var counter int64
    var wg sync.WaitGroup
    
    // 多个goroutine并发增加计数器
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            atomic.AddInt64(&counter, 1)
        }()
    }
    
    wg.Wait()
    fmt.Println("Final counter:", counter)
    
    // 原子读取
    value := atomic.LoadInt64(&counter)
    fmt.Println("Current value:", value)
    
    // 原子写入
    atomic.StoreInt64(&counter, 100)
    fmt.Println("After store:", atomic.LoadInt64(&counter))
    
    // 原子比较交换
    oldValue := atomic.LoadInt64(&counter)
    newValue := oldValue + 50
    swapped := atomic.CompareAndSwapInt64(&counter, oldValue, newValue)
    fmt.Printf("CAS result: %v, New value: %d\n", swapped, atomic.LoadInt64(&counter))
}

6. 并发模式

6.1 生产者-消费者模式

go
func producer(ch chan<- int, done chan<- bool) {
    for i := 1; i <= 10; i++ {
        fmt.Printf("Producing: %d\n", i)
        ch <- i
        time.Sleep(200 * time.Millisecond)
    }
    close(ch)
    done <- true
}

func consumer(ch <-chan int, id int, done chan<- bool) {
    for value := range ch {
        fmt.Printf("Consumer %d: Consumed %d\n", id, value)
        time.Sleep(500 * time.Millisecond)
    }
    done <- true
}

func main() {
    ch := make(chan int, 5)
    done := make(chan bool, 3)
    
    // 启动生产者
    go producer(ch, done)
    
    // 启动多个消费者
    for i := 1; i <= 2; i++ {
        go consumer(ch, i, done)
    }
    
    // 等待所有goroutine完成
    for i := 0; i < 3; i++ {
        <-done
    }
    fmt.Println("All done")
}

6.2 工作池模式

go
func worker(id int, jobs <-chan int, results chan<- int) {
    for job := range jobs {
        fmt.Printf("Worker %d processing job %d\n", id, job)
        time.Sleep(1 * time.Second) // 模拟处理时间
        results <- job * 2
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3
    
    jobs := make(chan int, numJobs)
    results := make(chan int, numJobs)
    
    // 启动工作池
    for i := 1; i <= numWorkers; i++ {
        go worker(i, jobs, results)
    }
    
    // 发送任务
    for i := 1; i <= numJobs; i++ {
        jobs <- i
    }
    close(jobs)
    
    // 收集结果
    for i := 1; i <= numJobs; i++ {
        result := <-results
        fmt.Printf("Result: %d\n", result)
    }
    close(results)
    
    fmt.Println("All jobs completed")
}

6.3 Fan Out-Fan In模式

go
func generator(n int) <-chan int {
    ch := make(chan int)
    go func() {
        defer close(ch)
        for i := 0; i < n; i++ {
            ch <- i
        }
    }()
    return ch
}

func worker(id int, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
            time.Sleep(100 * time.Millisecond)
        }
    }()
    return out
}

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }
    
    go func() {
        wg.Wait()
        close(out)
    }()
    
    return out
}

func main() {
    // 生成数据
    in := generator(10)
    
    // Fan Out: 多个worker处理数据
    w1 := worker(1, in)
    w2 := worker(2, in)
    w3 := worker(3, in)
    
    // Fan In: 合并结果
    out := merge(w1, w2, w3)
    
    // 收集结果
    for n := range out {
        fmt.Println(n)
    }
}

6.4 超时控制模式

go
func operationWithTimeout() error {
    ch := make(chan error)
    
    go func() {
        // 模拟长时间操作
        time.Sleep(3 * time.Second)
        ch <- nil // 操作成功
    }()
    
    select {
    case err := <-ch:
        return err
    case <-time.After(2 * time.Second):
        return fmt.Errorf("operation timed out")
    }
}

func main() {
    err := operationWithTimeout()
    if err != nil {
        fmt.Println("Error:", err)
    } else {
        fmt.Println("Operation completed successfully")
    }
}

6.5 Context模式

go
import "context"

func worker(ctx context.Context, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d: Context cancelled\n", id)
            return
        default:
            fmt.Printf("Worker %d: Working...\n", id)
            time.Sleep(1 * time.Second)
        }
    }
}

func main() {
    // 创建带超时的context
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()
    
    // 启动多个worker
    for i := 1; i <= 3; i++ {
        go worker(ctx, i)
    }
    
    // 等待context超时
    <-ctx.Done()
    fmt.Println("Main: Context cancelled")
    time.Sleep(1 * time.Second)
}

7. 并发编程最佳实践

7.1 避免竞态条件

  1. 使用Channel通信:优先使用channel传递数据,而不是共享内存
  2. 使用同步原语:必要时使用Mutex、RWMutex等同步原语
  3. 使用Atomic操作:对于简单的计数器等,使用atomic包
  4. 使用Context:用于传递取消信号和截止时间

7.2 避免Goroutine泄漏

  1. 确保Goroutine能退出:每个goroutine都应该有明确的退出条件
  2. 使用context控制生命周期:通过context的Done通道接收取消信号
  3. 使用WaitGroup等待完成:确保所有goroutine都能完成
  4. 避免无限循环:循环中要有退出机制

7.3 性能优化

  1. 合理使用缓冲Channel:根据实际情况选择合适的缓冲区大小
  2. 控制Goroutine数量:避免创建过多的goroutine
  3. 使用工作池:对于大量任务,使用固定大小的工作池
  4. 减少锁竞争:使用细粒度锁,减少锁持有时间
  5. 使用RWMutex:对于读多写少的场景,使用RWMutex

7.4 错误处理

  1. 传递错误:通过channel传递错误信息
  2. 统一错误处理:集中处理错误
  3. 超时控制:为长时间操作设置超时
  4. 优雅退出:处理中断信号,确保程序优雅退出

7.5 代码组织

  1. 模块化设计:将并发逻辑封装到独立的模块中
  2. 明确的接口:定义清晰的接口,隐藏并发实现细节
  3. 文档注释:为并发代码添加详细的注释
  4. 测试覆盖:编写并发测试,确保代码正确性

8. 常见问题和解决方案

8.1 竞态条件

问题:多个goroutine同时修改共享资源,导致数据不一致

解决方案

  • 使用channel传递数据
  • 使用Mutex或RWMutex保护共享资源
  • 使用atomic包进行原子操作

8.2 Goroutine泄漏

问题:goroutine没有正确退出,导致资源泄漏

解决方案

  • 使用context控制goroutine生命周期
  • 为goroutine提供退出通道
  • 使用WaitGroup等待goroutine完成
  • 避免在goroutine中使用无限循环

8.3 死锁

问题:两个或多个goroutine互相等待对方释放资源

解决方案

  • 避免嵌套锁
  • 使用带超时的channel操作
  • 确保锁的获取顺序一致
  • 使用deadlock检测工具

8.4 活锁

问题:goroutine不断重试失败的操作,导致系统资源浪费

解决方案

  • 为重试操作添加退避策略
  • 设置最大重试次数
  • 使用随机延迟避免惊群效应

8.5 饥饿

问题:某些goroutine长时间无法获取资源

解决方案

  • 使用公平锁
  • 避免长时间持有锁
  • 合理设计并发算法

8.6 内存开销

问题:过多的goroutine导致内存占用过高

解决方案

  • 使用工作池控制goroutine数量
  • 及时释放不再需要的资源
  • 监控内存使用情况

9. 并发编程实战案例

9.1 并发Web服务器

go
package main

import (
    "fmt"
    "net/http"
    "sync"
    "time"
)

var (
    requestCount int64
    mu           sync.Mutex
)

func helloHandler(w http.ResponseWriter, r *http.Request) {
    // 模拟处理时间
    time.Sleep(100 * time.Millisecond)
    
    // 原子增加计数
    atomic.AddInt64(&requestCount, 1)
    
    fmt.Fprintf(w, "Hello, Concurrent Web Server!\n")
    fmt.Fprintf(w, "Request count: %d\n", atomic.LoadInt64(&requestCount))
}

func statusHandler(w http.ResponseWriter, r *http.Request) {
    fmt.Fprintf(w, "Server status:\n")
    fmt.Fprintf(w, "Request count: %d\n", atomic.LoadInt64(&requestCount))
    fmt.Fprintf(w, "Time: %s\n", time.Now().Format(time.RFC3339))
}

func main() {
    http.HandleFunc("/", helloHandler)
    http.HandleFunc("/status", statusHandler)
    
    fmt.Println("Server starting on port 8080...")
    fmt.Println("Try: curl http://localhost:8080/")
    fmt.Println("Try: curl http://localhost:8080/status")
    
    if err := http.ListenAndServe(":8080", nil); err != nil {
        fmt.Println("Error starting server:", err)
    }
}

9.2 并发文件处理

go
package main

import (
    "bufio"
    "fmt"
    "os"
    "path/filepath"
    "sync"
)

func processFile(filePath string, wg *sync.WaitGroup, resultChan chan<- int) {
    defer wg.Done()
    
    file, err := os.Open(filePath)
    if err != nil {
        fmt.Printf("Error opening file %s: %v\n", filePath, err)
        resultChan <- 0
        return
    }
    defer file.Close()
    
    scanner := bufio.NewScanner(file)
    lineCount := 0
    for scanner.Scan() {
        lineCount++
    }
    
    if err := scanner.Err(); err != nil {
        fmt.Printf("Error scanning file %s: %v\n", filePath, err)
        resultChan <- 0
        return
    }
    
    fmt.Printf("File %s: %d lines\n", filePath, lineCount)
    resultChan <- lineCount
}

func main() {
    rootDir := "."
    fileExt := ".go"
    
    var wg sync.WaitGroup
    resultChan := make(chan int, 100)
    
    // 遍历目录,处理符合条件的文件
    err := filepath.Walk(rootDir, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        
        if !info.IsDir() && filepath.Ext(path) == fileExt {
            wg.Add(1)
            go processFile(path, &wg, resultChan)
        }
        
        return nil
    })
    
    if err != nil {
        fmt.Println("Error walking directory:", err)
        return
    }
    
    // 等待所有文件处理完成
    go func() {
        wg.Wait()
        close(resultChan)
    }()
    
    // 汇总结果
    totalLines := 0
    fileCount := 0
    for lines := range resultChan {
        if lines > 0 {
            totalLines += lines
            fileCount++
        }
    }
    
    fmt.Printf("\nProcessed %d files\n", fileCount)
    fmt.Printf("Total lines: %d\n", totalLines)
}

9.3 并发爬虫

go
package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "regexp"
    "sync"
    "time"
)

var (
    visitedUrls = make(map[string]bool)
    mu          sync.Mutex
    urlChan     = make(chan string, 100)
    wg          sync.WaitGroup
)

func extractLinks(html string) []string {
    var links []string
    re := regexp.MustCompile(`href="(https?://[^"]+)"`)
    matches := re.FindAllStringSubmatch(html, -1)
    for _, match := range matches {
        if len(match) > 1 {
            links = append(links, match[1])
        }
    }
    return links
}

func crawl(url string) {
    defer wg.Done()
    
    // 标记为已访问
    mu.Lock()
    if visitedUrls[url] {
        mu.Unlock()
        return
    }
    visitedUrls[url] = true
    mu.Unlock()
    
    fmt.Printf("Crawling: %s\n", url)
    
    // 发送HTTP请求
    client := &http.Client{Timeout: 10 * time.Second}
    resp, err := client.Get(url)
    if err != nil {
        fmt.Printf("Error crawling %s: %v\n", url, err)
        return
    }
    defer resp.Body.Close()
    
    // 读取响应体
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        fmt.Printf("Error reading response: %v\n", err)
        return
    }
    
    // 提取链接
    links := extractLinks(string(body))
    fmt.Printf("Found %d links on %s\n", len(links), url)
    
    // 处理提取的链接
    for _, link := range links {
        mu.Lock()
        if !visitedUrls[link] {
            wg.Add(1)
            go crawl(link)
        }
        mu.Unlock()
    }
}

func main() {
    startUrl := "https://example.com"
    
    wg.Add(1)
    go crawl(startUrl)
    
    wg.Wait()
    fmt.Printf("Crawling completed. Visited %d URLs\n", len(visitedUrls))
}

10. 总结

本课程介绍了Go语言的并发编程特性,包括:

  1. 并发基础:并发与并行的区别,Go语言的CSP模型
  2. Goroutine:轻量级线程的创建、调度和注意事项
  3. Channel:无缓冲和缓冲channel,基本操作和使用场景
  4. Select:监听多个channel操作,超时处理和退出信号
  5. 同步原语:Mutex、RWMutex、WaitGroup、Once、Cond、Atomic
  6. 并发模式:生产者-消费者、工作池、Fan Out-Fan In、超时控制、Context
  7. 最佳实践:避免竞态条件、Goroutine泄漏、死锁等问题
  8. 常见问题:竞态条件、Goroutine泄漏、死锁、活锁、饥饿、内存开销
  9. 实战案例:并发Web服务器、并发文件处理、并发爬虫

通过本课程的学习,你已经掌握了Go语言并发编程的核心概念和实践技巧。在实际开发中,你可以根据具体场景选择合适的并发模式,编写高效、可靠的并发程序。

记住Go语言的并发哲学:"不要通过共享内存来通信,而是通过通信来共享内存"。这种方式可以帮助你避免许多传统并发编程中的问题,让并发代码更加清晰和可维护。

评论区

专业的Linux技术学习平台,从入门到精通的完整学习路径