Туториалы

Дедупликация запросов на решение CAPTCHA с блокировкой базы данных

Когда несколько работников или повторных попыток отправляют одну и ту же CAPTCHA для решения, вы платите за каждый дубликат. Уровень дедупликации перехватывает идентичные запросы и возвращает тот же результат — экономя кредиты API и уменьшая задержку.

Как возникают дубликаты

Сценарий Причина Напрасно тратить
Повторить попытку до получения результата Агрессивная логика повтора 2–5-кратная стоимость CAPTCHA
Несколько работников, одна цель Нет координации между работниками Параллельные траты решают
Обновление страницы повторяется Повторная попытка внешнего интерфейса по таймауту Дополнительное решение за каждое обновление
Сообщение очереди воспроизведено повторно Гарантия доставки хотя бы один раз Дублирование решения для каждого повтора

Ключевой дизайн дедупликации

Сгенерируйте уникальный ключ из параметров запроса:

import hashlib


def dedup_key(method, sitekey, pageurl):
    """Generate a deduplication key for a CAPTCHA solve request."""
    raw = f"{method}:{sitekey}:{pageurl}"
    return f"captcha:dedup:{hashlib.sha256(raw.encode()).hexdigest()[:16]}"

Ключевой состав:

Тип капчи Ключевые компоненты
reCAPTCHA v2 method + sitekey + pageurl
reCAPTCHA v3 method + sitekey + pageurl + action
hCaptcha method + sitekey + pageurl
Cloudflare Turnstile method + sitekey + pageurl
Капча изображения method + хеш body (содержимое изображения)

Дедупликация на основе Redis

Реализация Python

import os
import time
import json
import hashlib
import redis
import requests

r = redis.Redis(
    host=os.environ.get("REDIS_HOST", "localhost"),
    port=int(os.environ.get("REDIS_PORT", 6379)),
    decode_responses=True
)

API_KEY = os.environ["CAPTCHAAI_API_KEY"]

# Dedup window: how long to consider a request "in progress"
DEDUP_TTL = 180  # seconds


def dedup_key(method, sitekey, pageurl, extra=""):
    raw = f"{method}:{sitekey}:{pageurl}:{extra}"
    return f"captcha:dedup:{hashlib.sha256(raw.encode()).hexdigest()[:16]}"


def solve_with_dedup(sitekey, pageurl, method="userrecaptcha"):
    key = dedup_key(method, sitekey, pageurl)

    # Check if this request is already being solved
    existing = r.get(key)
    if existing:
        state = json.loads(existing)
        if state["status"] == "solving":
            # Wait for the result
            return wait_for_result(key)
        elif state["status"] == "solved":
            return {"solution": state["solution"], "source": "dedup_cache"}
        elif state["status"] == "error":
            pass  # Allow retry on error

    # Mark as solving
    r.set(key, json.dumps({"status": "solving", "started": time.time()}), ex=DEDUP_TTL)

    # Submit to CaptchaAI
    resp = requests.post("https://ocr.captchaai.com/in.php", data={
        "key": API_KEY,
        "method": method,
        "googlekey": sitekey,
        "pageurl": pageurl,
        "json": 1
    })
    data = resp.json()

    if data.get("status") != 1:
        r.set(key, json.dumps({"status": "error", "error": data.get("request")}), ex=30)
        return {"error": data.get("request")}

    captcha_id = data["request"]

    # Poll for result
    for _ in range(60):
        time.sleep(5)
        result = requests.get("https://ocr.captchaai.com/res.php", params={
            "key": API_KEY, "action": "get",
            "id": captcha_id, "json": 1
        }).json()

        if result.get("status") == 1:
            solution = result["request"]
            # Cache the result for other workers (short TTL since tokens expire)
            r.set(key, json.dumps({
                "status": "solved",
                "solution": solution,
                "solved_at": time.time()
            }), ex=60)  # Cache result for 60 seconds
            return {"solution": solution, "source": "api"}

        if result.get("request") != "CAPCHA_NOT_READY":
            r.set(key, json.dumps({
                "status": "error", "error": result.get("request")
            }), ex=30)
            return {"error": result.get("request")}

    r.set(key, json.dumps({"status": "error", "error": "TIMEOUT"}), ex=30)
    return {"error": "TIMEOUT"}


def wait_for_result(key, timeout=120):
    """Wait for another worker to finish solving."""
    start = time.time()
    while time.time() - start < timeout:
        data = r.get(key)
        if data:
            state = json.loads(data)
            if state["status"] == "solved":
                return {"solution": state["solution"], "source": "dedup_wait"}
            if state["status"] == "error":
                return {"error": state.get("error", "UNKNOWN")}
        time.sleep(2)
    return {"error": "DEDUP_WAIT_TIMEOUT"}

Реализация JavaScript

const Redis = require("ioredis");
const axios = require("axios");
const crypto = require("crypto");

const redis = new Redis(process.env.REDIS_URL || "redis://localhost:6379");
const API_KEY = process.env.CAPTCHAAI_API_KEY;
const DEDUP_TTL = 180;

function dedupKey(method, sitekey, pageurl) {
  const raw = `${method}:${sitekey}:${pageurl}`;
  const hash = crypto.createHash("sha256").update(raw).digest("hex").slice(0, 16);
  return `captcha:dedup:${hash}`;
}

async function solveWithDedup(sitekey, pageurl, method = "userrecaptcha") {
  const key = dedupKey(method, sitekey, pageurl);

  // Check existing
  const existing = await redis.get(key);
  if (existing) {
    const state = JSON.parse(existing);
    if (state.status === "solving") return await waitForResult(key);
    if (state.status === "solved") return { solution: state.solution, source: "dedup_cache" };
  }

  // Mark as solving
  await redis.set(key, JSON.stringify({ status: "solving", started: Date.now() }), "EX", DEDUP_TTL);

  // Submit
  const submit = await axios.post("https://ocr.captchaai.com/in.php", null, {
    params: { key: API_KEY, method, googlekey: sitekey, pageurl, json: 1 },
  });

  if (submit.data.status !== 1) {
    await redis.set(key, JSON.stringify({ status: "error", error: submit.data.request }), "EX", 30);
    return { error: submit.data.request };
  }

  const captchaId = submit.data.request;

  for (let i = 0; i < 60; i++) {
    await new Promise((r) => setTimeout(r, 5000));
    const poll = await axios.get("https://ocr.captchaai.com/res.php", {
      params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
    });

    if (poll.data.status === 1) {
      await redis.set(key, JSON.stringify({ status: "solved", solution: poll.data.request }), "EX", 60);
      return { solution: poll.data.request, source: "api" };
    }
    if (poll.data.request !== "CAPCHA_NOT_READY") {
      await redis.set(key, JSON.stringify({ status: "error", error: poll.data.request }), "EX", 30);
      return { error: poll.data.request };
    }
  }

  await redis.set(key, JSON.stringify({ status: "error", error: "TIMEOUT" }), "EX", 30);
  return { error: "TIMEOUT" };
}

async function waitForResult(key, timeout = 120000) {
  const start = Date.now();
  while (Date.now() - start < timeout) {
    const data = await redis.get(key);
    if (data) {
      const state = JSON.parse(data);
      if (state.status === "solved") return { solution: state.solution, source: "dedup_wait" };
      if (state.status === "error") return { error: state.error };
    }
    await new Promise((r) => setTimeout(r, 2000));
  }
  return { error: "DEDUP_WAIT_TIMEOUT" };
}

Альтернатива блокировки базы данных

Для дедупликации на основе PostgreSQL без Redis:

import psycopg2


def solve_with_pg_dedup(conn, sitekey, pageurl):
    """Use PostgreSQL advisory locks for deduplication."""
    # Generate a numeric lock key from the dedup key
    lock_id = hash(f"{sitekey}:{pageurl}") & 0x7FFFFFFF

    cursor = conn.cursor()

    # Try to acquire advisory lock (non-blocking)
    cursor.execute("SELECT pg_try_advisory_lock(%s)", (lock_id,))
    acquired = cursor.fetchone()[0]

    if not acquired:
        # Another worker is solving — wait for result
        cursor.execute("SELECT pg_advisory_lock(%s)", (lock_id,))
        # Lock acquired means other worker finished — check cache
        cursor.execute(
            "SELECT solution FROM captcha_cache "
            "WHERE sitekey = %s AND pageurl = %s "
            "AND created_at > NOW() - INTERVAL '60 seconds'",
            (sitekey, pageurl)
        )
        row = cursor.fetchone()
        cursor.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))
        if row:
            return {"solution": row[0], "source": "pg_cache"}
        return {"error": "NO_CACHED_RESULT"}

    try:
        # Solve the CAPTCHA
        solution = solve_via_api(sitekey, pageurl)
        if solution:
            cursor.execute(
                "INSERT INTO captcha_cache (sitekey, pageurl, solution) "
                "VALUES (%s, %s, %s)",
                (sitekey, pageurl, solution)
            )
            conn.commit()
        return {"solution": solution} if solution else {"error": "SOLVE_FAILED"}
    finally:
        cursor.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))

Показатели эффективности дедупликации

Отслеживайте экономию при дедупликации:

def track_dedup_stats(source):
    """Increment counters for dedup tracking."""
    today = time.strftime("%Y-%m-%d")
    r.hincrby(f"dedup:stats:{today}", source, 1)
    r.expire(f"dedup:stats:{today}", 7 * 86400)


def get_dedup_report():
    today = time.strftime("%Y-%m-%d")
    stats = r.hgetall(f"dedup:stats:{today}")
    total = sum(int(v) for v in stats.values())
    saved = int(stats.get("dedup_cache", 0)) + int(stats.get("dedup_wait", 0))
    return {
        "total_requests": total,
        "deduplicated": saved,
        "savings_pct": f"{saved / total * 100:.1f}%" if total else "0%",
        "breakdown": stats
    }

Поиск неисправностей

| Проблема | Причина | Исправить |


Следующие шаги

  • CaptchaAI Quickstart: ваше первое решение CAPTCHA за 5 минут
  • Как решить reCAPTCHA v2 через API: пошаговое руководство
  • Как решить Cloudflare Turnstile через API
  • Как решить GeeTest v3 с помощью API
Комментарии для этой статьи отключены.