feat redis subscribe

This commit is contained in:
timerzz 2024-08-29 20:38:48 +08:00
parent 95e8ec378f
commit bb8dde6361
8 changed files with 212 additions and 1 deletions

5
go.mod
View File

@ -12,8 +12,10 @@ require (
github.com/golang/glog v1.2.1
github.com/metacubex/mihomo v1.18.4
github.com/pkg/errors v0.9.1
github.com/redis/go-redis/v9 v9.6.1
github.com/samber/lo v1.39.0
github.com/timerzz/proxypool v0.0.0-20240512142241-f74bcb3534c5
gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/postgres v1.5.7
gorm.io/gorm v1.25.10
)
@ -34,11 +36,13 @@ require (
github.com/bytedance/go-tagexpr/v2 v2.9.2 // indirect
github.com/bytedance/gopkg v0.0.0-20220413063733-65bf48ffb3a7 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cloudflare/circl v1.3.6 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/cloudwego/netpoll v0.6.0 // indirect
github.com/coreos/go-iptables v0.7.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/ericlagergren/aegis v0.0.0-20230312195928-b4ce538b56f9 // indirect
github.com/ericlagergren/polyval v0.0.0-20220411101811-e25bc10ba391 // indirect
github.com/ericlagergren/siv v0.0.0-20220507050439-0b757b3aa5f1 // indirect
@ -155,6 +159,5 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/sourcemap.v1 v1.0.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.2 // indirect
)

10
go.sum
View File

@ -33,6 +33,10 @@ github.com/antchfx/xpath v1.3.0/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwq
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg=
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/bytedance/go-tagexpr/v2 v2.9.2 h1:QySJaAIQgOEDQBLS3x9BxOWrnhqu5sQ+f6HaZIxD39I=
@ -48,6 +52,8 @@ github.com/bytedance/sonic v1.11.6/go.mod h1:LysEHSvpvDySVdC2f87zGWf6CIKJcAvqab1
github.com/bytedance/sonic/loader v0.1.1 h1:c+e5Pt1k/cy5wMveRDyk2X4B9hF4g7an8N3zCYjJFNM=
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY=
github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
@ -69,6 +75,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ericlagergren/aegis v0.0.0-20230312195928-b4ce538b56f9 h1:/5RkVc9Rc81XmMyVqawCiDyrBHZbLAZgTTCqou4mwj8=
@ -285,6 +293,8 @@ github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/qtls-go1-20 v0.4.1 h1:D33340mCNDAIKBqXuAvexTNMUByrYmFYVfKfDN5nfFs=
github.com/quic-go/qtls-go1-20 v0.4.1/go.mod h1:X9Nh97ZL80Z+bX/gUXMbipO6OxdiDi58b/fMC9mAL+k=
github.com/redis/go-redis/v9 v9.6.1 h1:HHDteefn6ZkTtY5fGUE8tj8uy85AHk6zP7CpzIAM0y4=
github.com/redis/go-redis/v9 v9.6.1/go.mod h1:0C0c6ycQsdpVNQpxb1njEQIqkx5UcsM8FJCQLgE9+RA=
github.com/robertkrimen/otto v0.2.1 h1:FVP0PJ0AHIjC+N4pKCG9yCDz6LHNPCwi/GKID5pGGF0=
github.com/robertkrimen/otto v0.2.1/go.mod h1:UPwtJ1Xu7JrLcZjNWN8orJaM5n5YEtqL//farB5FlRY=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=

16
pkg/redis/init.go Normal file
View File

@ -0,0 +1,16 @@
package redis
import "github.com/redis/go-redis/v9"
func InitDefaultRedis() (*redis.Client, error) {
opt, err := LoadDefaultConfig()
if err != nil {
return nil, err
}
rdb := redis.NewClient(&redis.Options{
Addr: opt.Addr,
Password: opt.Password,
DB: opt.DB,
})
return rdb, nil
}

35
pkg/redis/option.go Normal file
View File

@ -0,0 +1,35 @@
package redis
import (
"os"
"gopkg.in/yaml.v3"
)
const (
DefaultConfigPath = "/cfg/redis.yaml"
ConfigPathEnvKey = "REDIS_CONFIG_PATH"
)
type Option struct {
Addr string `yaml:"addr"`
Password string `yaml:"password"`
DB int `yaml:"db"`
}
func LoadDefaultConfig() (*Option, error) {
path := os.Getenv(ConfigPathEnvKey)
if path == "" {
path = DefaultConfigPath
}
return LoadConfig(path)
}
func LoadConfig(path string) (*Option, error) {
var opt Option
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
return &opt, yaml.NewDecoder(f).Decode(&opt)
}

33
pkg/redis/syb_test.go Normal file
View File

@ -0,0 +1,33 @@
package redis
import (
"context"
"testing"
)
func TestInitDefaultRedis(t *testing.T) {
rdb, err := InitDefaultRedis()
if err != nil {
t.Fatal(err)
}
chanel := "/provider/id/cfg"
ctx, cancel := context.WithCancel(context.Background())
pubsub := rdb.Subscribe(ctx, chanel)
c := make(chan struct{})
// 使用完毕,记得关闭
defer pubsub.Close()
go func() {
ch := pubsub.Channel()
for msg := range ch {
t.Log(msg.Channel, msg.Payload)
c <- struct{}{}
}
}()
err = rdb.Publish(ctx, chanel, "payload").Err()
if err != nil {
t.Fatal(err)
}
cancel()
<-c
}

21
pkg/subscribe/client.go Normal file
View File

@ -0,0 +1,21 @@
package subscribe
import (
"context"
"github.com/redis/go-redis/v9"
)
type Client struct {
rdb *redis.Client
}
func NewClient(rdb *redis.Client) *Client {
return &Client{
rdb: rdb,
}
}
func (c *Client) Publish(ctx context.Context, channel string, message string) error {
return c.rdb.Publish(ctx, channel, message).Err()
}

78
pkg/subscribe/server.go Normal file
View File

@ -0,0 +1,78 @@
package subscribe
import (
"context"
"fmt"
"sync"
"github.com/redis/go-redis/v9"
)
type Server struct {
rdb *redis.Client
ctx context.Context
pubSub *redis.PubSub
// 给fm加锁
lock sync.RWMutex
fm map[string]MessageWorker
errHandle func(err error)
}
type MessageWorker func(ctx context.Context, message string) error
func NewServer(ctx context.Context, rdb *redis.Client) *Server {
return &Server{
ctx: ctx,
rdb: rdb,
fm: make(map[string]MessageWorker),
}
}
// 订阅
func (s *Server) Subscribe(channel string, f MessageWorker) error {
if f == nil {
return fmt.Errorf("message worker is nil")
}
if s.pubSub == nil {
s.pubSub = s.rdb.Subscribe(s.ctx, channel)
} else if err := s.pubSub.Subscribe(s.ctx, channel); err != nil {
return err
}
s.lock.Lock()
defer s.lock.Unlock()
s.fm[channel] = f
return nil
}
// 取消订阅
func (s *Server) Unsubscribe(channel string) error {
if s.pubSub == nil {
return nil
}
s.lock.Lock()
defer s.lock.Unlock()
delete(s.fm, channel)
return s.pubSub.Unsubscribe(s.ctx, channel)
}
func (s *Server) Run() {
ch := s.pubSub.Channel()
defer s.pubSub.Close()
for {
select {
case <-s.ctx.Done():
return
case msg := <-ch:
s.lock.RLock()
f := s.fm[msg.Channel]
s.lock.RUnlock()
if f != nil {
if err := f(s.ctx, msg.Payload); err != nil {
s.errHandle(err)
}
}
}
}
}

View File

@ -0,0 +1,15 @@
package utils
import (
"fmt"
v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2"
)
func ProviderConfigNotifyChannel(providerId v2.ProviderId) string {
return fmt.Sprintf("/provider/%s/cfg", providerId)
}
func SellerConfigNotifyChannel(sellerId v2.SellerId) string {
return fmt.Sprintf("/seller/%s/cfg", sellerId)
}