package spider import ( "context" "fmt" "strconv" "strings" "time" "gitea.timerzz.com/kedaya_haitao/common/pkg/cron" "gitea.timerzz.com/kedaya_haitao/common/pkg/subscribe" "gitea.timerzz.com/kedaya_haitao/common/structs/storage" "gitea.timerzz.com/kedaya_haitao/common/structs/utils" v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2" dw_sdk "gitea.timerzz.com/kedaya_haitao/dw-sdk" "github.com/golang/glog" "github.com/pkg/errors" "github.com/redis/go-redis/v9" "github.com/samber/lo" "gorm.io/gorm" ) type Controller struct { ctx context.Context cron *cron.Cron storage *storage.Storage client *dw_sdk.Client rdb *redis.Client // 订阅服务 sub *subscribe.Server sellerId v2.SellerId seller v2.Seller bidType dw_sdk.BiddingType bidClient dw_sdk.BidClient // 订阅客户端 subscribeClient *subscribe.Client ready bool } func NewController(ctx context.Context, sellerId v2.SellerId, client *dw_sdk.Client, storage *storage.Storage, rdb *redis.Client, bidType dw_sdk.BiddingType) *Controller { return &Controller{ ctx: ctx, sellerId: sellerId, client: client, storage: storage, cron: cron.NewCron(), rdb: rdb, bidType: bidType, sub: subscribe.NewServer(ctx, rdb), bidClient: client.BidClientByBidType(bidType), subscribeClient: subscribe.NewClient(rdb), } } func (c *Controller) Run() (err error) { if err = c.AutoMigrate(); err != nil { return err } if c.seller, err = c.storage.Seller().GetBySellerId(c.sellerId); err != nil { return fmt.Errorf("没有找到对应的销售商信息:%v", err) } if err = c.ListenSeller(c.ctx, c.rdb); err != nil { return errors.Wrap(err, "订阅失败") } // 定时抓取 go c.cron.SetTimeHHmm(c.seller.Config.Ticker).SetFunc(c.Crawl).Run(c.ctx) c.ready = true <-c.ctx.Done() glog.Infof("controller服务退出") return nil } // 监听配置变更 func (c *Controller) ListenSeller(ctx context.Context, rdb *redis.Client) error { server := subscribe.NewServer(ctx, rdb).SetErrorHandle(func(err error) { glog.Errorf("销售商信息更新处理失败: %v", err) }) err := server.Subscribe(utils.SellerConfigNotifyChannel(c.sellerId), c.sellerChange) if err != nil { return errors.Wrap(err, "订阅失败") } go server.Run() return nil } // 销售商信息发生变化 func (c *Controller) sellerChange(ctx context.Context, message string) error { seller, err := c.storage.Seller().GetBySellerId(c.sellerId) if err != nil { return err } // 定的拉取时间变了 if seller.Config.Ticker != c.seller.Config.Ticker { // 重新启动下定时任务 c.cron.Stop() go c.cron.SetTimeHHmm(seller.Config.Ticker).Run(c.ctx) } // 检查要不要重新计算所有sellerArticle的价格 var needUpdate = false oldProcess := make(map[uint]v2.CalculateProcess, len(c.seller.CalculateProcess)) for _, process := range c.seller.CalculateProcess { oldProcess[process.ID] = process } for _, process := range seller.CalculateProcess { old, ok := oldProcess[process.ID] if !ok { // 不存在,说明新增了 needUpdate = true break } if old.Condition != process.Condition || old.Process != process.Process { // 条件或者计算过程变了,需要更新 needUpdate = true } delete(oldProcess, process.ID) } if len(oldProcess) > 0 { needUpdate = true } c.seller.Config = seller.Config c.seller.CalculateProcess = seller.CalculateProcess if needUpdate { var results = make([]v2.SellerArticle, 0, 20) err = c.storage.SellerArticle().FindInBatches(storage.NewGetSellerArticleQuery().SetSellerId(c.sellerId), &results, func(tx *gorm.DB, batch int) error { for idx := range results { results[idx].Sell = utils.CalculateSellerPrice(append(seller.CalculateProcess, results[idx].CalculateProcess...), map[string]float64{ "originalPrice": results[idx].Sell.OriginalPrice, }) } return tx.Select("id", "sell").Save(&results).Error }) } return err } func (c *Controller) AutoMigrate() error { if err := c.storage.Article().AutoMigrate(); err != nil { return err } if err := c.storage.Seller().AutoMigrate(); err != nil { return err } return c.storage.SellerArticle().AutoMigrate() } func (c *Controller) Crawl() { glog.Infof("%s 开始抓取信息", time.Now()) // 开始拉取,修改状态 c.setSellerStatus(v2.SellerStatus_Pulling) var msgs = make([]string, 0) var results = make([]*v2.Article, 0, 20) c.storage.DB().Debug(). Preload("Sellers", "seller_id = ?", c.sellerId).Preload("Sellers.CalculateProcess"). Joins("LEFT JOIN seller_articles ON seller_articles.article_id=articles.id AND seller_articles.seller_id = ?", c.sellerId). Where("(seller_articles.exclude is null OR not seller_articles.exclude) AND (articles.exclude is null OR not articles.exclude)"). FindInBatches(&results, 20, func(tx *gorm.DB, batch int) error { glog.Infof("正在抓取第%d批数据", batch) for _, article := range results { var sArticle v2.SellerArticle if len(article.Sellers) > 0 { sArticle = article.Sellers[0] } else { var err error sArticle, err = c.MatchDWSluId(v2.SellerArticle{ ArticleID: article.ID, SellerId: c.sellerId, Pid: article.Pid, Brand: article.Brand, }) if err != nil { glog.Warning(err) } if sArticle.SkuID == "" { continue } sArticle, err = c.storage.SellerArticle().Get(storage.NewGetSellerArticleQuery().SetSellerId(c.sellerId).SetSkuId(sArticle.SkuID)) if err != nil { glog.Warning(err) continue } } if err := c.PullArticle(sArticle); err != nil { msgs = append(msgs, err.Error()) glog.Error(err.Error()) continue } // 抓取成功,利润率重新计算 if err := c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strconv.Itoa(int(article.ID))); err != nil { glog.Warningf("发布消息失败: %v", err) } } glog.Infof("完成抓取第%d批数据", batch) return nil }) // 拉取结束,修改状态 c.seller.PullAt = time.Now() if len(msgs) > 0 { c.setSellerStatus(v2.SellerStatus_Error, strings.Join(msgs, "\n")) } else { c.setSellerStatus(v2.SellerStatus_Normal) } glog.Infof("%s 抓取信息结束", time.Now()) } // 尝试重新匹配得物的skuid func (c *Controller) MatchDWSluId(article v2.SellerArticle) (v2.SellerArticle, error) { resp, err := c.client.ArticleService().BatchArticleNumber([]string{article.Pid}) if err != nil { return article, fmt.Errorf("请求pid:%s出错:%v", article.Pid, err) } if resp.Code != 200 { article.Exclude = true _ = c.storage.SellerArticle().Upsert(article) return article, fmt.Errorf("pid:%s 得物返回错误:%s", article.Pid, resp.Msg) } if len(resp.Data) == 0 { article.Exclude = true _ = c.storage.SellerArticle().Upsert(article) return article, fmt.Errorf("pid:%s 得物返回spu为0", article.Pid) } var spu *dw_sdk.Spu brandId, brandName := BrandIdMap[article.Brand], BrandNameMap[article.Brand] for _, s := range resp.Data { if s.BrandId == brandId || s.BrandName == brandName { spu = &s } } if spu == nil { article.Exclude = true _ = c.storage.SellerArticle().Upsert(article) return article, fmt.Errorf("未获取到pid : %s 的coach spu信息:%v", article.Pid, resp.Data) } if len(spu.Skus) == 0 { article.Exclude = true _ = c.storage.SellerArticle().Upsert(article) return article, fmt.Errorf("未获取到pid : %s 的coach sku信息:%v", article.Pid, spu) } article.SpuID, article.SkuID = strconv.Itoa(spu.SpuId), strconv.Itoa(spu.Skus[0].SkuId) article.Exclude = false return article, c.storage.SellerArticle().Upsert(article) } func (c *Controller) PullArticle(sArticle v2.SellerArticle) error { skuId, _ := strconv.Atoi(sArticle.SkuID) resp, err := c.bidClient.LowestPrice(skuId) if err != nil { return fmt.Errorf("请求sku:%s最低价失败:%v", sArticle.SkuID, err) } lowest, exist := lo.Find(resp.Items, func(item dw_sdk.LowestPriceItem) bool { return item.BiddingType == c.bidType }) if !exist { // 没有拿到价格,将exclude设置为true,如果后面要拉取,需要手动打开 sArticle.Exclude = true return c.storage.SellerArticle().Upsert(sArticle) } sell := utils.CalculateSellerPrice(append(c.seller.CalculateProcess, sArticle.CalculateProcess...), map[string]float64{ "originalPrice": float64(lowest.LowestPrice / 100), }) if sell.OriginalPrice != sArticle.Sell.OriginalPrice { sArticle.Sell = sell sArticle.HistoryPrice = append(sArticle.HistoryPrice, sArticle.Sell) } if err = c.storage.SellerArticle().Upsert(sArticle); err != nil { return fmt.Errorf("保存sellerArticle失败:%v", err) } return nil } // 修改保存状态 func (c *Controller) setSellerStatus(status v2.SellerStatus, msg ...string) { c.seller.Status = status if len(msg) > 0 { c.seller.Msg = msg[0] } if err := c.storage.Seller().UpdateStatus(c.seller); err != nil { glog.Errorf("更新状态失败: %v", err) } } func (c *Controller) Ready() bool { return c.ready } func (c *Controller) FetchArticlePrice(id uint) error { article, err := c.storage.SellerArticle().Get(storage.NewGetSellerArticleQuery().SetID(id)) if err != nil { return err } err = c.PullArticle(article) if err != nil { return err } // 抓取成功,利润率重新计算 if err = c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strconv.Itoa(int(article.ArticleID))); err != nil { glog.Warningf("发布消息失败: %v", err) } return nil }