AI Skill Report Card
Batch Data Scraping
YAML--- name: batch-data-scraping description: Implements multi-batch data collection systems with concurrent processing, error handling, and resume capabilities. Use when scraping large datasets, multiple sources, or requiring robust data pipeline automation. ---
Multi-Batch Data Collection
Quick Start14 / 15
Pythonimport asyncio import aiohttp from dataclasses import dataclass from typing import List, Dict, Any import json import time @dataclass class BatchJob: batch_id: str urls: List[str] params: Dict[str, Any] = None headers: Dict[str, str] = None async def collect_batch(session, job: BatchJob, semaphore): async with semaphore: results = [] for url in job.urls: try: async with session.get(url, params=job.params, headers=job.headers) as response: data = await response.json() results.append({"url": url, "data": data, "status": "success"}) except Exception as e: results.append({"url": url, "error": str(e), "status": "failed"}) return {"batch_id": job.batch_id, "results": results} async def run_batch_collection(jobs: List[BatchJob], max_concurrent=10): semaphore = asyncio.Semaphore(max_concurrent) async with aiohttp.ClientSession() as session: tasks = [collect_batch(session, job, semaphore) for job in jobs] return await asyncio.gather(*tasks)
Recommendation▾
Add concrete input/output pairs showing actual scraped data structure and response times
Workflow12 / 15
Phase 1: Job Preparation
Progress:
- Define data sources and URL patterns
- Create batch configuration (size, concurrency)
- Set up error handling and retry logic
- Configure output storage (database/files)
Phase 2: Batch Execution
Progress:
- Initialize connection pools
- Start batch processing with progress tracking
- Monitor rate limits and adjust delays
- Handle failures and queue retries
Phase 3: Data Processing
Progress:
- Validate collected data structure
- Clean and transform data
- Store results with batch metadata
- Generate collection reports
Recommendation▾
Include a complete working example that can be copy-pasted and run immediately
Examples15 / 20
Example 1: E-commerce Product Scraping Input:
Pythonjobs = [ BatchJob("electronics", ["https://api.store.com/products?category=electronics&page=" + str(i) for i in range(1, 51)]), BatchJob("clothing", ["https://api.store.com/products?category=clothing&page=" + str(i) for i in range(1, 31)]), ]
Output: Structured product data with batch tracking, ~8000 products in 45 seconds
Example 2: Social Media Data Collection Input:
PythonBatchJob("trending", ["https://api.social.com/posts/trending?date=" + date for date in date_range], headers={"Authorization": "Bearer token"} )
Output: Time-series social media posts with engagement metrics
Recommendation▾
Provide more specific guidance on choosing batch sizes and concurrency limits for different scenarios
Best Practices
Concurrency Management:
- Use semaphores to limit concurrent requests (10-50 typical)
- Implement exponential backoff for rate limiting
- Monitor memory usage with large datasets
Error Resilience:
Pythonclass BatchCollector: def __init__(self, max_retries=3, retry_delay=1): self.max_retries = max_retries self.retry_delay = retry_delay self.failed_jobs = [] async def retry_failed_jobs(self): if self.failed_jobs: return await self.process_batches(self.failed_jobs)
Progress Tracking:
Pythonfrom tqdm import tqdm import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def track_progress(jobs): with tqdm(total=len(jobs), desc="Processing batches") as pbar: for job in jobs: result = await process_job(job) pbar.update(1) logger.info(f"Completed batch {job.batch_id}: {len(result)} items")
Data Storage:
- Use chunked writes for large datasets
- Implement checkpointing for resume capability
- Store metadata (timestamps, source, batch_id)
Common Pitfalls
Memory Issues:
- Don't load entire datasets into memory
- Process and store data in streaming fashion
- Use generators for large URL lists
Rate Limiting Violations:
- Always implement delays between requests
- Monitor response headers for rate limit info
- Use rotating proxies/IPs for high-volume collection
Incomplete Data Recovery:
- Save batch state before processing
- Implement resume-from-checkpoint functionality
- Log all failed URLs for manual review
Poor Error Handling:
Python# BAD - Silent failures try: data = await response.json() except: pass # Lost data, no tracking # GOOD - Comprehensive error tracking try: data = await response.json() except aiohttp.ClientError as e: self.log_error(url, f"Network error: {e}") self.queue_retry(url, batch_id) except json.JSONDecodeError as e: self.log_error(url, f"Invalid JSON: {e}") self.save_raw_response(url, await response.text())