package spider import ( "context" "fmt" "net/url" "strconv" "strings" "time" coach_client "gitea.timerzz.com/kedaya_haitao/common/pkg/coach-client" "gitea.timerzz.com/kedaya_haitao/common/pkg/cron" "gitea.timerzz.com/kedaya_haitao/common/pkg/subscribe" "gitea.timerzz.com/kedaya_haitao/common/structs/storage" "gitea.timerzz.com/kedaya_haitao/common/structs/utils" v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2" "github.com/golang/glog" "github.com/pkg/errors" "github.com/redis/go-redis/v9" "github.com/samber/lo" "gorm.io/gorm" ) type Controller struct { ctx context.Context cron *cron.Cron storage *storage.Storage client coach_client.USClient rdb *redis.Client sub *subscribe.Server providerId v2.ProviderId provider v2.Provider subscribeClient *subscribe.Client ready bool } func NewController(ctx context.Context, providerId v2.ProviderId, client coach_client.USClient, db *gorm.DB, rdb *redis.Client) *Controller { return &Controller{ ctx: ctx, providerId: providerId, client: client, storage: storage.NewStorage(db), cron: cron.NewCron(), rdb: rdb, sub: subscribe.NewServer(ctx, rdb), subscribeClient: subscribe.NewClient(rdb), } } func (c *Controller) Run() (err error) { if err = c.AutoMigrate(); err != nil { return err } if c.provider, err = c.storage.Provider().GetByProvider(c.providerId); err != nil { return } if err = c.ListenProvider(c.ctx, c.rdb); err != nil { return errors.Wrap(err, "订阅失败") } // 定时抓取 go c.cron.SetTimeHHmm(c.provider.Config.Ticker).SetFunc(c.Crawl).Run(c.ctx) c.ready = true <-c.ctx.Done() glog.Infof("controller服务退出") return nil } // 监听配置变更 func (c *Controller) ListenProvider(ctx context.Context, rdb *redis.Client) error { server := subscribe.NewServer(ctx, rdb).SetErrorHandle(func(err error) { glog.Errorf("供应商信息更新处理失败: %v", err) }) err := server.Subscribe(utils.ProviderConfigNotifyChannel(c.providerId), c.providerChange) if err != nil { return errors.Wrap(err, "订阅失败") } go server.Run() return nil } // 供应商信息发生变化 func (c *Controller) providerChange(ctx context.Context, message string) error { provider, err := c.storage.Provider().GetByProvider(c.providerId) if err != nil { return err } // 定的拉取时间变了 if provider.Config.Ticker != c.provider.Config.Ticker { // 重新启动下定时任务 c.cron.Stop() go c.cron.SetTimeHHmm(provider.Config.Ticker).Run(c.ctx) } // 检查要不要重新计算所有providerArticle的价格 var needUpdate = false if provider.Config.ExchangeRate != c.provider.Config.ExchangeRate || provider.Config.Freight != c.provider.Config.Freight { needUpdate = true } oldProcess := make(map[uint]v2.CalculateProcess, len(c.provider.CalculateProcess)) for _, process := range c.provider.CalculateProcess { oldProcess[process.ID] = process } for _, process := range provider.CalculateProcess { old, ok := oldProcess[process.ID] if !ok { // 不存在,说明新增了 needUpdate = true break } if old.Condition != process.Condition || old.Process != process.Process { // 条件或者计算过程变了,需要更新 needUpdate = true } delete(oldProcess, process.ID) } if len(oldProcess) > 0 { needUpdate = true } c.provider.Config = provider.Config c.provider.CalculateProcess = provider.CalculateProcess if needUpdate { var results = make([]v2.ProviderArticle, 0, 20) err = c.storage.ProviderArticle().FindInBatches(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId), &results, func(tx *gorm.DB, batch int) error { for idx := range results { results[idx].Cost = utils.CalculateProviderPrice(append(provider.CalculateProcess, results[idx].CalculateProcess...), map[string]float64{ "originalPrice": results[idx].Cost.OriginalPrice, "freight": c.provider.Config.Freight, "exchangeRate": c.provider.Config.ExchangeRate, }) } return tx.Select("id", "cost").Save(&results).Error }) } return err } func (c *Controller) AutoMigrate() error { if err := c.storage.Article().AutoMigrate(); err != nil { return err } if err := c.storage.Provider().AutoMigrate(); err != nil { return err } return c.storage.ProviderArticle().AutoMigrate() } func (c *Controller) productsToArticles(products []coach_client.Product) (articles []v2.Article) { for _, product := range products { // 一个color是一个sku for _, color := range product.Colors { // 如果没找到,说明没有标准商品,创建一个 article := v2.Article{ Name: product.Name, EnglishName: product.Name, Pid: color.VgId, Brand: v2.Brand_Coach, Image: color.Media.Thumbnail.Src, Providers: make([]v2.ProviderArticle, 0, 1), } link, _ := url.JoinPath(c.client.BaseUrl(), color.Url) pArticle := v2.ProviderArticle{ ProviderId: c.providerId, Brand: v2.Brand_Coach, Pid: color.VgId, SkuID: color.VgId, Available: color.Orderable, Image: color.Media.Thumbnail.Src, Link: link, } // 拿到现在的价格 price, _ := lo.Find(product.VariantsOnSale, func(item coach_client.Variant) bool { return item.Id == color.VgId }) // 计算成本 pArticle.Cost = utils.CalculateProviderPrice( append(c.provider.CalculateProcess, pArticle.CalculateProcess...), map[string]float64{ "originalPrice": price.Price.Sales.Value, "freight": c.provider.Config.Freight, "exchangeRate": c.provider.Config.ExchangeRate, }) pArticle.HistoryPrice = append(pArticle.HistoryPrice, pArticle.Cost) article.Providers = append(article.Providers, pArticle) articles = append(articles, article) } } return } // TODO长时间没更新的,需要更新 func (c *Controller) Crawl() { glog.Infof("%s 开始抓取信息", time.Now()) // 开始拉取,修改状态 c.setProviderStatus(v2.ProviderStatus_Pulling) var msgs = make([]string, 0) for page, totalPage := 1, -1; page <= totalPage || totalPage == -1; page++ { resp, err := c.client.ViewAllBags(c.ctx, page) glog.Infof("开始处理第%d页数据", page) if err != nil { msg := fmt.Sprintf("访问coach第%d页失败: %v", page, err) msgs = append(msgs, msg) glog.Error(msg) continue } totalPage = resp.PageData.TotalPages c.saveProducts(c.productsToArticles(resp.PageData.Products)) glog.Infof("第%d页数据保存完成", page) break } // 拉取结束,修改状态 c.provider.PullAt = time.Now() if len(msgs) > 0 { c.setProviderStatus(v2.ProviderStatus_Error, strings.Join(msgs, "\n")) } else { c.setProviderStatus(v2.ProviderStatus_Normal) } glog.Infof("%s 抓取信息结束", time.Now()) } // 对coach返回的数据进行处理保存 func (c *Controller) saveProducts(articles []v2.Article) { for _, article := range articles { oldArticle, err := c.storage.Article().Get(storage.NewGetArticleQuery().SetBrand(v2.Brand_Coach).SetPid(article.Pid)) if err != nil && !errors.As(err, &gorm.ErrRecordNotFound) { glog.Errorf("获取商品信息失败: %v", err) continue } // 如果已经存在了,那么不需要重新创建article if err == nil { article.ID = oldArticle.ID // 查看现在有没有这个供应商的商品 oldProviderArticle, _, exist := lo.FindIndexOf(oldArticle.Providers, func(item v2.ProviderArticle) bool { return item.ProviderId == c.providerId }) // 创建供应商商品 if !exist { oldArticle.Providers = article.Providers // 保存商品信息 if err = c.storage.Article().Upsert(oldArticle); err != nil { glog.Errorf("保存商品信息失败: %v", err) continue } } else { pArticle := article.Providers[0] if oldProviderArticle.Cost.OriginalPrice != pArticle.Cost.OriginalPrice { oldProviderArticle.HistoryPrice = append(oldProviderArticle.HistoryPrice, pArticle.Cost) } oldProviderArticle.Cost = pArticle.Cost oldProviderArticle.Ats = pArticle.Ats oldProviderArticle.Available = pArticle.Available if err = c.storage.ProviderArticle().Upsert(oldProviderArticle); err != nil { glog.Errorf("保存供应商商品信息失败: %v", err) continue } } } else { // 如果article不存在,那么保存整个article // 保存商品信息 if err = c.storage.Article().Upsert(article); err != nil { glog.Errorf("保存商品信息失败: %v", err) continue } if err = c.storage.DB().Model(&v2.Article{}).Where("pid = ? and brand = ?", article.Pid, article.Brand).Select("id").First(&article.ID).Error; err != nil { glog.Errorf("获取商品ID失败: %v", err) continue } } if err = c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strconv.Itoa(int(article.ID))); err != nil { glog.Errorf("通知商品利润率失败: %v", err) } } } func (c *Controller) FetchArticleDetail(ctx context.Context, pid string) error { old, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetPid(pid)) if err == nil { return nil } if !errors.As(err, &gorm.ErrRecordNotFound) { return err } resp, err := c.client.RequestProductDetail(ctx, pid) if err != nil { return fmt.Errorf("请求商品信息失败: %v", err) } article := v2.Article{ Name: resp.Name, EnglishName: resp.Name, Pid: resp.Id, Brand: v2.Brand_Coach, } if len(resp.ImageGroups) > 0 { article.Image = resp.ImageGroups[0].Images[0].Src } link, _ := url.JoinPath(c.client.BaseUrl(), resp.Url) pArticle := v2.ProviderArticle{ ProviderId: c.providerId, Brand: v2.Brand_Coach, Pid: pid, SkuID: pid, Available: resp.Inventory.Orderable, Ats: resp.Inventory.Ats, Image: article.Image, Link: link, Cost: utils.CalculateProviderPrice( append(c.provider.CalculateProcess, old.CalculateProcess...), map[string]float64{ "originalPrice": resp.Prices.CurrentPrice, "freight": c.provider.Config.Freight, "exchangeRate": c.provider.Config.ExchangeRate, }), } pArticle.HistoryPrice = append(pArticle.HistoryPrice, pArticle.Cost) article.Providers = append(article.Providers, pArticle) c.saveProducts([]v2.Article{article}) return nil } func (c *Controller) FetchArticleAts(ctx context.Context, pid string) error { pArticle, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetPid(pid)) if err != nil { return fmt.Errorf("获取商品信息失败: %v", err) } inv, err := c.client.RequestInventory(ctx, pid) if err != nil { return fmt.Errorf("请求商品库存失败: %v", err) } pArticle.Ats = inv.Ats return c.storage.ProviderArticle().Update(pArticle, "ats") } func (c *Controller) GetArticleAts(ctx context.Context, pid string) (int, error) { inv, err := c.client.RequestInventory(ctx, pid) if err != nil { return 0, fmt.Errorf("请求商品库存失败: %v", err) } return inv.Ats, nil } // 更新某个商品的价格 func (c *Controller) FetchArticlePrice(ctx context.Context, id uint) error { pArticle, err := c.storage.ProviderArticle().Get(storage.NewGetProviderArticleQuery().SetProviderId(c.providerId).SetID(id)) if err != nil { return fmt.Errorf("获取商品信息失败: %v", err) } detail, err := c.client.RequestProductDetail(ctx, pArticle.Pid) if err != nil { return fmt.Errorf("请求商品库存失败: %v", err) } cost := utils.CalculateProviderPrice( append(c.provider.CalculateProcess, pArticle.CalculateProcess...), map[string]float64{ "originalPrice": detail.Prices.CurrentPrice, "freight": c.provider.Config.Freight, "exchangeRate": c.provider.Config.ExchangeRate, }) pArticle.Available = detail.Inventory.Orderable && detail.Inventory.Ats > 0 // 价格发生变化了, if pArticle.Cost.OriginalPrice != cost.OriginalPrice { pArticle.HistoryPrice = append(pArticle.HistoryPrice, cost) } pArticle.Cost = cost if err = c.storage.ProviderArticle().Upsert(pArticle); err != nil { return fmt.Errorf("保存商品信息失败: %v", err) } // 抓取成功,利润率重新计算 if err = c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strconv.Itoa(int(pArticle.ArticleID))); err != nil { glog.Warningf("发布消息失败: %v", err) } return nil } // 修改保存状态 func (c *Controller) setProviderStatus(status v2.ProviderStatus, msg ...string) { c.provider.Status = status if len(msg) > 0 { c.provider.Msg = msg[0] } if err := c.storage.Provider().UpdateStatus(c.provider); err != nil { glog.Errorf("更新状态失败: %v", err) } } func (c *Controller) Ready() bool { return c.ready }