主题
Go语言并发编程
课程目标
通过本课程的学习,你将掌握Go语言的并发编程特性,包括goroutine、channel、同步原语等核心概念,以及并发编程的最佳实践和常见问题解决方案。
1. 并发编程基础
1.1 并发与并行的区别
- 并发:多个任务在同一时间段内交替执行(宏观上同时,微观上交替)
- 并行:多个任务在同一时刻同时执行(宏观和微观上都同时)
Go语言的并发模型支持这两种方式,通过goroutine和调度器实现。
1.2 Go语言的并发模型
Go语言采用**CSP(Communicating Sequential Processes)**并发模型:
- Goroutine:轻量级线程,由Go运行时管理
- Channel:用于goroutine之间的通信
- Select:用于监听多个channel的操作
这种模型的核心思想是:"不要通过共享内存来通信,而是通过通信来共享内存"。
1.3 并发编程的优势
- 提高程序性能:充分利用多核CPU
- 改善用户体验:避免阻塞主线程
- 简化复杂问题:将复杂任务分解为多个小任务
- 提高资源利用率:IO操作时可以处理其他任务
2. Goroutine
2.1 什么是Goroutine
Goroutine是Go语言中的轻量级线程,具有以下特点:
- 轻量:占用内存小(约2KB),启动速度快
- 由Go运行时管理:不是操作系统线程
- 多路复用:多个goroutine可以运行在一个操作系统线程上
- 调度灵活:Go调度器会自动在多个线程上分配goroutine
2.2 创建Goroutine
go
// 方法1:使用go关键字启动goroutine
func sayHello() {
fmt.Println("Hello from goroutine!")
}
func main() {
go sayHello() // 启动goroutine
fmt.Println("Hello from main!")
time.Sleep(1 * time.Second) // 等待goroutine执行完成
}
// 方法2:启动匿名函数作为goroutine
func main() {
go func() {
fmt.Println("Hello from anonymous goroutine!")
}()
fmt.Println("Hello from main!")
time.Sleep(1 * time.Second)
}
// 方法3:带参数的goroutine
func printNumbers(n int) {
for i := 1; i <= n; i++ {
fmt.Println(i)
time.Sleep(100 * time.Millisecond)
}
}
func main() {
go printNumbers(5) // 启动带参数的goroutine
fmt.Println("Main function")
time.Sleep(1 * time.Second)
}
// 方法4:启动多个goroutine
func worker(id int) {
fmt.Printf("Worker %d starting\n", id)
time.Sleep(1 * time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
for i := 1; i <= 3; i++ {
go worker(i)
}
fmt.Println("All workers started")
time.Sleep(2 * time.Second)
}2.3 Goroutine的生命周期
- 创建:使用
go关键字创建 - 运行:在调度器的管理下执行
- 阻塞:遇到channel操作、系统调用等时阻塞
- 唤醒:阻塞条件解除后被唤醒
- 结束:函数执行完成后自动销毁
2.4 Goroutine的调度
Go调度器采用G-M-P模型:
- G(Goroutine):goroutine对象
- M(Machine):操作系统线程
- P(Processor):处理器,负责管理goroutine队列
调度策略:
- 工作窃取:空闲P会从其他P的队列中窃取goroutine
- 本地队列:每个P都有自己的goroutine队列
- 全局队列:存储待调度的goroutine
2.5 Goroutine的注意事项
- Goroutine泄漏:goroutine如果不结束,会一直占用资源
- 内存占用:虽然每个goroutine占用内存小,但大量goroutine仍会消耗内存
- 调度开销:过多的goroutine会增加调度开销
- 竞态条件:多个goroutine访问共享资源时可能产生竞态条件
3. Channel
3.1 什么是Channel
Channel是Go语言中用于goroutine之间通信的管道,具有以下特点:
- 类型安全:只能传输指定类型的数据
- 同步机制:默认情况下,发送和接收操作会阻塞
- FIFO:先进先出的消息队列
- 可以关闭:关闭后不能再发送数据,但可以继续接收
3.2 创建Channel
go
// 创建无缓冲channel
ch := make(chan int)
// 创建缓冲channel
ch := make(chan int, 5) // 容量为5
// 创建带方向的channel
var sendCh chan<- int // 只能发送
var recvCh <-chan int // 只能接收3.3 Channel的基本操作
3.3.1 发送数据
go
ch := make(chan int)
go func() {
ch <- 10 // 发送数据到channel
}()3.3.2 接收数据
go
// 方式1:基本接收
value := <-ch
// 方式2:带接收状态
value, ok := <-ch
if !ok {
fmt.Println("Channel closed")
}
// 方式3:忽略接收值
<-ch3.3.3 关闭Channel
go
ch := make(chan int, 5)
// 发送数据
for i := 1; i <= 3; i++ {
ch <- i
}
// 关闭channel
close(ch)
// 尝试发送到已关闭的channel会panic
// ch <- 4 // panic: send on closed channel
// 从已关闭的channel接收数据
for value := range ch {
fmt.Println(value)
}3.4 无缓冲Channel
无缓冲Channel(也称为同步Channel):
- 发送操作会阻塞,直到有goroutine接收数据
- 接收操作会阻塞,直到有goroutine发送数据
- 适用于需要严格同步的场景
go
func main() {
ch := make(chan int) // 无缓冲channel
go func() {
fmt.Println("Goroutine: Sending 1")
ch <- 1 // 阻塞,直到main goroutine接收
fmt.Println("Goroutine: Sent 1")
}()
fmt.Println("Main: Waiting for data")
value := <-ch // 阻塞,直到goroutine发送
fmt.Println("Main: Received", value)
time.Sleep(1 * time.Second)
}3.5 缓冲Channel
缓冲Channel:
- 发送操作在缓冲区未满时不会阻塞
- 接收操作在缓冲区未空时不会阻塞
- 适用于生产者-消费者模式
go
func main() {
ch := make(chan int, 3) // 缓冲容量为3
// 发送数据到缓冲channel
fmt.Println("Sending 1")
ch <- 1
fmt.Println("Sending 2")
ch <- 2
fmt.Println("Sending 3")
ch <- 3
// 缓冲区已满,发送会阻塞
// fmt.Println("Sending 4")
// ch <- 4 // 阻塞
// 接收数据
fmt.Println("Receiving 1:", <-ch)
fmt.Println("Receiving 2:", <-ch)
fmt.Println("Receiving 3:", <-ch)
// 缓冲区已空,接收会阻塞
// fmt.Println("Receiving 4:", <-ch) // 阻塞
close(ch)
}3.6 带方向的Channel
带方向的Channel可以提高代码的安全性和可读性:
go
// 只发送的channel
func sendData(ch chan<- int) {
for i := 1; i <= 5; i++ {
ch <- i
time.Sleep(100 * time.Millisecond)
}
close(ch)
}
// 只接收的channel
func receiveData(ch <-chan int) {
for value := range ch {
fmt.Println("Received:", value)
}
}
func main() {
ch := make(chan int, 5)
go sendData(ch)
receiveData(ch)
}3.7 Channel的使用场景
- goroutine间通信:传递数据和信号
- 同步操作:等待goroutine完成
- 任务分发:将任务分发给多个worker
- 事件通知:通知其他goroutine事件发生
- 资源池:管理共享资源
4. Select语句
4.1 什么是Select语句
Select语句用于监听多个channel的操作,类似于switch语句,但专门用于channel操作。
4.2 基本用法
go
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "from channel 1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "from channel 2"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("Received:", msg1)
case msg2 := <-ch2:
fmt.Println("Received:", msg2)
case <-time.After(1500 * time.Millisecond):
fmt.Println("Timeout")
}
}
}4.3 Select的特性
- 随机性:当多个case都可以执行时,随机选择一个
- 阻塞性:当所有case都不可执行时,select会阻塞
- default分支:当所有case都不可执行时,执行default分支
- 可以处理多个channel:同时监听多个channel的操作
4.4 常见用法
4.4.1 超时处理
go
func main() {
ch := make(chan string)
go func() {
time.Sleep(3 * time.Second)
ch <- "result"
}()
select {
case result := <-ch:
fmt.Println("Received:", result)
case <-time.After(2 * time.Second):
fmt.Println("Timeout")
}
}4.4.2 非阻塞操作
go
func main() {
ch := make(chan int, 1)
// 非阻塞发送
select {
case ch <- 10:
fmt.Println("Sent successfully")
default:
fmt.Println("Channel full")
}
// 非阻塞接收
select {
case value := <-ch:
fmt.Println("Received:", value)
default:
fmt.Println("Channel empty")
}
}4.4.3 退出信号
go
func worker(done chan bool) {
for {
select {
case <-done:
fmt.Println("Worker exiting")
return
default:
fmt.Println("Working...")
time.Sleep(1 * time.Second)
}
}
}
func main() {
done := make(chan bool)
go worker(done)
time.Sleep(3 * time.Second)
fmt.Println("Sending exit signal")
done <- true
time.Sleep(1 * time.Second)
}4.4.4 监听多个channel
go
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
go func() {
for i := 0; i < 5; i++ {
time.Sleep(500 * time.Millisecond)
ch1 <- i
}
}()
go func() {
for i := 0; i < 3; i++ {
time.Sleep(1 * time.Second)
ch2 <- i * 10
}
}()
go func() {
time.Sleep(2 * time.Second)
ch3 <- 100
}()
// 监听多个channel
count := 0
for count < 9 {
select {
case v := <-ch1:
fmt.Println("From ch1:", v)
count++
case v := <-ch2:
fmt.Println("From ch2:", v)
count++
case v := <-ch3:
fmt.Println("From ch3:", v)
count++
}
}
}5. 同步原语
5.1 Mutex(互斥锁)
Mutex用于保护共享资源,防止多个goroutine同时访问:
go
import "sync"
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := &Counter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("Final value:", counter.Value())
}5.2 RWMutex(读写锁)
RWMutex允许多个读操作同时进行,但写操作需要独占:
go
import "sync"
type DataStore struct {
mu sync.RWMutex
data map[string]string
}
func (ds *DataStore) Get(key string) string {
ds.mu.RLock()
defer ds.mu.RUnlock()
return ds.data[key]
}
func (ds *DataStore) Set(key, value string) {
ds.mu.Lock()
defer ds.mu.Unlock()
ds.data[key] = value
}
func main() {
ds := &DataStore{
data: make(map[string]string),
}
// 写入数据
ds.Set("key1", "value1")
// 多个读操作可以同时进行
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
value := ds.Get("key1")
fmt.Printf("Goroutine %d: %s\n", i, value)
}(i)
}
wg.Wait()
}5.3 WaitGroup
WaitGroup用于等待一组goroutine完成:
go
import "sync"
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Duration(id) * 100 * time.Millisecond)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go worker(i, &wg)
}
fmt.Println("Waiting for all workers to complete")
wg.Wait()
fmt.Println("All workers completed")
}5.4 Once
Once用于确保某个函数只执行一次:
go
import "sync"
var (
once sync.Once
instance *Database
)
type Database struct {
connection string
}
func GetDatabase() *Database {
once.Do(func() {
fmt.Println("Initializing database connection")
instance = &Database{
connection: "postgres://localhost:5432/mydb",
}
})
return instance
}
func main() {
for i := 0; i < 5; i++ {
go func(i int) {
db := GetDatabase()
fmt.Printf("Goroutine %d: %s\n", i, db.connection)
}(i)
}
time.Sleep(1 * time.Second)
}5.5 Cond
Cond用于等待或宣布事件的发生:
go
import "sync"
type Queue struct {
mu sync.Mutex
cond *sync.Cond
items []int
}
func NewQueue() *Queue {
q := &Queue{
items: make([]int, 0),
}
q.cond = sync.NewCond(&q.mu)
return q
}
func (q *Queue) Enqueue(item int) {
q.mu.Lock()
defer q.mu.Unlock()
q.items = append(q.items, item)
fmt.Printf("Enqueued: %d\n", item)
q.cond.Signal() // 通知一个等待的goroutine
}
func (q *Queue) Dequeue() int {
q.mu.Lock()
defer q.mu.Unlock()
for len(q.items) == 0 {
fmt.Println("Queue is empty, waiting...")
q.cond.Wait() // 等待直到队列非空
}
item := q.items[0]
q.items = q.items[1:]
fmt.Printf("Dequeued: %d\n", item)
return item
}
func main() {
q := NewQueue()
// 启动消费者
go func() {
for i := 0; i < 3; i++ {
q.Dequeue()
}
}()
// 生产者延迟生产
time.Sleep(1 * time.Second)
q.Enqueue(1)
time.Sleep(500 * time.Millisecond)
q.Enqueue(2)
time.Sleep(500 * time.Millisecond)
q.Enqueue(3)
time.Sleep(1 * time.Second)
}5.6 Atomic
Atomic包提供了原子操作,用于无锁编程:
go
import "sync/atomic"
func main() {
var counter int64
var wg sync.WaitGroup
// 多个goroutine并发增加计数器
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1)
}()
}
wg.Wait()
fmt.Println("Final counter:", counter)
// 原子读取
value := atomic.LoadInt64(&counter)
fmt.Println("Current value:", value)
// 原子写入
atomic.StoreInt64(&counter, 100)
fmt.Println("After store:", atomic.LoadInt64(&counter))
// 原子比较交换
oldValue := atomic.LoadInt64(&counter)
newValue := oldValue + 50
swapped := atomic.CompareAndSwapInt64(&counter, oldValue, newValue)
fmt.Printf("CAS result: %v, New value: %d\n", swapped, atomic.LoadInt64(&counter))
}6. 并发模式
6.1 生产者-消费者模式
go
func producer(ch chan<- int, done chan<- bool) {
for i := 1; i <= 10; i++ {
fmt.Printf("Producing: %d\n", i)
ch <- i
time.Sleep(200 * time.Millisecond)
}
close(ch)
done <- true
}
func consumer(ch <-chan int, id int, done chan<- bool) {
for value := range ch {
fmt.Printf("Consumer %d: Consumed %d\n", id, value)
time.Sleep(500 * time.Millisecond)
}
done <- true
}
func main() {
ch := make(chan int, 5)
done := make(chan bool, 3)
// 启动生产者
go producer(ch, done)
// 启动多个消费者
for i := 1; i <= 2; i++ {
go consumer(ch, i, done)
}
// 等待所有goroutine完成
for i := 0; i < 3; i++ {
<-done
}
fmt.Println("All done")
}6.2 工作池模式
go
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(1 * time.Second) // 模拟处理时间
results <- job * 2
}
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
// 启动工作池
for i := 1; i <= numWorkers; i++ {
go worker(i, jobs, results)
}
// 发送任务
for i := 1; i <= numJobs; i++ {
jobs <- i
}
close(jobs)
// 收集结果
for i := 1; i <= numJobs; i++ {
result := <-results
fmt.Printf("Result: %d\n", result)
}
close(results)
fmt.Println("All jobs completed")
}6.3 Fan Out-Fan In模式
go
func generator(n int) <-chan int {
ch := make(chan int)
go func() {
defer close(ch)
for i := 0; i < n; i++ {
ch <- i
}
}()
return ch
}
func worker(id int, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
time.Sleep(100 * time.Millisecond)
}
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// 生成数据
in := generator(10)
// Fan Out: 多个worker处理数据
w1 := worker(1, in)
w2 := worker(2, in)
w3 := worker(3, in)
// Fan In: 合并结果
out := merge(w1, w2, w3)
// 收集结果
for n := range out {
fmt.Println(n)
}
}6.4 超时控制模式
go
func operationWithTimeout() error {
ch := make(chan error)
go func() {
// 模拟长时间操作
time.Sleep(3 * time.Second)
ch <- nil // 操作成功
}()
select {
case err := <-ch:
return err
case <-time.After(2 * time.Second):
return fmt.Errorf("operation timed out")
}
}
func main() {
err := operationWithTimeout()
if err != nil {
fmt.Println("Error:", err)
} else {
fmt.Println("Operation completed successfully")
}
}6.5 Context模式
go
import "context"
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d: Context cancelled\n", id)
return
default:
fmt.Printf("Worker %d: Working...\n", id)
time.Sleep(1 * time.Second)
}
}
}
func main() {
// 创建带超时的context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 启动多个worker
for i := 1; i <= 3; i++ {
go worker(ctx, i)
}
// 等待context超时
<-ctx.Done()
fmt.Println("Main: Context cancelled")
time.Sleep(1 * time.Second)
}7. 并发编程最佳实践
7.1 避免竞态条件
- 使用Channel通信:优先使用channel传递数据,而不是共享内存
- 使用同步原语:必要时使用Mutex、RWMutex等同步原语
- 使用Atomic操作:对于简单的计数器等,使用atomic包
- 使用Context:用于传递取消信号和截止时间
7.2 避免Goroutine泄漏
- 确保Goroutine能退出:每个goroutine都应该有明确的退出条件
- 使用context控制生命周期:通过context的Done通道接收取消信号
- 使用WaitGroup等待完成:确保所有goroutine都能完成
- 避免无限循环:循环中要有退出机制
7.3 性能优化
- 合理使用缓冲Channel:根据实际情况选择合适的缓冲区大小
- 控制Goroutine数量:避免创建过多的goroutine
- 使用工作池:对于大量任务,使用固定大小的工作池
- 减少锁竞争:使用细粒度锁,减少锁持有时间
- 使用RWMutex:对于读多写少的场景,使用RWMutex
7.4 错误处理
- 传递错误:通过channel传递错误信息
- 统一错误处理:集中处理错误
- 超时控制:为长时间操作设置超时
- 优雅退出:处理中断信号,确保程序优雅退出
7.5 代码组织
- 模块化设计:将并发逻辑封装到独立的模块中
- 明确的接口:定义清晰的接口,隐藏并发实现细节
- 文档注释:为并发代码添加详细的注释
- 测试覆盖:编写并发测试,确保代码正确性
8. 常见问题和解决方案
8.1 竞态条件
问题:多个goroutine同时修改共享资源,导致数据不一致
解决方案:
- 使用channel传递数据
- 使用Mutex或RWMutex保护共享资源
- 使用atomic包进行原子操作
8.2 Goroutine泄漏
问题:goroutine没有正确退出,导致资源泄漏
解决方案:
- 使用context控制goroutine生命周期
- 为goroutine提供退出通道
- 使用WaitGroup等待goroutine完成
- 避免在goroutine中使用无限循环
8.3 死锁
问题:两个或多个goroutine互相等待对方释放资源
解决方案:
- 避免嵌套锁
- 使用带超时的channel操作
- 确保锁的获取顺序一致
- 使用deadlock检测工具
8.4 活锁
问题:goroutine不断重试失败的操作,导致系统资源浪费
解决方案:
- 为重试操作添加退避策略
- 设置最大重试次数
- 使用随机延迟避免惊群效应
8.5 饥饿
问题:某些goroutine长时间无法获取资源
解决方案:
- 使用公平锁
- 避免长时间持有锁
- 合理设计并发算法
8.6 内存开销
问题:过多的goroutine导致内存占用过高
解决方案:
- 使用工作池控制goroutine数量
- 及时释放不再需要的资源
- 监控内存使用情况
9. 并发编程实战案例
9.1 并发Web服务器
go
package main
import (
"fmt"
"net/http"
"sync"
"time"
)
var (
requestCount int64
mu sync.Mutex
)
func helloHandler(w http.ResponseWriter, r *http.Request) {
// 模拟处理时间
time.Sleep(100 * time.Millisecond)
// 原子增加计数
atomic.AddInt64(&requestCount, 1)
fmt.Fprintf(w, "Hello, Concurrent Web Server!\n")
fmt.Fprintf(w, "Request count: %d\n", atomic.LoadInt64(&requestCount))
}
func statusHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Server status:\n")
fmt.Fprintf(w, "Request count: %d\n", atomic.LoadInt64(&requestCount))
fmt.Fprintf(w, "Time: %s\n", time.Now().Format(time.RFC3339))
}
func main() {
http.HandleFunc("/", helloHandler)
http.HandleFunc("/status", statusHandler)
fmt.Println("Server starting on port 8080...")
fmt.Println("Try: curl http://localhost:8080/")
fmt.Println("Try: curl http://localhost:8080/status")
if err := http.ListenAndServe(":8080", nil); err != nil {
fmt.Println("Error starting server:", err)
}
}9.2 并发文件处理
go
package main
import (
"bufio"
"fmt"
"os"
"path/filepath"
"sync"
)
func processFile(filePath string, wg *sync.WaitGroup, resultChan chan<- int) {
defer wg.Done()
file, err := os.Open(filePath)
if err != nil {
fmt.Printf("Error opening file %s: %v\n", filePath, err)
resultChan <- 0
return
}
defer file.Close()
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
lineCount++
}
if err := scanner.Err(); err != nil {
fmt.Printf("Error scanning file %s: %v\n", filePath, err)
resultChan <- 0
return
}
fmt.Printf("File %s: %d lines\n", filePath, lineCount)
resultChan <- lineCount
}
func main() {
rootDir := "."
fileExt := ".go"
var wg sync.WaitGroup
resultChan := make(chan int, 100)
// 遍历目录,处理符合条件的文件
err := filepath.Walk(rootDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() && filepath.Ext(path) == fileExt {
wg.Add(1)
go processFile(path, &wg, resultChan)
}
return nil
})
if err != nil {
fmt.Println("Error walking directory:", err)
return
}
// 等待所有文件处理完成
go func() {
wg.Wait()
close(resultChan)
}()
// 汇总结果
totalLines := 0
fileCount := 0
for lines := range resultChan {
if lines > 0 {
totalLines += lines
fileCount++
}
}
fmt.Printf("\nProcessed %d files\n", fileCount)
fmt.Printf("Total lines: %d\n", totalLines)
}9.3 并发爬虫
go
package main
import (
"fmt"
"io/ioutil"
"net/http"
"regexp"
"sync"
"time"
)
var (
visitedUrls = make(map[string]bool)
mu sync.Mutex
urlChan = make(chan string, 100)
wg sync.WaitGroup
)
func extractLinks(html string) []string {
var links []string
re := regexp.MustCompile(`href="(https?://[^"]+)"`)
matches := re.FindAllStringSubmatch(html, -1)
for _, match := range matches {
if len(match) > 1 {
links = append(links, match[1])
}
}
return links
}
func crawl(url string) {
defer wg.Done()
// 标记为已访问
mu.Lock()
if visitedUrls[url] {
mu.Unlock()
return
}
visitedUrls[url] = true
mu.Unlock()
fmt.Printf("Crawling: %s\n", url)
// 发送HTTP请求
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Get(url)
if err != nil {
fmt.Printf("Error crawling %s: %v\n", url, err)
return
}
defer resp.Body.Close()
// 读取响应体
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Printf("Error reading response: %v\n", err)
return
}
// 提取链接
links := extractLinks(string(body))
fmt.Printf("Found %d links on %s\n", len(links), url)
// 处理提取的链接
for _, link := range links {
mu.Lock()
if !visitedUrls[link] {
wg.Add(1)
go crawl(link)
}
mu.Unlock()
}
}
func main() {
startUrl := "https://example.com"
wg.Add(1)
go crawl(startUrl)
wg.Wait()
fmt.Printf("Crawling completed. Visited %d URLs\n", len(visitedUrls))
}10. 总结
本课程介绍了Go语言的并发编程特性,包括:
- 并发基础:并发与并行的区别,Go语言的CSP模型
- Goroutine:轻量级线程的创建、调度和注意事项
- Channel:无缓冲和缓冲channel,基本操作和使用场景
- Select:监听多个channel操作,超时处理和退出信号
- 同步原语:Mutex、RWMutex、WaitGroup、Once、Cond、Atomic
- 并发模式:生产者-消费者、工作池、Fan Out-Fan In、超时控制、Context
- 最佳实践:避免竞态条件、Goroutine泄漏、死锁等问题
- 常见问题:竞态条件、Goroutine泄漏、死锁、活锁、饥饿、内存开销
- 实战案例:并发Web服务器、并发文件处理、并发爬虫
通过本课程的学习,你已经掌握了Go语言并发编程的核心概念和实践技巧。在实际开发中,你可以根据具体场景选择合适的并发模式,编写高效、可靠的并发程序。
记住Go语言的并发哲学:"不要通过共享内存来通信,而是通过通信来共享内存"。这种方式可以帮助你避免许多传统并发编程中的问题,让并发代码更加清晰和可维护。