Tutorials

Создание клиентских конвейеров CAPTCHA с помощью CaptchaAI

Когда вы управляете парсингом или автоматизацией для нескольких клиентов, каждый проект в конечном итоге попадает в CAPTCHA. Вместо написания разового кода решения для каждого проекта создайте конвейер многократного использования. В этом руководстве рассказывается об архитектуре.


Трубопроводная архитектура

┌──────────────┐    ┌───────────────┐    ┌──────────────┐
│  Client A    │──▶ │               │    │              │
│  Client B    │──▶ │  Task Queue   │──▶ │  CaptchaAI   │
│  Client C    │──▶ │               │    │  API         │
└──────────────┘    └───────────────┘    └──────────────┘
                           │                    │
                           ▼                    ▼
                    ┌───────────────┐    ┌──────────────┐
                    │  Result Store │◀── │  Polling      │
                    │  (Redis/DB)   │    │  Workers      │
                    └───────────────┘    └──────────────┘

Компоненты:

  1. Прием задач – получает запросы на решение от клиентских парсеров.
  2. Очередь — буферизует задачи, устанавливает ограничения параллелизма для каждого клиента.
  3. Работники решателя — отправьте заявку на CaptchaAI и опросите результаты.
  4. Хранилище результатов — содержит решенные токены для последующего поиска потребителем.

Конвейер Python

Основной класс решателя

import requests
import time
from dataclasses import dataclass
from typing import Optional
from collections import deque
from threading import Lock

SUBMIT_URL = "https://ocr.captchaai.com/in.php"
RESULT_URL = "https://ocr.captchaai.com/res.php"

@dataclass
class SolveRequest:
    client_id: str
    method: str
    params: dict
    callback: Optional[callable] = None

@dataclass
class SolveResult:
    client_id: str
    task_id: str
    token: Optional[str] = None
    error: Optional[str] = None


class CaptchaPipeline:
    def __init__(self, api_key: str, max_concurrent: int = 10):
        self.api_key = api_key
        self.max_concurrent = max_concurrent
        self.queue = deque()
        self.active = {}
        self.lock = Lock()

    def enqueue(self, request: SolveRequest):
        with self.lock:
            self.queue.append(request)

    def submit_task(self, request: SolveRequest) -> Optional[str]:
        data = {
            "key": self.api_key,
            "method": request.method,
            "json": 1,
            **request.params
        }

        try:
            resp = requests.post(SUBMIT_URL, data=data, timeout=15)
            result = resp.json()

            if result.get("status") == 1:
                return result["request"]
            else:
                print(f"[{request.client_id}] Submit error: {result.get('error_text', result.get('request'))}")
                return None
        except requests.RequestException as e:
            print(f"[{request.client_id}] Network error: {e}")
            return None

    def poll_result(self, task_id: str, max_wait: int = 120) -> Optional[str]:
        elapsed = 0
        interval = 5
        while elapsed < max_wait:
            time.sleep(interval)
            elapsed += interval

            try:
                resp = requests.get(RESULT_URL, params={
                    "key": self.api_key,
                    "action": "get",
                    "id": task_id,
                    "json": 1
                }, timeout=10)
                result = resp.json()

                if result.get("status") == 1:
                    return result["request"]
                elif result.get("request") == "CAPCHA_NOT_READY":
                    continue
                else:
                    print(f"Poll error for {task_id}: {result.get('error_text', result.get('request'))}")
                    return None
            except requests.RequestException:
                continue

        return None

    def process_queue(self):
        while self.queue or self.active:
            # Fill active slots
            with self.lock:
                while self.queue and len(self.active) < self.max_concurrent:
                    request = self.queue.popleft()
                    task_id = self.submit_task(request)
                    if task_id:
                        self.active[task_id] = request

            # Poll active tasks
            completed = []
            for task_id, request in list(self.active.items()):
                token = self.poll_result(task_id, max_wait=10)
                if token:
                    result = SolveResult(
                        client_id=request.client_id,
                        task_id=task_id,
                        token=token
                    )
                    if request.callback:
                        request.callback(result)
                    completed.append(task_id)

            with self.lock:
                for task_id in completed:
                    del self.active[task_id]

Мультиклиентское использование

pipeline = CaptchaPipeline(api_key="YOUR_API_KEY", max_concurrent=15)

# Client A — reCAPTCHA v2
pipeline.enqueue(SolveRequest(
    client_id="client_a",
    method="userrecaptcha",
    params={
        "googlekey": "6Le-SITEKEY-A",
        "pageurl": "https://client-a-https://staging.example.com/qa-form"
    },
    callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))

# Client B — Turnstile
pipeline.enqueue(SolveRequest(
    client_id="client_b",
    method="turnstile",
    params={
        "sitekey": "0x4AAAA-SITEKEY-B",
        "pageurl": "https://client-b-target.com/login"
    },
    callback=lambda r: print(f"[{r.client_id}] Solved: {r.token[:40]}...")
))

pipeline.process_queue()

Конвейер Node.js

const axios = require("axios");

const SUBMIT_URL = "https://ocr.captchaai.com/in.php";
const RESULT_URL = "https://ocr.captchaai.com/res.php";

class CaptchaPipeline {
  constructor(apiKey, maxConcurrent = 10) {
    this.apiKey = apiKey;
    this.maxConcurrent = maxConcurrent;
    this.queue = [];
    this.activeCount = 0;
  }

  enqueue(clientId, method, params) {
    return new Promise((resolve, reject) => {
      this.queue.push({ clientId, method, params, resolve, reject });
      this._processNext();
    });
  }

  async _processNext() {
    if (this.activeCount >= this.maxConcurrent || this.queue.length === 0) return;

    this.activeCount++;
    const task = this.queue.shift();

    try {
      const token = await this._solve(task);
      task.resolve({ clientId: task.clientId, token });
    } catch (err) {
      task.reject(err);
    } finally {
      this.activeCount--;
      this._processNext();
    }
  }

  async _solve(task) {
    const submitResp = await axios.post(SUBMIT_URL, null, {
      params: {
        key: this.apiKey,
        method: task.method,
        json: 1,
        ...task.params,
      },
      timeout: 15000,
    });

    if (submitResp.data.status !== 1) {
      throw new Error(submitResp.data.error_text || submitResp.data.request);
    }

    const taskId = submitResp.data.request;
    return this._poll(taskId);
  }

  async _poll(taskId, maxWait = 120000) {
    const interval = 5000;
    let elapsed = 0;

    while (elapsed < maxWait) {
      await new Promise((r) => setTimeout(r, interval));
      elapsed += interval;

      try {
        const resp = await axios.get(RESULT_URL, {
          params: {
            key: this.apiKey,
            action: "get",
            id: taskId,
            json: 1,
          },
          timeout: 10000,
        });

        if (resp.data.status === 1) return resp.data.request;
        if (resp.data.request !== "CAPCHA_NOT_READY") {
          throw new Error(resp.data.error_text || resp.data.request);
        }
      } catch (err) {
        if (err.response) throw err;
      }
    }

    throw new Error(`Timeout waiting for task ${taskId}`);
  }
}

// Usage
(async () => {
  const pipeline = new CaptchaPipeline("YOUR_API_KEY", 15);

  const results = await Promise.allSettled([
    pipeline.enqueue("client_a", "userrecaptcha", {
      googlekey: "6Le-SITEKEY-A",
      pageurl: "https://client-a-https://staging.example.com/qa-form",
    }),
    pipeline.enqueue("client_b", "turnstile", {
      sitekey: "0x4AAAA-SITEKEY-B",
      pageurl: "https://client-b-target.com/login",
    }),
  ]);

  results.forEach((r) => {
    if (r.status === "fulfilled") {
      console.log(`[${r.value.clientId}] Token: ${r.value.token.slice(0, 40)}...`);
    } else {
      console.error(`Failed: ${r.reason.message}`);
    }
  });
})();

Конфигурация для каждого клиента

Отслеживайте настройки каждого клиента, такие как прокси, предпочтения решателя и ограничения скорости:

CLIENT_CONFIG = {
    "client_a": {
        "proxy": "host:port:user:pass",
        "proxytype": "HTTP",
        "max_concurrent": 5,
        "default_method": "userrecaptcha"
    },
    "client_b": {
        "proxy": None,
        "proxytype": None,
        "max_concurrent": 10,
        "default_method": "turnstile"
    }
}

def build_params(client_id, params):
    config = CLIENT_CONFIG.get(client_id, {})
    if config.get("proxy"):
        params["proxy"] = config["proxy"]
        params["proxytype"] = config["proxytype"]
    return params

Стратегия обработки ошибок

Ошибка Ответ
ERROR_ZERO_BALANCE Остановить очередь, предупредить всех клиентов
ERROR_NO_SLOT_AVAILABLE Повторная постановка задачи в очередь с задержкой
ERROR_WRONG_CAPTCHA_ID Отбросить, ошибка журнала
ERROR_CAPTCHA_UNSOLVABLE Повторите попытку, затем потерпите неудачу.
Тайм-аут сети Повторить попытку с отсрочкой (максимум 3 попытки)

Поиск неисправностей

Проблема Причина Исправить
Очередь растет без ограничений Активные слоты заполнены Увеличьте max_concurrent или добавьте рабочих
Обратный вызов не срабатывает Задача не выполнена молча Проверьте возврат ошибки в цикле опроса
Смешанные токены между клиентами Общее хранилище результатов Ключевые результаты client_id + task_id
Ошибки ограничения скорости (429) Слишком много одновременных отправок Уменьшите параллелизм, добавьте задержку отправки

Часто задаваемые вопросы

Сколько одновременных задач мне следует запускать на одного клиента?

Начните с 5–10. Отслеживайте время решения и частоту ошибок, а затем корректируйте. CaptchaAI поддерживает высокий уровень параллелизма, но ваш пул прокси может быть узким местом.

Должен ли я использовать отдельный ключ API для каждого клиента?

Это упрощает выставление счетов. Используйте параметр CaptchaAI soft_id, если вам нужно отслеживание по одному ключу.

Как справиться с ночными очередями?

Сохраните очередь (Redis или база данных). При перезапуске перезагрузите ожидающие задачи и возобновите обработку.


Создайте свой конвейер CAPTCHA с помощью CaptchaAI

Начните создавать клиентские конвейеры наcaptchaai.com.


Связанные руководства

Комментарии для этой статьи отключены.

Похожие сообщения

Tutorials Кеширование CAPTCHA-токенов в собственном backend в пределах TTL
Корректно кешируйте и переиспользуйте CAPTCHA-токены в пределах официального TTL для идемпотентных ретраев в собственном backend.

Корректно кешируйте и переиспользуйте CAPTCHA-токены в пределах официального TTL для идемпотентных ретраев в с...

May 02, 2026
Integrations Изоляция браузерных профилей для QA с CaptchaAI
Разделяйте cookies, storage, тестовые учетные записи и CAPTCHA-конфигурации по браузерным профилям, чтобы QA-тесты в staging были чистыми и воспроизводимыми.

Разделяйте cookies, storage, тестовые учетные записи и CAPTCHA-конфигурации по браузерным профилям, чтобы QA-т...

Apr 28, 2026
Use Cases Тестирование CAPTCHA для e-commerce checkout с высокой нагрузкой
QA-руководство по проверке CAPTCHA в собственных checkout-сценариях e-commerce в staging: фиктивный инвентарь, тестовые платежные токены и Captcha AI.

QA-руководство по проверке CAPTCHA в собственных checkout-сценариях e-commerce в staging: фиктивный инвентарь,...

May 05, 2026