第12章 综合实战项目
12.1 新闻网站全文采集系统
新闻网站全文采集是爬虫的经典应用场景。这类网站通常具有清晰的结构,但也可能包含反爬机制。我们需要设计一个能够稳定运行、自动处理分页、提取正文内容并妥善存储的系统。
核心功能方法表
| 功能名称 | 调用方法 | 具体功能与注意事项 |
|---|---|---|
| 获取新闻列表页 | requests.get(list_url, headers=headers) | 需要处理分页参数,通常为page或offset |
| 提取新闻链接 | soup.find_all('a', class_='news-link') | 使用BeautifulSoup定位链接元素,注意相对路径转绝对路径 |
| 获取新闻详情 | requests.get(detail_url, headers=headers) | 可能需要添加Referer头模拟从列表页跳转 |
| 提取正文内容 | soup.find('div', class_='content').get_text() | 不同网站结构差异大,需针对性编写选择器 |
下面是一个完整的新闻采集示例:
python
# -*- coding: utf-8 -*-
import requests
from bs4 import BeautifulSoup
import time
import csv
import os
from urllib.parse import urljoin, urlparse
# 设置请求头,模拟真实浏览器访问
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
def get_news_list(base_url, page_num=1):
"""
获取新闻列表页面的所有新闻链接
:param base_url: 列表页基础URL
:param page_num: 页码
:return: 新闻详情页URL列表
"""
# 构造完整的列表页URL(假设分页参数为page)
list_url = f"{base_url}?page={page_num}"
try:
# 发送GET请求获取列表页内容
response = requests.get(list_url, headers=headers, timeout=10)
response.raise_for_status() # 检查HTTP错误
# 使用BeautifulSoup解析HTML
soup = BeautifulSoup(response.text, 'html.parser')
# 查找所有新闻链接(这里使用通用的选择器,实际需要根据目标网站调整)
news_links = []
for link in soup.find_all('a', href=True):
href = link['href']
# 将相对路径转换为绝对路径
absolute_url = urljoin(base_url, href)
# 确保URL属于同一域名
if urlparse(absolute_url).netloc == urlparse(base_url).netloc:
news_links.append(absolute_url)
return news_links
except requests.RequestException as e:
print(f"获取列表页失败: {e}")
return []
def extract_news_content(news_url):
"""
提取单篇新闻的标题和正文内容
:param news_url: 新闻详情页URL
:return: 包含标题和正文的字典
"""
try:
# 发送请求获取新闻详情页
response = requests.get(news_url, headers=headers, timeout=10)
response.raise_for_status()
# 解析HTML
soup = BeautifulSoup(response.text, 'html.parser')
# 提取标题(需要根据实际网站结构调整选择器)
title_elem = soup.find('h1') or soup.find('title')
title = title_elem.get_text().strip() if title_elem else "未知标题"
# 提取正文内容(需要根据实际网站结构调整选择器)
# 这里提供几种常见的正文容器选择方式
content_selectors = [
'div.content',
'div.article-content',
'div.post-content',
'article',
'div.entry-content'
]
content = ""
for selector in content_selectors:
content_elem = soup.select_one(selector)
if content_elem:
content = content_elem.get_text().strip()
break
# 如果以上选择器都没找到,尝试获取所有段落
if not content:
paragraphs = soup.find_all('p')
content = '\n'.join([p.get_text().strip() for p in paragraphs])
return {
'url': news_url,
'title': title,
'content': content
}
except requests.RequestException as e:
print(f"获取新闻详情失败 {news_url}: {e}")
return None
except Exception as e:
print(f"解析新闻内容失败 {news_url}: {e}")
return None
def save_to_csv(news_data, filename='news_data.csv'):
"""
将新闻数据保存到CSV文件
:param news_data: 新闻数据列表
:param filename: 保存的文件名
"""
# 检查文件是否存在,决定是否写入表头
file_exists = os.path.isfile(filename)
with open(filename, 'a', newline='', encoding='utf-8-sig') as csvfile:
fieldnames = ['url', 'title', 'content']
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
# 如果文件不存在,先写入表头
if not file_exists:
writer.writeheader()
# 写入新闻数据
for news in news_data:
if news: # 确保数据不为空
writer.writerow(news)
def main():
"""
主函数:执行完整的新闻采集流程
"""
# 配置目标网站的列表页URL(这里以示例URL代替,实际使用时替换为真实URL)
base_list_url = "https://example-news-site.com/news"
# 存储所有采集到的新闻数据
all_news_data = []
# 采集前3页的新闻(可根据需要调整)
for page in range(1, 4):
print(f"正在采集第 {page} 页...")
# 获取当前页的新闻链接
news_urls = get_news_list(base_list_url, page)
print(f"找到 {len(news_urls)} 个新闻链接")
# 逐个采集新闻详情
for url in news_urls[:5]: # 限制每页只采集前5条,避免过于频繁
print(f"采集: {url}")
news_data = extract_news_content(url)
if news_data:
all_news_data.append(news_data)
# 添加延迟,避免请求过于频繁
time.sleep(1)
# 页面间添加稍长的延迟
time.sleep(2)
# 保存数据到CSV文件
if all_news_data:
save_to_csv(all_news_data)
print(f"成功保存 {len(all_news_data)} 条新闻数据")
else:
print("未采集到任何新闻数据")
if __name__ == "__main__":
main()注意事项:
- 网站结构差异:不同新闻网站的HTML结构千差万别,代码中的选择器需要根据目标网站实际结构调整
- 反爬机制:新闻网站通常有反爬措施,需要合理设置请求间隔,必要时使用代理IP
- 编码问题:确保正确处理中文编码,使用
utf-8-sig编码保存CSV可避免Excel打开乱码 - 法律合规:仅采集公开可访问的内容,遵守robots.txt协议,不要用于商业用途
- 错误处理:网络请求可能失败,必须包含完善的异常处理机制
这个新闻采集系统展示了如何构建一个完整的爬虫应用,包含了请求发送、内容解析、数据存储等核心环节,并且考虑了实际运行中的各种问题。
12.2 电商商品价格监控爬虫
电商价格监控是另一个热门的爬虫应用场景。通过定期抓取商品价格信息,可以帮助用户了解价格变化趋势,抓住最佳购买时机。
核心功能方法表
| 功能名称 | 调用方法 | 具体功能与注意事项 |
|---|---|---|
| 获取商品详情页 | requests.get(product_url, headers=headers) | 电商网站通常有复杂的反爬机制,需要更真实的请求头 |
| 提取价格信息 | soup.find('span', class_='price').get_text() | 价格可能在多个位置,需要尝试不同的选择器 |
| 提取商品基本信息 | soup.find('h1', class_='product-title').get_text() | 包括商品名称、品牌、规格等信息 |
| 定时监控 | schedule.every().hour.do(monitor_price) | 使用schedule库实现定时任务 |
电商价格监控爬虫示例:
python
# -*- coding: utf-8 -*-
import requests
from bs4 import BeautifulSoup
import json
import time
import sqlite3
from datetime import datetime
import re
# 更完整的请求头,模拟真实浏览器
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
def create_database():
"""
创建SQLite数据库和商品价格表
"""
conn = sqlite3.connect('price_monitor.db')
cursor = conn.cursor()
# 创建商品信息表
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id TEXT UNIQUE,
name TEXT,
url TEXT,
platform TEXT
)
''')
# 创建价格记录表
cursor.execute('''
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id TEXT,
price REAL,
currency TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_id) REFERENCES products (product_id)
)
''')
conn.commit()
conn.close()
def extract_price_from_text(text):
"""
从文本中提取价格数字
:param text: 包含价格的文本
:return: 提取到的价格数字,如果失败返回None
"""
if not text:
return None
# 移除所有非数字和小数点的字符,但保留负号(虽然价格不会是负的)
# 这个正则表达式匹配数字和小数点
price_match = re.search(r'[\d,]+\.?\d*', text.replace(',', ''))
if price_match:
try:
return float(price_match.group())
except ValueError:
return None
return None
def monitor_jd_product(product_url):
"""
监控京东商品价格
:param product_url: 京东商品URL
:return: 商品信息字典
"""
try:
# 发送请求获取商品页面
response = requests.get(product_url, headers=headers, timeout=15)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 提取商品ID(从URL中获取)
import re
product_id_match = re.search(r'/(\d+)\.html', product_url)
product_id = product_id_match.group(1) if product_id_match else "unknown"
# 提取商品名称
title_elem = soup.find('div', class_='sku-name') or soup.find('h1')
product_name = title_elem.get_text().strip() if title_elem else "未知商品"
# 尝试多种方式提取价格
price = None
# 方式1: 查找价格元素
price_elem = soup.find('span', class_='price')
if price_elem:
price = extract_price_from_text(price_elem.get_text())
# 方式2: 如果方式1失败,查找其他可能的价格元素
if price is None:
price_elem = soup.find('span', class_='p-price')
if price_elem:
price = extract_price_from_text(price_elem.get_text())
# 方式3: 查找包含"¥"符号的元素
if price is None:
for elem in soup.find_all(text=re.compile(r'¥')):
price = extract_price_from_text(elem)
if price is not None:
break
return {
'product_id': f"jd_{product_id}",
'name': product_name,
'url': product_url,
'platform': '京东',
'price': price,
'currency': 'CNY'
}
except requests.RequestException as e:
print(f"请求京东商品页面失败: {e}")
return None
except Exception as e:
print(f"解析京东商品信息失败: {e}")
return None
def monitor_taobao_product(product_url):
"""
监控淘宝商品价格(简化版,实际淘宝有复杂的反爬)
:param product_url: 淘宝商品URL
:return: 商品信息字典
"""
# 注意:淘宝有非常严格的反爬机制,此示例仅作演示
# 实际使用可能需要Selenium或其他高级技术
try:
response = requests.get(product_url, headers=headers, timeout=15)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 提取商品ID
import re
product_id_match = re.search(r'id=(\d+)', product_url)
product_id = product_id_match.group(1) if product_id_match else "unknown"
# 提取商品名称
title_elem = soup.find('h1', class_='tb-main-title') or soup.find('title')
product_name = title_elem.get_text().strip() if title_elem else "未知商品"
# 提取价格(淘宝价格结构复杂,这里简化处理)
price = None
price_elem = soup.find('em', class_='tb-rmb-num')
if price_elem:
price = extract_price_from_text(price_elem.get_text())
return {
'product_id': f"tb_{product_id}",
'name': product_name,
'url': product_url,
'platform': '淘宝',
'price': price,
'currency': 'CNY'
}
except Exception as e:
print(f"监控淘宝商品失败: {e}")
return None
def save_product_info(product_info):
"""
保存商品信息到数据库
:param product_info: 商品信息字典
"""
if not product_info or not product_info.get('price'):
return False
conn = sqlite3.connect('price_monitor.db')
cursor = conn.cursor()
try:
# 插入或更新商品基本信息
cursor.execute('''
INSERT OR IGNORE INTO products (product_id, name, url, platform)
VALUES (?, ?, ?, ?)
''', (
product_info['product_id'],
product_info['name'],
product_info['url'],
product_info['platform']
))
# 插入价格记录
cursor.execute('''
INSERT INTO price_history (product_id, price, currency)
VALUES (?, ?, ?)
''', (
product_info['product_id'],
product_info['price'],
product_info['currency']
))
conn.commit()
print(f"已记录 {product_info['name']} 价格: ¥{product_info['price']}")
return True
except sqlite3.Error as e:
print(f"保存商品信息失败: {e}")
return False
finally:
conn.close()
def get_price_history(product_id):
"""
获取指定商品的历史价格记录
:param product_id: 商品ID
:return: 价格历史记录列表
"""
conn = sqlite3.connect('price_monitor.db')
cursor = conn.cursor()
cursor.execute('''
SELECT price, timestamp FROM price_history
WHERE product_id = ?
ORDER BY timestamp DESC
''', (product_id,))
records = cursor.fetchall()
conn.close()
return [{'price': record[0], 'timestamp': record[1]} for record in records]
def main():
"""
主函数:监控多个商品的价格
"""
# 创建数据库
create_database()
# 配置要监控的商品URL列表
product_urls = [
# 京东商品示例(替换为真实URL)
"https://item.jd.com/100000000000.html",
# 淘宝商品示例(替换为真实URL)
"https://item.taobao.com/item.htm?id=10000000000"
]
print("开始价格监控...")
for url in product_urls:
print(f"监控商品: {url}")
# 根据URL判断平台
if 'jd.com' in url:
product_info = monitor_jd_product(url)
elif 'taobao.com' in url:
product_info = monitor_taobao_product(url)
else:
print(f"不支持的平台: {url}")
continue
# 保存商品信息
if product_info:
save_product_info(product_info)
# 添加延迟,避免请求过于频繁
time.sleep(3)
print("价格监控完成")
if __name__ == "__main__":
main()注意事项:
- 电商平台反爬:京东、淘宝等大型电商平台有严格的反爬机制,简单的requests可能无法获取到真实价格
- 动态加载:很多电商网站的价格是通过JavaScript动态加载的,可能需要使用Selenium
- 价格格式复杂:价格可能包含促销信息、会员价、满减等复杂情况,需要仔细分析
- 数据库设计:合理设计数据库结构,便于后续查询和分析价格趋势
- 定时任务:实际应用中应该使用定时任务调度器定期执行监控
这个价格监控爬虫展示了如何处理电商网站的特殊挑战,并提供了数据持久化的解决方案。
12.3 招聘信息聚合分析爬虫
招聘网站数据对于求职者和人力资源分析都非常有价值。通过爬取多个招聘网站的职位信息,可以进行薪资分析、技能需求统计等。
核心功能方法表
| 功能名称 | 调用方法 | 具体功能与注意事项 |
|---|---|---|
| 搜索职位列表 | requests.get(search_url, params=search_params) | 需要构造正确的搜索参数 |
| 提取职位信息 | soup.find('div', class_='job-item') | 包括职位名称、公司、薪资、地点等 |
| 处理薪资范围 | parse_salary_range(salary_text) | 将"10k-20k"转换为数值范围 |
| 数据标准化 | standardize_job_data(raw_data) | 统一不同网站的数据格式 |
招聘信息爬虫示例:
python
# -*- coding: utf-8 -*-
import requests
from bs4 import BeautifulSoup
import pandas as pd
import re
import time
from datetime import datetime
import json
# 请求头配置
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
def parse_salary_range(salary_text):
"""
解析薪资范围文本,返回最小值和最大值
:param salary_text: 薪资文本,如"10k-20k"、"面议"、"15k以上"
:return: (min_salary, max_salary) 元/月
"""
if not salary_text or '面议' in salary_text or '保密' in salary_text:
return (None, None)
# 移除空格和特殊字符
salary_text = salary_text.replace(' ', '').replace('·', '')
# 匹配薪资模式
# 支持的格式:10k-20k, 10-20k, 15k以上, 15k以下, 15k
patterns = [
r'(\d+\.?\d*)[kK]?\s*-\s*(\d+\.?\d*)[kK]?', # 10k-20k 或 10-20k
r'(\d+\.?\d*)[kK]?以上', # 15k以上
r'(\d+\.?\d*)[kK]?以下', # 15k以下
r'(\d+\.?\d*)[kK]?' # 15k
]
for pattern in patterns:
match = re.search(pattern, salary_text)
if match:
groups = match.groups()
if len(groups) == 2:
# 范围格式
min_val = float(groups[0])
max_val = float(groups[1])
elif '以上' in salary_text:
# 最低薪资
min_val = float(groups[0])
max_val = min_val * 1.5 # 估算上限
elif '以下' in salary_text:
# 最高薪资
max_val = float(groups[0])
min_val = max_val * 0.7 # 估算下限
else:
# 单一薪资
min_val = max_val = float(groups[0])
# 转换为元/月(假设k表示千元/月)
min_salary = min_val * 1000 if 'k' in salary_text.lower() or 'K' in salary_text else min_val
max_salary = max_val * 1000 if 'k' in salary_text.lower() or 'K' in salary_text else max_val
return (min_salary, max_salary)
return (None, None)
def scrape_liepin_jobs(keyword, city="全国", pages=1):
"""
爬取猎聘网职位信息
:param keyword: 搜索关键词
:param city: 城市
:param pages: 爬取页数
:return: 职位信息列表
"""
jobs = []
for page in range(1, pages + 1):
print(f"爬取猎聘网第 {page} 页...")
# 构造搜索URL
search_url = "https://www.liepin.com/zhaopin/"
params = {
'key': keyword,
'curPage': page - 1 # 猎聘页码从0开始
}
if city != "全国":
params['dqs'] = city
try:
response = requests.get(search_url, headers=headers, params=params, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 查找职位列表项
job_items = soup.find_all('div', class_='job-info')
if not job_items:
print("未找到职位信息,可能已到达最后一页")
break
for item in job_items:
try:
# 提取职位名称
title_elem = item.find('h3')
title = title_elem.get_text().strip() if title_elem else "未知职位"
# 提取公司名称
company_elem = item.find('p', class_='company-name')
company = company_elem.get_text().strip() if company_elem else "未知公司"
# 提取薪资
salary_elem = item.find('span', class_='salary')
salary_text = salary_elem.get_text().strip() if salary_elem else ""
min_salary, max_salary = parse_salary_range(salary_text)
# 提取工作地点
location_elem = item.find('a', class_='area')
location = location_elem.get_text().strip() if location_elem else ""
# 提取职位链接
link_elem = item.find('a')
job_link = link_elem['href'] if link_elem and 'href' in link_elem.attrs else ""
job_info = {
'platform': '猎聘网',
'title': title,
'company': company,
'min_salary': min_salary,
'max_salary': max_salary,
'location': location,
'link': job_link,
'keyword': keyword,
'scraped_at': datetime.now().isoformat()
}
jobs.append(job_info)
except Exception as e:
print(f"解析单个职位信息失败: {e}")
continue
# 添加延迟
time.sleep(2)
except requests.RequestException as e:
print(f"请求猎聘网失败: {e}")
break
return jobs
def scrape_zhipin_jobs(keyword, city="全国", pages=1):
"""
爬取BOSS直聘职位信息(简化版)
:param keyword: 搜索关键词
:param city: 城市
:param pages: 爬取页数
:return: 职位信息列表
"""
# 注意:BOSS直聘有严格的反爬机制,此示例仅作演示
jobs = []
for page in range(1, pages + 1):
print(f"爬取BOSS直聘第 {page} 页...")
# BOSS直聘的实际API比较复杂,这里使用简化的方式
search_url = f"https://www.zhipin.com/c101010100/?query={keyword}&page={page}"
try:
response = requests.get(search_url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# 查找职位列表
job_items = soup.find_all('div', class_='job-primary')
for item in job_items:
try:
# 提取职位信息
title_elem = item.find('span', class_='job-name')
title = title_elem.get_text().strip() if title_elem else "未知职位"
company_elem = item.find('h3', class_='name')
company = company_elem.get_text().strip() if company_elem else "未知公司"
salary_elem = item.find('span', class_='red')
salary_text = salary_elem.get_text().strip() if salary_elem else ""
min_salary, max_salary = parse_salary_range(salary_text)
info_primary = item.find('div', class_='info-primary')
location = ""
if info_primary:
p_tag = info_primary.find('p')
if p_tag:
location = p_tag.get_text().split('·')[0] if '·' in p_tag.get_text() else p_tag.get_text()
link_elem = item.find('a')
job_link = "https://www.zhipin.com" + link_elem['href'] if link_elem and 'href' in link_elem.attrs else ""
job_info = {
'platform': 'BOSS直聘',
'title': title,
'company': company,
'min_salary': min_salary,
'max_salary': max_salary,
'location': location,
'link': job_link,
'keyword': keyword,
'scraped_at': datetime.now().isoformat()
}
jobs.append(job_info)
except Exception as e:
print(f"解析BOSS直聘职位信息失败: {e}")
continue
time.sleep(3)
except requests.RequestException as e:
print(f"请求BOSS直聘失败: {e}")
break
return jobs
def analyze_job_data(jobs_data):
"""
分析职位数据
:param jobs_data: 职位数据列表
:return: 分析结果字典
"""
if not jobs_data:
return {}
df = pd.DataFrame(jobs_data)
# 计算平均薪资(排除None值)
valid_salaries = df[df['min_salary'].notna() & df['max_salary'].notna()]
if not valid_salaries.empty:
avg_min_salary = valid_salaries['min_salary'].mean()
avg_max_salary = valid_salaries['max_salary'].mean()
else:
avg_min_salary = avg_max_salary = 0
# 按平台统计
platform_stats = df.groupby('platform').size().to_dict()
# 按城市统计(取前5个城市)
location_stats = df['location'].value_counts().head(5).to_dict()
analysis = {
'total_jobs': len(jobs_data),
'avg_min_salary': round(avg_min_salary, 2),
'avg_max_salary': round(avg_max_salary, 2),
'platform_distribution': platform_stats,
'top_locations': location_stats
}
return analysis
def save_jobs_to_excel(jobs_data, filename='job_data.xlsx'):
"""
保存职位数据到Excel文件
:param jobs_data: 职位数据列表
:param filename: 文件名
"""
if not jobs_data:
print("没有数据可保存")
return
df = pd.DataFrame(jobs_data)
df.to_excel(filename, index=False, engine='openpyxl')
print(f"已保存 {len(jobs_data)} 条职位数据到 {filename}")
def main():
"""
主函数:执行招聘数据爬取和分析
"""
keyword = "Python开发工程师"
city = "北京"
pages = 2 # 每个网站爬取2页
all_jobs = []
# 爬取猎聘网数据
print("开始爬取猎聘网...")
liepin_jobs = scrape_liepin_jobs(keyword, city, pages)
all_jobs.extend(liepin_jobs)
# 爬取BOSS直聘数据
print("开始爬取BOSS直聘...")
zhipin_jobs = scrape_zhipin_jobs(keyword, city, pages)
all_jobs.extend(zhipin_jobs)
print(f"总共爬取到 {len(all_jobs)} 条职位信息")
# 保存数据
if all_jobs:
save_jobs_to_excel(all_jobs)
# 进行数据分析
analysis = analyze_job_data(all_jobs)
print("\n=== 数据分析结果 ===")
print(f"总职位数: {analysis.get('total_jobs', 0)}")
print(f"平均薪资范围: ¥{analysis.get('avg_min_salary', 0):,.0f} - ¥{analysis.get('avg_max_salary', 0):,.0 f}")
print(f"平台分布: {analysis.get('platform_distribution', {})}")
print(f"热门城市: {analysis.get('top_locations', {})}")
else:
print("未爬取到任何职位数据")
if __name__ == "__main__":
main()注意事项:
- 反爬机制:招聘网站通常有较强的反爬措施,可能需要更复杂的处理方式
- 薪资格式多样:不同网站的薪资显示格式差异很大,需要灵活的解析逻辑
- 数据质量:爬取的数据可能存在缺失或错误,需要进行数据清洗
- 法律合规:招聘数据涉及个人信息,需要遵守相关法律法规
- API限制:有些招聘网站提供官方API,优先考虑使用官方渠道
这个招聘信息爬虫展示了如何处理多源数据聚合,并提供了基本的数据分析功能。
12.4 社交媒体公开数据采集(遵守平台规则)
社交媒体数据对于舆情分析、市场调研等场景非常有价值。但必须严格遵守平台的使用条款和robots.txt协议,仅采集公开可访问的数据。
核心功能方法表
| 功能名称 | 调用方法 | 具体功能与注意事项 |
|---|---|---|
| 获取公开微博列表 | requests.get(weibo_api_url, params=params) | 使用微博开放平台API,需要申请开发者权限 |
| 提取微博内容 | json_response['data']['cards'] | 解析JSON格式的API响应 |
| 处理分页数据 | params['since_id'] = last_since_id | 微博API使用since_id进行分页 |
| 遵守API限制 | time.sleep(1) | 严格遵守API调用频率限制 |
社交媒体数据采集示例(以微博为例):
python
# -*- coding: utf-8 -*-
import requests
import json
import time
import csv
from datetime import datetime
import os
# 微博开放平台API配置
# 注意:需要先在 https://open.weibo.com/ 申请开发者账号和应用
APP_KEY = "your_app_key" # 替换为你的App Key
APP_SECRET = "your_app_secret" # 替换为你的App Secret
# 请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
def get_weibo_access_token():
"""
获取微博API访问令牌(简化版,实际需要OAuth2.0授权流程)
:return: access_token字符串
"""
# 注意:这只是一个示意,实际的OAuth2.0流程更复杂
# 在生产环境中,你需要实现完整的授权流程
return "your_access_token" # 替换为实际的access token
def search_weibo_posts(keyword, access_token, pages=1):
"""
搜索微博公开帖子
:param keyword: 搜索关键词
:param access_token: API访问令牌
:param pages: 搜索页数
:return: 微博帖子列表
"""
posts = []
since_id = 0 # 分页参数
# 微博搜索API端点
api_url = "https://api.weibo.com/2/search/topics.json"
for page in range(pages):
print(f"搜索微博第 {page + 1} 页...")
params = {
'access_token': access_token,
'q': keyword,
'count': 20, # 每页数量
'page': page + 1
}
# 如果不是第一页,添加since_id参数
if since_id > 0:
params['since_id'] = since_id
try:
response = requests.get(api_url, headers=headers, params=params, timeout=10)
# 检查API响应
if response.status_code == 200:
data = response.json()
# 检查是否有错误
if 'error_code' in data:
print(f"微博API错误: {data.get('error')}")
break
# 提取微博数据
statuses = data.get('statuses', [])
if not statuses:
print("未找到更多微博数据")
break
for status in statuses:
post_info = {
'platform': '微博',
'id': status.get('idstr', ''),
'text': status.get('text', ''),
'user_name': status.get('user', {}).get('screen_name', ''),
'user_id': status.get('user', {}).get('idstr', ''),
'created_at': status.get('created_at', ''),
'reposts_count': status.get('reposts_count', 0),
'comments_count': status.get('comments_count', 0),
'attitudes_count': status.get('attitudes_count', 0),
'keyword': keyword,
'scraped_at': datetime.now().isoformat()
}
posts.append(post_info)
# 更新since_id用于下一页(如果有)
if 'since_id' in data:
since_id = data['since_id']
# 严格遵守API调用频率限制
time.sleep(1)
else:
print(f"微博API请求失败: {response.status_code}")
break
except requests.RequestException as e:
print(f"请求微博API失败: {e}")
break
except json.JSONDecodeError as e:
print(f"解析微博API响应失败: {e}")
break
return posts
def clean_weibo_text(text):
"""
清理微博文本,移除HTML标签和特殊字符
:param text: 原始微博文本
:return: 清理后的文本
"""
import re
# 移除HTML标签
clean_text = re.sub(r'<[^>]+>', '', text)
# 移除多余的空白字符
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
return clean_text
def save_weibo_posts_to_csv(posts, filename='weibo_posts.csv'):
"""
保存微博数据到CSV文件
:param posts: 微博数据列表
:param filename: 文件名
"""
if not posts:
print("没有微博数据可保存")
return
# 确保目录存在
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
file_exists = os.path.isfile(filename)
with open(filename, 'a', newline='', encoding='utf-8-sig') as csvfile:
fieldnames = [
'platform', 'id', 'text', 'user_name', 'user_id',
'created_at', 'reposts_count', 'comments_count',
'attitudes_count', 'keyword', 'scraped_at'
]
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if not file_exists:
writer.writeheader()
for post in posts:
# 清理文本内容
post['text'] = clean_weibo_text(post['text'])
writer.writerow(post)
print(f"已保存 {len(posts)} 条微博数据到 {filename}")
def get_user_public_timeline(user_id, access_token, count=20):
"""
获取指定用户的公开时间线(需要用户授权或公开账号)
:param user_id: 用户ID
:param access_token: 访问令牌
:param count: 获取的微博数量
:return: 用户微博列表
"""
# 注意:这需要用户的授权,或者用户账号是完全公开的
api_url = "https://api.weibo.com/2/statuses/user_timeline.json"
params = {
'access_token': access_token,
'uid': user_id,
'count': count,
'trim_user': 0
}
try:
response = requests.get(api_url, headers=headers, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
if 'error_code' in data:
print(f"获取用户时间线失败: {data.get('error')}")
return []
statuses = data.get('statuses', [])
posts = []
for status in statuses:
post_info = {
'platform': '微博',
'id': status.get('idstr', ''),
'text': status.get('text', ''),
'user_name': status.get('user', {}).get('screen_name', ''),
'user_id': status.get('user', {}).get('idstr', ''),
'created_at': status.get('created_at', ''),
'reposts_count': status.get('reposts_count', 0),
'comments_count': status.get('comments_count', 0),
'attitudes_count': status.get('attitudes_count', 0),
'scraped_at': datetime.now().isoformat()
}
posts.append(post_info)
return posts
except Exception as e:
print(f"获取用户时间线失败: {e}")
return []
def main():
"""
主函数:执行微博数据采集
"""
# 获取访问令牌
access_token = get_weibo_access_token()
if not access_token:
print("无法获取访问令牌,请检查开发者配置")
return
# 搜索关键词
keyword = "人工智能"
pages = 2
print(f"开始搜索微博关键词: {keyword}")
# 搜索微博帖子
posts = search_weibo_posts(keyword, access_token, pages)
if posts:
# 保存数据
save_weibo_posts_to_csv(posts)
# 显示统计信息
total_posts = len(posts)
total_reposts = sum(post['reposts_count'] for post in posts)
total_comments = sum(post['comments_count'] for post in posts)
print(f"\n=== 采集统计 ===")
print(f"总帖子数: {total_posts}")
print(f"总转发数: {total_reposts}")
print(f"总评论数: {total_comments}")
# 显示前3条微博预览
print(f"\n=== 前3条微博预览 ===")
for i, post in enumerate(posts[:3]):
print(f"{i+1}. @{post['user_name']}: {post['text'][:100]}...")
else:
print("未采集到任何微博数据")
if __name__ == "__main__":
main()注意事项:
- API授权:微博等社交媒体平台需要开发者账号和API授权,不能直接爬取
- 调用限制:严格遵守API的调用频率限制,避免被封禁
- 数据隐私:仅采集公开数据,不得采集用户私密信息
- 内容合规:采集的内容不得用于违法或违反平台条款的用途
- 错误处理:API可能返回各种错误码,需要完善的错误处理机制
这个社交媒体数据采集示例强调了合规性和API使用的正确方式,展示了如何在遵守平台规则的前提下获取公开数据。