package watcher import ( "context" "fmt" "sync" "time" "gitea.timerzz.com/kedaya_haitao/coach-spider/pkg/options" coach_client "gitea.timerzz.com/kedaya_haitao/common/pkg/coach-client" "gitea.timerzz.com/kedaya_haitao/common/structs/storage" v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2" "gitea.timerzz.com/kedaya_haitao/pusher/kitex_gen/push" "gitea.timerzz.com/kedaya_haitao/pusher/rpc/pusher" "github.com/golang/glog" "gorm.io/gorm" ) type Controller struct { ctx context.Context m sync.RWMutex // 要蹲货的ProviderArticle watchers *Watchers storage *storage.Storage client coach_client.USClient providerId v2.ProviderId interval time.Duration } func NewController(ctx context.Context, cfg *options.Config, client coach_client.USClient, db *gorm.DB) *Controller { return &Controller{ ctx: ctx, providerId: cfg.ProviderId, interval: cfg.WatchInterval, client: client, storage: storage.NewStorage(db), watchers: NewWatchers(storage.NewStorage(db), cfg.ProviderId), } } func (c *Controller) Run() (err error) { // 加载要蹲货的ProviderArticle if err = c.watchers.Load(); err != nil { return } ticker := time.NewTicker(c.interval) defer ticker.Stop() for { select { case <-c.ctx.Done(): glog.Info("watcher 退出") return case <-ticker.C: c.watchRange() } } } // 返回ready func (c *Controller) Ready() bool { return c.watchers.Ready() } // Add 添加一个要蹲货的ProviderArticle func (c *Controller) Add(skuID string) error { article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID)) if err != nil { return fmt.Errorf("获取商品信息失败: %v", err) } article.SetWatch(true) article.Available = false if err = c.storage.ProviderArticle().Update(article, "watch", "available"); err != nil { return fmt.Errorf("更新数据库失败:%v", err) } c.watchers.Add(&article) return nil } func (c *Controller) Delete(skuID string) error { article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID)) if err != nil { return fmt.Errorf("获取商品信息失败: %v", err) } article.Watch = nil if err = c.storage.ProviderArticle().Update(article, "watch"); err != nil { return fmt.Errorf("更新数据库失败:%v", err) } c.watchers.Remove(skuID) return nil } func (c *Controller) Stop(skuID string) error { article, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetSkuId(skuID)) if err != nil { return fmt.Errorf("获取商品信息失败: %v", err) } article.SetWatch(false) if err = c.storage.ProviderArticle().Update(article, "watch"); err != nil { return fmt.Errorf("更新数据库失败:%v", err) } c.watchers.Remove(skuID) return nil } func (c *Controller) watchRange() { c.watchers.Range(func(watcher *Watcher) bool { return !watcher.watching }, func(watcher *Watcher) { go func() { watcher.watching = true defer func() { watcher.watching = false }() if c.doWatch(watcher) { // 如果蹲到了,需要通知 resp, err := pusher.Push(c.ctx, &push.PushReq{ Title: "coach 补货", Content: fmt.Sprintf("coach 商品 %s 补货\n库存:%d\n链接:%s", watcher.pArticle.SkuID, watcher.pArticle.Ats, watcher.pArticle.Link), }) if err != nil { glog.Errorf("消息推送失败:%v", err) } if resp.Code != 0 { glog.Errorf("消息推送失败:%s", resp.Msg) } watcher.pArticle.SetWatch(false) _ = c.storage.ProviderArticle().Update(*watcher.pArticle, "watch") c.watchers.Remove(watcher.pArticle.SkuID) } }() }) } func (c *Controller) doWatch(watcher *Watcher) (available bool) { article := watcher.pArticle inventory, err := c.client.RequestInventory(c.ctx, article.SkuID) if err != nil { glog.Warningf("获取coach %s 库存失败:%v", article.SkuID, err) return } article.Available = inventory.Orderable && inventory.Ats > 0 article.Ats = inventory.Ats article.SetWatch(!article.Available) if err = c.storage.ProviderArticle().Update(*article, "available", "ats", "updated_at"); err != nil { glog.Errorf("更新数据库失败:%v", err) article.SetWatch(true) return } return article.Available }