一. 前言
之所以会做这个爬虫项目,是因为在一些账号交易平台,偶尔会有一些非常高性价比的账号出售,俗称“捡漏”,但这种商品几乎很快就会卖出,所以如果想要捡漏,就需要人工实时的盯着交易平台上架的商品数据,这么做非常消耗时间和精力,于是我便想着写一个爬虫放置到服务器上运行,代替人工进行监视,当遇到符合高性价的条件的商品时,自动发送邮件到我的邮箱,我就只需要坐等邮件即可。
二. 准备
1. 目标网站
众所周知,目前知名的账号交易平台有某猫,某蟹,某鱼等,从交易的安全性上来考虑,某蟹相较于其他的交易平台安全性会更有保障,所以这里选择某蟹进行爬取。
2. 网站特点
本想直接使用网站自带的筛选和商品列表功能,但由于某蟹的网页采用动态JS渲染,无法直接爬取静态页面内容,且使用了随机的cookie和头字段,每过一段时间之前的cookie和请求头都无法使用,如果一直使用同个请求服务器会直接报错。经过一段时间分析后,发现了一个可行的方法:由于某蟹的商品ID是连续的,且可通过url直接访问到商品页面,所以可以使用url+商品ID的方式来发送请求,而动态JS渲染可通过python库的playwright来解决,等待无头浏览器渲染并抓取最终的页面内容。
3. 需求分析
这里以鸣潮的账号作为例子,爬取全图鉴、星声大于8000、官服、且价格低于800的商品为例子,每隔1小时抓取最新的商品数据,当获取到符合条件的商品时,通过发送邮件来通知。
三. 技术栈
PyCharm Community Edition 2024.2.4
Python 3.10
Playwright 1.49.0
threading
Ubunut Server 22.04.4 LTS
docker 27.3.1
四. 具体实现
先放一个效果图:
核心代码:
这段代码使用了 Playwright 库来自动化浏览器操作,需要传入爬取的url和等待渲染的元素,这里以css选择器的方式来指定所需等待的元素。
- headless=True:使用无头浏览器操作,如果是部署到服务器上务必使用True。
- 如果传入了等待渲染的元素,则会等待页面渲染直到指定元素出现。
- 由于有些商品ID为已售出或下架,所以在加载页面时指定的元素可能会不存在,所以设置5秒的超时时间。
- 等待渲染完成后 ,抓取页面内容content并返回
判断部分:
def decide(url, wait_selector,current_id):
global TIMES
global ID_output
global MY_PRICE
global ROLES
global SERVER
global TAPTAP
global WEGAME
global XS
now = datetime.datetime.now()
formatted_time = now.strftime("%Y-%m-%d %H:%M:%S")
try:
content = fetch_full_page_content(url, wait_selector, timeout=6)
with condition:
while current_id != ID_output:
condition.wait()
sys.stdout.flush()
print(f"[{formatted_time}][{GAME_NAME}]正在获取编号:{current_id} 商品内容.....", end='')
sys.stdout.flush()
condition.notify_all()
except:
with condition:
while current_id != ID_output:
condition.wait()
print(f"[{formatted_time}][{GAME_NAME}]编号:{current_id} 商品内容获取时出错")
ID_output += 1
condition.notify_all()
return
pattern1 = r'class="price">¥([^<]+)</div>'
pattern2 = r'class="DI_productName_title">鸣潮</span>\s*([\s\S]*?)\s*</span>'
pattern3 = r'是否绑定tap:</div><div data-v-1f136876="" class="zone_content">是</div>'
pattern4 = r'是否绑定Wegame:</div><div data-v-1f136876="" class="zone_content">是</div>'
try:
match1 = re.search(pattern1, content)
match2 = re.search(pattern2, content)
match3 = re.search(pattern3, content)
match4 = re.search(pattern4, content)
if match1:
price = match1.group(1)
TIMES = 0
if int(price) > MY_PRICE:
with condition:
while current_id != ID_output:
condition.wait()
print('商品价格为:' + str(price) + f',大于{MY_PRICE}元')
ID_output += 1
condition.notify_all()
return False
else:
TIMES += 1
with condition:
while current_id != ID_output:
condition.wait()
print('商品似乎不存在,当前连续空次数:' + str(TIMES))
ID_output += 1
condition.notify_all()
return False
if match2:
title = match2.group(1)
notes_less = remove_notes(title)
if all(sub in notes_less for sub in ROLES):
pass
else:
with condition:
while current_id != ID_output:
condition.wait()
print('角色不满足要求')
ID_output += 1
condition.notify_all()
return False
if SERVER in notes_less:
pass
else:
with condition:
while current_id != ID_output:
condition.wait()
print(f'不是{SERVER}')
ID_output += 1
condition.notify_all()
return False
else:
TIMES += 1
with condition:
while current_id != ID_output:
condition.wait()
print('商品内容似乎不存在,当前连续空次数:' + str(TIMES))
ID_output += 1
condition.notify_all()
return False
if TAPTAP:
if match3:
with condition:
while current_id != ID_output:
condition.wait()
print('绑定了taptap')
ID_output += 1
condition.notify_all()
return False
if WEGAME:
if match4:
with condition:
while current_id != ID_output:
condition.wait()
print('绑定了Wegame')
ID_output += 1
condition.notify_all()
return False
pattern5 = r'星声:(.*?),'
match5 = re.search(pattern5, title)
if match5:
xs = int(match5.group(1))
if xs < XS:
with condition:
while current_id != ID_output:
condition.wait()
print(f'星声数量小于{XS}')
ID_output += 1
condition.notify_all()
return False
else:
xs = "未获取到星声数量"
except:
with condition:
while current_id != ID_output:
condition.wait()
print('匹配时错误')
ID_output += 1
condition.notify_all()
return False
return price,title,xs
这里使用正则表达式来匹配所需数据
自定义数据:
- TIMES:商品ID内容为空的次数,这里设置为50,当次数大于50相当于已经爬取完最新数据。当然这个次数可以自己定义。
- MY_PRICE、ROLES、SERVER、TAPTAP、WEGAME、XS:最高价格、角色、服务器、是否判断绑定TAPTAP和WEGAME、星声数量最低值
示例:
MY_PRICE = 800
ROLES = ["椿", "今汐", "忌炎", "长离", "吟霖", "守岸人", "相里要", "折枝"]
SERVER = '官服'
TAPTAP = False
WEGAME = False
XS = 8000
主函数部分:
while not stop_event.is_set():
with lock:
if TIMES >= 50:
stop_event.set()
with lock:
current_id = ID
ID += 1
url = "https://www.exmple.com/" + str(current_id)
wait_selector = ".DI_productName_hide"
result = decide(url, wait_selector, current_id)
if result:
price, title, xs = result
with condition:
while current_id != ID_output:
condition.wait()
mail_title = f"恭喜找到完美的高性价比商品!"
body = f"恭喜找到完美的高性价比商品!\n价格:{price}\n描述:{title}\n星声:{xs}\n"
print(body)
try:
with lock:
with open(f'{work_dir}win.txt', 'a', encoding='utf-8') as file:
file.write(body+"\n==============================================\n")
except:
print("win.txt文件写入失败!")
try:
mail_main(GAME_NAME+':'+str(current_id), mail_title, body+url)
except:
print("邮件发送失败!!!")
ID_output += 1
condition.notify_all()
- 当TIMES的次数大于50时,设置一个事件标志,停止所有线程工作
- wait_selector:等待渲染的元素
- mail_main:发送邮件的函数,这里就不放出来了,各位自己写
多线程部分:
try:
threads = []
for i in range(20):
thread = threading.Thread(target=main)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
except Exception as e:
print(f"发生未知错误: {e}")
finally:
print("==========本次爬取结束==========")
write_id()
sys.exit(0)
这里线程数务必根据自己服务器的配置写,否则会卡死,5个线程大约需要2G内存。
五. 部署爬虫脚本
Playwright有官方的容器化部署,所以这里选择使用docker来部署爬虫。
拉取Playwright官方镜像:
docker pull mcr.microsoft.com/playwright
docker run -it --restart always -v /opt/spider:/app mcr.microsoft.com/playwright /bin/bash
这里自己选择挂载目录,我的是/opt/spider,然后进到容器内部,安装容器对应版本的playwright,因为容器内只带了playwright的浏览器依赖,这里我的容器对应版本为1.46.0
pip3 install playwright==1.46.0 -i https://mirrors.aliyun.com/pypi/simple/
创建运行脚本run.sh:
#!/bin/bash
cd /app || exit 1
export PLAYWRIGHT_BROWSERS_PATH=/ms-playwright
export TERM=xterm
export SHLVL=1
export LC_ALL=C.UTF-8
export PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin
export OLDPWD=/ms-playwright/chromium-1129/chrome-linux
# 获取当前时间
current_time=$(date +"%Y-%m-%d %H:%M:%S")
# 检查上一个进程是否仍在运行
if pgrep -f "python3 mingchao.py" > /dev/null; then
echo "$current_time: 上一个任务仍在运行,跳过此执行" >> run.log
exit 0
fi
# 执行 Python 脚本
echo "$current_time: 开始执行 mingchao.py" >> run.log
python3 mingchao.py >> mingchao.log 2>&1
echo "$current_time: mingchao.py 已启动" >> run.log
注意,脚本需要导入执行环境变量,否则无法正常工作。
添加执行权限:
chmod a+x /opt/spider/run.sh
添加计划任务,每一小时执行一次爬虫:
crontab -l 2>/dev/null; echo "0 * * * * /opt/spider/run.sh" | crontab -
到此所有工作完美结束,爬取的日志输出在当前目录下的mingchao.log中,如果实现了邮件函数,便可坐等通知邮件。