欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python实现商品价格监控,识破双十一的套路

程序员文章站 2022-05-22 20:42:47
一年一度的“双十一”又要来了,很多人已经开始摩拳擦掌,毕竟几天之后手还在不在就不好说了。 各种社交软件也是跟着遭殃,整天就是“来帮我一起盖楼”,各种字体绕过屏蔽,什么奇葩的脑洞也出来了: 不过也感谢这些电商平台,让多年未联系的好友、加过但没有对话的陌生人都找到了打破尴尬的话题。(让场面更加尴尬) 月 ......

一年一度的“双十一”又要来了,很多人已经开始摩拳擦掌,毕竟几天之后手还在不在就不好说了。

各种社交软件也是跟着遭殃,整天就是“来帮我一起盖楼”,各种字体绕过屏蔽,什么奇葩的脑洞也出来了: Python实现商品价格监控,识破双十一的套路 不过也感谢这些电商平台,让多年未联系的好友、加过但没有对话的陌生人都找到了打破尴尬的话题。(让场面更加尴尬)

月薪上万的白领们为了2块5毛钱的优惠券起早贪黑,也是堪称人类迷惑行为大赏了……

问题是,你以为自己真的赚到了?

商品“明降暗升”的传言早有耳闻:很多商品在双十一之前早早地把价格调高,加上优惠之后也不过就是跟以前的原价相当。让不知情的消费者在心理上感觉占了便宜。

这个传言是不是真的,很好判断,只要定期去访问商品页面,记录价格就可以。不过一般人也没闲工夫这么去做。于是,我们用 python 做了一个可以定时监控商品的小工具,可以帮你监控想要关注的商品。

工具完成之后,我们随机挑选了几个商品作为测试,结果就有一个中招了……(真的是随便选的): Python实现商品价格监控,识破双十一的套路 这款保暖背心产品,之前标价 39.9元,到11月之后却突然调价为 49.9元,并标注上了“双11狂欢价”,也就是原价…… Python实现商品价格监控,识破双十一的套路 Python实现商品价格监控,识破双十一的套路

商品价格监控

实现功能

  • 输入天猫、苏宁、京东、拼多多(网页页面 )任一商品链接,不是口令。请复制选择好商品配置的页面链接,即返回相应商品价格,并保存到文件。商品页面若有团购与单独购买两个价格,返回团购价格。

  • 使用 windows 任务计划或 linux 定时任务,定时执行程序。获取不同时段的商品价格信息。

  • 单独运行画图程序,可根据定时任务获取的数据,生成商品价格时间变化折线图。

  • 程序监测的两件商品截图如下,具体文件在 pic 文件夹下 bnbx.html、kyy.html,推荐本地查看。 Python实现商品价格监控,识破双十一的套路 Python实现商品价格监控,识破双十一的套路 简单的商品查看页面 https://htmlpreview.github.io/?https://raw.githubusercontent.com/spiderbeg/price_monitor/master/search/search.html 。输入查询商品关键词,选择商城,即可查看相应商城商品列表。默认为苏宁。效果图如下。注意:点击后请等待一段时间即可,请勿频繁刷新。 Python实现商品价格监控,识破双十一的套路

运行环境

  • python3.7

  • windows

  • jupyter notebook

运行依赖包

  • requests

  • pyecharts

  • beautifulsoup4

项目思路

部分问题回答

项目的大致思路流程:

  • 第一步:使用商品详细页链接获取商品信息与商品价格,并保存获取数据 时间、商品介绍,价格 到 csv 文件中;

  • 第二步:使用定时任务定时执行第一步完成的程序;

  • 第三步:读取前两步获取到的时间、商品介绍、价格数据。使用 pyecharts 绘制绘制商品价格时间变化折线图。

  • 为什么不使用 pc 端来调试网页,获取价格信息?

因为在未登录状态天猫的详细商品页的信息是虚假的,同时从移动端网页入手,可以降低调试难度。

谷歌浏览器如何开启手机调试模式?

f12 进入开发者模式,然后鼠标点击一下,具体见下图,包括后文的查找价格接口信息。

Python实现商品价格监控,识破双十一的套路

实现代码

test.py

  • 测试商品链接是否能够成功获取到商品价格。

import timing
"""
  1 调用 timing.py 中的 go 方法测试链接的可用性
  2 调用 timing.py 中的 go, get_url() 方法测试 goods.csv 文件中链接的可用性
python学习交流群:821460695更多学习资料可以加群获取
"""

# 链接测试
# urls = ['https://m.suning.com/product/0000000000/000000011210599174.html?utm_source=baidu&utm_midium=brand-wuxian&utm_content=&utm_campaign=title&safp=f73ee1cf.wapindex7.113464229882.4&safc=prd.1.rec_14-40_0_a_ab:a',
# 'https://m.suning.com/product/0070067092/000000000188392234.html?utm_source=baidu&utm_midium=brand-wuxian&utm_content=&utm_campaign=title&safp=f73ee1cf.wapindex7.113464229882.60&safc=prd.1.rec_5-5_1018c,1014c$c3ae37eafeb814a098d120647449da6f_h_ab:a',
# 'https://m.suning.com/product/0000000000/000000000107426461.html?src=snsxpd_none_recssxcnxhq_1-3_p_0000000000_000000000107426461_rec_21-65_3_a&safp=f73ee1cf.71jyzx.112079032536.4&safc=prd.1.rec_21-65_3_a',
# 'https://m.suning.com/product/0000000000/10606656136.html?safp=f73ee1cf.phone2019.121927933306.2&safc=prd.0.0']

# 输入文本的链接可用性测试
if __name__ == '__main__':
   urls = timing.get_url()
   for url in urls:
       try:
           timing.go(url) # 获取返回信息
       except baseexception as e:
           print(url,'\n',e)

timing.py

  • 进行定时抓取任务时,运行的文件。

# encoding:utf8
import time
import os
import re
import csv
from shop.jd import jd # 自定义
from shop.tm import tm
from shop.sn import sn
from shop.pdd import pdd
from apscheduler.schedulers.blocking import blockingscheduler
'''
python学习交流群:821460695更多学习资料可以加群获取
'''
# import logging
# formats = "%(asctime)s %(name)s %(levelname)s function:%(funcname)s -> :%(message)s"
# logging.basicconfig(format=formats, datefmt='%m/%d/%y %i:%m:%s %p') # ,handlers=[logging.filehandler(log_path, 'a+', 'utf-8')]
# logger = logging.getlogger(__name__)
# logger.setlevel(logging.info)

basepath = os.path.dirname(os.path.abspath(__file__)) # 当前文件夹

def get_date():
   """获取日期"""
   timestamp = int(time.time())
   time_local = time.localtime(timestamp) # #时间戳 转 时间数组
   dt = time.strftime("%y-%m-%d %h:%m:%s",time_local) # #时间数组 转 新的时间格式(2016-05-05 20:28:54)
   return dt

def get_url():
   """读取商品链接
  返回:图像名,商品名,商品链接 元组
  """
   urls = []
   with open(os.path.join(basepath, 'goods.csv'),'r',encoding='utf8') as f:
       f_csv = csv.reader(f)
       next(f_csv) # 返回标题,直接到内容
       for row in f_csv: # 内容
           if row:
               urls.append(row)
   return urls

def go(url):
   '''输入:链接
  输出:(时间,标题,商品价格), 文件路径 元组
  统一价格输出,以最低价格为标准,如有团购和单独购买以单独购买为准
  '''
   result = re.findall('://(.+?).com', url[2])
   if result:
       result = result[0]
       if 'yangkeduo' in result:
           pd = pdd(url[2])
           title,price = pd.main()
       elif 'suning' in result:
           sn = sn(url[2])
           title,price = sn.main()
       elif 'tmall' in result or 'taobao' in result:
           tm = tm(url[2]) # 605030977928:联想笔记本 ; 603330883901 华为 mate30 pro ; 523962011119: 酸奶
           title,price = tm.main()
       elif 'jd' in result:
           jd = jd(url[2]) # 测试 id:100009083152 商品:联想 y9000x 笔记本电脑 2 热水壶 or 薯条?
           title,price = jd.main()
       else:
           raise typeerror('请检查输入的网站链接')
       print('%s 标题 %s, 价格(多个价格以团购为准) %s. '%(result,title,price))
   else:
       raise typeerror('请检查输入是否为目标网站的商品详细页面链接')
   # 文件名
   replace_string = ['.',' ',r'/',r'\\']
   for rs in replace_string:
       url[1] = url[1].replace(rs,'_')
   path = os.path.join(os.path.join(basepath, 'data'), url[1]+'.csv')

   today = get_date() # 日期
   return (today, title, price),path

def adddata(row, path):
   """数据写入文件"""
   with open(path,'a+',encoding='utf8') as f:
       fieldnames = ['时间', '标题','价格']
       writer = csv.dictwriter(f, fieldnames=fieldnames)
       if f.tell() == 0: # 如果内容为空则添加标题
           writer.writeheader()
       writer.writerow({'时间': row[0], '标题': row[1],'价格':row[2]})

def main():
   """运行程序"""
   urls = get_url()
   for url in urls:
       try:
           row,path = go(url) # 获取返回信息
           adddata(row,path) # 写入文件
       except baseexception as e:
           print('请求问题?报错:%s'%e)


if __name__ == '__main__':
   print('时间',get_date())
   main()
   # scheduler = blockingscheduler()
   # scheduler.add_job(go,'cron', args=[url],hour='8-23', minute= '5,35' , second='15')
   # # scheduler.add_job(main,'cron', args=[3088512],hour='8-23', minute= 5 , second='15')
   # print('press ctrl+{0} to exit'.format('break' if os.name == 'nt' else 'c'))

   # try:
   #     scheduler.start()
   # except (keyboardinterrupt, systemexit):
   #     pass

draw.py

  • 图像文件生成在 pic 文件中。

# encoding: utf8

from pyecharts import options as opts
from pyecharts.charts import page, line
import os
import csv
'''
python学习交流群:821460695更多学习资料可以加群获取
'''
basepath = os.path.dirname(os.path.abspath(__file__)) # 当前文件夹

def line(title,checktime,price) -> line:
   """绘图函数"""
   c = (
       line()
      .add_xaxis(checktime)
      .add_yaxis(title, price, is_smooth=true)
      .set_global_opts(title_opts=opts.titleopts(title="商品价格"),
               yaxis_opts=opts.axisopts(name="元/台"),
               xaxis_opts=opts.axisopts(name=title,
               axislabel_opts=opts.labelopts(formatter="{value}", font_size=12, rotate=30,) # x,y 轴标签
                  )
              )
      )
   return c

def files():
   """
  输出字典,每一个键值代表一张图表
  """
   global basepath
   files = {}
   with open(os.path.join(basepath,'goods.csv'),'r',encoding='utf8') as f:
       f_csv = csv.reader(f)
       next(f_csv) # 标题
       for row in f_csv: # 内容
           if row:
               replace_string = ['.',' ',r'/',r'\\'] # 特殊字符处理
               for rs in replace_string:
                   row[1] = row[1].replace(rs,'_')
               files.setdefault(row[0],[]).append(row[1])
   return files

def draw(files):
   """绘制图形文件"""
   datapath = os.path.join(basepath,'data')
   picpath = os.path.join(basepath,'pic')
   for k,i in files.items():
       page = page()
       for n in i:
           try:
               with open(os.path.join(datapath, n +'.csv'),'r', encoding='utf8') as f:
                   f_csv = csv.dictreader(f)
                   price,checktime = [],[]
                   for row in f_csv:
                       checktime.append(row['时间'])
                       price.append(row['价格'])
                   title = n
               page.add(line(title,checktime,price)) # 24 发帖回帖变化图、近3月变化图、浏览、回复散点图
           except:
               print('未制图:',n)
       page.render(os.path.join(picpath, k +'.html'))


if __name__ == '__main__':
   draw(files())