Skip to content

第12章 综合实战项目

12.1 新闻网站全文采集系统

新闻网站全文采集是爬虫的经典应用场景。这类网站通常具有清晰的结构,但也可能包含反爬机制。我们需要设计一个能够稳定运行、自动处理分页、提取正文内容并妥善存储的系统。

核心功能方法表

功能名称调用方法具体功能与注意事项
获取新闻列表页requests.get(list_url, headers=headers)需要处理分页参数,通常为page或offset
提取新闻链接soup.find_all('a', class_='news-link')使用BeautifulSoup定位链接元素,注意相对路径转绝对路径
获取新闻详情requests.get(detail_url, headers=headers)可能需要添加Referer头模拟从列表页跳转
提取正文内容soup.find('div', class_='content').get_text()不同网站结构差异大,需针对性编写选择器

下面是一个完整的新闻采集示例:

python
# -*- coding: utf-8 -*-
import requests
from bs4 import BeautifulSoup
import time
import csv
import os
from urllib.parse import urljoin, urlparse

# 设置请求头,模拟真实浏览器访问
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}

def get_news_list(base_url, page_num=1):
    """
    获取新闻列表页面的所有新闻链接
    :param base_url: 列表页基础URL
    :param page_num: 页码
    :return: 新闻详情页URL列表
    """
    # 构造完整的列表页URL(假设分页参数为page)
    list_url = f"{base_url}?page={page_num}"
    
    try:
        # 发送GET请求获取列表页内容
        response = requests.get(list_url, headers=headers, timeout=10)
        response.raise_for_status()  # 检查HTTP错误
        
        # 使用BeautifulSoup解析HTML
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # 查找所有新闻链接(这里使用通用的选择器,实际需要根据目标网站调整)
        news_links = []
        for link in soup.find_all('a', href=True):
            href = link['href']
            # 将相对路径转换为绝对路径
            absolute_url = urljoin(base_url, href)
            # 确保URL属于同一域名
            if urlparse(absolute_url).netloc == urlparse(base_url).netloc:
                news_links.append(absolute_url)
        
        return news_links
    
    except requests.RequestException as e:
        print(f"获取列表页失败: {e}")
        return []

def extract_news_content(news_url):
    """
    提取单篇新闻的标题和正文内容
    :param news_url: 新闻详情页URL
    :return: 包含标题和正文的字典
    """
    try:
        # 发送请求获取新闻详情页
        response = requests.get(news_url, headers=headers, timeout=10)
        response.raise_for_status()
        
        # 解析HTML
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # 提取标题(需要根据实际网站结构调整选择器)
        title_elem = soup.find('h1') or soup.find('title')
        title = title_elem.get_text().strip() if title_elem else "未知标题"
        
        # 提取正文内容(需要根据实际网站结构调整选择器)
        # 这里提供几种常见的正文容器选择方式
        content_selectors = [
            'div.content',
            'div.article-content', 
            'div.post-content',
            'article',
            'div.entry-content'
        ]
        
        content = ""
        for selector in content_selectors:
            content_elem = soup.select_one(selector)
            if content_elem:
                content = content_elem.get_text().strip()
                break
        
        # 如果以上选择器都没找到,尝试获取所有段落
        if not content:
            paragraphs = soup.find_all('p')
            content = '\n'.join([p.get_text().strip() for p in paragraphs])
        
        return {
            'url': news_url,
            'title': title,
            'content': content
        }
    
    except requests.RequestException as e:
        print(f"获取新闻详情失败 {news_url}: {e}")
        return None
    except Exception as e:
        print(f"解析新闻内容失败 {news_url}: {e}")
        return None

def save_to_csv(news_data, filename='news_data.csv'):
    """
    将新闻数据保存到CSV文件
    :param news_data: 新闻数据列表
    :param filename: 保存的文件名
    """
    # 检查文件是否存在,决定是否写入表头
    file_exists = os.path.isfile(filename)
    
    with open(filename, 'a', newline='', encoding='utf-8-sig') as csvfile:
        fieldnames = ['url', 'title', 'content']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        
        # 如果文件不存在,先写入表头
        if not file_exists:
            writer.writeheader()
        
        # 写入新闻数据
        for news in news_data:
            if news:  # 确保数据不为空
                writer.writerow(news)

def main():
    """
    主函数:执行完整的新闻采集流程
    """
    # 配置目标网站的列表页URL(这里以示例URL代替,实际使用时替换为真实URL)
    base_list_url = "https://example-news-site.com/news"
    
    # 存储所有采集到的新闻数据
    all_news_data = []
    
    # 采集前3页的新闻(可根据需要调整)
    for page in range(1, 4):
        print(f"正在采集第 {page} 页...")
        
        # 获取当前页的新闻链接
        news_urls = get_news_list(base_list_url, page)
        print(f"找到 {len(news_urls)} 个新闻链接")
        
        # 逐个采集新闻详情
        for url in news_urls[:5]:  # 限制每页只采集前5条,避免过于频繁
            print(f"采集: {url}")
            news_data = extract_news_content(url)
            if news_data:
                all_news_data.append(news_data)
            
            # 添加延迟,避免请求过于频繁
            time.sleep(1)
        
        # 页面间添加稍长的延迟
        time.sleep(2)
    
    # 保存数据到CSV文件
    if all_news_data:
        save_to_csv(all_news_data)
        print(f"成功保存 {len(all_news_data)} 条新闻数据")
    else:
        print("未采集到任何新闻数据")

if __name__ == "__main__":
    main()

注意事项:

  1. 网站结构差异:不同新闻网站的HTML结构千差万别,代码中的选择器需要根据目标网站实际结构调整
  2. 反爬机制:新闻网站通常有反爬措施,需要合理设置请求间隔,必要时使用代理IP
  3. 编码问题:确保正确处理中文编码,使用utf-8-sig编码保存CSV可避免Excel打开乱码
  4. 法律合规:仅采集公开可访问的内容,遵守robots.txt协议,不要用于商业用途
  5. 错误处理:网络请求可能失败,必须包含完善的异常处理机制

这个新闻采集系统展示了如何构建一个完整的爬虫应用,包含了请求发送、内容解析、数据存储等核心环节,并且考虑了实际运行中的各种问题。

12.2 电商商品价格监控爬虫

电商价格监控是另一个热门的爬虫应用场景。通过定期抓取商品价格信息,可以帮助用户了解价格变化趋势,抓住最佳购买时机。

核心功能方法表

功能名称调用方法具体功能与注意事项
获取商品详情页requests.get(product_url, headers=headers)电商网站通常有复杂的反爬机制,需要更真实的请求头
提取价格信息soup.find('span', class_='price').get_text()价格可能在多个位置,需要尝试不同的选择器
提取商品基本信息soup.find('h1', class_='product-title').get_text()包括商品名称、品牌、规格等信息
定时监控schedule.every().hour.do(monitor_price)使用schedule库实现定时任务

电商价格监控爬虫示例:

python
# -*- coding: utf-8 -*-
import requests
from bs4 import BeautifulSoup
import json
import time
import sqlite3
from datetime import datetime
import re

# 更完整的请求头,模拟真实浏览器
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
    'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
    'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',
    'Accept-Encoding': 'gzip, deflate',
    'Connection': 'keep-alive',
    'Upgrade-Insecure-Requests': '1',
}

def create_database():
    """
    创建SQLite数据库和商品价格表
    """
    conn = sqlite3.connect('price_monitor.db')
    cursor = conn.cursor()
    
    # 创建商品信息表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS products (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            product_id TEXT UNIQUE,
            name TEXT,
            url TEXT,
            platform TEXT
        )
    ''')
    
    # 创建价格记录表
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS price_history (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            product_id TEXT,
            price REAL,
            currency TEXT,
            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (product_id) REFERENCES products (product_id)
        )
    ''')
    
    conn.commit()
    conn.close()

def extract_price_from_text(text):
    """
    从文本中提取价格数字
    :param text: 包含价格的文本
    :return: 提取到的价格数字,如果失败返回None
    """
    if not text:
        return None
    
    # 移除所有非数字和小数点的字符,但保留负号(虽然价格不会是负的)
    # 这个正则表达式匹配数字和小数点
    price_match = re.search(r'[\d,]+\.?\d*', text.replace(',', ''))
    if price_match:
        try:
            return float(price_match.group())
        except ValueError:
            return None
    return None

def monitor_jd_product(product_url):
    """
    监控京东商品价格
    :param product_url: 京东商品URL
    :return: 商品信息字典
    """
    try:
        # 发送请求获取商品页面
        response = requests.get(product_url, headers=headers, timeout=15)
        response.raise_for_status()
        
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # 提取商品ID(从URL中获取)
        import re
        product_id_match = re.search(r'/(\d+)\.html', product_url)
        product_id = product_id_match.group(1) if product_id_match else "unknown"
        
        # 提取商品名称
        title_elem = soup.find('div', class_='sku-name') or soup.find('h1')
        product_name = title_elem.get_text().strip() if title_elem else "未知商品"
        
        # 尝试多种方式提取价格
        price = None
        
        # 方式1: 查找价格元素
        price_elem = soup.find('span', class_='price')
        if price_elem:
            price = extract_price_from_text(price_elem.get_text())
        
        # 方式2: 如果方式1失败,查找其他可能的价格元素
        if price is None:
            price_elem = soup.find('span', class_='p-price')
            if price_elem:
                price = extract_price_from_text(price_elem.get_text())
        
        # 方式3: 查找包含"¥"符号的元素
        if price is None:
            for elem in soup.find_all(text=re.compile(r'¥')):
                price = extract_price_from_text(elem)
                if price is not None:
                    break
        
        return {
            'product_id': f"jd_{product_id}",
            'name': product_name,
            'url': product_url,
            'platform': '京东',
            'price': price,
            'currency': 'CNY'
        }
    
    except requests.RequestException as e:
        print(f"请求京东商品页面失败: {e}")
        return None
    except Exception as e:
        print(f"解析京东商品信息失败: {e}")
        return None

def monitor_taobao_product(product_url):
    """
    监控淘宝商品价格(简化版,实际淘宝有复杂的反爬)
    :param product_url: 淘宝商品URL
    :return: 商品信息字典
    """
    # 注意:淘宝有非常严格的反爬机制,此示例仅作演示
    # 实际使用可能需要Selenium或其他高级技术
    try:
        response = requests.get(product_url, headers=headers, timeout=15)
        response.raise_for_status()
        
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # 提取商品ID
        import re
        product_id_match = re.search(r'id=(\d+)', product_url)
        product_id = product_id_match.group(1) if product_id_match else "unknown"
        
        # 提取商品名称
        title_elem = soup.find('h1', class_='tb-main-title') or soup.find('title')
        product_name = title_elem.get_text().strip() if title_elem else "未知商品"
        
        # 提取价格(淘宝价格结构复杂,这里简化处理)
        price = None
        price_elem = soup.find('em', class_='tb-rmb-num')
        if price_elem:
            price = extract_price_from_text(price_elem.get_text())
        
        return {
            'product_id': f"tb_{product_id}",
            'name': product_name,
            'url': product_url,
            'platform': '淘宝',
            'price': price,
            'currency': 'CNY'
        }
    
    except Exception as e:
        print(f"监控淘宝商品失败: {e}")
        return None

def save_product_info(product_info):
    """
    保存商品信息到数据库
    :param product_info: 商品信息字典
    """
    if not product_info or not product_info.get('price'):
        return False
    
    conn = sqlite3.connect('price_monitor.db')
    cursor = conn.cursor()
    
    try:
        # 插入或更新商品基本信息
        cursor.execute('''
            INSERT OR IGNORE INTO products (product_id, name, url, platform)
            VALUES (?, ?, ?, ?)
        ''', (
            product_info['product_id'],
            product_info['name'],
            product_info['url'],
            product_info['platform']
        ))
        
        # 插入价格记录
        cursor.execute('''
            INSERT INTO price_history (product_id, price, currency)
            VALUES (?, ?, ?)
        ''', (
            product_info['product_id'],
            product_info['price'],
            product_info['currency']
        ))
        
        conn.commit()
        print(f"已记录 {product_info['name']} 价格: ¥{product_info['price']}")
        return True
        
    except sqlite3.Error as e:
        print(f"保存商品信息失败: {e}")
        return False
    finally:
        conn.close()

def get_price_history(product_id):
    """
    获取指定商品的历史价格记录
    :param product_id: 商品ID
    :return: 价格历史记录列表
    """
    conn = sqlite3.connect('price_monitor.db')
    cursor = conn.cursor()
    
    cursor.execute('''
        SELECT price, timestamp FROM price_history 
        WHERE product_id = ? 
        ORDER BY timestamp DESC
    ''', (product_id,))
    
    records = cursor.fetchall()
    conn.close()
    
    return [{'price': record[0], 'timestamp': record[1]} for record in records]

def main():
    """
    主函数:监控多个商品的价格
    """
    # 创建数据库
    create_database()
    
    # 配置要监控的商品URL列表
    product_urls = [
        # 京东商品示例(替换为真实URL)
        "https://item.jd.com/100000000000.html",
        # 淘宝商品示例(替换为真实URL)
        "https://item.taobao.com/item.htm?id=10000000000"
    ]
    
    print("开始价格监控...")
    
    for url in product_urls:
        print(f"监控商品: {url}")
        
        # 根据URL判断平台
        if 'jd.com' in url:
            product_info = monitor_jd_product(url)
        elif 'taobao.com' in url:
            product_info = monitor_taobao_product(url)
        else:
            print(f"不支持的平台: {url}")
            continue
        
        # 保存商品信息
        if product_info:
            save_product_info(product_info)
        
        # 添加延迟,避免请求过于频繁
        time.sleep(3)
    
    print("价格监控完成")

if __name__ == "__main__":
    main()

注意事项:

  1. 电商平台反爬:京东、淘宝等大型电商平台有严格的反爬机制,简单的requests可能无法获取到真实价格
  2. 动态加载:很多电商网站的价格是通过JavaScript动态加载的,可能需要使用Selenium
  3. 价格格式复杂:价格可能包含促销信息、会员价、满减等复杂情况,需要仔细分析
  4. 数据库设计:合理设计数据库结构,便于后续查询和分析价格趋势
  5. 定时任务:实际应用中应该使用定时任务调度器定期执行监控

这个价格监控爬虫展示了如何处理电商网站的特殊挑战,并提供了数据持久化的解决方案。

12.3 招聘信息聚合分析爬虫

招聘网站数据对于求职者和人力资源分析都非常有价值。通过爬取多个招聘网站的职位信息,可以进行薪资分析、技能需求统计等。

核心功能方法表

功能名称调用方法具体功能与注意事项
搜索职位列表requests.get(search_url, params=search_params)需要构造正确的搜索参数
提取职位信息soup.find('div', class_='job-item')包括职位名称、公司、薪资、地点等
处理薪资范围parse_salary_range(salary_text)将"10k-20k"转换为数值范围
数据标准化standardize_job_data(raw_data)统一不同网站的数据格式

招聘信息爬虫示例:

python
# -*- coding: utf-8 -*-
import requests
from bs4 import BeautifulSoup
import pandas as pd
import re
import time
from datetime import datetime
import json

# 请求头配置
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}

def parse_salary_range(salary_text):
    """
    解析薪资范围文本,返回最小值和最大值
    :param salary_text: 薪资文本,如"10k-20k"、"面议"、"15k以上"
    :return: (min_salary, max_salary) 元/月
    """
    if not salary_text or '面议' in salary_text or '保密' in salary_text:
        return (None, None)
    
    # 移除空格和特殊字符
    salary_text = salary_text.replace(' ', '').replace('·', '')
    
    # 匹配薪资模式
    # 支持的格式:10k-20k, 10-20k, 15k以上, 15k以下, 15k
    patterns = [
        r'(\d+\.?\d*)[kK]?\s*-\s*(\d+\.?\d*)[kK]?',  # 10k-20k 或 10-20k
        r'(\d+\.?\d*)[kK]?以上',                      # 15k以上
        r'(\d+\.?\d*)[kK]?以下',                      # 15k以下
        r'(\d+\.?\d*)[kK]?'                          # 15k
    ]
    
    for pattern in patterns:
        match = re.search(pattern, salary_text)
        if match:
            groups = match.groups()
            if len(groups) == 2:
                # 范围格式
                min_val = float(groups[0])
                max_val = float(groups[1])
            elif '以上' in salary_text:
                # 最低薪资
                min_val = float(groups[0])
                max_val = min_val * 1.5  # 估算上限
            elif '以下' in salary_text:
                # 最高薪资
                max_val = float(groups[0])
                min_val = max_val * 0.7  # 估算下限
            else:
                # 单一薪资
                min_val = max_val = float(groups[0])
            
            # 转换为元/月(假设k表示千元/月)
            min_salary = min_val * 1000 if 'k' in salary_text.lower() or 'K' in salary_text else min_val
            max_salary = max_val * 1000 if 'k' in salary_text.lower() or 'K' in salary_text else max_val
            
            return (min_salary, max_salary)
    
    return (None, None)

def scrape_liepin_jobs(keyword, city="全国", pages=1):
    """
    爬取猎聘网职位信息
    :param keyword: 搜索关键词
    :param city: 城市
    :param pages: 爬取页数
    :return: 职位信息列表
    """
    jobs = []
    
    for page in range(1, pages + 1):
        print(f"爬取猎聘网第 {page} 页...")
        
        # 构造搜索URL
        search_url = "https://www.liepin.com/zhaopin/"
        params = {
            'key': keyword,
            'curPage': page - 1  # 猎聘页码从0开始
        }
        
        if city != "全国":
            params['dqs'] = city
        
        try:
            response = requests.get(search_url, headers=headers, params=params, timeout=10)
            response.raise_for_status()
            
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # 查找职位列表项
            job_items = soup.find_all('div', class_='job-info')
            
            if not job_items:
                print("未找到职位信息,可能已到达最后一页")
                break
            
            for item in job_items:
                try:
                    # 提取职位名称
                    title_elem = item.find('h3')
                    title = title_elem.get_text().strip() if title_elem else "未知职位"
                    
                    # 提取公司名称
                    company_elem = item.find('p', class_='company-name')
                    company = company_elem.get_text().strip() if company_elem else "未知公司"
                    
                    # 提取薪资
                    salary_elem = item.find('span', class_='salary')
                    salary_text = salary_elem.get_text().strip() if salary_elem else ""
                    min_salary, max_salary = parse_salary_range(salary_text)
                    
                    # 提取工作地点
                    location_elem = item.find('a', class_='area')
                    location = location_elem.get_text().strip() if location_elem else ""
                    
                    # 提取职位链接
                    link_elem = item.find('a')
                    job_link = link_elem['href'] if link_elem and 'href' in link_elem.attrs else ""
                    
                    job_info = {
                        'platform': '猎聘网',
                        'title': title,
                        'company': company,
                        'min_salary': min_salary,
                        'max_salary': max_salary,
                        'location': location,
                        'link': job_link,
                        'keyword': keyword,
                        'scraped_at': datetime.now().isoformat()
                    }
                    
                    jobs.append(job_info)
                    
                except Exception as e:
                    print(f"解析单个职位信息失败: {e}")
                    continue
            
            # 添加延迟
            time.sleep(2)
            
        except requests.RequestException as e:
            print(f"请求猎聘网失败: {e}")
            break
    
    return jobs

def scrape_zhipin_jobs(keyword, city="全国", pages=1):
    """
    爬取BOSS直聘职位信息(简化版)
    :param keyword: 搜索关键词
    :param city: 城市
    :param pages: 爬取页数
    :return: 职位信息列表
    """
    # 注意:BOSS直聘有严格的反爬机制,此示例仅作演示
    jobs = []
    
    for page in range(1, pages + 1):
        print(f"爬取BOSS直聘第 {page} 页...")
        
        # BOSS直聘的实际API比较复杂,这里使用简化的方式
        search_url = f"https://www.zhipin.com/c101010100/?query={keyword}&page={page}"
        
        try:
            response = requests.get(search_url, headers=headers, timeout=10)
            response.raise_for_status()
            
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # 查找职位列表
            job_items = soup.find_all('div', class_='job-primary')
            
            for item in job_items:
                try:
                    # 提取职位信息
                    title_elem = item.find('span', class_='job-name')
                    title = title_elem.get_text().strip() if title_elem else "未知职位"
                    
                    company_elem = item.find('h3', class_='name')
                    company = company_elem.get_text().strip() if company_elem else "未知公司"
                    
                    salary_elem = item.find('span', class_='red')
                    salary_text = salary_elem.get_text().strip() if salary_elem else ""
                    min_salary, max_salary = parse_salary_range(salary_text)
                    
                    info_primary = item.find('div', class_='info-primary')
                    location = ""
                    if info_primary:
                        p_tag = info_primary.find('p')
                        if p_tag:
                            location = p_tag.get_text().split('·')[0] if '·' in p_tag.get_text() else p_tag.get_text()
                    
                    link_elem = item.find('a')
                    job_link = "https://www.zhipin.com" + link_elem['href'] if link_elem and 'href' in link_elem.attrs else ""
                    
                    job_info = {
                        'platform': 'BOSS直聘',
                        'title': title,
                        'company': company,
                        'min_salary': min_salary,
                        'max_salary': max_salary,
                        'location': location,
                        'link': job_link,
                        'keyword': keyword,
                        'scraped_at': datetime.now().isoformat()
                    }
                    
                    jobs.append(job_info)
                    
                except Exception as e:
                    print(f"解析BOSS直聘职位信息失败: {e}")
                    continue
            
            time.sleep(3)
            
        except requests.RequestException as e:
            print(f"请求BOSS直聘失败: {e}")
            break
    
    return jobs

def analyze_job_data(jobs_data):
    """
    分析职位数据
    :param jobs_data: 职位数据列表
    :return: 分析结果字典
    """
    if not jobs_data:
        return {}
    
    df = pd.DataFrame(jobs_data)
    
    # 计算平均薪资(排除None值)
    valid_salaries = df[df['min_salary'].notna() & df['max_salary'].notna()]
    if not valid_salaries.empty:
        avg_min_salary = valid_salaries['min_salary'].mean()
        avg_max_salary = valid_salaries['max_salary'].mean()
    else:
        avg_min_salary = avg_max_salary = 0
    
    # 按平台统计
    platform_stats = df.groupby('platform').size().to_dict()
    
    # 按城市统计(取前5个城市)
    location_stats = df['location'].value_counts().head(5).to_dict()
    
    analysis = {
        'total_jobs': len(jobs_data),
        'avg_min_salary': round(avg_min_salary, 2),
        'avg_max_salary': round(avg_max_salary, 2),
        'platform_distribution': platform_stats,
        'top_locations': location_stats
    }
    
    return analysis

def save_jobs_to_excel(jobs_data, filename='job_data.xlsx'):
    """
    保存职位数据到Excel文件
    :param jobs_data: 职位数据列表
    :param filename: 文件名
    """
    if not jobs_data:
        print("没有数据可保存")
        return
    
    df = pd.DataFrame(jobs_data)
    df.to_excel(filename, index=False, engine='openpyxl')
    print(f"已保存 {len(jobs_data)} 条职位数据到 {filename}")

def main():
    """
    主函数:执行招聘数据爬取和分析
    """
    keyword = "Python开发工程师"
    city = "北京"
    pages = 2  # 每个网站爬取2页
    
    all_jobs = []
    
    # 爬取猎聘网数据
    print("开始爬取猎聘网...")
    liepin_jobs = scrape_liepin_jobs(keyword, city, pages)
    all_jobs.extend(liepin_jobs)
    
    # 爬取BOSS直聘数据
    print("开始爬取BOSS直聘...")
    zhipin_jobs = scrape_zhipin_jobs(keyword, city, pages)
    all_jobs.extend(zhipin_jobs)
    
    print(f"总共爬取到 {len(all_jobs)} 条职位信息")
    
    # 保存数据
    if all_jobs:
        save_jobs_to_excel(all_jobs)
        
        # 进行数据分析
        analysis = analyze_job_data(all_jobs)
        print("\n=== 数据分析结果 ===")
        print(f"总职位数: {analysis.get('total_jobs', 0)}")
        print(f"平均薪资范围: ¥{analysis.get('avg_min_salary', 0):,.0f} - ¥{analysis.get('avg_max_salary', 0):,.0 f}")
        print(f"平台分布: {analysis.get('platform_distribution', {})}")
        print(f"热门城市: {analysis.get('top_locations', {})}")
    
    else:
        print("未爬取到任何职位数据")

if __name__ == "__main__":
    main()

注意事项:

  1. 反爬机制:招聘网站通常有较强的反爬措施,可能需要更复杂的处理方式
  2. 薪资格式多样:不同网站的薪资显示格式差异很大,需要灵活的解析逻辑
  3. 数据质量:爬取的数据可能存在缺失或错误,需要进行数据清洗
  4. 法律合规:招聘数据涉及个人信息,需要遵守相关法律法规
  5. API限制:有些招聘网站提供官方API,优先考虑使用官方渠道

这个招聘信息爬虫展示了如何处理多源数据聚合,并提供了基本的数据分析功能。

12.4 社交媒体公开数据采集(遵守平台规则)

社交媒体数据对于舆情分析、市场调研等场景非常有价值。但必须严格遵守平台的使用条款和robots.txt协议,仅采集公开可访问的数据。

核心功能方法表

功能名称调用方法具体功能与注意事项
获取公开微博列表requests.get(weibo_api_url, params=params)使用微博开放平台API,需要申请开发者权限
提取微博内容json_response['data']['cards']解析JSON格式的API响应
处理分页数据params['since_id'] = last_since_id微博API使用since_id进行分页
遵守API限制time.sleep(1)严格遵守API调用频率限制

社交媒体数据采集示例(以微博为例):

python
# -*- coding: utf-8 -*-
import requests
import json
import time
import csv
from datetime import datetime
import os

# 微博开放平台API配置
# 注意:需要先在 https://open.weibo.com/ 申请开发者账号和应用
APP_KEY = "your_app_key"  # 替换为你的App Key
APP_SECRET = "your_app_secret"  # 替换为你的App Secret

# 请求头
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}

def get_weibo_access_token():
    """
    获取微博API访问令牌(简化版,实际需要OAuth2.0授权流程)
    :return: access_token字符串
    """
    # 注意:这只是一个示意,实际的OAuth2.0流程更复杂
    # 在生产环境中,你需要实现完整的授权流程
    return "your_access_token"  # 替换为实际的access token

def search_weibo_posts(keyword, access_token, pages=1):
    """
    搜索微博公开帖子
    :param keyword: 搜索关键词
    :param access_token: API访问令牌
    :param pages: 搜索页数
    :return: 微博帖子列表
    """
    posts = []
    since_id = 0  # 分页参数
    
    # 微博搜索API端点
    api_url = "https://api.weibo.com/2/search/topics.json"
    
    for page in range(pages):
        print(f"搜索微博第 {page + 1} 页...")
        
        params = {
            'access_token': access_token,
            'q': keyword,
            'count': 20,  # 每页数量
            'page': page + 1
        }
        
        # 如果不是第一页,添加since_id参数
        if since_id > 0:
            params['since_id'] = since_id
        
        try:
            response = requests.get(api_url, headers=headers, params=params, timeout=10)
            
            # 检查API响应
            if response.status_code == 200:
                data = response.json()
                
                # 检查是否有错误
                if 'error_code' in data:
                    print(f"微博API错误: {data.get('error')}")
                    break
                
                # 提取微博数据
                statuses = data.get('statuses', [])
                if not statuses:
                    print("未找到更多微博数据")
                    break
                
                for status in statuses:
                    post_info = {
                        'platform': '微博',
                        'id': status.get('idstr', ''),
                        'text': status.get('text', ''),
                        'user_name': status.get('user', {}).get('screen_name', ''),
                        'user_id': status.get('user', {}).get('idstr', ''),
                        'created_at': status.get('created_at', ''),
                        'reposts_count': status.get('reposts_count', 0),
                        'comments_count': status.get('comments_count', 0),
                        'attitudes_count': status.get('attitudes_count', 0),
                        'keyword': keyword,
                        'scraped_at': datetime.now().isoformat()
                    }
                    posts.append(post_info)
                
                # 更新since_id用于下一页(如果有)
                if 'since_id' in data:
                    since_id = data['since_id']
                
                # 严格遵守API调用频率限制
                time.sleep(1)
                
            else:
                print(f"微博API请求失败: {response.status_code}")
                break
                
        except requests.RequestException as e:
            print(f"请求微博API失败: {e}")
            break
        except json.JSONDecodeError as e:
            print(f"解析微博API响应失败: {e}")
            break
    
    return posts

def clean_weibo_text(text):
    """
    清理微博文本,移除HTML标签和特殊字符
    :param text: 原始微博文本
    :return: 清理后的文本
    """
    import re
    
    # 移除HTML标签
    clean_text = re.sub(r'<[^>]+>', '', text)
    
    # 移除多余的空白字符
    clean_text = re.sub(r'\s+', ' ', clean_text).strip()
    
    return clean_text

def save_weibo_posts_to_csv(posts, filename='weibo_posts.csv'):
    """
    保存微博数据到CSV文件
    :param posts: 微博数据列表
    :param filename: 文件名
    """
    if not posts:
        print("没有微博数据可保存")
        return
    
    # 确保目录存在
    os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
    
    file_exists = os.path.isfile(filename)
    
    with open(filename, 'a', newline='', encoding='utf-8-sig') as csvfile:
        fieldnames = [
            'platform', 'id', 'text', 'user_name', 'user_id',
            'created_at', 'reposts_count', 'comments_count', 
            'attitudes_count', 'keyword', 'scraped_at'
        ]
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        
        if not file_exists:
            writer.writeheader()
        
        for post in posts:
            # 清理文本内容
            post['text'] = clean_weibo_text(post['text'])
            writer.writerow(post)
    
    print(f"已保存 {len(posts)} 条微博数据到 {filename}")

def get_user_public_timeline(user_id, access_token, count=20):
    """
    获取指定用户的公开时间线(需要用户授权或公开账号)
    :param user_id: 用户ID
    :param access_token: 访问令牌
    :param count: 获取的微博数量
    :return: 用户微博列表
    """
    # 注意:这需要用户的授权,或者用户账号是完全公开的
    api_url = "https://api.weibo.com/2/statuses/user_timeline.json"
    
    params = {
        'access_token': access_token,
        'uid': user_id,
        'count': count,
        'trim_user': 0
    }
    
    try:
        response = requests.get(api_url, headers=headers, params=params, timeout=10)
        
        if response.status_code == 200:
            data = response.json()
            if 'error_code' in data:
                print(f"获取用户时间线失败: {data.get('error')}")
                return []
            
            statuses = data.get('statuses', [])
            posts = []
            
            for status in statuses:
                post_info = {
                    'platform': '微博',
                    'id': status.get('idstr', ''),
                    'text': status.get('text', ''),
                    'user_name': status.get('user', {}).get('screen_name', ''),
                    'user_id': status.get('user', {}).get('idstr', ''),
                    'created_at': status.get('created_at', ''),
                    'reposts_count': status.get('reposts_count', 0),
                    'comments_count': status.get('comments_count', 0),
                    'attitudes_count': status.get('attitudes_count', 0),
                    'scraped_at': datetime.now().isoformat()
                }
                posts.append(post_info)
            
            return posts
        
    except Exception as e:
        print(f"获取用户时间线失败: {e}")
        return []

def main():
    """
    主函数:执行微博数据采集
    """
    # 获取访问令牌
    access_token = get_weibo_access_token()
    
    if not access_token:
        print("无法获取访问令牌,请检查开发者配置")
        return
    
    # 搜索关键词
    keyword = "人工智能"
    pages = 2
    
    print(f"开始搜索微博关键词: {keyword}")
    
    # 搜索微博帖子
    posts = search_weibo_posts(keyword, access_token, pages)
    
    if posts:
        # 保存数据
        save_weibo_posts_to_csv(posts)
        
        # 显示统计信息
        total_posts = len(posts)
        total_reposts = sum(post['reposts_count'] for post in posts)
        total_comments = sum(post['comments_count'] for post in posts)
        
        print(f"\n=== 采集统计 ===")
        print(f"总帖子数: {total_posts}")
        print(f"总转发数: {total_reposts}")
        print(f"总评论数: {total_comments}")
        
        # 显示前3条微博预览
        print(f"\n=== 前3条微博预览 ===")
        for i, post in enumerate(posts[:3]):
            print(f"{i+1}. @{post['user_name']}: {post['text'][:100]}...")
    else:
        print("未采集到任何微博数据")

if __name__ == "__main__":
    main()

注意事项:

  1. API授权:微博等社交媒体平台需要开发者账号和API授权,不能直接爬取
  2. 调用限制:严格遵守API的调用频率限制,避免被封禁
  3. 数据隐私:仅采集公开数据,不得采集用户私密信息
  4. 内容合规:采集的内容不得用于违法或违反平台条款的用途
  5. 错误处理:API可能返回各种错误码,需要完善的错误处理机制

这个社交媒体数据采集示例强调了合规性和API使用的正确方式,展示了如何在遵守平台规则的前提下获取公开数据。