309 lines
9.4 KiB
Go
309 lines
9.4 KiB
Go
package spider
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"gitea.timerzz.com/kedaya_haitao/common/pkg/cron"
|
||
"gitea.timerzz.com/kedaya_haitao/common/pkg/subscribe"
|
||
"gitea.timerzz.com/kedaya_haitao/common/structs/storage"
|
||
"gitea.timerzz.com/kedaya_haitao/common/structs/utils"
|
||
v2 "gitea.timerzz.com/kedaya_haitao/common/structs/v2"
|
||
dw_sdk "gitea.timerzz.com/kedaya_haitao/dw-sdk"
|
||
"github.com/golang/glog"
|
||
"github.com/pkg/errors"
|
||
"github.com/redis/go-redis/v9"
|
||
"github.com/samber/lo"
|
||
"gorm.io/gorm"
|
||
)
|
||
|
||
type Controller struct {
|
||
ctx context.Context
|
||
cron *cron.Cron
|
||
storage *storage.Storage
|
||
client *dw_sdk.Client
|
||
rdb *redis.Client
|
||
// 订阅服务
|
||
sub *subscribe.Server
|
||
sellerId v2.SellerId
|
||
seller v2.Seller
|
||
bidType dw_sdk.BiddingType
|
||
bidClient dw_sdk.BidClient
|
||
// 订阅客户端
|
||
subscribeClient *subscribe.Client
|
||
|
||
ready bool
|
||
}
|
||
|
||
func NewController(ctx context.Context, sellerId v2.SellerId, client *dw_sdk.Client, storage *storage.Storage, rdb *redis.Client, bidType dw_sdk.BiddingType) *Controller {
|
||
return &Controller{
|
||
ctx: ctx,
|
||
sellerId: sellerId,
|
||
client: client,
|
||
storage: storage,
|
||
cron: cron.NewCron(),
|
||
rdb: rdb,
|
||
bidType: bidType,
|
||
sub: subscribe.NewServer(ctx, rdb),
|
||
bidClient: client.BidClientByBidType(bidType),
|
||
subscribeClient: subscribe.NewClient(rdb),
|
||
}
|
||
}
|
||
|
||
func (c *Controller) Run() (err error) {
|
||
if err = c.AutoMigrate(); err != nil {
|
||
return err
|
||
}
|
||
if c.seller, err = c.storage.Seller().GetBySellerId(c.sellerId); err != nil {
|
||
return fmt.Errorf("没有找到对应的销售商信息:%v", err)
|
||
}
|
||
if err = c.ListenSeller(c.ctx, c.rdb); err != nil {
|
||
return errors.Wrap(err, "订阅失败")
|
||
}
|
||
// 定时抓取
|
||
go c.cron.SetTimeHHmm(c.seller.Config.Ticker).SetFunc(c.Crawl).Run(c.ctx)
|
||
c.ready = true
|
||
<-c.ctx.Done()
|
||
glog.Infof("controller服务退出")
|
||
return nil
|
||
}
|
||
|
||
// 监听配置变更
|
||
func (c *Controller) ListenSeller(ctx context.Context, rdb *redis.Client) error {
|
||
server := subscribe.NewServer(ctx, rdb).SetErrorHandle(func(err error) {
|
||
glog.Errorf("销售商信息更新处理失败: %v", err)
|
||
})
|
||
err := server.Subscribe(utils.SellerConfigNotifyChannel(c.sellerId), c.sellerChange)
|
||
if err != nil {
|
||
return errors.Wrap(err, "订阅失败")
|
||
}
|
||
go server.Run()
|
||
return nil
|
||
}
|
||
|
||
// 销售商信息发生变化
|
||
func (c *Controller) sellerChange(ctx context.Context, message string) error {
|
||
seller, err := c.storage.Seller().GetBySellerId(c.sellerId)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
// 定的拉取时间变了
|
||
if seller.Config.Ticker != c.seller.Config.Ticker {
|
||
// 重新启动下定时任务
|
||
c.cron.Stop()
|
||
go c.cron.SetTimeHHmm(seller.Config.Ticker).Run(c.ctx)
|
||
}
|
||
|
||
// 检查要不要重新计算所有sellerArticle的价格
|
||
var needUpdate = false
|
||
|
||
oldProcess := make(map[uint]v2.CalculateProcess, len(c.seller.CalculateProcess))
|
||
for _, process := range c.seller.CalculateProcess {
|
||
oldProcess[process.ID] = process
|
||
}
|
||
for _, process := range seller.CalculateProcess {
|
||
old, ok := oldProcess[process.ID]
|
||
if !ok {
|
||
// 不存在,说明新增了
|
||
needUpdate = true
|
||
break
|
||
}
|
||
if old.Condition != process.Condition || old.Process != process.Process {
|
||
// 条件或者计算过程变了,需要更新
|
||
needUpdate = true
|
||
}
|
||
delete(oldProcess, process.ID)
|
||
}
|
||
if len(oldProcess) > 0 {
|
||
needUpdate = true
|
||
}
|
||
c.seller.Config = seller.Config
|
||
c.seller.CalculateProcess = seller.CalculateProcess
|
||
|
||
if needUpdate {
|
||
var results = make([]v2.SellerArticle, 0, 20)
|
||
err = c.storage.SellerArticle().FindInBatches(storage.NewGetSellerArticleQuery().SetSellerId(c.sellerId), &results, func(tx *gorm.DB, batch int) error {
|
||
for idx := range results {
|
||
results[idx].Sell = utils.CalculateSellerPrice(append(seller.CalculateProcess, results[idx].CalculateProcess...), map[string]float64{
|
||
"originalPrice": results[idx].Sell.OriginalPrice,
|
||
})
|
||
}
|
||
return tx.Select("id", "sell").Save(&results).Error
|
||
})
|
||
}
|
||
return err
|
||
}
|
||
|
||
func (c *Controller) AutoMigrate() error {
|
||
if err := c.storage.Article().AutoMigrate(); err != nil {
|
||
return err
|
||
}
|
||
if err := c.storage.Seller().AutoMigrate(); err != nil {
|
||
return err
|
||
}
|
||
return c.storage.SellerArticle().AutoMigrate()
|
||
}
|
||
|
||
func (c *Controller) Crawl() {
|
||
glog.Infof("%s 开始抓取信息", time.Now())
|
||
// 开始拉取,修改状态
|
||
c.setSellerStatus(v2.SellerStatus_Pulling)
|
||
|
||
var msgs = make([]string, 0)
|
||
var results = make([]*v2.Article, 0, 20)
|
||
c.storage.DB().Debug().
|
||
Preload("Sellers", "seller_id = ?", c.sellerId).Preload("Sellers.CalculateProcess").
|
||
Joins("LEFT JOIN seller_articles ON seller_articles.article_id=articles.id AND seller_articles.seller_id = ?", c.sellerId).
|
||
Where("(seller_articles.exclude is null OR not seller_articles.exclude) AND (articles.exclude is null OR not articles.exclude)").
|
||
FindInBatches(&results, 20, func(tx *gorm.DB, batch int) error {
|
||
glog.Infof("正在抓取第%d批数据", batch)
|
||
for _, article := range results {
|
||
var sArticle v2.SellerArticle
|
||
if len(article.Sellers) > 0 {
|
||
sArticle = article.Sellers[0]
|
||
} else {
|
||
var err error
|
||
sArticle, err = c.MatchDWSluId(v2.SellerArticle{
|
||
ArticleID: article.ID,
|
||
SellerId: c.sellerId,
|
||
Pid: article.Pid,
|
||
Brand: article.Brand,
|
||
})
|
||
if err != nil {
|
||
glog.Warning(err)
|
||
}
|
||
|
||
if sArticle.SkuID == "" {
|
||
continue
|
||
}
|
||
sArticle, err = c.storage.SellerArticle().Get(storage.NewGetSellerArticleQuery().SetSellerId(c.sellerId).SetSkuId(sArticle.SkuID))
|
||
if err != nil {
|
||
glog.Warning(err)
|
||
continue
|
||
}
|
||
}
|
||
|
||
if err := c.PullArticle(sArticle); err != nil {
|
||
msgs = append(msgs, err.Error())
|
||
glog.Error(err.Error())
|
||
continue
|
||
}
|
||
// 抓取成功,利润率重新计算
|
||
if err := c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strconv.Itoa(int(article.ID))); err != nil {
|
||
glog.Warningf("发布消息失败: %v", err)
|
||
}
|
||
}
|
||
glog.Infof("完成抓取第%d批数据", batch)
|
||
return nil
|
||
})
|
||
|
||
// 拉取结束,修改状态
|
||
c.seller.PullAt = time.Now()
|
||
if len(msgs) > 0 {
|
||
c.setSellerStatus(v2.SellerStatus_Error, strings.Join(msgs, "\n"))
|
||
} else {
|
||
c.setSellerStatus(v2.SellerStatus_Normal)
|
||
}
|
||
glog.Infof("%s 抓取信息结束", time.Now())
|
||
}
|
||
|
||
// 尝试重新匹配得物的skuid
|
||
func (c *Controller) MatchDWSluId(article v2.SellerArticle) (v2.SellerArticle, error) {
|
||
resp, err := c.client.ArticleService().BatchArticleNumber([]string{article.Pid})
|
||
if err != nil {
|
||
return article, fmt.Errorf("请求pid:%s出错:%v", article.Pid, err)
|
||
}
|
||
if resp.Code != 200 {
|
||
article.Exclude = true
|
||
_ = c.storage.SellerArticle().Upsert(article)
|
||
return article, fmt.Errorf("pid:%s 得物返回错误:%s", article.Pid, resp.Msg)
|
||
}
|
||
if len(resp.Data) == 0 {
|
||
article.Exclude = true
|
||
_ = c.storage.SellerArticle().Upsert(article)
|
||
return article, fmt.Errorf("pid:%s 得物返回spu为0", article.Pid)
|
||
}
|
||
var spu *dw_sdk.Spu
|
||
brandId, brandName := BrandIdMap[article.Brand], BrandNameMap[article.Brand]
|
||
for _, s := range resp.Data {
|
||
if s.BrandId == brandId || s.BrandName == brandName {
|
||
spu = &s
|
||
}
|
||
}
|
||
if spu == nil {
|
||
article.Exclude = true
|
||
_ = c.storage.SellerArticle().Upsert(article)
|
||
return article, fmt.Errorf("未获取到pid : %s 的coach spu信息:%v", article.Pid, resp.Data)
|
||
}
|
||
if len(spu.Skus) == 0 {
|
||
article.Exclude = true
|
||
_ = c.storage.SellerArticle().Upsert(article)
|
||
return article, fmt.Errorf("未获取到pid : %s 的coach sku信息:%v", article.Pid, spu)
|
||
}
|
||
article.SpuID, article.SkuID = strconv.Itoa(spu.SpuId), strconv.Itoa(spu.Skus[0].SkuId)
|
||
article.Exclude = false
|
||
return article, c.storage.SellerArticle().Upsert(article)
|
||
}
|
||
|
||
func (c *Controller) PullArticle(sArticle v2.SellerArticle) error {
|
||
skuId, _ := strconv.Atoi(sArticle.SkuID)
|
||
resp, err := c.bidClient.LowestPrice(skuId)
|
||
if err != nil {
|
||
return fmt.Errorf("请求sku:%s最低价失败:%v", sArticle.SkuID, err)
|
||
}
|
||
|
||
lowest, exist := lo.Find(resp.Items, func(item dw_sdk.LowestPriceItem) bool {
|
||
return item.BiddingType == c.bidType
|
||
})
|
||
if !exist {
|
||
// 没有拿到价格,将exclude设置为true,如果后面要拉取,需要手动打开
|
||
sArticle.Exclude = true
|
||
return c.storage.SellerArticle().Upsert(sArticle)
|
||
}
|
||
sell := utils.CalculateSellerPrice(append(c.seller.CalculateProcess, sArticle.CalculateProcess...), map[string]float64{
|
||
"originalPrice": float64(lowest.LowestPrice / 100),
|
||
})
|
||
if sell.OriginalPrice != sArticle.Sell.OriginalPrice {
|
||
sArticle.Sell = sell
|
||
sArticle.HistoryPrice = append(sArticle.HistoryPrice, sArticle.Sell)
|
||
}
|
||
if err = c.storage.SellerArticle().Upsert(sArticle); err != nil {
|
||
return fmt.Errorf("保存sellerArticle失败:%v", err)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// 修改保存状态
|
||
func (c *Controller) setSellerStatus(status v2.SellerStatus, msg ...string) {
|
||
c.seller.Status = status
|
||
if len(msg) > 0 {
|
||
c.seller.Msg = msg[0]
|
||
}
|
||
if err := c.storage.Seller().UpdateStatus(c.seller); err != nil {
|
||
glog.Errorf("更新状态失败: %v", err)
|
||
}
|
||
}
|
||
|
||
func (c *Controller) Ready() bool {
|
||
return c.ready
|
||
}
|
||
|
||
func (c *Controller) FetchArticlePrice(id uint) error {
|
||
article, err := c.storage.SellerArticle().Get(storage.NewGetSellerArticleQuery().SetID(id))
|
||
if err != nil {
|
||
return err
|
||
}
|
||
err = c.PullArticle(article)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
// 抓取成功,利润率重新计算
|
||
if err = c.subscribeClient.Publish(c.ctx, utils.ProfitRate_Channel, strconv.Itoa(int(article.ArticleID))); err != nil {
|
||
glog.Warningf("发布消息失败: %v", err)
|
||
}
|
||
return nil
|
||
}
|