Когда несколько работников или повторных попыток отправляют одну и ту же CAPTCHA для решения, вы платите за каждый дубликат. Уровень дедупликации перехватывает идентичные запросы и возвращает тот же результат — экономя кредиты API и уменьшая задержку.
Как возникают дубликаты
| Сценарий | Причина | Напрасно тратить |
|---|---|---|
| Повторить попытку до получения результата | Агрессивная логика повтора | 2–5-кратная стоимость CAPTCHA |
| Несколько работников, одна цель | Нет координации между работниками | Параллельные траты решают |
| Обновление страницы повторяется | Повторная попытка внешнего интерфейса по таймауту | Дополнительное решение за каждое обновление |
| Сообщение очереди воспроизведено повторно | Гарантия доставки хотя бы один раз | Дублирование решения для каждого повтора |
Ключевой дизайн дедупликации
Сгенерируйте уникальный ключ из параметров запроса:
import hashlib
def dedup_key(method, sitekey, pageurl):
"""Generate a deduplication key for a CAPTCHA solve request."""
raw = f"{method}:{sitekey}:{pageurl}"
return f"captcha:dedup:{hashlib.sha256(raw.encode()).hexdigest()[:16]}"
Ключевой состав:
| Тип капчи | Ключевые компоненты |
|---|---|
| reCAPTCHA v2 | method + sitekey + pageurl |
| reCAPTCHA v3 | method + sitekey + pageurl + action |
| hCaptcha | method + sitekey + pageurl |
| Cloudflare Turnstile | method + sitekey + pageurl |
| Капча изображения | method + хеш body (содержимое изображения) |
Дедупликация на основе Redis
Реализация Python
import os
import time
import json
import hashlib
import redis
import requests
r = redis.Redis(
host=os.environ.get("REDIS_HOST", "localhost"),
port=int(os.environ.get("REDIS_PORT", 6379)),
decode_responses=True
)
API_KEY = os.environ["CAPTCHAAI_API_KEY"]
# Dedup window: how long to consider a request "in progress"
DEDUP_TTL = 180 # seconds
def dedup_key(method, sitekey, pageurl, extra=""):
raw = f"{method}:{sitekey}:{pageurl}:{extra}"
return f"captcha:dedup:{hashlib.sha256(raw.encode()).hexdigest()[:16]}"
def solve_with_dedup(sitekey, pageurl, method="userrecaptcha"):
key = dedup_key(method, sitekey, pageurl)
# Check if this request is already being solved
existing = r.get(key)
if existing:
state = json.loads(existing)
if state["status"] == "solving":
# Wait for the result
return wait_for_result(key)
elif state["status"] == "solved":
return {"solution": state["solution"], "source": "dedup_cache"}
elif state["status"] == "error":
pass # Allow retry on error
# Mark as solving
r.set(key, json.dumps({"status": "solving", "started": time.time()}), ex=DEDUP_TTL)
# Submit to CaptchaAI
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": API_KEY,
"method": method,
"googlekey": sitekey,
"pageurl": pageurl,
"json": 1
})
data = resp.json()
if data.get("status") != 1:
r.set(key, json.dumps({"status": "error", "error": data.get("request")}), ex=30)
return {"error": data.get("request")}
captcha_id = data["request"]
# Poll for result
for _ in range(60):
time.sleep(5)
result = requests.get("https://ocr.captchaai.com/res.php", params={
"key": API_KEY, "action": "get",
"id": captcha_id, "json": 1
}).json()
if result.get("status") == 1:
solution = result["request"]
# Cache the result for other workers (short TTL since tokens expire)
r.set(key, json.dumps({
"status": "solved",
"solution": solution,
"solved_at": time.time()
}), ex=60) # Cache result for 60 seconds
return {"solution": solution, "source": "api"}
if result.get("request") != "CAPCHA_NOT_READY":
r.set(key, json.dumps({
"status": "error", "error": result.get("request")
}), ex=30)
return {"error": result.get("request")}
r.set(key, json.dumps({"status": "error", "error": "TIMEOUT"}), ex=30)
return {"error": "TIMEOUT"}
def wait_for_result(key, timeout=120):
"""Wait for another worker to finish solving."""
start = time.time()
while time.time() - start < timeout:
data = r.get(key)
if data:
state = json.loads(data)
if state["status"] == "solved":
return {"solution": state["solution"], "source": "dedup_wait"}
if state["status"] == "error":
return {"error": state.get("error", "UNKNOWN")}
time.sleep(2)
return {"error": "DEDUP_WAIT_TIMEOUT"}
Реализация JavaScript
const Redis = require("ioredis");
const axios = require("axios");
const crypto = require("crypto");
const redis = new Redis(process.env.REDIS_URL || "redis://localhost:6379");
const API_KEY = process.env.CAPTCHAAI_API_KEY;
const DEDUP_TTL = 180;
function dedupKey(method, sitekey, pageurl) {
const raw = `${method}:${sitekey}:${pageurl}`;
const hash = crypto.createHash("sha256").update(raw).digest("hex").slice(0, 16);
return `captcha:dedup:${hash}`;
}
async function solveWithDedup(sitekey, pageurl, method = "userrecaptcha") {
const key = dedupKey(method, sitekey, pageurl);
// Check existing
const existing = await redis.get(key);
if (existing) {
const state = JSON.parse(existing);
if (state.status === "solving") return await waitForResult(key);
if (state.status === "solved") return { solution: state.solution, source: "dedup_cache" };
}
// Mark as solving
await redis.set(key, JSON.stringify({ status: "solving", started: Date.now() }), "EX", DEDUP_TTL);
// Submit
const submit = await axios.post("https://ocr.captchaai.com/in.php", null, {
params: { key: API_KEY, method, googlekey: sitekey, pageurl, json: 1 },
});
if (submit.data.status !== 1) {
await redis.set(key, JSON.stringify({ status: "error", error: submit.data.request }), "EX", 30);
return { error: submit.data.request };
}
const captchaId = submit.data.request;
for (let i = 0; i < 60; i++) {
await new Promise((r) => setTimeout(r, 5000));
const poll = await axios.get("https://ocr.captchaai.com/res.php", {
params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
});
if (poll.data.status === 1) {
await redis.set(key, JSON.stringify({ status: "solved", solution: poll.data.request }), "EX", 60);
return { solution: poll.data.request, source: "api" };
}
if (poll.data.request !== "CAPCHA_NOT_READY") {
await redis.set(key, JSON.stringify({ status: "error", error: poll.data.request }), "EX", 30);
return { error: poll.data.request };
}
}
await redis.set(key, JSON.stringify({ status: "error", error: "TIMEOUT" }), "EX", 30);
return { error: "TIMEOUT" };
}
async function waitForResult(key, timeout = 120000) {
const start = Date.now();
while (Date.now() - start < timeout) {
const data = await redis.get(key);
if (data) {
const state = JSON.parse(data);
if (state.status === "solved") return { solution: state.solution, source: "dedup_wait" };
if (state.status === "error") return { error: state.error };
}
await new Promise((r) => setTimeout(r, 2000));
}
return { error: "DEDUP_WAIT_TIMEOUT" };
}
Альтернатива блокировки базы данных
Для дедупликации на основе PostgreSQL без Redis:
import psycopg2
def solve_with_pg_dedup(conn, sitekey, pageurl):
"""Use PostgreSQL advisory locks for deduplication."""
# Generate a numeric lock key from the dedup key
lock_id = hash(f"{sitekey}:{pageurl}") & 0x7FFFFFFF
cursor = conn.cursor()
# Try to acquire advisory lock (non-blocking)
cursor.execute("SELECT pg_try_advisory_lock(%s)", (lock_id,))
acquired = cursor.fetchone()[0]
if not acquired:
# Another worker is solving — wait for result
cursor.execute("SELECT pg_advisory_lock(%s)", (lock_id,))
# Lock acquired means other worker finished — check cache
cursor.execute(
"SELECT solution FROM captcha_cache "
"WHERE sitekey = %s AND pageurl = %s "
"AND created_at > NOW() - INTERVAL '60 seconds'",
(sitekey, pageurl)
)
row = cursor.fetchone()
cursor.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))
if row:
return {"solution": row[0], "source": "pg_cache"}
return {"error": "NO_CACHED_RESULT"}
try:
# Solve the CAPTCHA
solution = solve_via_api(sitekey, pageurl)
if solution:
cursor.execute(
"INSERT INTO captcha_cache (sitekey, pageurl, solution) "
"VALUES (%s, %s, %s)",
(sitekey, pageurl, solution)
)
conn.commit()
return {"solution": solution} if solution else {"error": "SOLVE_FAILED"}
finally:
cursor.execute("SELECT pg_advisory_unlock(%s)", (lock_id,))
Показатели эффективности дедупликации
Отслеживайте экономию при дедупликации:
def track_dedup_stats(source):
"""Increment counters for dedup tracking."""
today = time.strftime("%Y-%m-%d")
r.hincrby(f"dedup:stats:{today}", source, 1)
r.expire(f"dedup:stats:{today}", 7 * 86400)
def get_dedup_report():
today = time.strftime("%Y-%m-%d")
stats = r.hgetall(f"dedup:stats:{today}")
total = sum(int(v) for v in stats.values())
saved = int(stats.get("dedup_cache", 0)) + int(stats.get("dedup_wait", 0))
return {
"total_requests": total,
"deduplicated": saved,
"savings_pct": f"{saved / total * 100:.1f}%" if total else "0%",
"breakdown": stats
}
Поиск неисправностей
| Проблема | Причина | Исправить |
Следующие шаги
- CaptchaAI Quickstart: ваше первое решение CAPTCHA за 5 минут
- Как решить reCAPTCHA v2 через API: пошаговое руководство
- Как решить Cloudflare Turnstile через API
- Как решить GeeTest v3 с помощью API