156 lines
4.3 KiB
Go
156 lines
4.3 KiB
Go
package watcher
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"sync"
|
||
"time"
|
||
|
||
"gitea.timerzz.com/kedaya_haitao/coach-spider/pkg/options"
|
||
coach_client "gitea.timerzz.com/kedaya_haitao/common/pkg/coach-client"
|
||
"gitea.timerzz.com/kedaya_haitao/common/structs/storage"
|
||
v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2"
|
||
"gitea.timerzz.com/kedaya_haitao/pusher/kitex_gen/push"
|
||
"gitea.timerzz.com/kedaya_haitao/pusher/rpc/pusher"
|
||
"github.com/golang/glog"
|
||
"gorm.io/gorm"
|
||
)
|
||
|
||
type Controller struct {
|
||
ctx context.Context
|
||
m sync.RWMutex
|
||
|
||
// 要蹲货的ProviderArticle
|
||
watchers *Watchers
|
||
|
||
storage *storage.Storage
|
||
client *coach_client.US
|
||
|
||
providerId v2.ProviderId
|
||
interval time.Duration
|
||
}
|
||
|
||
func NewController(ctx context.Context, cfg *options.Config, client *coach_client.US, db *gorm.DB) *Controller {
|
||
return &Controller{
|
||
ctx: ctx,
|
||
providerId: cfg.ProviderId,
|
||
interval: cfg.WatchInterval,
|
||
client: client,
|
||
storage: storage.NewStorage(db),
|
||
watchers: NewWatchers(storage.NewStorage(db), cfg.ProviderId),
|
||
}
|
||
}
|
||
|
||
func (c *Controller) Run() (err error) {
|
||
// 加载要蹲货的ProviderArticle
|
||
if err = c.watchers.Load(); err != nil {
|
||
return
|
||
}
|
||
ticker := time.NewTicker(c.interval)
|
||
defer ticker.Stop()
|
||
for {
|
||
select {
|
||
case <-c.ctx.Done():
|
||
glog.Info("watcher 退出")
|
||
return
|
||
case <-ticker.C:
|
||
c.watchRange()
|
||
}
|
||
}
|
||
}
|
||
|
||
// 返回ready
|
||
func (c *Controller) Ready() bool {
|
||
return c.watchers.Ready()
|
||
}
|
||
|
||
// Add 添加一个要蹲货的ProviderArticle
|
||
func (c *Controller) Add(skuID string) error {
|
||
article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID))
|
||
if err != nil {
|
||
return fmt.Errorf("获取商品信息失败: %v", err)
|
||
}
|
||
article.SetWatch(true)
|
||
article.Available = false
|
||
if err = c.storage.ProviderArticle().Update(article, "watch", "available"); err != nil {
|
||
return fmt.Errorf("更新数据库失败:%v", err)
|
||
}
|
||
c.watchers.Add(&article)
|
||
return nil
|
||
}
|
||
|
||
func (c *Controller) Delete(skuID string) error {
|
||
article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID))
|
||
if err != nil {
|
||
return fmt.Errorf("获取商品信息失败: %v", err)
|
||
}
|
||
article.Watch = nil
|
||
if err = c.storage.ProviderArticle().Update(article, "watch"); err != nil {
|
||
return fmt.Errorf("更新数据库失败:%v", err)
|
||
}
|
||
c.watchers.Remove(skuID)
|
||
return nil
|
||
}
|
||
|
||
func (c *Controller) Stop(skuID string) error {
|
||
article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID))
|
||
if err != nil {
|
||
return fmt.Errorf("获取商品信息失败: %v", err)
|
||
}
|
||
article.SetWatch(false)
|
||
if err = c.storage.ProviderArticle().Update(article, "watch"); err != nil {
|
||
return fmt.Errorf("更新数据库失败:%v", err)
|
||
}
|
||
c.watchers.Remove(skuID)
|
||
return nil
|
||
}
|
||
|
||
func (c *Controller) watchRange() {
|
||
c.watchers.Range(func(watcher *Watcher) bool {
|
||
return !watcher.watching
|
||
}, func(watcher *Watcher) {
|
||
go func() {
|
||
watcher.watching = true
|
||
defer func() {
|
||
watcher.watching = false
|
||
}()
|
||
|
||
if c.doWatch(watcher) {
|
||
// 如果蹲到了,需要通知
|
||
resp, err := pusher.Push(c.ctx, &push.PushReq{
|
||
Title: "coach 补货",
|
||
Content: fmt.Sprintf("coach 商品 %s 补货\n库存:%d\n链接:%s", watcher.pArticle.SkuID, watcher.pArticle.Ats, watcher.pArticle.Link),
|
||
})
|
||
if err != nil {
|
||
glog.Errorf("消息推送失败:%v", err)
|
||
}
|
||
if resp.Code != 0 {
|
||
glog.Errorf("消息推送失败:%s", resp.Msg)
|
||
}
|
||
watcher.pArticle.SetWatch(false)
|
||
_ = c.storage.ProviderArticle().Update(*watcher.pArticle, "watch")
|
||
c.watchers.Remove(watcher.pArticle.SkuID)
|
||
}
|
||
}()
|
||
})
|
||
}
|
||
|
||
func (c *Controller) doWatch(watcher *Watcher) (available bool) {
|
||
article := watcher.pArticle
|
||
inventory, err := c.client.RequestInventory(c.ctx, article.SkuID)
|
||
if err != nil {
|
||
glog.Warningf("获取coach %s 库存失败:%v", article.SkuID, err)
|
||
return
|
||
}
|
||
article.Available = inventory.Orderable && inventory.Ats > 0
|
||
article.Ats = inventory.Ats
|
||
article.SetWatch(!article.Available)
|
||
if err = c.storage.ProviderArticle().Update(*article, "available", "ats", "updated_at"); err != nil {
|
||
glog.Errorf("更新数据库失败:%v", err)
|
||
article.SetWatch(true)
|
||
return
|
||
}
|
||
return article.Available
|
||
|
||
}
|