DevOps и масштабирование

Планирование аварийного восстановления для конвейеров решения CAPTCHA

Конвейер решения CAPTCHA, который теряет текущие задачи во время сбоя, требует затрат данных, времени и денег. Планирование аварийного восстановления (DR) не является универсальным обещанием, что вы сможете восстановиться после сбоев инфраструктуры, сбоев API или ошибок конфигурации с минимальной потерей данных.

Цели аварийного восстановления

Метрика Определение Цель конвейера CAPTCHA
RPO (Цель точки восстановления) Максимально допустимая потеря данных < 5 минут задач в очереди
RTO (целевое время восстановления) Максимальное время восстановления сервиса < 15 минут
MTTR (среднее время восстановления) Среднее время восстановления < 10 минут

Сценарии неудач

Scenario 1: Worker crash         → Restart workers, replay queue
Scenario 2: Queue data loss      → Restore from persistent backup
Scenario 3: Network partition    → Failover to secondary region
Scenario 4: API key compromised  → Rotate key, update workers
Scenario 5: Config corruption    → Rollback to last known good

Уровень сохранения задач

Никогда не решайте CAPTCHA из очереди, хранящейся только в памяти. Выполняйте задачи, чтобы выжить при сбоях.

Python — постоянная очередь задач

import os
import json
import time
import sqlite3
import threading
import requests
from datetime import datetime

API_KEY = os.environ["CAPTCHAAI_API_KEY"]


class PersistentTaskQueue:
    """SQLite-backed task queue that survives crashes."""

    def __init__(self, db_path="captcha_tasks.db"):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path, check_same_thread=False)
        self.lock = threading.Lock()
        self._init_db()

    def _init_db(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS tasks (
                id TEXT PRIMARY KEY,
                payload TEXT NOT NULL,
                status TEXT DEFAULT 'pending',
                created_at TEXT DEFAULT CURRENT_TIMESTAMP,
                started_at TEXT,
                completed_at TEXT,
                result TEXT,
                attempts INTEGER DEFAULT 0
            )
        """)
        self.conn.commit()

    def enqueue(self, task_id, payload):
        with self.lock:
            self.conn.execute(
                "INSERT INTO tasks (id, payload) VALUES (?, ?)",
                (task_id, json.dumps(payload))
            )
            self.conn.commit()

    def dequeue(self):
        with self.lock:
            cursor = self.conn.execute(
                "SELECT id, payload FROM tasks "
                "WHERE status = 'pending' ORDER BY created_at LIMIT 1"
            )
            row = cursor.fetchone()
            if not row:
                return None

            task_id, payload = row
            self.conn.execute(
                "UPDATE tasks SET status = 'processing', "
                "started_at = ?, attempts = attempts + 1 WHERE id = ?",
                (datetime.utcnow().isoformat(), task_id)
            )
            self.conn.commit()
            return {"id": task_id, "payload": json.loads(payload)}

    def complete(self, task_id, result):
        with self.lock:
            self.conn.execute(
                "UPDATE tasks SET status = 'completed', "
                "completed_at = ?, result = ? WHERE id = ?",
                (datetime.utcnow().isoformat(), json.dumps(result), task_id)
            )
            self.conn.commit()

    def fail(self, task_id, error):
        with self.lock:
            # Requeue if under retry limit
            cursor = self.conn.execute(
                "SELECT attempts FROM tasks WHERE id = ?", (task_id,)
            )
            row = cursor.fetchone()
            if row and row[0] < 3:
                self.conn.execute(
                    "UPDATE tasks SET status = 'pending' WHERE id = ?",
                    (task_id,)
                )
            else:
                self.conn.execute(
                    "UPDATE tasks SET status = 'failed', "
                    "result = ? WHERE id = ?",
                    (json.dumps({"error": error}), task_id)
                )
            self.conn.commit()

    def recover_stale(self, timeout_seconds=600):
        """Reset tasks stuck in 'processing' after a crash."""
        with self.lock:
            cutoff = datetime.utcnow().timestamp() - timeout_seconds
            self.conn.execute(
                "UPDATE tasks SET status = 'pending' "
                "WHERE status = 'processing' "
                "AND started_at < datetime(?, 'unixepoch')",
                (cutoff,)
            )
            count = self.conn.total_changes
            self.conn.commit()
            return count

    @property
    def stats(self):
        cursor = self.conn.execute(
            "SELECT status, COUNT(*) FROM tasks GROUP BY status"
        )
        return dict(cursor.fetchall())


# On startup: recover tasks that were processing during a crash
queue = PersistentTaskQueue()
recovered = queue.recover_stale(timeout_seconds=600)
print(f"Recovered {recovered} stale tasks after restart")

JavaScript — Менеджер восстановления

const axios = require("axios");
const fs = require("fs");

const API_KEY = process.env.CAPTCHAAI_API_KEY;

class DisasterRecoveryManager {
  constructor(checkpointDir = "./dr-checkpoints") {
    this.checkpointDir = checkpointDir;
    if (!fs.existsSync(checkpointDir)) {
      fs.mkdirSync(checkpointDir, { recursive: true });
    }
  }

  checkpoint(label, data) {
    const filename = `${this.checkpointDir}/${label}-${Date.now()}.json`;
    fs.writeFileSync(filename, JSON.stringify(data, null, 2));
    this.pruneOldCheckpoints(label, 10); // Keep last 10
    return filename;
  }

  restore(label) {
    const files = fs.readdirSync(this.checkpointDir)
      .filter((f) => f.startsWith(label) && f.endsWith(".json"))
      .sort()
      .reverse();

    if (files.length === 0) return null;
    const latest = fs.readFileSync(
      `${this.checkpointDir}/${files[0]}`, "utf8"
    );
    return JSON.parse(latest);
  }

  pruneOldCheckpoints(label, keep) {
    const files = fs.readdirSync(this.checkpointDir)
      .filter((f) => f.startsWith(label) && f.endsWith(".json"))
      .sort();

    while (files.length > keep) {
      const old = files.shift();
      fs.unlinkSync(`${this.checkpointDir}/${old}`);
    }
  }

  async healthCheck() {
    try {
      const resp = await axios.get("https://ocr.captchaai.com/res.php", {
        params: { key: API_KEY, action: "getbalance", json: 1 },
        timeout: 10000,
      });
      return {
        healthy: resp.data.status === 1,
        balance: parseFloat(resp.data.request || 0),
      };
    } catch (err) {
      return { healthy: false, error: err.message };
    }
  }
}

class ResilientSolver {
  constructor() {
    this.dr = new DisasterRecoveryManager();
    this.pendingTasks = [];
  }

  async solveBatch(tasks) {
    // Checkpoint before starting
    this.dr.checkpoint("batch-pending", {
      tasks,
      startedAt: new Date().toISOString(),
    });

    const results = [];
    for (const task of tasks) {
      try {
        const result = await this.solveSingle(task);
        results.push({ taskId: task.id, ...result });
      } catch (err) {
        results.push({ taskId: task.id, error: err.message });
      }

      // Checkpoint progress periodically
      if (results.length % 10 === 0) {
        this.dr.checkpoint("batch-progress", { results, remaining: tasks.length - results.length });
      }
    }

    // Final checkpoint
    this.dr.checkpoint("batch-complete", { results });
    return results;
  }

  async recover() {
    // Check for incomplete batch
    const progress = this.dr.restore("batch-progress");
    const pending = this.dr.restore("batch-pending");

    if (progress) {
      const completedIds = new Set(progress.results.map((r) => r.taskId));
      const remaining = pending?.tasks.filter((t) => !completedIds.has(t.id));
      console.log(
        `Recovering: ${progress.results.length} done, ${remaining?.length || 0} remaining`
      );
      return remaining || [];
    }

    if (pending) {
      console.log(`Recovering full batch: ${pending.tasks.length} tasks`);
      return pending.tasks;
    }

    return [];
  }

  async solveSingle(task) {
    const resp = await axios.post("https://ocr.captchaai.com/in.php", null, {
      params: {
        key: API_KEY,
        method: "userrecaptcha",
        googlekey: task.sitekey,
        pageurl: task.pageurl,
        json: 1,
      },
    });

    if (resp.data.status !== 1) throw new Error(resp.data.request);

    const captchaId = resp.data.request;
    for (let i = 0; i < 60; i++) {
      await new Promise((r) => setTimeout(r, 5000));
      const poll = await axios.get("https://ocr.captchaai.com/res.php", {
        params: { key: API_KEY, action: "get", id: captchaId, json: 1 },
      });
      if (poll.data.status === 1) return { solution: poll.data.request };
      if (poll.data.request !== "CAPCHA_NOT_READY")
        throw new Error(poll.data.request);
    }
    throw new Error("TIMEOUT");
  }
}

// Start with recovery check
const solver = new ResilientSolver();
solver.recover().then((remaining) => {
  if (remaining.length > 0) {
    console.log(`Resuming ${remaining.length} tasks from checkpoint`);
    solver.solveBatch(remaining);
  }
});

Шаблон Runbook аварийного восстановления

RUNBOOK: CAPTCHA Pipeline Recovery
====================================

1. DETECT
   - Alert fires: [PagerDuty / Slack / Email]
   - Symptom: [Queue growing / Workers offline / Error spike]

2. ASSESS
   - Check worker health: curl http://workers/health
   - Check API status: GET /res.php?action=getbalance
   - Check queue depth: SELECT COUNT(*) FROM tasks WHERE status='pending'

3. RECOVER
   If: Workers crashed
     → Restart worker containers: docker-compose up -d workers
     → Run stale task recovery: recovery.py --recover-stale

   If: Network partition
     → Failover to secondary region
     → Update DNS or load balancer routing

   If: API key compromised
     → Generate new key at captchaai.com
     → Update secret store
     → Rolling restart workers

4. VERIFY
   - Confirm solve rate > 90%
   - Confirm queue draining
   - Confirm no duplicate solves

5. POST-MORTEM
   - Document root cause
   - Update runbook if needed

Поиск неисправностей

| Проблема | Причина | Исправить |


Следующие шаги

  • CaptchaAI Quickstart: ваше первое решение CAPTCHA за 5 минут
  • Как решить reCAPTCHA v2 через API: пошаговое руководство
  • Как решить Cloudflare Turnstile через API
  • Как решить GeeTest v3 с помощью API

Картирование целей восстановления

  • Определите, какие данные можно воспроизвести позже, а какие потоки, связанные с пользователем, требуют немедленного восстановления после сбоя.
  • Сопоставьте каждый компонент конвейера с целевыми значениями RPO и RTO вместо того, чтобы рассматривать всю систему как один блок.
  • Задокументируйте, кто может инициировать аварийное переключение, кто проверяет восстановление и когда возобновлять нормальную маршрутизацию.
Комментарии для этой статьи отключены.