Когда вы управляете парсингом или автоматизацией для нескольких клиентов, каждый проект в конечном итоге попадает в CAPTCHA. Вместо написания разового кода решения для каждого проекта создайте конвейер многократного использования. В этом руководстве рассказывается об архитектуре.
Трубопроводная архитектура
┌──────────────┐ ┌───────────────┐ ┌──────────────┐
│ Client A │──▶ │ │ │ │
│ Client B │──▶ │ Task Queue │──▶ │ CaptchaAI │
│ Client C │──▶ │ │ │ API │
└──────────────┘ └───────────────┘ └──────────────┘
│ │
▼ ▼
┌───────────────┐ ┌──────────────┐
│ Result Store │◀── │ Polling │
│ (Redis/DB) │ │ Workers │
└───────────────┘ └──────────────┘
Компоненты:
- Прием задач – получает запросы на решение от клиентских парсеров.
- Очередь — буферизует задачи, устанавливает ограничения параллелизма для каждого клиента.
- Работники решателя — отправьте заявку на CaptchaAI и опросите результаты.
- Хранилище результатов — содержит решенные токены для последующего поиска потребителем.
Конвейер Python
Основной класс решателя
import requests
import time
from dataclasses import dataclass
from typing import Optional
from collections import deque
from threading import Lock
SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"
@dataclass
class SolveRequest:
client_id: str
method: str
params: dict
callback: Optional[callable] = None
@dataclass
class SolveResult:
client_id: str
task_id: str
token: Optional[str] = None
error: Optional[str] = None
class CaptchaPipeline:
def __init__(self, api_key: str, max_concurrent: int = 10):
self.api_key = api_key
self.max_concurrent = max_concurrent
self.queue = deque()
self.active = {}
self.lock = Lock()
def enqueue(self, request: SolveRequest):
with self.lock:
self.queue.append(request)
def submit_task(self, request: SolveRequest) -> Optional[str]:
data = {
"key": self.api_key,
"method": request.method,
"json": 1,
**request.params
}
try:
resp = requests.post(SUBMIT_URL, data=data, timeout=15)
result = resp.json()
if result.get("status") == 1:
return result["request"]
else:
print(f"[{request.client_id}] Submit error: {result.get('error_text', result.get('request'))}")
return None
except requests.RequestException as e:
print(f"[{request.client_id}] Network error: {e}")
return None
def poll_result(self, task_id: str, max_wait: int = 120) -> Optional[str]:
elapsed = 0
interval = 5
while elapsed < max_wait:
time.sleep(interval)
elapsed += interval
try:
resp = requests.get(RESULT_URL, params={
"key": self.api_key,
"action": "get",
"id": task_id,
"json": 1
}, timeout=10)
result = resp.json()
if result.get("status") == 1:
return result["request"]
elif result.get("request") == "CAPCHA_NOT_READY":
continue
else:
print(f"Poll error for {task_id}: {result.get('error_text', result.get('request'))}")
return None
except requests.RequestException:
continue
return None
def process_queue(self):
while self.queue or self.active:
# Fill active slots
with self.lock:
while self.queue and len(self.active) < self.max_concurrent:
request = self.queue.popleft()
task_id = self.submit_task(request)
if task_id:
self.active[task_id] = request
# Poll active tasks
completed = []
for task_id, request in list(self.active.items()):
token = self.poll_result(task_id, max_wait=10)
if token:
result = SolveResult(
client_id=request.client_id,
task_id=task_id,
token=token
)
if request.callback:
request.callback(result)
completed.append(task_id)
with self.lock:
for task_id in completed:
del self.active[task_id]
Мультиклиентское использование
pipeline = CaptchaPipeline(api_key="YOUR_API_KEY", max_concurrent=15)
# Client A — reCAPTCHA v2
pipeline.enqueue(SolveRequest(
client_id="client_a",
method="userrecaptcha",
params={
"googlekey": "6Le-SITEKEY-A",
"pageurl": "https://client-a-https://staging.example.com/qa-form"
},
callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))
# Client B — Turnstile
pipeline.enqueue(SolveRequest(
client_id="client_b",
method="turnstile",
params={
"sitekey": "0x4AAAA-SITEKEY-B",
"pageurl": "https://client-b-target.com/login"
},
callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))
pipeline.process_queue()
Конвейер Node.js
const axios = require("axios");
const SUBMIT_URL = "https://ocr.captchaai.com/in.php";
const RESULT_URL = "https://ocr.captchaai.com/res.php";
class CaptchaPipeline {
constructor(apiKey, maxConcurrent = 10) {
this.apiKey = apiKey;
this.maxConcurrent = maxConcurrent;
this.queue = [];
this.activeCount = 0;
}
enqueue(clientId, method, params) {
return new Promise((resolve, reject) => {
this.queue.push({ clientId, method, params, resolve, reject });
this._processNext();
});
}
async _processNext() {
if (this.activeCount >= this.maxConcurrent || this.queue.length === 0) return;
this.activeCount++;
const task = this.queue.shift();
try {
const token = await this._solve(task);
task.resolve({ clientId: task.clientId, token });
} catch (err) {
task.reject(err);
} finally {
this.activeCount--;
this._processNext();
}
}
async _solve(task) {
const submitResp = await axios.post(SUBMIT_URL, null, {
params: {
key: this.apiKey,
method: task.method,
json: 1,
...task.params,
},
timeout: 15000,
});
if (submitResp.data.status !== 1) {
throw new Error(submitResp.data.error_text || submitResp.data.request);
}
const taskId = submitResp.data.request;
return this._poll(taskId);
}
async _poll(taskId, maxWait = 120000) {
const interval = 5000;
let elapsed = 0;
while (elapsed < maxWait) {
await new Promise((r) => setTimeout(r, interval));
elapsed += interval;
try {
const resp = await axios.get(RESULT_URL, {
params: {
key: this.apiKey,
action: "get",
id: taskId,
json: 1,
},
timeout: 10000,
});
if (resp.data.status === 1) return resp.data.request;
if (resp.data.request !== "CAPCHA_NOT_READY") {
throw new Error(resp.data.error_text || resp.data.request);
}
} catch (err) {
if (err.response) throw err;
}
}
throw new Error(`Timeout waiting for task ${taskId}`);
}
}
// Usage
(async () => {
const pipeline = new CaptchaPipeline("YOUR_API_KEY", 15);
const results = await Promise.allSettled([
pipeline.enqueue("client_a", "userrecaptcha", {
googlekey: "6Le-SITEKEY-A",
pageurl: "https://client-a-https://staging.example.com/qa-form",
}),
pipeline.enqueue("client_b", "turnstile", {
sitekey: "0x4AAAA-SITEKEY-B",
pageurl: "https://client-b-target.com/login",
}),
]);
results.forEach((r) => {
if (r.status === "fulfilled") {
console.log(`[${r.value.clientId}] Token: ${r.value.token.slice(0, 40)}...`);
} else {
console.error(`Failed: ${r.reason.message}`);
}
});
})();
Конфигурация для каждого клиента
Отслеживайте настройки каждого клиента, такие как прокси, предпочтения решателя и ограничения скорости:
CLIENT_CONFIG = {
"client_a": {
"proxy": "host:port:user:pass",
"proxytype": "HTTP",
"max_concurrent": 5,
"default_method": "userrecaptcha"
},
"client_b": {
"proxy": None,
"proxytype": None,
"max_concurrent": 10,
"default_method": "turnstile"
}
}
def build_params(client_id, params):
config = CLIENT_CONFIG.get(client_id, {})
if config.get("proxy"):
params["proxy"] = config["proxy"]
params["proxytype"] = config["proxytype"]
return params
Стратегия обработки ошибок
| Ошибка | Ответ |
|---|---|
ERROR_ZERO_BALANCE |
Остановить очередь, предупредить всех клиентов |
ERROR_NO_SLOT_AVAILABLE |
Повторная постановка задачи в очередь с задержкой |
ERROR_WRONG_CAPTCHA_ID |
Отбросить, ошибка журнала |
ERROR_CAPTCHA_UNSOLVABLE |
Повторите попытку, затем потерпите неудачу. |
| Тайм-аут сети | Повторить попытку с отсрочкой (максимум 3 попытки) |
Поиск неисправностей
| Проблема | Причина | Исправить |
|---|---|---|
| Очередь растет без ограничений | Активные слоты заполнены | Увеличьте max_concurrent или добавьте рабочих |
| Обратный вызов не срабатывает | Задача не выполнена молча | Проверьте возврат ошибки в цикле опроса |
| Смешанные токены между клиентами | Общее хранилище результатов | Ключевые результаты client_id + task_id |
| Ошибки ограничения скорости (429) | Слишком много одновременных отправок | Уменьшите параллелизм, добавьте задержку отправки |
Часто задаваемые вопросы
Сколько одновременных задач мне следует запускать на одного клиента?
Начните с 5–10. Отслеживайте время решения и частоту ошибок, а затем корректируйте. CaptchaAI поддерживает высокий уровень параллелизма, но ваш пул прокси может быть узким местом.
Должен ли я использовать отдельный ключ API для каждого клиента?
Это упрощает выставление счетов. Используйте параметр CaptchaAI soft_id, если вам нужно отслеживание по одному ключу.
Как справиться с ночными очередями?
Сохраните очередь (Redis или база данных). При перезапуске перезагрузите ожидающие задачи и возобновите обработку.
Создайте свой конвейер CAPTCHA с помощью CaptchaAI
Начните создавать клиентские конвейеры наcaptchaai.com.
Связанные руководства
- Параллельное решение CAPTCHA
- Реализация логики повторов
- Распределенная обработка очереди Redis
- Скрипт мониторинга проверки работоспособности