麻豆小视频在线观看_中文黄色一级片_久久久成人精品_成片免费观看视频大全_午夜精品久久久久久久99热浪潮_成人一区二区三区四区

首頁 > 數據庫 > Redis > 正文

關于redigo中PubSub的一點小坑分析

2020-03-17 12:19:21
字體:
來源:轉載
供稿:網友

前言

最近在用 golang 做一些 redis 相關的操作,選用了 redigo 這個第三方庫。然后在使用 Pub/Sub 的時候,卻發現了一個小……

Redis Client

首先,我們來初始化一個帶連接池的 Redis Client:

import (	"github.com/gomodule/redigo/redis")type RedisClient struct {	pool *redis.Pool}func NewRedisClient(addr string, db int, passwd string) *RedisClient {	pool := &redis.Pool{		MaxIdle:  10,		IdleTimeout: 300 * time.Second,		Dial: func() (redis.Conn, error) {			c, err := redis.Dial("tcp", addr, redis.DialPassword(passwd), redis.DialDatabase(db))			if err != nil {				return nil, err			}			return c, nil		},		TestOnBorrow: func(c redis.Conn, t time.Time) error {			if time.Since(t) < time.Minute {				return nil			}			_, err := c.Do("PING")			return err		},	}	log.Printf("new redis pool at %s", addr)	client := &RedisClient{		pool: pool,	}	return client}

Publish

然后我們可以簡單的實現一個 publish 方法:

func (r *RedisClient) Publish(channel, message string) (int, error) {	c := r.pool.Get()	defer c.Close()	n, err := redis.Int(c.Do("PUBLISH", channel, message))	if err != nil {		return 0, fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)	}	return n, nil}

Subscribe

接下來就是一個稍微復雜點的帶有心跳的 subscribe 方法:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {	psc := redis.PubSubConn{Conn: r.pool.Get()}	defer psc.Close()	log.Printf("redis pubsub subscribe channel: %v", channel)	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {		return err	}	done := make(chan error, 1)	// start a new goroutine to receive message	go func() {		for {			switch msg := psc.Receive().(type) {			case error:				done <- fmt.Errorf("redis pubsub receive err: %v", msg)				return			case redis.Message:				if err := consume(msg); err != nil {					done <- err					return				}			case redis.Subscription:				if msg.Count == 0 {					// all channels are unsubscribed					done <- nil					return				}			}		}	}()	// health check	tick := time.NewTicker(time.Minute)	defer tick.Stop()	for {		select {		case <-ctx.Done():			if err := psc.Unsubscribe(); err != nil {				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)			}			return nil		case err := <-done:			return err		case <-tick.C:			if err := psc.Ping(""); err != nil {				return err			}		}	}	return nil}

最后,我們寫一個簡單地 main 函數來調用 publish & subscribe:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {	psc := redis.PubSubConn{Conn: r.pool.Get()}	defer psc.Close()	log.Printf("redis pubsub subscribe channel: %v", channel)	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {		return err	}	done := make(chan error, 1)	// start a new goroutine to receive message	go func() {		for {			switch msg := psc.Receive().(type) {			case error:				done <- fmt.Errorf("redis pubsub receive err: %v", msg)				return			case redis.Message:				if err := consume(msg); err != nil {					done <- err					return				}			case redis.Subscription:				if msg.Count == 0 {					// all channels are unsubscribed					done <- nil					return				}			}		}	}()	// health check	tick := time.NewTicker(time.Minute)	defer tick.Stop()	for {		select {		case <-ctx.Done():			if err := psc.Unsubscribe(); err != nil {				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)			}			return nil		case err := <-done:			return err		case <-tick.C:			if err := psc.Ping(""); err != nil {				return err			}		}	}	return nil}

咋一看之下,好像并沒有什么異常?然而,如果我們這時候去看 redis 的 tcp 連接,就可以發現一些貓膩:

$sudo netstat -antp | grep redistcp  0  0 0.0.0.0:6379   0.0.0.0:*    LISTEN  940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55010  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55015  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55009  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55005  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55012  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55011  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55013  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55007  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55006  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55014  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:54972  ESTABLISHED 940/redis-server 0. 

竟然是每一次 subscribe 就新建了一個連接,而 connection pool 似乎沒有什么作用。

更進一步地調試,我們發現在 defer psc.Close() 的時候就卡住了,也就是上面的 10 個 goroutine 其實并沒有正常退出。

Concurrent

排查許久之后,終于定位到了問題!引用 redigo 的說明

Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.

For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.

也就是說,雖然一個連接可以在不同的 goroutine 并發調用 Receive() 和 Subscribe()(subscribe調用了send和flush) ,但是卻不能再有其他并發操作(比如 Close())。

其他相似的問題還可以參考 issue

Fix

知道了上面的原因之后,我們稍微修改一下 defer psc.Close() 的位置即可解決問題:

	// start a new goroutine to receive message	go func() {		// IMPORTANT!		defer psc.Close()		for {			switch msg := psc.Receive().(type) {			case error:

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對VEVB武林網的支持。


注:相關教程知識閱讀請移步到Redis頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
主站蜘蛛池模板: 99精品国产一区二区三区 | 99re久久最新地址获取 | av在线免费看网址 | 日本在线观看视频网站 | 看免费黄色大片 | 色综合视频 | 欧美一级爱操视频 | 在线看91 | 国产成人精品一区二区三区电影 | 国产成人自拍小视频 | 懂色av懂色aⅴ精彩av | 在线免费观看毛片 | 有色视频在线观看 | 国产视频在线观看一区二区三区 | 看黄在线| 国产成人小视频在线观看 | 亚洲日韩精品欧美一区二区 | 久久精片 | 欧美毛片 | 中文字幕欧美一区二区三区 | 免费视频一区 | h视频在线免费看 | 蜜桃麻豆视频 | 久久伊人精品热在75 | 久久免费视频5 | 特级西西444www大精品视频免费看 | 日本s级毛片免费观看 | 色播久久 | 宅男视频在线观看免费 | 国产午夜电影 | 全黄裸片武则天一级第4季 偿还电影免费看 | 4p嗯啊巨肉寝室调教男男视频 | 黄色一级视频 | 九九热在线视频观看 | 亚洲天堂成人在线 | 国产免费久久久久 | 日本xxxx色视频在线观看免费, | 久久精品国产精品亚洲 | 久久性生活免费视频 | 亚洲精品成人av在线 | 久久手机在线视频 |