sync 标准库是 Go 语言中用于实现同步原语的包例如互斥锁(sync.Mutex)、读写锁(sync.RWMutex)和同步组(sync.WaitGroup)

使用sync.Cond等待任务空闲

type Queue struct {
	mu    *sync.Mutex
	cond  *sync.Cond
	jobs  []func() error
	limit int
}

func (q *Queue) push(j func() error) {
	q.mu.Lock()
	if len(q.jobs) >= q.limit {
		fmt.Println("Job limit exceeded, waiting for idle")
		q.cond.Wait()
	}
	q.jobs = append(q.jobs, j)
	q.mu.Unlock()
}

func (q *Queue) pop() func() error {
	q.mu.Lock()
	defer q.mu.Unlock()
	if len(q.jobs) == 0 {
		return nil
	}
	j := q.jobs[0]
	q.cond.Signal()
	return j
}

func New(limit int) Queue {
	var mu sync.Mutex
	return Queue{
		mu:    &mu,
		cond:  sync.NewCond(&mu),
		jobs:  nil,
		limit: limit,
	}
}

func main() {
	queue := New(1)
	wg1 := sync.WaitGroup{} //producer
	for i := 0; i < 10; i++ {
		wg1.Add(1)
		go func() {
			wg1.Done()
			j := func() error {
				fmt.Println(fmt.Sprintf("job run at %d", time.Now().Unix()))
				return nil
			}
			queue.push(j) //add job
		}()
	}

	wg2 := sync.WaitGroup{} //consumer

	for i := 0; i < 10; i++ {
		wg2.Add(1)
		go func() {
			defer wg2.Done()
			f := queue.pop() //get job
			f()              //exec job
		}()
	}

	wg1.Wait() //wait to exit
	wg2.Wait() //wait to exit
}

push: job数量达到限制的最大值,调用cond.Wait()使goroutine阻塞,等待条件满足.

pop: 在切片切片里面读取到一个job,此时job数量小于限制的最大值,调用cond.Signalcond.Broadcast让push继续工作.

必须在独立的协程里面进程操作,否则可能导致死锁


使用sync.Map实现线程安全map

func main() {
	s := sync.Map{}

	s.Store("key1", "hello")    //写入key val

	s.Store("key2", "world")

	v, ok := s.Load("key1")     //读取key的值,如果key不存在的时候ok = false

	if ok {
		fmt.Println(v)
	}

	_, ok = s.Load("key3")

	if !ok {
		s.LoadOrStore("key3", "val3")   //存在则获取key的值,否则就创建并返回  
	}

	fmt.Println(ok)

	s.Range(func(key, value any) bool { //遍历map, 返回false的时候会中断range
		fmt.Println(key, value)
		return true
	})

	s.LoadAndDelete("key3") //读取key的值,如果存在就返回并删除,否则返回nil
}

map key,value 都是any(interface)结构可以传入任意类型


使用sync.Mutex互斥锁

type visits struct {
	mu    sync.Mutex
	count int
}

func (v *visits) Inc() {
	v.mu.Lock()
	v.count += 1
	v.mu.Unlock()
}

func (v *visits) Dec() {
	v.mu.Lock()
	v.count -= 1
	v.mu.Unlock()

	v.mu.TryLock()
}

func (v *visits) Do(f func(v *visits) error) error {
	if v.mu.TryLock() {
		v.mu.Unlock()
		err := f(v)
		if err != nil {
			return err
		}
		return nil
	}
	return errors.New("failed to acquire lock")
}

func main() {
	vis := visits{}

	_ = vis.Do(func(v *visits) error {
		v.count += 1
		return nil
	})

	vis.Inc()

	vis.Dec()

	fmt.Println(vis.count)
}

sync.Mutex.Lock 调用锁,如果已锁定协程会进入阻塞,需配合Unlock使用,否则会造成死锁

sync.Mutex.Unlock 解锁,如果没有被锁定会抛出unlock of unlocked mutex异常

sync.Mutex.TryLock 尝试加锁,如果成功,则返回 true;否则返回 false,获取锁之后需使用Unlock释放锁


sync.Once保证只执行一次

type Singleton struct {
	argv []any
}

var instance *Singleton

var once sync.Once

func (s *Singleton) Greet() {
	for _, v := range s.argv {
		if reflect.String == reflect.TypeOf(v).Kind() {
			fmt.Printf("%s", v)
		}
	}
}

func GetInstance(argv ...any) *Singleton {
	once.Do(func() {
		instance = &Singleton{
			argv: argv,
		}
	})

	return instance
}

func main() {

	wg := sync.WaitGroup{}

	wg.Add(2)

	for i := 0; i < 2; i++ {
		GetInstance("hello", " ", "world", "!", " ", fmt.Sprintf("%d", i), "\n").Greet()
	}
}

使用sync.Once.Do保证Singleton只初始化一次


使用sync.Pool复用对象

type MyObject struct {
	Timestamp time.Time
}

func createObject() *MyObject {
	return &MyObject{
		Timestamp: time.Now(),
	}
}

func main() {
	objPool := sync.Pool{
		New: func() interface{} {
			fmt.Println("Creating a new object...")
			return createObject()
		},
	}

	// Get an object from the pool
	obj1 := objPool.Get().(*MyObject)

	fmt.Println(obj1)
}

每次调用sync.Pool.Get的时候会触发New方法


sync.RWMutex读写锁

type User struct {
	mu    sync.RWMutex
	point int
}

var person = User{}

func (u *User) GetPoint() int {
	u.mu.RLock()
	p := u.point
	time.Sleep(time.Second * 1)
	u.mu.RUnlock()
	return p
}

func (u *User) IncPoint(point int) {
	u.mu.Lock()
	u.point += point
	time.Sleep(time.Second * 1)
	u.mu.Unlock()
}

func (u *User) Change(f func(u *User)) {
	if ok := u.mu.TryLock(); ok {
		f(u)
		u.mu.Unlock()
	}
}

func main() {
	wg := sync.WaitGroup{}

	for i := 0; i < 10; i++ {
		wg.Add(1)

		go func() {
			defer wg.Done()

			fmt.Printf("point %d\n", person.GetPoint())
		}()
	}

	for i := 1; i < 5; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			person.IncPoint(i)
		}(i)
	}

	wg.Wait()

	person.Change(func(u *User) {
		u.point = 100
	})

	fmt.Println(person.point)
}

sync.RWMutex.RLock: 锁定读操作,会阻止写操作进行,但允许其他读操作同时进行

sync.RWMutex.RUnlock: 解锁读操作,释放后等待的写操作获得锁并写入

sync.RWMutex.Lock: 锁定写操作,会阻止其他读写读写操作

sync.RWMutex.Lock: 解锁写操作,允许等待的读写操作获得锁并执行

sync.RWMutex.TryLock: 尝试获得read、write锁,获取成功返回true,需要手动释放锁


sync.WaitGroup

func main() {
	wg := sync.WaitGroup{}

	wg.Add(2)

	go func() {
		defer wg.Done()
		time.Sleep(time.Second)
		fmt.Println("co 1")
	}()

	go func() {
		defer wg.Done()
		time.Sleep(time.Second * 2)
		fmt.Println("co 2")
	}()

	wg.Wait()
}