Статические рабочие пулы тратят деньги в периоды затишья и создают узкие места во время пиков нагрузки. Автоматическое масштабирование сопоставляет количество работников с фактическим спросом, оптимизируя как затраты, так и пропускную способность.
Масштабирование сигналов
| Сигнал | Масштабируйте, когда | Уменьшить масштаб, когда |
|---|---|---|
| Глубина очереди | > 20 невыполненных задач | < 5 невыполненных задач |
| Использование рабочих | > 80% занято | <20 % занято |
| Устранение задержки | P95 > 60 секунд | P95 < 20 секунд |
| Доля ошибок | > 5% (нужны свежие работники) | Стабильный < 1% |
| Баланс | PLACEHOLDER_TOKEN | Баланс < 1 доллара США (прекратить масштабирование) |
Поточное автоматическое масштабирование
Масштабируйте рабочие потоки в рамках одного процесса:
import os
import time
import threading
import requests
import json
import redis
class AutoScalingPool:
"""Dynamically scale CaptchaAI worker threads."""
def __init__(self, api_key, redis_url="redis://localhost:6379"):
self.api_key = api_key
self.redis = redis.from_url(redis_url)
self.base = "https://ocr.captchaai.com"
self.queue_key = "captcha:tasks"
self.results_key = "captcha:results"
self.min_workers = 2
self.max_workers = 20
self.workers = []
self.active_count = 0
self.lock = threading.Lock()
self.running = True
def start(self):
"""Start the pool with minimum workers."""
for _ in range(self.min_workers):
self._add_worker()
# Start scaler in background
scaler = threading.Thread(target=self._scaling_loop, daemon=True)
scaler.start()
print(f"Pool started with {self.min_workers} workers")
def _add_worker(self):
"""Add a worker thread."""
if len(self.workers) >= self.max_workers:
return
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
self.workers.append(t)
def _remove_worker(self):
"""Signal one worker to stop (lazy removal)."""
if len(self.workers) <= self.min_workers:
return
self.workers.pop() # Thread will exit on next idle cycle
def _worker_loop(self):
"""Worker loop: fetch and process tasks."""
while self.running and threading.current_thread() in self.workers:
result = self.redis.blpop(self.queue_key, timeout=10)
if result is None:
continue
_, raw = result
task = json.loads(raw)
task_id = task["id"]
with self.lock:
self.active_count += 1
try:
token = self._solve(task["method"], task["params"])
self.redis.hset(self.results_key, task_id, json.dumps({
"status": "success", "token": token,
}))
except Exception as e:
self.redis.hset(self.results_key, task_id, json.dumps({
"status": "error", "error": str(e),
}))
finally:
with self.lock:
self.active_count -= 1
def _scaling_loop(self):
"""Periodically adjust worker count."""
while self.running:
time.sleep(10)
queue_depth = self.redis.llen(self.queue_key)
current = len(self.workers)
utilization = (
self.active_count / current * 100 if current > 0 else 0
)
# Scale up: queue growing and workers busy
if queue_depth > 20 and utilization > 70:
new_count = min(current + 2, self.max_workers)
while len(self.workers) < new_count:
self._add_worker()
print(f"Scaled up: {current} → {len(self.workers)} workers")
# Scale down: queue empty and workers idle
elif queue_depth < 5 and utilization < 20:
target = max(current - 1, self.min_workers)
while len(self.workers) > target:
self._remove_worker()
if len(self.workers) < current:
print(f"Scaled down: {current} → {len(self.workers)} workers")
def _solve(self, method, params, timeout=120):
data = {"key": self.api_key, "method": method, "json": 1}
data.update(params)
resp = requests.post(
f"{self.base}/in.php", data=data, timeout=30,
)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(result.get("request"))
captcha_id = result["request"]
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{self.base}/res.php", params={
"key": self.api_key,
"action": "get",
"id": captcha_id,
"json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
if data.get("status") == 1:
return data["request"]
raise RuntimeError(data["request"])
raise TimeoutError("Solve timeout")
def stats(self):
return {
"workers": len(self.workers),
"active": self.active_count,
"queue": self.redis.llen(self.queue_key),
}
# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()
# Monitor
while True:
print(pool.stats())
time.sleep(30)
Автоматическое масштабирование на основе процессов
Масштабируйте рабочие процессы для изоляции ЦП:
import multiprocessing
import time
import redis
import os
class ProcessScaler:
"""Scale worker processes based on queue depth."""
def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
self.worker_fn = worker_fn
self.redis = redis.from_url(redis_url)
self.processes = []
self.min_workers = 2
self.max_workers = 16
def run(self, check_interval=15):
"""Run the scaler loop."""
# Start minimum workers
for _ in range(self.min_workers):
self._spawn()
while True:
time.sleep(check_interval)
self._cleanup_dead()
queue_depth = self.redis.llen("captcha:tasks")
current = len(self.processes)
# Scale up
if queue_depth > current * 5 and current < self.max_workers:
to_add = min(
max(1, queue_depth // 10),
self.max_workers - current,
)
for _ in range(to_add):
self._spawn()
print(f"Scaled up to {len(self.processes)} workers")
# Scale down
elif queue_depth < 3 and current > self.min_workers:
to_remove = min(2, current - self.min_workers)
for _ in range(to_remove):
p = self.processes.pop()
p.terminate()
print(f"Scaled down to {len(self.processes)} workers")
def _spawn(self):
p = multiprocessing.Process(target=self.worker_fn)
p.start()
self.processes.append(p)
def _cleanup_dead(self):
self.processes = [p for p in self.processes if p.is_alive()]
# Ensure minimum
while len(self.processes) < self.min_workers:
self._spawn()
Масштабирование с учетом баланса
Прекратите масштабирование, когда средства иссякают:
def check_balance(api_key, min_balance=2.0):
"""Check if balance is sufficient for scaling."""
resp = requests.get("https://ocr.captchaai.com/res.php", params={
"key": api_key,
"action": "getbalance",
"json": 1,
}, timeout=15)
balance = float(resp.json()["request"])
if balance < min_balance:
print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
return False
return True
Интегрируйте в цикл масштабирования:
# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
if check_balance(self.api_key, min_balance=2.0):
# Scale up
...
else:
print("Scaling paused — low balance")
Сравнение стратегий масштабирования
| Стратегия | подходящий для | Задержка | Сложность |
|---|---|---|---|
| Пул потоков | I/O-bound (вызовы API) | Низкий | Низкий |
| Пул процессов | Предварительная обработка с привязкой к ЦП | Середина | Середина |
| Кубернетес HPA | Облачные развертывания | Выше | Высокий |
| КЕДА | Масштабирование, управляемое событиями | Середина | Середина |
Поиск неисправностей
| Проблема | Причина | Исправить |
|---|---|---|
| Рабочие продолжают расширяться | Очередь никогда не опустошается | Проверьте, действительно ли работники обрабатывают |
| Уменьшение масштаба слишком агрессивное | Низкий порог | Увеличьте задержку уменьшения масштаба до 30 с+. |
| Зомби-процессы | Не очищенные процессы | Регулярно используйте _cleanup_dead() |
| Баланс быстро расходуется | Слишком много работников | Добавить проверку баланса в логику масштабирования |
Часто задаваемые вопросы
Каково правильное соотношение рабочих к очереди?
Стремитесь к одному работнику на 5–10 задач в очереди. Каждый работник обрабатывает ~3-6 CAPTCHA в минуту в зависимости от типа.
Должен ли я использовать потоки или процессы?
Потоки для чистого вызова API (CaptchaAI — I/O-bound). Процессы, когда помимо решения вы также выполняете предварительную обработку изображений или тяжелые вычисления.
Как быстро мне следует масштабироваться?
Быстро увеличивайте масштаб (проверка каждые 10–15 секунд), медленно уменьшайте масштаб (подождите 30–60 секунд при низкой нагрузке). Это предотвращает метания между состояниями.
Связанные руководства
- Очереди заданий Kubernetes
- Прометей/Grafana Мониторинг
Умное масштабирование —получите ключ CaptchaAIсегодня.