AI Skill Report Card

Batch Data Scraping

B+78·Mar 4, 2026·Source: Web
YAML
--- name: batch-data-scraping description: Implements multi-batch data collection systems with concurrent processing, error handling, and resume capabilities. Use when scraping large datasets, multiple sources, or requiring robust data pipeline automation. ---

Multi-Batch Data Collection

14 / 15
Python
import asyncio import aiohttp from dataclasses import dataclass from typing import List, Dict, Any import json import time @dataclass class BatchJob: batch_id: str urls: List[str] params: Dict[str, Any] = None headers: Dict[str, str] = None async def collect_batch(session, job: BatchJob, semaphore): async with semaphore: results = [] for url in job.urls: try: async with session.get(url, params=job.params, headers=job.headers) as response: data = await response.json() results.append({"url": url, "data": data, "status": "success"}) except Exception as e: results.append({"url": url, "error": str(e), "status": "failed"}) return {"batch_id": job.batch_id, "results": results} async def run_batch_collection(jobs: List[BatchJob], max_concurrent=10): semaphore = asyncio.Semaphore(max_concurrent) async with aiohttp.ClientSession() as session: tasks = [collect_batch(session, job, semaphore) for job in jobs] return await asyncio.gather(*tasks)
Recommendation
Add concrete input/output pairs showing actual scraped data structure and response times
12 / 15

Phase 1: Job Preparation

Progress:

  • Define data sources and URL patterns
  • Create batch configuration (size, concurrency)
  • Set up error handling and retry logic
  • Configure output storage (database/files)

Phase 2: Batch Execution

Progress:

  • Initialize connection pools
  • Start batch processing with progress tracking
  • Monitor rate limits and adjust delays
  • Handle failures and queue retries

Phase 3: Data Processing

Progress:

  • Validate collected data structure
  • Clean and transform data
  • Store results with batch metadata
  • Generate collection reports
Recommendation
Include a complete working example that can be copy-pasted and run immediately
15 / 20

Example 1: E-commerce Product Scraping Input:

Python
jobs = [ BatchJob("electronics", ["https://api.store.com/products?category=electronics&page=" + str(i) for i in range(1, 51)]), BatchJob("clothing", ["https://api.store.com/products?category=clothing&page=" + str(i) for i in range(1, 31)]), ]

Output: Structured product data with batch tracking, ~8000 products in 45 seconds

Example 2: Social Media Data Collection Input:

Python
BatchJob("trending", ["https://api.social.com/posts/trending?date=" + date for date in date_range], headers={"Authorization": "Bearer token"} )

Output: Time-series social media posts with engagement metrics

Recommendation
Provide more specific guidance on choosing batch sizes and concurrency limits for different scenarios

Concurrency Management:

  • Use semaphores to limit concurrent requests (10-50 typical)
  • Implement exponential backoff for rate limiting
  • Monitor memory usage with large datasets

Error Resilience:

Python
class BatchCollector: def __init__(self, max_retries=3, retry_delay=1): self.max_retries = max_retries self.retry_delay = retry_delay self.failed_jobs = [] async def retry_failed_jobs(self): if self.failed_jobs: return await self.process_batches(self.failed_jobs)

Progress Tracking:

Python
from tqdm import tqdm import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) async def track_progress(jobs): with tqdm(total=len(jobs), desc="Processing batches") as pbar: for job in jobs: result = await process_job(job) pbar.update(1) logger.info(f"Completed batch {job.batch_id}: {len(result)} items")

Data Storage:

  • Use chunked writes for large datasets
  • Implement checkpointing for resume capability
  • Store metadata (timestamps, source, batch_id)

Memory Issues:

  • Don't load entire datasets into memory
  • Process and store data in streaming fashion
  • Use generators for large URL lists

Rate Limiting Violations:

  • Always implement delays between requests
  • Monitor response headers for rate limit info
  • Use rotating proxies/IPs for high-volume collection

Incomplete Data Recovery:

  • Save batch state before processing
  • Implement resume-from-checkpoint functionality
  • Log all failed URLs for manual review

Poor Error Handling:

Python
# BAD - Silent failures try: data = await response.json() except: pass # Lost data, no tracking # GOOD - Comprehensive error tracking try: data = await response.json() except aiohttp.ClientError as e: self.log_error(url, f"Network error: {e}") self.queue_retry(url, batch_id) except json.JSONDecodeError as e: self.log_error(url, f"Invalid JSON: {e}") self.save_raw_response(url, await response.text())
0
Grade B+AI Skill Framework
Scorecard
Criteria Breakdown
Quick Start
14/15
Workflow
12/15
Examples
15/20
Completeness
10/20
Format
15/15
Conciseness
12/15