mikrotik-bot/bot.py
stakost 25d910cef7
All checks were successful
Build and Deploy MikroTik Bot / build-and-deploy (push) Successful in 26s
Fix Vault integration and ALLOWED_USER_IDS handling
2025-06-01 14:03:16 +03:00

342 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import asyncio
from aiogram import Bot, Dispatcher, types
from aiogram.filters import Command
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from dotenv import load_dotenv
from librouteros import connect
from librouteros.exceptions import TrapError
from fastapi import FastAPI, Request
import uvicorn
from db import init_db, upsert_client, start_session, update_session, close_session, get_history, get_clients, get_stats
from vault_client import VaultClient
load_dotenv()
# Получаем конфигурацию из Vault или environment variables
vault = VaultClient()
config = vault.get_config()
# Используем правильные имена переменных
BOT_TOKEN = config.get('BOT_TOKEN') or os.getenv("BOT_TOKEN")
ROUTER_HOST = config.get('ROUTER_HOST') or os.getenv("ROUTER_HOST")
ROUTER_USER = config.get('ROUTER_USER') or os.getenv("ROUTER_USER")
ROUTER_PASSWORD = config.get('ROUTER_PASSWORD') or os.getenv("ROUTER_PASSWORD")
# Безопасная обработка ALLOWED_USER_IDS
allowed_ids_str = os.getenv("ALLOWED_USER_IDS", "")
if allowed_ids_str.strip():
ALLOWED_USER_IDS = set(map(int, allowed_ids_str.split(",")))
else:
# Если не установлено, используем пустое множество или значение по умолчанию
ALLOWED_USER_IDS = set()
print("⚠️ ALLOWED_USER_IDS не установлено - доступ для всех пользователей!")
bot = Bot(token=BOT_TOKEN)
dp = Dispatcher()
# Авторизация по user_id
async def check_user(message: types.Message) -> bool:
# Если список пуст, разрешаем всем (для тестирования)
if not ALLOWED_USER_IDS:
return True
if message.from_user.id not in ALLOWED_USER_IDS:
await message.answer("⛔️ Нет доступа")
return False
return True
# Подключение к MikroTik API
def get_mt_api():
return connect(username=ROUTER_USER, password=ROUTER_PASSWORD, host=ROUTER_HOST)
@dp.message(Command("start"))
async def start_cmd(msg: types.Message):
if not await check_user(msg):
return
kb = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Клиенты Wi-Fi", callback_data="clients")]
])
await msg.answer("Добро пожаловать!", reply_markup=kb)
@dp.callback_query(lambda c: c.data == "clients")
async def show_clients(callback: types.CallbackQuery):
if callback.from_user.id not in ALLOWED_USER_IDS:
await callback.answer("⛔️ Нет доступа", show_alert=True)
return
try:
print("[DEBUG] Подключаюсь к MikroTik API...")
api = get_mt_api()
print("[DEBUG] Получаю список клиентов через /interface/wifi/registration-table...")
# Правильный путь для RouterOS 7.x
clients = list(api.path('interface', 'wifi', 'registration-table'))
print(f"[DEBUG] Найдено клиентов: {len(clients)}")
if not clients:
await callback.message.answer("Нет подключённых клиентов Wi-Fi.")
return
text = "<b>Wi-Fi клиенты:</b>\n"
for i, c in enumerate(clients):
print(f"[DEBUG] Обрабатываю клиента {i}: type={type(c)}")
# Конвертируем Path объект в словарь
client_data = dict(c)
print(f"[DEBUG] Данные клиента: {client_data}")
# Используем правильные поля из RouterOS 7.x
mac = client_data.get('mac-address', '-')
interface = client_data.get('interface', '-')
ssid = client_data.get('ssid', '-')
uptime = client_data.get('uptime', '-')
signal = client_data.get('signal', '-')
band = client_data.get('band', '-')
text += f"\n<b>MAC:</b> {mac}\n"
text += f"<b>Интерфейс:</b> {interface}\n"
text += f"<b>SSID:</b> {ssid}\n"
text += f"<b>Uptime:</b> {uptime}\n"
text += f"<b>Сигнал:</b> {signal} | <b>Диапазон:</b> {band}\n"
print("[DEBUG] Отправляю сообщение...")
await callback.message.answer(text, parse_mode="HTML")
except TrapError as e:
print(f"[ERROR] TrapError: {e}")
await callback.message.answer(f"Ошибка API MikroTik: {e}")
except Exception as e:
print(f"[ERROR] Exception: {e}")
print(f"[ERROR] Exception type: {type(e)}")
import traceback
traceback.print_exc()
await callback.message.answer(f"Ошибка: {e}")
# --- FastAPI для fetch-уведомлений ---
app = FastAPI()
@app.get("/health")
async def health():
return {"status": "ok", "service": "mikrotik-telegram-bot"}
@app.get("/event")
async def event(request: Request):
mac = request.query_params.get('mac')
ip = request.query_params.get('ip')
event_type = request.query_params.get('type')
# Можно добавить секрет для безопасности:
# if request.query_params.get('secret') != os.getenv('MT_SECRET'): return 'forbidden'
text = f"MikroTik: событие {event_type}\nMAC: {mac}\nIP: {ip}"
# Сохраняем клиента и сессию (fetch)
await upsert_client(mac, name=None)
await start_session(mac, ip, source='fetch')
# Отправляем уведомления только если есть разрешённые пользователи
if ALLOWED_USER_IDS:
for uid in ALLOWED_USER_IDS:
await bot.send_message(uid, text)
return {"status": "ok"}
# --- Периодический polling MikroTik API для сбора статистики ---
async def poll_mikrotik():
known_clients = {}
while True:
try:
api = get_mt_api()
# Используем правильный путь для RouterOS 7.x
clients = list(api.path('interface', 'wifi', 'registration-table'))
current_macs = set()
for c in clients:
# Конвертируем Path объект в словарь
client_data = dict(c)
mac = client_data.get('mac-address')
# В RouterOS 7.x WiFi нет поля last-ip, используем другие поля
ip = client_data.get('last-ip', 'N/A') # возможно нет этого поля
name = client_data.get('host-name') or client_data.get('comment')
# В WiFi registration-table нет rx-bytes/tx-bytes, используем 0
rx = 0
tx = 0
await upsert_client(mac, name=name)
await start_session(mac, ip, rx, tx, source='api')
await update_session(mac, rx, tx)
current_macs.add(mac)
# Закрываем сессии для клиентов, которых больше нет
for mac in list(known_clients):
if mac not in current_macs:
await close_session(mac)
known_clients.pop(mac)
known_clients = {mac: True for mac in current_macs}
except Exception as e:
print(f"[poll_mikrotik] Ошибка: {e}")
await asyncio.sleep(30) # интервал опроса
# --- Одновременный запуск бота, FastAPI и polling ---
async def main():
await init_db()
bot_task = asyncio.create_task(dp.start_polling(bot))
config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info")
server = uvicorn.Server(config)
api_task = asyncio.create_task(server.serve())
poll_task = asyncio.create_task(poll_mikrotik())
await asyncio.gather(bot_task, api_task, poll_task)
# --- Команда /history с постраничностью ---
@dp.message(Command("history"))
async def history_cmd(msg: types.Message):
if not await check_user(msg):
return
await send_history_page(msg, period_days=30, page=1)
@dp.callback_query(lambda c: c.data.startswith("history_"))
async def history_period(callback: types.CallbackQuery):
if not await check_user_callback(callback):
return
parts = callback.data.split("_")
days = int(parts[1])
page = int(parts[2]) if len(parts) > 2 else 1
await send_history_page(callback.message, period_days=days, page=page, edit=True, callback=callback)
async def send_history_page(message, period_days, page, edit=False, callback=None):
page_size = 10
rows, total = await get_history(period_days=period_days, page=page, page_size=page_size)
max_page = max(1, (total + page_size - 1) // page_size)
text = f"<b>История подключений за {period_days} дн. (стр. {page}/{max_page}):</b>\n"
if not rows:
text += "Нет подключений за выбранный период."
else:
for row in rows:
mac = row[1]
ip = row[2]
start = row[3]
end = row[4] or "..."
rx = row[5] or 0
tx = row[6] or 0
name = row[-2] or row[-1] or mac
text += f"\n<b>{name}</b> ({mac})\nIP: {ip}\nВремя: {start}{end}\nRX: {rx//1024} KB, TX: {tx//1024} KB\n"
# Кнопки: Назад, Обновить, Вперёд (одна строка)
buttons = []
if page > 1:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data=f"history_{period_days}_{page-1}"))
else:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data="noop", disabled=True))
buttons.append(InlineKeyboardButton("Обновить", callback_data=f"history_{period_days}_{page}"))
if page < max_page:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data=f"history_{period_days}_{page+1}"))
else:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data="noop", disabled=True))
kb = InlineKeyboardMarkup(inline_keyboard=[buttons])
if edit and callback:
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=kb)
await callback.answer()
else:
await message.answer(text, parse_mode="HTML", reply_markup=kb)
# --- Команда /clients с постраничностью ---
@dp.message(Command("clients"))
async def clients_cmd(msg: types.Message):
if not await check_user(msg):
return
await send_clients_page(msg, page=1)
@dp.callback_query(lambda c: c.data.startswith("clients_"))
async def clients_page_cb(callback: types.CallbackQuery):
if not await check_user_callback(callback):
return
parts = callback.data.split("_")
page = int(parts[1]) if len(parts) > 1 else 1
await send_clients_page(callback.message, page=page, edit=True, callback=callback)
async def send_clients_page(message, page, edit=False, callback=None):
page_size = 10
rows, total = await get_clients(page=page, page_size=page_size)
max_page = max(1, (total + page_size - 1) // page_size)
text = f"<b>Устройства (стр. {page}/{max_page}):</b>\n"
if not rows:
text += "Нет известных клиентов."
else:
for row in rows:
mac = row[0]
name = row[1] or mac
custom = row[2]
last_seen = row[3]
text += f"\n<b>{custom or name}</b> ({mac})\nПоследняя активность: {last_seen}\n"
# Кнопки: Назад, Обновить, Вперёд (одна строка)
buttons = []
if page > 1:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data=f"clients_{page-1}"))
else:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data="noop", disabled=True))
buttons.append(InlineKeyboardButton("Обновить", callback_data=f"clients_{page}"))
if page < max_page:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data=f"clients_{page+1}"))
else:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data="noop", disabled=True))
kb = InlineKeyboardMarkup(inline_keyboard=[buttons])
if edit and callback:
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=kb)
await callback.answer()
else:
await message.answer(text, parse_mode="HTML", reply_markup=kb)
# --- Обработчик для неактивных кнопок ---
@dp.callback_query(lambda c: c.data == "noop")
async def noop_callback(callback: types.CallbackQuery):
await callback.answer()
# --- Команда /stats с постраничностью ---
@dp.message(Command("stats"))
async def stats_cmd(msg: types.Message):
if not await check_user(msg):
return
await send_stats_page(msg, period_days=30, page=1)
@dp.callback_query(lambda c: c.data.startswith("stats_"))
async def stats_period(callback: types.CallbackQuery):
if not await check_user_callback(callback):
return
parts = callback.data.split("_")
days = int(parts[1])
page = int(parts[2]) if len(parts) > 2 else 1
await send_stats_page(callback.message, period_days=days, page=page, edit=True, callback=callback)
async def send_stats_page(message, period_days, page, edit=False, callback=None):
page_size = 10
rows, total = await get_stats(period_days=period_days, page=page, page_size=page_size)
max_page = max(1, (total + page_size - 1) // page_size)
text = f"<b>Топ по трафику за {period_days} дн. (стр. {page}/{max_page}):</b>\n"
if not rows:
text += "Нет данных за выбранный период."
else:
for row in rows:
mac = row[0]
name = row[1] or row[2] or mac
rx = row[3] or 0
tx = row[4] or 0
count = row[5]
text += f"\n<b>{name}</b> ({mac})\nСессий: {count}\nRX: {rx//1024} KB, TX: {tx//1024} KB, Всего: {(rx+tx)//1024} KB\n"
# Кнопки: Назад, Обновить, Вперёд (одна строка)
buttons = []
if page > 1:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data=f"stats_{period_days}_{page-1}"))
else:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data="noop", disabled=True))
buttons.append(InlineKeyboardButton("Обновить", callback_data=f"stats_{period_days}_{page}"))
if page < max_page:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data=f"stats_{period_days}_{page+1}"))
else:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data="noop", disabled=True))
kb = InlineKeyboardMarkup(inline_keyboard=[buttons])
if edit and callback:
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=kb)
await callback.answer()
else:
await message.answer(text, parse_mode="HTML", reply_markup=kb)
# Добавляем вспомогательную функцию для проверки callback'ов
async def check_user_callback(callback: types.CallbackQuery) -> bool:
# Если список пуст, разрешаем всем
if not ALLOWED_USER_IDS:
return True
if callback.from_user.id not in ALLOWED_USER_IDS:
await callback.answer("⛔️ Нет доступа", show_alert=True)
return False
return True
if __name__ == "__main__":
asyncio.run(main())