Туториалы по API

CaptchaAI Pingback и шаблоны уведомлений о задачах

Параметр CaptchaAI pingback позволяет получать решенные токены через обратный вызов HTTP вместо опроса. В этом руководстве рассматриваются расширенные шаблоны уведомлений для производственных систем.


Как работает пингбэк


1. Submit task with pingback=YOUR_CALLBACK_URL
2. CaptchaAI solves the CAPTCHA
3. CaptchaAI sends GET request to your callback:
   YOUR_CALLBACK_URL?id=TASK_ID&code=TOKEN

4. Your server processes the result

Схема 1: принцип «выстрелил и забыл» с хранилищем результатов

Отправьте задачи и позвольте обратному вызову сохранить результаты в потокобезопасном словаре:

import requests
import threading
import time
from flask import Flask, request


class PingbackStore:
    """Store for results received via pingback."""

    def __init__(self):
        self.results = {}
        self.events = {}
        self.lock = threading.Lock()

    def register(self, task_id):
        """Register a task ID we expect results for."""
        with self.lock:
            self.events[task_id] = threading.Event()

    def store(self, task_id, token):
        """Store result from pingback callback."""
        with self.lock:
            self.results[task_id] = token
            if task_id in self.events:
                self.events[task_id].set()

    def wait(self, task_id, timeout=120):
        """Wait for a specific result."""
        event = self.events.get(task_id)
        if not event:
            return None
        event.wait(timeout=timeout)
        return self.results.get(task_id)

    def get(self, task_id):
        """Get result without waiting (non-blocking)."""
        return self.results.get(task_id)


# Global store
store = PingbackStore()

# Flask app for receiving callbacks
app = Flask(__name__)


@app.route("/pingback")
def receive_pingback():
    """Handle CaptchaAI pingback callback."""
    task_id = request.args.get("id")
    code = request.args.get("code")

    if not task_id or not code:
        return "Bad request", 400

    store.store(task_id, code)
    return "OK", 200


def submit_with_pingback(api_key, method, callback_url, **params):
    """Submit a task with pingback enabled."""
    data = {
        "key": api_key,
        "method": method,
        "pingback": callback_url,
        "json": 1,
    }
    data.update(params)

    resp = requests.post(
        "https://ocr.captchaai.com/in.php",
        data=data,
        timeout=30,
    )
    result = resp.json()

    if result.get("status") != 1:
        raise RuntimeError(f"Submit error: {result.get('request')}")

    task_id = result["request"]
    store.register(task_id)
    return task_id


# Usage
# Start Flask server in background thread
server = threading.Thread(
    target=lambda: app.run(port=8080, debug=False),
    daemon=True,
)
server.start()

# Submit task
task_id = submit_with_pingback(
    "YOUR_API_KEY",
    "userrecaptcha",
    "https://yourserver.com/pingback",
    googlekey="SITE_KEY",
    pageurl="https://example.com",
)

# Wait for result via pingback
token = store.wait(task_id, timeout=120)
print(f"Token: {token[:50]}...")

Схема 2: Многозадачное разветвление

Отправляйте несколько задач и собирайте результаты по мере их поступления:

import requests
import threading
import time


class FanOutSolver:
    """Submit many tasks, collect results via pingback."""

    def __init__(self, api_key, callback_url):
        self.api_key = api_key
        self.callback_url = callback_url
        self.store = PingbackStore()
        self.pending = []

    def submit(self, method, **params):
        """Submit a task and track it."""
        data = {
            "key": self.api_key,
            "method": method,
            "pingback": self.callback_url,
            "json": 1,
        }
        data.update(params)

        resp = requests.post(
            "https://ocr.captchaai.com/in.php",
            data=data,
            timeout=30,
        )
        result = resp.json()

        if result.get("status") != 1:
            raise RuntimeError(f"Submit error: {result.get('request')}")

        task_id = result["request"]
        self.store.register(task_id)
        self.pending.append(task_id)
        return task_id

    def submit_batch(self, tasks):
        """Submit multiple tasks.

        tasks: list of dicts with 'method' and params
        """
        task_ids = []
        for task in tasks:
            method = task.pop("method")
            task_id = self.submit(method, **task)
            task_ids.append(task_id)
            time.sleep(0.1)  # Avoid rate limits
        return task_ids

    def collect_all(self, timeout=180):
        """Wait for all pending results."""
        results = {}
        deadline = time.time() + timeout

        for task_id in self.pending:
            remaining = max(1, deadline - time.time())
            token = self.store.wait(task_id, timeout=remaining)
            results[task_id] = token

        self.pending.clear()
        return results


# Usage
solver = FanOutSolver("YOUR_API_KEY", "https://yourserver.com/pingback")

# Submit 5 tasks
tasks = [
    {
        "method": "userrecaptcha",
        "googlekey": "SITE_KEY",
        "pageurl": f"https://example.com/page{i}",
    }
    for i in range(5)
]

task_ids = solver.submit_batch(tasks)
print(f"Submitted {len(task_ids)} tasks")

# Wait for all results
results = solver.collect_all(timeout=180)
for tid, token in results.items():
    status = "solved" if token else "failed"
    print(f"  {tid}: {status}")

Схема 3: Маршрутизатор уведомлений

Направляйте результаты различным обработчикам на основе метаданных задачи:

import threading
from collections import defaultdict


class NotificationRouter:
    """Route pingback results to registered handlers."""

    def __init__(self):
        self.handlers = {}
        self.default_handler = None
        self.task_routes = {}
        self.lock = threading.Lock()

    def register_handler(self, name, handler_fn):
        """Register a named handler function."""
        self.handlers[name] = handler_fn

    def set_default(self, handler_fn):
        """Set a default handler for unrouted tasks."""
        self.default_handler = handler_fn

    def route(self, task_id, handler_name):
        """Route a task ID to a specific handler."""
        with self.lock:
            self.task_routes[task_id] = handler_name

    def dispatch(self, task_id, token):
        """Dispatch a result to the correct handler."""
        handler_name = self.task_routes.get(task_id)

        if handler_name and handler_name in self.handlers:
            self.handlers[handler_name](task_id, token)
        elif self.default_handler:
            self.default_handler(task_id, token)


# Usage
router = NotificationRouter()

# Register handlers
def login_handler(task_id, token):
    print(f"Login flow got token from {task_id}")
    # Submit token to login form

def scraping_handler(task_id, token):
    print(f"Scraping pipeline got token from {task_id}")
    # Continue scraping with token

router.register_handler("login", login_handler)
router.register_handler("scraping", scraping_handler)

# When submitting
task_id = submit_with_pingback(
    "YOUR_API_KEY", "userrecaptcha",
    "https://yourserver.com/pingback",
    googlekey="KEY", pageurl="https://example.com",
)
router.route(task_id, "login")

# In pingback handler
# router.dispatch(task_id, token)

Защита конечной точки Pingback

import hmac
import hashlib
from flask import Flask, request, abort

app = Flask(__name__)
API_KEY = "YOUR_API_KEY"


@app.route("/pingback")
def secure_pingback():
    """Validate pingback requests."""
    task_id = request.args.get("id")
    code = request.args.get("code")
    ip = request.remote_addr

    # Validate required parameters
    if not task_id or not code:
        abort(400)

    # Validate IP (CaptchaAI server IPs)
    # Add actual CaptchaAI IPs to allowlist
    ALLOWED_IPS = {"0.0.0.0/0"}  # Replace with real IPs

    # Validate task ID format (numeric)
    if not task_id.isdigit():
        abort(400)

    # Store result
    store.store(task_id, code)
    return "OK", 200

Когда использовать Pingback или опрос

Фактор Пингбэк Опрос
Инфраструктура Требуется общедоступная конечная точка Сервер не нужен
Задержка Мгновенное уведомление Задержка интервала опроса 5 с
Шкала подходящий для 100+ одновременно Штраф за <50 одновременных
Надежность Требуется повторная обработка Простой цикл повтора
Брандмауэр Требуется входящий порт Только исходящие
Сложность Высшая настройка Нижняя установка

Поиск неисправностей

Проблема Причина Исправить
Обратный звонок не получен Конечная точка недоступна Убедитесь, что сервер является общедоступным; проверить брандмауэр
Дублирующиеся обратные вызовы CaptchaAI повторить попытку Сделать обработчик идемпотентным
Неверный идентификатор задачи в обратном вызове Устаревшее состояние сервера Проверить время регистрации задачи
Таймаут несмотря на решение URL обратного вызова недоступен Сначала протестируйте конечную точку с помощью Curl

Часто задаваемые вопросы

Могу ли я использовать pingback со всеми типами CAPTCHA?

Да. Параметр pingback работает с reCAPTCHA, Turnstile, GeeTest, Image, BLS и всеми другими поддерживаемыми методами.

Что произойдет, если мой сервер выйдет из строя в момент поступления обратного вызова?

CaptchaAI может повторить обратный вызов. Вам также следует реализовать резервный механизм опроса для задач, которые не получают обратные вызовы в течение таймаута.

Могу ли я использовать localhost для тестирования?

Нет. URL обратного вызова должен быть общедоступным. Используйте ngrok или аналогичный туннель для локального тестирования.


Связанные руководства

  • URL обратного вызова и руководство по вебхуку
  • Пакетное решение CAPTCHA

Создавайте рабочие процессы, управляемые событиями —получите ключ CaptchaAIсейчас.

Комментарии для этой статьи отключены.