pusher/pushers/controller.go
timerzz c16e02a7ea
All checks were successful
Build image / build (push) Successful in 1m21s
feat push可以不传id
2024-11-29 21:53:59 +08:00

87 lines
1.9 KiB
Go

package pushers
import (
"context"
"fmt"
"sync"
"gitea.timerzz.com/kedaya_haitao/pusher/biz/dal/postgres"
"gitea.timerzz.com/kedaya_haitao/pusher/kitex_gen/config"
"github.com/bytedance/sonic"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
)
func Init() {
c = NewController()
}
func Push(ctx context.Context, ids []int64, title, content string) error {
return c.Push(ctx, ids, title, content)
}
type Pusher interface {
Push(ctx context.Context, title, content string) error
}
type Controller struct {
pushers map[int64]Pusher
l sync.RWMutex
}
func NewController() *Controller {
var pushers []config.PusherConfig
_ = postgres.DB.Find(&pushers)
ctl := &Controller{
pushers: make(map[int64]Pusher, len(pushers)),
}
for _, pusher := range pushers {
c.pushers[pusher.Id] = NewPusher(pusher)
}
fmt.Printf("共%d个pusher\n", len(pushers))
return ctl
}
var c *Controller
func (c *Controller) Push(ctx context.Context, ids []int64, title, content string) error {
wg, sub := errgroup.WithContext(ctx)
if len(ids) == 0 {
c.l.RLock()
ids = maps.Keys(c.pushers)
c.l.RUnlock()
}
for _, id := range ids {
var i = id
wg.Go(func() error {
if pusher, ok := c.pushers[i]; ok {
return pusher.Push(sub, title, content)
} else {
var cfg config.PusherConfig
if err := postgres.DB.Where("id = ?", i).Find(&cfg).Error; err != nil {
return err
}
c.l.Lock()
c.pushers[cfg.Id] = NewPusher(cfg)
c.l.Unlock()
return c.pushers[cfg.Id].Push(sub, title, content)
}
})
}
return wg.Wait()
}
func NewPusher(cfg config.PusherConfig) Pusher {
switch cfg.Type {
case config.PusherConfigType_AnPush:
var opt config.AnPush
_ = sonic.UnmarshalString(cfg.Option, &opt)
return NewAnPush(&opt)
case config.PusherConfigType_Email:
var opt config.EmailPush
_ = sonic.UnmarshalString(cfg.Option, &opt)
return NewEmailPusher(&opt)
}
return nil
}