package spider import ( "context" "fmt" "strconv" "strings" "time" coach_client "gitea.timerzz.com/kedaya_haitao/common/pkg/coach-client" "gitea.timerzz.com/kedaya_haitao/common/pkg/cron" "gitea.timerzz.com/kedaya_haitao/common/pkg/subscribe" "gitea.timerzz.com/kedaya_haitao/common/structs/storage" "gitea.timerzz.com/kedaya_haitao/common/structs/utils" v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2" "github.com/golang/glog" "github.com/pkg/errors" "github.com/redis/go-redis/v9" "github.com/samber/lo" "gorm.io/gorm" ) type Controller struct { ctx context.Context cron *cron.Cron storage *storage.Storage client *coach_client.US rdb *redis.Client sub *subscribe.Server providerId v2.ProviderId provider v2.Provider subscribeClient *subscribe.Client ready bool } func NewController(ctx context.Context, providerId v2.ProviderId, client *coach_client.US, db *gorm.DB, rdb *redis.Client) *Controller { return &Controller{ ctx: ctx, providerId: providerId, client: client, storage: storage.NewStorage(db), cron: cron.NewCron(), rdb: rdb, sub: subscribe.NewServer(ctx, rdb), subscribeClient: subscribe.NewClient(rdb), } } func (c *Controller) Run() (err error) { if err = c.AutoMigrate(); err != nil { return err } if c.provider, err = c.storage.Provider().GetByProvider(c.providerId); err != nil { return } if err = c.ListenProvider(c.ctx, c.rdb); err != nil { return errors.Wrap(err, "订阅失败") } // 定时抓取 go c.cron.SetTimeHHmm(c.provider.Config.Ticker).SetFunc(c.Crawl).Run(c.ctx) c.ready = true <-c.ctx.Done() glog.Infof("controller服务退出") return nil } // 监听配置变更 func (c *Controller) ListenProvider(ctx context.Context, rdb *redis.Client) error { server := subscribe.NewServer(ctx, rdb).SetErrorHandle(func(err error) { glog.Errorf("供应商信息更新处理失败: %v", err) }) err := server.Subscribe(utils.ProviderConfigNotifyChannel(c.providerId), c.providerChange) if err != nil { return errors.Wrap(err, "订阅失败") } go server.Run() return nil } // 供应商信息发生变化 func (c *Controller) providerChange(ctx context.Context, message string) error { provider, err := c.storage.Provider().GetByProvider(c.providerId) if err != nil { return err } // 定的拉取时间变了 if provider.Config.Ticker != c.provider.Config.Ticker { // 重新启动下定时任务 c.cron.Stop() go c.cron.SetTimeHHmm(provider.Config.Ticker).Run(c.ctx) } // 检查要不要重新计算所有providerArticle的价格 var needUpdate = false if provider.Config.ExchangeRate != c.provider.Config.ExchangeRate || provider.Config.Freight != c.provider.Config.Freight { needUpdate = true } oldProcess := make(map[uint]v2.CalculateProcess, len(c.provider.CalculateProcess)) for _, process := range c.provider.CalculateProcess { oldProcess[process.ID] = process } for _, process := range provider.CalculateProcess { old, ok := oldProcess[process.ID] if !ok { // 不存在,说明新增了 needUpdate = true break } if old.Condition != process.Condition || old.Process != process.Process { // 条件或者计算过程变了,需要更新 needUpdate = true } delete(oldProcess, process.ID) } if len(oldProcess) > 0 { needUpdate = true } c.provider.Config = provider.Config c.provider.CalculateProcess = provider.CalculateProcess if needUpdate { var results = make([]v2.ProviderArticle, 0, 20) err = c.storage.ProviderArticle().FindInBatches(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId), &results, func(tx *gorm.DB, batch int) error { for idx := range results { results[idx].Cost = utils.CalculateProviderPrice(append(provider.CalculateProcess, results[idx].CalculateProcess...), map[string]float64{ "originalPrice": results[idx].Cost.OriginalPrice, "freight": c.provider.Config.Freight, "exchangeRate": c.provider.Config.ExchangeRate, }) } return tx.Select("id", "cost").Save(&results).Error }) } return err } func (c *Controller) AutoMigrate() error { if err := c.storage.Article().AutoMigrate(); err != nil { return err } if err := c.storage.Provider().AutoMigrate(); err != nil { return err } return c.storage.ProviderArticle().AutoMigrate() } func (c *Controller) Crawl() { glog.Infof("%s 开始抓取信息", time.Now()) // 开始拉取,修改状态 c.setProviderStatus(v2.ProviderStatus_Pulling) var msgs = make([]string, 0) for page, totalPage := 1, -1; page <= totalPage || totalPage == -1; page++ { resp, err := c.client.ViewAllBags(c.ctx, page) glog.Infof("开始处理第%d页数据", page) if err != nil { msg := fmt.Sprintf("访问coach第%d页失败: %v", page, err) msgs = append(msgs, msg) glog.Error(msg) continue } totalPage = resp.PageData.TotalPages c.saveProducts(resp.PageData.Products) glog.Infof("第%d页数据保存完成", page) break } // 拉取结束,修改状态 c.provider.PullAt = time.Now() if len(msgs) > 0 { c.setProviderStatus(v2.ProviderStatus_Error, strings.Join(msgs, "\n")) } else { c.setProviderStatus(v2.ProviderStatus_Normal) } glog.Infof("%s 抓取信息结束", time.Now()) } // 对coach返回的数据进行处理保存 // TODO 如果供应商的价格比商品现在的最低价格低,那么需要通知计算商品的利润率 func (c *Controller) saveProducts(list []coach_client.Product) { for _, resp := range list { // 一个color是一个sku for _, color := range resp.Colors { article, err := c.storage.Article().Get(storage.NewGetArticleQuery().SetBrand(v2.Brand_Coach).SetPid(color.VgId)) if err != nil && !errors.As(err, &gorm.ErrRecordNotFound) { glog.Errorf("获取商品信息失败: %v", err) continue } // 如果没找到,说明没有标准商品,创建一个 if errors.As(err, &gorm.ErrRecordNotFound) { article = v2.Article{ Name: resp.Name, EnglishName: resp.Name, Pid: color.VgId, Brand: v2.Brand_Coach, Image: color.Media.Thumbnail.Src, Providers: make([]v2.ProviderArticle, 0, 1), } if err = c.storage.Article().Create(&article); err != nil { glog.Errorf("创建商品信息失败: %v", err) continue } } pArticle, idx, exist := lo.FindIndexOf(article.Providers, func(item v2.ProviderArticle) bool { return item.ProviderId == c.providerId }) // 创建供应商上坪 if !exist { pArticle = v2.ProviderArticle{ ProviderId: c.providerId, Brand: v2.Brand_Coach, Pid: color.VgId, SkuID: color.VgId, Available: color.Orderable, Link: fmt.Sprintf("%s/%s", "https://www.coachoutlet.com", color.Url), } } else { article.Providers = lo.DropByIndex(article.Providers, idx) } // 拿到现在的价格 price, _ := lo.Find(resp.VariantsOnSale, func(item coach_client.Variant) bool { return item.Id == color.VgId }) if originalPrice := price.Price.Sales.Value; originalPrice != pArticle.Cost.OriginalPrice { // 价格发生了改变 pArticle.Cost = utils.CalculateProviderPrice( append(c.provider.CalculateProcess, pArticle.CalculateProcess...), map[string]float64{ "originalPrice": originalPrice, "freight": c.provider.Config.Freight, "exchangeRate": c.provider.Config.ExchangeRate, }) pArticle.HistoryPrice = append(pArticle.HistoryPrice, pArticle.Cost) } article.Providers = append(article.Providers, pArticle) // 保存商品信息 if err = c.storage.Article().Upsert(article); err != nil { glog.Errorf("保存商品信息失败: %v", err) continue } if err = c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strconv.Itoa(int(article.ID))); err != nil { glog.Errorf("通知商品利润率失败: %v", err) } } } } // 更新某个商品的价格 func (c *Controller) FetchArticlePrice(ctx context.Context, id uint) error { pArticle, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetID(id)) if err != nil { return fmt.Errorf("获取商品信息失败: %v", err) } detail, err := c.client.RequestProductDetail(ctx, pArticle.Pid) if err != nil { return fmt.Errorf("请求商品库存失败: %v", err) } cost := utils.CalculateProviderPrice( append(c.provider.CalculateProcess, pArticle.CalculateProcess...), map[string]float64{ "originalPrice": detail.Prices.CurrentPrice, "freight": c.provider.Config.Freight, "exchangeRate": c.provider.Config.ExchangeRate, }) pArticle.Available = detail.Inventory.Orderable && detail.Inventory.Ats > 0 // 价格发生变化了, if pArticle.Cost.OriginalPrice != cost.OriginalPrice { pArticle.HistoryPrice = append(pArticle.HistoryPrice, cost) } pArticle.Cost = cost if err = c.storage.ProviderArticle().Upsert(pArticle); err != nil { return fmt.Errorf("保存商品信息失败: %v", err) } // 抓取成功,利润率重新计算 if err = c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strconv.Itoa(int(pArticle.ArticleID))); err != nil { glog.Warningf("发布消息失败: %v", err) } return nil } // 修改保存状态 func (c *Controller) setProviderStatus(status v2.ProviderStatus, msg ...string) { c.provider.Status = status if len(msg) > 0 { c.provider.Msg = msg[0] } if err := c.storage.Provider().UpdateStatus(c.provider); err != nil { glog.Errorf("更新状态失败: %v", err) } } func (c *Controller) Ready() bool { return c.ready }