При парсинге сотен или тысяч страниц решение CAPTCHA по одной происходит медленно. Отправляйте несколько задач параллельно, опрашивайте их одновременно и обрабатывайте результаты по мере их поступления.
Последовательный или параллельный
Sequential (slow):
Submit #1 → Poll → Result (15s)
Submit #2 → Poll → Result (15s)
Submit #3 → Poll → Result (15s)
Total: ~45s for 3 solves
Parallel (fast):
Submit #1 ─┐
Submit #2 ─┤→ Poll all → Results arrive
Submit #3 ─┘
Total: ~15s for 3 solves
Базовое параллельное решение
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"
def submit_task(method, **params):
"""Submit a single CAPTCHA task."""
data = {"key": API_KEY, "method": method, "json": 1}
data.update(params)
resp = requests.post(f"{BASE_URL}/in.php", data=data, timeout=30)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(f"Submit error: {result.get('request')}")
return result["request"]
def poll_result(task_id, timeout=120):
"""Poll until result is ready."""
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{BASE_URL}/res.php", params={
"key": API_KEY, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
return data["request"]
raise TimeoutError(f"Task {task_id} timeout")
def solve_one(sitekey, pageurl):
"""Submit and poll a single task."""
task_id = submit_task("userrecaptcha", googlekey=sitekey, pageurl=pageurl)
token = poll_result(task_id)
return {"url": pageurl, "token": token}
def batch_solve(tasks, max_workers=10):
"""Solve multiple CAPTCHAs in parallel."""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(solve_one, t["sitekey"], t["url"]): t
for t in tasks
}
for future in as_completed(futures):
task = futures[future]
try:
result = future.result()
results.append(result)
print(f"Solved: {result['url']}")
except Exception as e:
print(f"Failed: {task['url']} - {e}")
results.append({"url": task["url"], "token": None, "error": str(e)})
return results
# Usage
tasks = [
{"sitekey": "SITE_KEY_1", "url": "https://example.com/page1"},
{"sitekey": "SITE_KEY_2", "url": "https://example.com/page2"},
{"sitekey": "SITE_KEY_3", "url": "https://example.com/page3"},
]
results = batch_solve(tasks, max_workers=5)
print(f"Solved {sum(1 for r in results if r.get('token'))}/{len(tasks)}")
Асинхронный пакетный решатель
Для более высокого параллелизма используйте asyncio:
import asyncio
import aiohttp
import time
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"
async def submit_task_async(session, method, **params):
data = {"key": API_KEY, "method": method, "json": 1}
data.update(params)
async with session.post(f"{BASE_URL}/in.php", data=data) as resp:
result = await resp.json()
if result.get("status") != 1:
raise RuntimeError(f"Submit error: {result.get('request')}")
return result["request"]
async def poll_result_async(session, task_id, timeout=120):
start = time.time()
while time.time() - start < timeout:
await asyncio.sleep(5)
params = {
"key": API_KEY, "action": "get",
"id": task_id, "json": 1,
}
async with session.get(f"{BASE_URL}/res.php", params=params) as resp:
data = await resp.json()
if data["request"] != "CAPCHA_NOT_READY":
return data["request"]
raise TimeoutError(f"Task {task_id} timeout")
async def solve_one_async(session, sitekey, pageurl):
task_id = await submit_task_async(
session, "userrecaptcha",
googlekey=sitekey, pageurl=pageurl,
)
token = await poll_result_async(session, task_id)
return {"url": pageurl, "token": token}
async def batch_solve_async(tasks, max_concurrent=20):
"""Solve many CAPTCHAs concurrently with asyncio."""
semaphore = asyncio.Semaphore(max_concurrent)
results = []
async def solve_with_limit(task):
async with semaphore:
try:
result = await solve_one_async(
session, task["sitekey"], task["url"],
)
return result
except Exception as e:
return {"url": task["url"], "token": None, "error": str(e)}
async with aiohttp.ClientSession() as session:
coros = [solve_with_limit(t) for t in tasks]
results = await asyncio.gather(*coros)
return results
# Usage
tasks = [
{"sitekey": "KEY", "url": f"https://example.com/page{i}"}
for i in range(50)
]
results = asyncio.run(batch_solve_async(tasks, max_concurrent=20))
solved = sum(1 for r in results if r.get("token"))
print(f"Solved: {solved}/{len(tasks)}")
Шаблон «Отправить, затем опрос»
Для максимальной пропускной способности отделите отправку от опроса:
import requests
import time
API_KEY = "YOUR_API_KEY"
BASE_URL = "https://ocr.captchaai.com"
def batch_submit(tasks):
"""Submit all tasks first, return task IDs."""
submitted = []
for task in tasks:
try:
data = {
"key": API_KEY,
"method": "userrecaptcha",
"googlekey": task["sitekey"],
"pageurl": task["url"],
"json": 1,
}
resp = requests.post(f"{BASE_URL}/in.php", data=data, timeout=30)
result = resp.json()
if result.get("status") == 1:
submitted.append({
"task_id": result["request"],
"url": task["url"],
})
time.sleep(0.1) # Brief delay between submits
except Exception as e:
print(f"Submit failed for {task['url']}: {e}")
return submitted
def batch_poll(submitted, timeout=120):
"""Poll all submitted tasks until complete."""
pending = {s["task_id"]: s for s in submitted}
results = []
start = time.time()
while pending and time.time() - start < timeout:
time.sleep(5)
for task_id in list(pending.keys()):
try:
resp = requests.get(f"{BASE_URL}/res.php", params={
"key": API_KEY, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
info = pending.pop(task_id)
results.append({
"url": info["url"],
"token": data["request"],
})
except Exception:
pass
# Mark remaining as failed
for task_id, info in pending.items():
results.append({"url": info["url"], "token": None, "error": "timeout"})
return results
# Usage
tasks = [
{"sitekey": "KEY", "url": f"https://example.com/page{i}"}
for i in range(20)
]
submitted = batch_submit(tasks)
print(f"Submitted {len(submitted)} tasks")
results = batch_poll(submitted)
solved = sum(1 for r in results if r.get("token"))
print(f"Solved: {solved}/{len(tasks)}")
Руководство по пропускной способности
| Параллельные задачи | Приблизительная скорость | подходящий для |
|---|---|---|
| 1-5 | 3-5 решений/min | Тестирование, легкое соскабливание |
| 5-20 | 15-60 решений/min | Производственная очистка |
| 20-50 | 60-150 решений/min | Трубопроводы большого объёма |
| 50-100 | 150-300 решений/min | Масштаб предприятия |
Поиск неисправностей
| Проблема | Причина | Исправить |
|---|---|---|
| Ограничение скорости (429) | Слишком много отправок/second | Добавить задержку 100 мс между отправками |
| Много таймаутов | Тайм-аут опроса слишком мал | Увеличение до 120-180с |
| Снижение доходности выше 50 одновременных | Узкое место в сети | Используйте async (aiohttp) вместо потоков |
| Результаты перепутаны | Проблема с отслеживанием идентификатора задачи | Используйте dict с ключом Task_id |
Часто задаваемые вопросы
Сколько задач я могу отправить одновременно?
Жестких ограничений на количество одновременных задач нет. Начните с 10–20 и увеличивайте их в зависимости от вашего успеха и потребностей в скорости.
Должен ли я использовать потоки или асинхронность?
Потоки (ThreadPoolExecutor) проще использовать для небольших и средних пакетов (до 50). Для более чем 100 одновременных задач асинхронный режим (aiohttp) более эффективен.
Пакетное решение стоит дороже?
Нет. Каждая задача стоит одинаково, независимо от того, выполняется ли она индивидуально или в пакетном режиме. Пакетная обработка просто экономит время.
Связанные руководства
- Ограничение скорости и 429 ответов
- Повторить реализацию логики
- Масштабируйте решение CAPTCHA —попробуй CaptchaAIдля высокопроизводительной пакетной обработки.*