欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【Share & Backup】FreeCrawl

程序员文章站 2022-04-08 23:21:44
...

分享个一份临时写的代码,感兴趣的朋友可以自取,出于一些原因不多作解释,权当自娱。近期限于各种原因的制约暂时搁置,以后有条件再继续完善这个项目。

代码文件结构如下所示????

../
  > FC_crawl.py
  > FC_hparams.py
  > FC_utils.py
  > ../FC_music/
	> __init__.py
	> music_analysis.py
	> music_netease.py
	> music_qq.py
	> music_kuwo.py

除了__init__.py是个空文件外,其他七个文件都在下文中可以取得,目前使用时music_kuwo.pymusic_netease.pymusic_qq.py都可以单独运行,截至本文发布都可以正常运行,特别地,music_netease.pymusic_qq.py可能需要安装selenium(基于Firefox版本)和Crypto库(这个库安装的话请直接安装pycryptodome即可,如果安装Crypto会有些不友好的问题)。

代码注释很详细,本意在FC_music模块下准备做个音频分析,其他模块暂时还没有想法,但是转念一想PC机的磁盘上最多能存一万个的mp3音频文件,感觉也没什么意义,而且mp3格式的文件本来也不能直接进行音频分析,都必须要转成wav格式的波形声音,大小要翻十倍不止,实在是太不经济了。

FC_crawl.py

# -*- coding: UTF-8 -*-
# Author: 囚生CY
# 爬虫模块总父类

import os
import time

from FC_utils import *

class Crawl():

	def __init__(self,
		hp=None,														 # 超参数集
		user_agent=None,												 # 浏览器用户代理
	):
		# 类构造参数
		if hp is None: hp = get_hparams()
		self.hp = hp
		self.user_agent = hp.user_agent if user_agent is None else user_agent

		# 类常用参数
		self.workspace = os.getcwd()									 # 类工作目录
		self.date = time.strftime("%Y%m%d")								 # 类创建时间
		self.dir_log = hp.dir_log										 # 记录文件夹
		self.dir_temp = hp.dir_temp										 # 临时文件夹

		# 类初始化
		log_path = os.path.join(self.workspace,self.dir_log)
		temp_path = os.path.join(self.workspace,self.dir_temp)
		if not os.path.exists(log_path):
			print("正在创建{}文件夹...".format(self.dir_log))
			os.mkdir(log_path)
		if not os.path.exists(temp_path):
			print("正在创建{}文件夹...".format(self.dir_temp))
			os.mkdir(temp_path)

if __name__ == "__main__":
	c = Crawl()


FC_hparams.py

# -*- coding: UTF-8 -*-
# Author: 囚生CY
# 保存项目超参数

import argparse

class HyperParameters:
	parser = argparse.ArgumentParser("--")
	
	parser.add_argument("--user_agent",default="Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:76.0) Gecko/20100101 Firefox/76.0",help="浏览器用户代理",type=str)
	parser.add_argument("--dir_log",default="log",help="记录文件夹",type=str)
	parser.add_argument("--dir_temp",default="temp",help="临时文件夹",type=str)

	
if __name__ == "__main__":
	hyperparameters = HyperParameters()
	parser = hyperparameters.parser
	hp = parser.parse_args()
	print(hp)


FC_utils.py

# -*- coding: UTF-8 -*-
# Author: 囚生CY
# FC项目工具函数

import os
import json
import time

from FC_hparams import HyperParameters

def get_hparams():														 # 读取thesis_hyperparameters.py中的超参数集
	hyperparameters = HyperParameters()
	parser = hyperparameters.parser
	hp = parser.parse_args()
	return hp

def save_hparams(hp,save_path=None):									 # 导出路径: 默认为"hparams_20200521231856"(时间戳可变)																	 # 导出超参数外部文件
	if save_path is None: save_path = "hparams_{}.json".format(time.strftime("%Y%m%d%H%M%S"))
	with open(save_path,"w") as f: f.write(json.dumps(vars(hp)))


if __name__ == "__main__":
	hp = get_hparams()
	save_hparams(hp)


music_analysis.py

# -*- coding: UTF-8 -*-
# Author: 囚生CY
# 音频数据分析模块
import time
import numpy as np
from pydub import AudioSegment
from pydub.playback import play

t = time.time()
song = AudioSegment.from_file("是风动.m4a","m4a")
array = song.get_array_of_samples()
data_raw = np.array(array.tolist())
print(data_raw.shape)
print(time.time()-t)

from scipy.io import wavfile

# 从 wavfile 包中读取文件

t = time.time()
sampling_freq, audio = wavfile.read('是风动.wav')
print(audio.shape)
print(time.time()-t)

"""
# -*- coding:utf-8 -*-
'''
   音频数据的读取与绘制 
'''
import numpy as np
import matplotlib.pyplot as plt

#  读取语音文件
from scipy.io import wavfile

# 从 wavfile 包中读取文件
sampling_freq, audio = wavfile.read('input_freq.wav')

# 打印参数
print '\nShape : ',audio.shape
#
print ' Datatype :',audio.dtype
print 'Duration:',round(audio.shape[0]/float(sampling_freq),3),'seconds'

# 标准化数值
audio = audio/(2.**15)

# 提取前30个值画图
audio = audio[:30]

# 建立x轴为时间轴 将x轴按照采样频率因子进行缩放
x_values = np.arange(0, len(audio), 1) / float(sampling_freq)

# 将单位转换为秒
x_values *= 1000

# 画出声音信号图形
plt.plot(x_values,audio,color='black')

plt.xlabel('Time (ms)')

plt.ylabel('Amplitude')

plt.title('Audio signal')

plt.show()
"""


music_kuwo.py

# -*- coding: UTF-8 -*-
# Author: 囚生CY
# 酷我音乐爬虫模块

import os
import sys
import math
import time
import json
import random
import base64
import codecs

sys.path.append("../")													 # 导入上级目录

from requests import Session
from bs4 import BeautifulSoup
from Crypto.Cipher import AES											 # 这个库安装的话直接安装pycryptodome, 如果安装Crypto会有些不友好的问题
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.action_chains import ActionChains

from FC_crawl import Crawl
from FC_utils import *


class KuWo(Crawl):														 # 酷我音乐爬虫

	def __init__(self):
		
		Crawl.__init__(self)											 # 父类继承

		# 类常用参数
		self.url_main = "http://www.kuwo.cn/"							 # 网易云音乐首页
		self.headers = {"User-Agent": self.user_agent}					 # 请求头伪装信息
		self.url_api = self.url_main + "url"							 # 请求歌曲链接的接口
		self.api_params = {												 # 接口调用参数
			"format": "mp3",											 # 返回格式
			"rid": None,												 # 歌曲编号
			"response": "url",											 # 返回变量
			"type": "convert_url3",										 # 返回类型
			"br": "128kmp3",											 # 返回歌曲质量
			"from": "web",												 # 请求来源
			"t": None,													 # 时间戳
			"reqId": "",												 # 关于这个字段的生成我目前细究, 因为目前不带这个字段也是可行的
		}
		self.url_song = "http://www.kuwo.cn/play_detail/{}"				 # 歌曲页面链接
		
		# 类初始化操作
		self.renew_session()											 # 生成新的session对象

	def renew_session(self):											 # 重构
		self.session = Session()										 # 创建新的Session对象
		self.session.headers = self.headers.copy()						 # 伪装头部信息
		self.session.get(self.url_main)									 # 访问主页

	def search_for_song_id(self,song_name,driver,
		n_result=1,														 # 返回多少个查询结果
	):	
		pass

	def download_by_song_id(self,song_id,								 # 给定歌曲编号
		save_path=None,													 # 歌曲下载保存路径
		driver=None,
	):																	 # 通过歌曲编号下载歌曲
		song_url = self.request_for_song_url(song_id,driver=driver)		 # 获取歌曲链接
		r = self.session.get(song_url)									 # 访问歌曲链接
		if save_path is None: save_path = "kuwo_{}".format(song_id)		 # 默认的保存路径
		with open(save_path,"wb") as f: f.write(r.content)				 # 写入音乐文件

	def request_for_song_url(self,song_id,
		driver=None,
	):																	 # 请求歌曲链接
		params = self.api_params.copy()									 # 获取请求字符串
		params["rid"] = song_id											 # 设置歌曲编号
		params["t"] = int(time.time()*1000)								 # 设置时间戳							
		r = self.session.get(self.url_api,params=params)				 # 发出播放请求
		print(r.text)												
		song_url = json.loads(r.text)["url"]							 # 这里用eval不好使, 因为有python无法识别为缺失值的null
		return song_url

	def test(self):
		song_id = "80459394"
		r = self.download_by_song_id(
			song_id,
			save_path="kuwo_{}.mp3".format(song_id),
			driver=None,
		)

if __name__ == "__main__":

	kw = KuWo()
	kw.test()



music_netease.py

# -*- coding: UTF-8 -*-
# Author: 囚生CY
# 网易云音乐爬虫模块

import os
import sys
import math
import time
import json
import random
import base64
import codecs

sys.path.append("../")													 # 导入上级目录

from requests import Session
from bs4 import BeautifulSoup
from Crypto.Cipher import AES											 # 这个库安装的话直接安装pycryptodome, 如果安装Crypto会有些不友好的问题
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.action_chains import ActionChains

from FC_crawl import Crawl
from FC_utils import *

class NetEase(Crawl):													 # 网易云音乐爬虫

	def __init__(self):
		
		Crawl.__init__(self)											 # 父类继承

		# 类常用参数
		self.url_main = "https://music.163.com/"						 # 网易云音乐首页
		self.headers = {"User-Agent": self.user_agent}					 # 请求头伪装信息
		self.url_api = self.url_main + "weapi/song/enhance/player/url?csrf_token="
		self.url_song = self.url_main + "song?id={}"					 # 歌曲页面链接
		self.url_search = self.url_main + "search/m/?s={}"				 # 搜索歌曲的URL
		
		# 类初始化操作
		self.renew_session()											 # 生成新的session对象

	def renew_session(self):											 # 重构
		self.session = Session()										 # 创建新的Session对象
		self.session.headers = self.headers.copy()						 # 伪装头部信息
		self.session.get(self.url_main)									 # 访问主页

	def search_for_song_id(self,song_name,driver,
		n_result=1,														 # 返回多少个查询结果
	):																	 #
		driver.get(self.url_main)
		xpath_input_frame = "//input[@id='srch']"
		input_frame = driver.find_element_by_xpath(xpath_input_frame)
		input_frame.send_keys(song_name)								 # 
		input_frame.send_keys(Keys.ENTER)								 # 回车键查询
		driver.switch_to_frame("g_iframe")
		WebDriverWait(driver,15).until(lambda driver: driver.find_element_by_xpath("//div[@class='srchsongst']").is_displayed())
		html = driver.page_source
		soup = BeautifulSoup(html,"lxml")
		# 寻找song_id的逻辑以后如果页面发生变化可能要随之改变
		result_list = soup.find("div",class_="srchsongst")
		divs = list(result_list.children)[:n_result]
		song_ids = []
		for div in divs:
			div.find("div",class_="td")
			a = div.find("a")
			print(a)
			song_id = a.attrs["id"][5:]
			song_ids.append(song_id)
		driver.quit()
		return song_ids
			
			
	def download_by_song_id(self,song_id,								 # 给定歌曲编号
		save_path=None,													 # 歌曲下载保存路径
		driver=None,
	):																	 # 通过歌曲编号下载歌曲
		song_url = self.request_for_song_url(song_id,driver=driver)		 # 获取歌曲链接
		r = self.session.get(song_url)									 # 访问歌曲链接
		if save_path is None: save_path = "netease_{}".format(song_id)	 # 默认的保存路径
		with open(save_path,"wb") as f: f.write(r.content)				 # 写入音乐文件

	def request_for_song_url(self,song_id,
		driver=None,
	):																	 # 请求歌曲链接											
		formdata = self.encrypt_formdata(song_id,driver=driver)			 # 加密的表单数据
		r = self.session.post(self.url_api,data=formdata)				 # 发出播放请求
		song_url = json.loads(r.text)["data"][0]["url"]					 # 这里用eval不好使, 因为有python无法识别为缺失值的null
		return song_url
		
	def encrypt_formdata(self,song_id,									 # 需要确定歌曲的编号
		d='{"ids":"[%s]","br":128000,"csrf_token":""}',					 # 歌曲信息字典转字符串: JS中对应参数是JSON.stringify(i0x)
		e="010001",														 # 固定值: JS中对应参数是bqR1x(["流泪","强"]), 下面的f是一串固定的MD5码(bqR1x(QM6G.md))
		f="00e0b509f6259df8642dbc35662901477df22677ec152b5ff68ace615bb7b725152b3ab17a876aea8a5aa76d2e417629ec4ee341f56135fccf695280104e0312ecbda92557c93870114af6c9d05c4f7f0c3685b7a46bee255932575cce10b424d813cfe4875d3e82047b97ddef52741d546b8e289dc6935b3ece0462db0a22b8e7",
		g="0CoJUm6Qyw8W8jud",											 # 固定值: JS中对应参数是bqR1x(["爱心","女孩","惊恐","大笑"]
		driver=None,													 # 提供一个使用selenium驱动运行JS代码获得加密表单数据的接口, 因为我担心加密逻辑会变, 相对来说输入
	):																	 # 获取加密表单数据
		d %= song_id													 # 将歌曲编码信息添加到d中
		if driver is not None:											 # 如果传入了driver参数则使用浏览器驱动执行JS: 个人认为这样即便逻辑改变, 只要参数不变就不会报错, 相比于下面复现JS加密逻辑更鲁棒
			JS = "return window.asrsea('{}','{}','{}','{}')".format(d,e,f,g)
			driver.get(self.url_song.format(song_id))
			formdata = driver.execute_script(JS)						 # execute_script获取变量值一定是要return, 这跟在浏览器控制台里写代码是不一样的
			formdata = dict(params=formdata["encText"],encSecKey=formdata["encSecKey"])
			return formdata
			
		def _javascript2python_a(a):									 # function a(): 从给定的字符串b中随机挑选字符合成长度为a的字符串
			b = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
			c = str()
			for i in range(a): c += b[math.floor(random.random()*len(b))]
			return c

		def _javascript2python_b(a,b):									 # function b(): 用于加密params字段的AES算法, 密文e, **c, 偏移量d, 加密模式CBC
			pad = 16 - len(a.encode())%16								 # 两个坑点: 一是计算字符串长度必须是unicode长度, 二是字符串长度必须是16的倍数
			a += pad*chr(pad)											 # 我到现在还是不能理解为什么要这样padding
			encryptor = AES.new(b.encode("UTF-8"),AES.MODE_CBC,b"0102030405060708")	
			f = base64.b64encode(encryptor.encrypt(a.encode("UTF-8")))
			return f
			
		def _javascript2python_c(a,b,c):								 # function c(): 用于加密encSecKey字段的RSA算法, 加密指数b, 解密参数空字符串, 加密系数c 
			b = b[::-1]													 # 这个反转字符串我也没搞明白
			e = int(codecs.encode(b.encode("UTF-8"),"hex_codec"),16)**int(a,16)%int(c,16)
			return format(e,"x").zfill(256)								 # 将密文e转为字符串后再零填充到256位

		random_text = _javascript2python_a(16)							 # 目前是生成16位的随机字符串: AES**与
		params = _javascript2python_b(d,g)								 # params第一次AES加密
		params = _javascript2python_b(params.decode("UTF-8"),random_text)# params第二次AES加密
		encSecKey = _javascript2python_c(e,random_text,f)				 # encSecKey加密
		formdata = dict(params=params,encSecKey=encSecKey)				 # 生成POST表单: self.url_api
		return formdata													 # 返回字典

	def test(self):

		# 确定song_id
		options = webdriver.FirefoxOptions()							 # 设置配置
		options.add_argument("--headless")								 # 设定无头浏览器的配置
		driver = webdriver.Firefox(options=options)						 # 无头浏览器
		#driver = webdriver.Firefox()
		song_ids = self.search_for_song_id("燕归巢",driver,n_result=3)
		print(song_ids)
		driver.quit()


		# 下载歌曲
		song_id = "504686859"
		r = self.download_by_song_id(
			song_id,
			save_path="netease_{}.mp3".format(song_id),
			driver=None,
		)

if __name__ == "__main__":

	ne = NetEase()
	ne.test()



music_qq.py


# -*- coding: UTF-8 -*-
# Author: 囚生CY
# QQ音乐爬虫模块

import os
import sys
import math
import time
import json
import random
import base64
import codecs

sys.path.append("../")													 # 导入上级目录

from requests import Session
from bs4 import BeautifulSoup
from Crypto.Cipher import AES											 # 这个库安装的话直接安装pycryptodome, 如果安装Crypto会有些不友好的问题
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.action_chains import ActionChains

from FC_crawl import Crawl
from FC_utils import *

class QQ(Crawl):														 # 酷我音乐爬虫

	def __init__(self):
		
		Crawl.__init__(self)											 # 父类继承

		# 类常用参数
		self.url_main = "https://y.qq.com/"								 # QQ云音乐首页
		self.headers = {"User-Agent": self.user_agent}					 # 请求头伪装信息
		self.url_song = self.url_main + "n/yqq/song/{}.html"			 # 歌曲页面链接
		self.url_js = "https://y.gtimg.cn/music/portal/js/v4/player_d905eb5.js"
		self.url_link = "http://{}/amobile.music.tc.qq.com/{}"			 # 第一个参数是请求的IP地址, 无法确定只能用一些备用可行的IP了

		self.ips = [
			"180.153.119.147",
			"180.153.119.146",
			"180.153.119.144",
			"114.80.27.13",
		]

		# 类初始化操作
		self.renew_session()											 # 生成新的session对象

	def renew_session(self):											 # 重构
		self.session = Session()										 # 创建新的Session对象
		self.session.headers = self.headers.copy()						 # 伪装头部信息
		self.session.get(self.url_main)									 # 访问主页

	def search_for_song_id(self,song_name,driver,
		n_result=1,														 # 返回多少个查询结果
	):
		pass

	def download_by_song_id(self,song_id,								 # 给定歌曲编号
		save_path=None,													 # 歌曲下载保存路径
		driver=None,
	):																	 # 通过歌曲编号下载歌曲
		song_url = self.request_for_song_url(song_id,driver=driver)		 # 获取歌曲链接
		link_url = self.url_link.format(self.ips[0],song_url)
		print(link_url)
		r = self.session.get(link_url)									 # 访问歌曲链接
		if save_path is None: save_path = "qq_{}".format(song_id)		 # 默认的保存路径
		with open(save_path,"wb") as f: f.write(r.content)				 # 写入音乐文件

	def request_for_song_url(self,song_id,
		driver=None,
	):																	 # 请求歌曲链接
		JS = "return window.g_vkey['{}']".format(song_id)				 # 获取歌曲链接的JS
		xpath_play_button = "//a[@class='mod_btn_green js_all_play']"	 # 歌曲页面播放按钮xpath定位
		driver.get(self.url_song.format(song_id))						 # 访问歌曲页面
		time.sleep(2)
		driver.find_element_by_xpath(xpath_play_button).click()			 # 点击播放
		windows = driver.window_handles									 # 窗口管理对象: 这个一定要在需要切换的时候再去生成, 否则会出一些问题
		driver.switch_to.window(windows[-1])							 # 切换至歌曲播放页面: 这个很有意思, 播放点得快就会跳转页面, 点慢了就打开新的标签页
		xpath_hint = "//div[@id='divdialog_0']"							 # 确定一些问题
		if driver.find_elements_by_xpath(xpath_hint): raise Exception("该歌曲只能在客户端播放")
		while True:
			try:
				result = driver.execute_script(JS)						 # 页面可能还没有加载出window.g_vkey这个变量
				if result is None: continue
				break
			except: continue
		print(result)
		song_url = result["purl"]					
		return song_url

	def test(self):
		options = webdriver.FirefoxOptions()							 # 设置配置
		options.add_argument("--headless")								 # 设定无头浏览器的配置
		driver = webdriver.Firefox(options=options)						 # 无头浏览器
		song_id = "003eSjmB276n6J"										 # 倾尽天下的ID
		r = self.download_by_song_id(
			song_id,
			save_path="qq_{}.mp3".format(song_id),
			driver=driver,
		)
		driver.quit()

if __name__ == "__main__":
	qq = QQ()
	qq.test()


总之,就Netease,KuWo和QQ来说,显然QQ的JS加密是做得最好的,KuWo则是最差,JS加密与逆向确实是个很有趣的东西,但是要精通真的很难很难。