import os
import asyncio
from aiogram import Bot, Dispatcher, types
from aiogram.filters import Command
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from dotenv import load_dotenv
from librouteros import connect
from librouteros.exceptions import TrapError
from fastapi import FastAPI, Request
import uvicorn
from db import init_db, upsert_client, start_session, update_session, close_session, get_history, get_clients, get_stats
from vault_client import VaultClient
load_dotenv()
# Получаем конфигурацию из Vault или environment variables
vault = VaultClient()
config = vault.get_config()
# Используем правильные имена переменных
BOT_TOKEN = config.get('BOT_TOKEN') or os.getenv("BOT_TOKEN")
ROUTER_HOST = config.get('ROUTER_HOST') or os.getenv("ROUTER_HOST")
ROUTER_USER = config.get('ROUTER_USER') or os.getenv("ROUTER_USER")
ROUTER_PASSWORD = config.get('ROUTER_PASSWORD') or os.getenv("ROUTER_PASSWORD")
# Безопасная обработка ALLOWED_USER_IDS
allowed_ids_str = os.getenv("ALLOWED_USER_IDS", "")
if allowed_ids_str.strip():
ALLOWED_USER_IDS = set(map(int, allowed_ids_str.split(",")))
else:
# Если не установлено, используем пустое множество или значение по умолчанию
ALLOWED_USER_IDS = set()
print("⚠️ ALLOWED_USER_IDS не установлено - доступ для всех пользователей!")
bot = Bot(token=BOT_TOKEN)
dp = Dispatcher()
# Авторизация по user_id
async def check_user(message: types.Message) -> bool:
# Если список пуст, разрешаем всем (для тестирования)
if not ALLOWED_USER_IDS:
return True
if message.from_user.id not in ALLOWED_USER_IDS:
await message.answer("⛔️ Нет доступа")
return False
return True
# Подключение к MikroTik API
def get_mt_api():
return connect(username=ROUTER_USER, password=ROUTER_PASSWORD, host=ROUTER_HOST)
@dp.message(Command("start"))
async def start_cmd(msg: types.Message):
if not await check_user(msg):
return
kb = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Клиенты Wi-Fi", callback_data="clients")]
])
await msg.answer("Добро пожаловать!", reply_markup=kb)
@dp.callback_query(lambda c: c.data == "clients")
async def show_clients(callback: types.CallbackQuery):
if callback.from_user.id not in ALLOWED_USER_IDS:
await callback.answer("⛔️ Нет доступа", show_alert=True)
return
try:
print("[DEBUG] Подключаюсь к MikroTik API...")
api = get_mt_api()
print("[DEBUG] Получаю список клиентов через /interface/wifi/registration-table...")
# Правильный путь для RouterOS 7.x
clients = list(api.path('interface', 'wifi', 'registration-table'))
print(f"[DEBUG] Найдено клиентов: {len(clients)}")
if not clients:
await callback.message.answer("Нет подключённых клиентов Wi-Fi.")
return
text = "Wi-Fi клиенты:\n"
for i, c in enumerate(clients):
print(f"[DEBUG] Обрабатываю клиента {i}: type={type(c)}")
# Конвертируем Path объект в словарь
client_data = dict(c)
print(f"[DEBUG] Данные клиента: {client_data}")
# Используем правильные поля из RouterOS 7.x
mac = client_data.get('mac-address', '-')
interface = client_data.get('interface', '-')
ssid = client_data.get('ssid', '-')
uptime = client_data.get('uptime', '-')
signal = client_data.get('signal', '-')
band = client_data.get('band', '-')
text += f"\nMAC: {mac}\n"
text += f"Интерфейс: {interface}\n"
text += f"SSID: {ssid}\n"
text += f"Uptime: {uptime}\n"
text += f"Сигнал: {signal} | Диапазон: {band}\n"
print("[DEBUG] Отправляю сообщение...")
await callback.message.answer(text, parse_mode="HTML")
except TrapError as e:
print(f"[ERROR] TrapError: {e}")
await callback.message.answer(f"Ошибка API MikroTik: {e}")
except Exception as e:
print(f"[ERROR] Exception: {e}")
print(f"[ERROR] Exception type: {type(e)}")
import traceback
traceback.print_exc()
await callback.message.answer(f"Ошибка: {e}")
# --- FastAPI для fetch-уведомлений ---
app = FastAPI()
@app.get("/health")
async def health():
return {"status": "ok", "service": "mikrotik-telegram-bot"}
@app.get("/event")
async def event(request: Request):
mac = request.query_params.get('mac')
ip = request.query_params.get('ip')
event_type = request.query_params.get('type')
# Можно добавить секрет для безопасности:
# if request.query_params.get('secret') != os.getenv('MT_SECRET'): return 'forbidden'
text = f"MikroTik: событие {event_type}\nMAC: {mac}\nIP: {ip}"
# Сохраняем клиента и сессию (fetch)
await upsert_client(mac, name=None)
await start_session(mac, ip, source='fetch')
# Отправляем уведомления только если есть разрешённые пользователи
if ALLOWED_USER_IDS:
for uid in ALLOWED_USER_IDS:
await bot.send_message(uid, text)
return {"status": "ok"}
# --- Периодический polling MikroTik API для сбора статистики ---
async def poll_mikrotik():
known_clients = {}
while True:
try:
api = get_mt_api()
# Используем правильный путь для RouterOS 7.x
clients = list(api.path('interface', 'wifi', 'registration-table'))
current_macs = set()
for c in clients:
# Конвертируем Path объект в словарь
client_data = dict(c)
mac = client_data.get('mac-address')
# В RouterOS 7.x WiFi нет поля last-ip, используем другие поля
ip = client_data.get('last-ip', 'N/A') # возможно нет этого поля
name = client_data.get('host-name') or client_data.get('comment')
# В WiFi registration-table нет rx-bytes/tx-bytes, используем 0
rx = 0
tx = 0
await upsert_client(mac, name=name)
await start_session(mac, ip, rx, tx, source='api')
await update_session(mac, rx, tx)
current_macs.add(mac)
# Закрываем сессии для клиентов, которых больше нет
for mac in list(known_clients):
if mac not in current_macs:
await close_session(mac)
known_clients.pop(mac)
known_clients = {mac: True for mac in current_macs}
except Exception as e:
print(f"[poll_mikrotik] Ошибка: {e}")
await asyncio.sleep(30) # интервал опроса
# --- Одновременный запуск бота, FastAPI и polling ---
async def main():
await init_db()
bot_task = asyncio.create_task(dp.start_polling(bot))
config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info")
server = uvicorn.Server(config)
api_task = asyncio.create_task(server.serve())
poll_task = asyncio.create_task(poll_mikrotik())
await asyncio.gather(bot_task, api_task, poll_task)
# --- Команда /history с постраничностью ---
@dp.message(Command("history"))
async def history_cmd(msg: types.Message):
if not await check_user(msg):
return
await send_history_page(msg, period_days=30, page=1)
@dp.callback_query(lambda c: c.data.startswith("history_"))
async def history_period(callback: types.CallbackQuery):
if not await check_user_callback(callback):
return
parts = callback.data.split("_")
days = int(parts[1])
page = int(parts[2]) if len(parts) > 2 else 1
await send_history_page(callback.message, period_days=days, page=page, edit=True, callback=callback)
async def send_history_page(message, period_days, page, edit=False, callback=None):
page_size = 10
rows, total = await get_history(period_days=period_days, page=page, page_size=page_size)
max_page = max(1, (total + page_size - 1) // page_size)
text = f"История подключений за {period_days} дн. (стр. {page}/{max_page}):\n"
if not rows:
text += "Нет подключений за выбранный период."
else:
for row in rows:
mac = row[1]
ip = row[2]
start = row[3]
end = row[4] or "..."
rx = row[5] or 0
tx = row[6] or 0
name = row[-2] or row[-1] or mac
text += f"\n{name} ({mac})\nIP: {ip}\nВремя: {start} — {end}\nRX: {rx//1024} KB, TX: {tx//1024} KB\n"
# Кнопки: Назад, Обновить, Вперёд (одна строка)
buttons = []
if page > 1:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data=f"history_{period_days}_{page-1}"))
else:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data="noop", disabled=True))
buttons.append(InlineKeyboardButton("Обновить", callback_data=f"history_{period_days}_{page}"))
if page < max_page:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data=f"history_{period_days}_{page+1}"))
else:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data="noop", disabled=True))
kb = InlineKeyboardMarkup(inline_keyboard=[buttons])
if edit and callback:
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=kb)
await callback.answer()
else:
await message.answer(text, parse_mode="HTML", reply_markup=kb)
# --- Команда /clients с постраничностью ---
@dp.message(Command("clients"))
async def clients_cmd(msg: types.Message):
if not await check_user(msg):
return
await send_clients_page(msg, page=1)
@dp.callback_query(lambda c: c.data.startswith("clients_"))
async def clients_page_cb(callback: types.CallbackQuery):
if not await check_user_callback(callback):
return
parts = callback.data.split("_")
page = int(parts[1]) if len(parts) > 1 else 1
await send_clients_page(callback.message, page=page, edit=True, callback=callback)
async def send_clients_page(message, page, edit=False, callback=None):
page_size = 10
rows, total = await get_clients(page=page, page_size=page_size)
max_page = max(1, (total + page_size - 1) // page_size)
text = f"Устройства (стр. {page}/{max_page}):\n"
if not rows:
text += "Нет известных клиентов."
else:
for row in rows:
mac = row[0]
name = row[1] or mac
custom = row[2]
last_seen = row[3]
text += f"\n{custom or name} ({mac})\nПоследняя активность: {last_seen}\n"
# Кнопки: Назад, Обновить, Вперёд (одна строка)
buttons = []
if page > 1:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data=f"clients_{page-1}"))
else:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data="noop", disabled=True))
buttons.append(InlineKeyboardButton("Обновить", callback_data=f"clients_{page}"))
if page < max_page:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data=f"clients_{page+1}"))
else:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data="noop", disabled=True))
kb = InlineKeyboardMarkup(inline_keyboard=[buttons])
if edit and callback:
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=kb)
await callback.answer()
else:
await message.answer(text, parse_mode="HTML", reply_markup=kb)
# --- Обработчик для неактивных кнопок ---
@dp.callback_query(lambda c: c.data == "noop")
async def noop_callback(callback: types.CallbackQuery):
await callback.answer()
# --- Команда /stats с постраничностью ---
@dp.message(Command("stats"))
async def stats_cmd(msg: types.Message):
if not await check_user(msg):
return
await send_stats_page(msg, period_days=30, page=1)
@dp.callback_query(lambda c: c.data.startswith("stats_"))
async def stats_period(callback: types.CallbackQuery):
if not await check_user_callback(callback):
return
parts = callback.data.split("_")
days = int(parts[1])
page = int(parts[2]) if len(parts) > 2 else 1
await send_stats_page(callback.message, period_days=days, page=page, edit=True, callback=callback)
async def send_stats_page(message, period_days, page, edit=False, callback=None):
page_size = 10
rows, total = await get_stats(period_days=period_days, page=page, page_size=page_size)
max_page = max(1, (total + page_size - 1) // page_size)
text = f"Топ по трафику за {period_days} дн. (стр. {page}/{max_page}):\n"
if not rows:
text += "Нет данных за выбранный период."
else:
for row in rows:
mac = row[0]
name = row[1] or row[2] or mac
rx = row[3] or 0
tx = row[4] or 0
count = row[5]
text += f"\n{name} ({mac})\nСессий: {count}\nRX: {rx//1024} KB, TX: {tx//1024} KB, Всего: {(rx+tx)//1024} KB\n"
# Кнопки: Назад, Обновить, Вперёд (одна строка)
buttons = []
if page > 1:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data=f"stats_{period_days}_{page-1}"))
else:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data="noop", disabled=True))
buttons.append(InlineKeyboardButton("Обновить", callback_data=f"stats_{period_days}_{page}"))
if page < max_page:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data=f"stats_{period_days}_{page+1}"))
else:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data="noop", disabled=True))
kb = InlineKeyboardMarkup(inline_keyboard=[buttons])
if edit and callback:
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=kb)
await callback.answer()
else:
await message.answer(text, parse_mode="HTML", reply_markup=kb)
# Добавляем вспомогательную функцию для проверки callback'ов
async def check_user_callback(callback: types.CallbackQuery) -> bool:
# Если список пуст, разрешаем всем
if not ALLOWED_USER_IDS:
return True
if callback.from_user.id not in ALLOWED_USER_IDS:
await callback.answer("⛔️ Нет доступа", show_alert=True)
return False
return True
if __name__ == "__main__":
asyncio.run(main())