⭐sync
标准库是 Go 语言中用于实现同步原语的包例如互斥锁(sync.Mutex
)、读写锁(sync.RWMutex
)和同步组(sync.WaitGroup
)
使用sync.Cond
等待任务空闲
type Queue struct {
mu *sync.Mutex
cond *sync.Cond
jobs []func() error
limit int
}
func (q *Queue) push(j func() error) {
q.mu.Lock()
if len(q.jobs) >= q.limit {
fmt.Println("Job limit exceeded, waiting for idle")
q.cond.Wait()
}
q.jobs = append(q.jobs, j)
q.mu.Unlock()
}
func (q *Queue) pop() func() error {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.jobs) == 0 {
return nil
}
j := q.jobs[0]
q.cond.Signal()
return j
}
func New(limit int) Queue {
var mu sync.Mutex
return Queue{
mu: &mu,
cond: sync.NewCond(&mu),
jobs: nil,
limit: limit,
}
}
func main() {
queue := New(1)
wg1 := sync.WaitGroup{} //producer
for i := 0; i < 10; i++ {
wg1.Add(1)
go func() {
wg1.Done()
j := func() error {
fmt.Println(fmt.Sprintf("job run at %d", time.Now().Unix()))
return nil
}
queue.push(j) //add job
}()
}
wg2 := sync.WaitGroup{} //consumer
for i := 0; i < 10; i++ {
wg2.Add(1)
go func() {
defer wg2.Done()
f := queue.pop() //get job
f() //exec job
}()
}
wg1.Wait() //wait to exit
wg2.Wait() //wait to exit
}
push: job数量达到限制的最大值,调用cond.Wait()
使goroutine阻塞,等待条件满足.
pop: 在切片切片里面读取到一个job,此时job数量小于限制的最大值,调用cond.Signal
或cond.Broadcast
让push继续工作.
必须在独立的协程里面进程操作,否则可能导致死锁
使用sync.Map
实现线程安全map
func main() {
s := sync.Map{}
s.Store("key1", "hello") //写入key val
s.Store("key2", "world")
v, ok := s.Load("key1") //读取key的值,如果key不存在的时候ok = false
if ok {
fmt.Println(v)
}
_, ok = s.Load("key3")
if !ok {
s.LoadOrStore("key3", "val3") //存在则获取key的值,否则就创建并返回
}
fmt.Println(ok)
s.Range(func(key, value any) bool { //遍历map, 返回false的时候会中断range
fmt.Println(key, value)
return true
})
s.LoadAndDelete("key3") //读取key的值,如果存在就返回并删除,否则返回nil
}
map key,value 都是any(interface)结构可以传入任意类型
使用sync.Mutex
互斥锁
type visits struct {
mu sync.Mutex
count int
}
func (v *visits) Inc() {
v.mu.Lock()
v.count += 1
v.mu.Unlock()
}
func (v *visits) Dec() {
v.mu.Lock()
v.count -= 1
v.mu.Unlock()
v.mu.TryLock()
}
func (v *visits) Do(f func(v *visits) error) error {
if v.mu.TryLock() {
v.mu.Unlock()
err := f(v)
if err != nil {
return err
}
return nil
}
return errors.New("failed to acquire lock")
}
func main() {
vis := visits{}
_ = vis.Do(func(v *visits) error {
v.count += 1
return nil
})
vis.Inc()
vis.Dec()
fmt.Println(vis.count)
}
sync.Mutex.Lock
调用锁,如果已锁定协程会进入阻塞,需配合Unlock
使用,否则会造成死锁
sync.Mutex.Unlock
解锁,如果没有被锁定会抛出unlock of unlocked mutex异常
sync.Mutex.TryLock
尝试加锁,如果成功,则返回 true;否则返回 false,获取锁之后需使用Unlock
释放锁
sync.Once保证只执行一次
type Singleton struct {
argv []any
}
var instance *Singleton
var once sync.Once
func (s *Singleton) Greet() {
for _, v := range s.argv {
if reflect.String == reflect.TypeOf(v).Kind() {
fmt.Printf("%s", v)
}
}
}
func GetInstance(argv ...any) *Singleton {
once.Do(func() {
instance = &Singleton{
argv: argv,
}
})
return instance
}
func main() {
wg := sync.WaitGroup{}
wg.Add(2)
for i := 0; i < 2; i++ {
GetInstance("hello", " ", "world", "!", " ", fmt.Sprintf("%d", i), "\n").Greet()
}
}
使用sync.Once.Do
保证Singleton只初始化一次
使用sync.Pool
复用对象
type MyObject struct {
Timestamp time.Time
}
func createObject() *MyObject {
return &MyObject{
Timestamp: time.Now(),
}
}
func main() {
objPool := sync.Pool{
New: func() interface{} {
fmt.Println("Creating a new object...")
return createObject()
},
}
// Get an object from the pool
obj1 := objPool.Get().(*MyObject)
fmt.Println(obj1)
}
每次调用sync.Pool.Get
的时候会触发New方法
sync.RWMutex
读写锁
type User struct {
mu sync.RWMutex
point int
}
var person = User{}
func (u *User) GetPoint() int {
u.mu.RLock()
p := u.point
time.Sleep(time.Second * 1)
u.mu.RUnlock()
return p
}
func (u *User) IncPoint(point int) {
u.mu.Lock()
u.point += point
time.Sleep(time.Second * 1)
u.mu.Unlock()
}
func (u *User) Change(f func(u *User)) {
if ok := u.mu.TryLock(); ok {
f(u)
u.mu.Unlock()
}
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Printf("point %d\n", person.GetPoint())
}()
}
for i := 1; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
person.IncPoint(i)
}(i)
}
wg.Wait()
person.Change(func(u *User) {
u.point = 100
})
fmt.Println(person.point)
}
sync.RWMutex.RLock
: 锁定读操作,会阻止写操作进行,但允许其他读操作同时进行
sync.RWMutex.RUnlock
: 解锁读操作,释放后等待的写操作获得锁并写入
sync.RWMutex.Lock
: 锁定写操作,会阻止其他读写读写操作
sync.RWMutex.Lock
: 解锁写操作,允许等待的读写操作获得锁并执行
sync.RWMutex.TryLock
: 尝试获得read、write锁,获取成功返回true,需要手动释放锁
sync.WaitGroup
func main() {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
time.Sleep(time.Second)
fmt.Println("co 1")
}()
go func() {
defer wg.Done()
time.Sleep(time.Second * 2)
fmt.Println("co 2")
}()
wg.Wait()
}