DevOps & Scaling

Рабочие решения CAPTCHA с автоматическим масштабированием

Статические рабочие пулы тратят деньги в периоды затишья и создают узкие места во время пиков нагрузки. Автоматическое масштабирование сопоставляет количество работников с фактическим спросом, оптимизируя как затраты, так и пропускную способность.


Масштабирование сигналов

Сигнал Масштабируйте, когда Уменьшить масштаб, когда
Глубина очереди > 20 невыполненных задач < 5 невыполненных задач
Использование рабочих > 80% занято <20 % занято
Устранение задержки P95 > 60 секунд P95 < 20 секунд
Доля ошибок > 5% (нужны свежие работники) Стабильный < 1%
Баланс PLACEHOLDER_TOKEN Баланс < 1 ​​доллара США (прекратить масштабирование)

Поточное автоматическое масштабирование

Масштабируйте рабочие потоки в рамках одного процесса:

import os
import time
import threading
import requests
import json
import redis


class AutoScalingPool:
    """Dynamically scale CaptchaAI worker threads."""

    def __init__(self, api_key, redis_url="redis://localhost:6379"):
        self.api_key = api_key
        self.redis = redis.from_url(redis_url)
        self.base = "https://ocr.captchaai.com"
        self.queue_key = "captcha:tasks"
        self.results_key = "captcha:results"

        self.min_workers = 2
        self.max_workers = 20
        self.workers = []
        self.active_count = 0
        self.lock = threading.Lock()
        self.running = True

    def start(self):
        """Start the pool with minimum workers."""
        for _ in range(self.min_workers):
            self._add_worker()

        # Start scaler in background
        scaler = threading.Thread(target=self._scaling_loop, daemon=True)
        scaler.start()
        print(f"Pool started with {self.min_workers} workers")

    def _add_worker(self):
        """Add a worker thread."""
        if len(self.workers) >= self.max_workers:
            return
        t = threading.Thread(target=self._worker_loop, daemon=True)
        t.start()
        self.workers.append(t)

    def _remove_worker(self):
        """Signal one worker to stop (lazy removal)."""
        if len(self.workers) <= self.min_workers:
            return
        self.workers.pop()  # Thread will exit on next idle cycle

    def _worker_loop(self):
        """Worker loop: fetch and process tasks."""
        while self.running and threading.current_thread() in self.workers:
            result = self.redis.blpop(self.queue_key, timeout=10)
            if result is None:
                continue

            _, raw = result
            task = json.loads(raw)
            task_id = task["id"]

            with self.lock:
                self.active_count += 1

            try:
                token = self._solve(task["method"], task["params"])
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "success", "token": token,
                }))
            except Exception as e:
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "error", "error": str(e),
                }))
            finally:
                with self.lock:
                    self.active_count -= 1

    def _scaling_loop(self):
        """Periodically adjust worker count."""
        while self.running:
            time.sleep(10)

            queue_depth = self.redis.llen(self.queue_key)
            current = len(self.workers)
            utilization = (
                self.active_count / current * 100 if current > 0 else 0
            )

            # Scale up: queue growing and workers busy
            if queue_depth > 20 and utilization > 70:
                new_count = min(current + 2, self.max_workers)
                while len(self.workers) < new_count:
                    self._add_worker()
                print(f"Scaled up: {current} → {len(self.workers)} workers")

            # Scale down: queue empty and workers idle
            elif queue_depth < 5 and utilization < 20:
                target = max(current - 1, self.min_workers)
                while len(self.workers) > target:
                    self._remove_worker()
                if len(self.workers) < current:
                    print(f"Scaled down: {current} → {len(self.workers)} workers")

    def _solve(self, method, params, timeout=120):
        data = {"key": self.api_key, "method": method, "json": 1}
        data.update(params)

        resp = requests.post(
            f"{self.base}/in.php", data=data, timeout=30,
        )
        result = resp.json()

        if result.get("status") != 1:
            raise RuntimeError(result.get("request"))

        captcha_id = result["request"]
        start = time.time()

        while time.time() - start < timeout:
            time.sleep(5)
            resp = requests.get(f"{self.base}/res.php", params={
                "key": self.api_key,
                "action": "get",
                "id": captcha_id,
                "json": 1,
            }, timeout=15)
            data = resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                if data.get("status") == 1:
                    return data["request"]
                raise RuntimeError(data["request"])

        raise TimeoutError("Solve timeout")

    def stats(self):
        return {
            "workers": len(self.workers),
            "active": self.active_count,
            "queue": self.redis.llen(self.queue_key),
        }


# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()

# Monitor
while True:
    print(pool.stats())
    time.sleep(30)

Автоматическое масштабирование на основе процессов

Масштабируйте рабочие процессы для изоляции ЦП:

import multiprocessing
import time
import redis
import os


class ProcessScaler:
    """Scale worker processes based on queue depth."""

    def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
        self.worker_fn = worker_fn
        self.redis = redis.from_url(redis_url)
        self.processes = []
        self.min_workers = 2
        self.max_workers = 16

    def run(self, check_interval=15):
        """Run the scaler loop."""
        # Start minimum workers
        for _ in range(self.min_workers):
            self._spawn()

        while True:
            time.sleep(check_interval)
            self._cleanup_dead()

            queue_depth = self.redis.llen("captcha:tasks")
            current = len(self.processes)

            # Scale up
            if queue_depth > current * 5 and current < self.max_workers:
                to_add = min(
                    max(1, queue_depth // 10),
                    self.max_workers - current,
                )
                for _ in range(to_add):
                    self._spawn()
                print(f"Scaled up to {len(self.processes)} workers")

            # Scale down
            elif queue_depth < 3 and current > self.min_workers:
                to_remove = min(2, current - self.min_workers)
                for _ in range(to_remove):
                    p = self.processes.pop()
                    p.terminate()
                print(f"Scaled down to {len(self.processes)} workers")

    def _spawn(self):
        p = multiprocessing.Process(target=self.worker_fn)
        p.start()
        self.processes.append(p)

    def _cleanup_dead(self):
        self.processes = [p for p in self.processes if p.is_alive()]
        # Ensure minimum
        while len(self.processes) < self.min_workers:
            self._spawn()

Масштабирование с учетом баланса

Прекратите масштабирование, когда средства иссякают:

def check_balance(api_key, min_balance=2.0):
    """Check if balance is sufficient for scaling."""
    resp = requests.get("https://ocr.captchaai.com/res.php", params={
        "key": api_key,
        "action": "getbalance",
        "json": 1,
    }, timeout=15)
    balance = float(resp.json()["request"])

    if balance < min_balance:
        print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
        return False
    return True

Интегрируйте в цикл масштабирования:

# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
    if check_balance(self.api_key, min_balance=2.0):
        # Scale up
        ...
    else:
        print("Scaling paused — low balance")

Сравнение стратегий масштабирования

Стратегия подходящий для Задержка Сложность
Пул потоков I/O-bound (вызовы API) Низкий Низкий
Пул процессов Предварительная обработка с привязкой к ЦП Середина Середина
Кубернетес HPA Облачные развертывания Выше Высокий
КЕДА Масштабирование, управляемое событиями Середина Середина

Поиск неисправностей

Проблема Причина Исправить
Рабочие продолжают расширяться Очередь никогда не опустошается Проверьте, действительно ли работники обрабатывают
Уменьшение масштаба слишком агрессивное Низкий порог Увеличьте задержку уменьшения масштаба до 30 с+.
Зомби-процессы Не очищенные процессы Регулярно используйте _cleanup_dead()
Баланс быстро расходуется Слишком много работников Добавить проверку баланса в логику масштабирования

Часто задаваемые вопросы

Каково правильное соотношение рабочих к очереди?

Стремитесь к одному работнику на 5–10 задач в очереди. Каждый работник обрабатывает ~3-6 CAPTCHA в минуту в зависимости от типа.

Должен ли я использовать потоки или процессы?

Потоки для чистого вызова API (CaptchaAI — I/O-bound). Процессы, когда помимо решения вы также выполняете предварительную обработку изображений или тяжелые вычисления.

Как быстро мне следует масштабироваться?

Быстро увеличивайте масштаб (проверка каждые 10–15 секунд), медленно уменьшайте масштаб (подождите 30–60 секунд при низкой нагрузке). Это предотвращает метания между состояниями.


Связанные руководства

  • Очереди заданий Kubernetes
  • Прометей/Grafana Мониторинг

Умное масштабирование —получите ключ CaptchaAIсегодня.

Комментарии для этой статьи отключены.

Похожие сообщения

DevOps & Scaling Создание решения CAPTCHA на основе событий с помощью AWS SNS и CaptchaAI
Руководство Dev Ops по созданию решений CAPTCHA на основе событий с помощью AWS SNS и Captcha AI, с архитектурными решениями, эксплуатационными соображениями и...

Руководство Dev Ops по созданию решений CAPTCHA на основе событий с помощью AWS SNS и Captcha AI, с архитектур...

Apr 24, 2026
DevOps & Scaling Учебники Ansible для развертывания рабочих кадров CaptchaAI
Руководство по Dev Ops для Учебники Ansible для развертывания рабочих кадров Captcha AI, с архитектурными решениями, соображениями по эксплуатации и шаблонами а...

Руководство по Dev Ops для Учебники Ansible для развертывания рабочих кадров Captcha AI, с архитектурными реше...

Apr 22, 2026
DevOps & Scaling AWS Lambda + CaptchaAI: бессерверное решение CAPTCHA
Руководство Dev Ops по AWS Lambda + Captcha AI: бессерверное решение CAPTCHA с архитектурными решениями, соображениями по эксплуатации и шаблонами автоматизации...

Руководство Dev Ops по AWS Lambda + Captcha AI: бессерверное решение CAPTCHA с архитектурными решениями, сообр...

Apr 24, 2026