mikrotik-bot/bot.py

317 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import asyncio
from aiogram import Bot, Dispatcher, types
from aiogram.filters import Command
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from dotenv import load_dotenv
from librouteros import connect
from librouteros.exceptions import TrapError
from fastapi import FastAPI, Request
import uvicorn
from db import init_db, upsert_client, start_session, update_session, close_session, get_history, get_clients, get_stats
load_dotenv()
TG_BOT_TOKEN = os.getenv("TG_BOT_TOKEN")
MT_API_HOST = os.getenv("MT_API_HOST")
MT_API_USER = os.getenv("MT_API_USER")
MT_API_PASS = os.getenv("MT_API_PASS")
ALLOWED_USER_IDS = set(map(int, os.getenv("ALLOWED_USER_IDS", "").split(",")))
bot = Bot(token=TG_BOT_TOKEN)
dp = Dispatcher()
# Авторизация по user_id
async def check_user(message: types.Message) -> bool:
if message.from_user.id not in ALLOWED_USER_IDS:
await message.answer("⛔️ Нет доступа")
return False
return True
# Подключение к MikroTik API
def get_mt_api():
return connect(username=MT_API_USER, password=MT_API_PASS, host=MT_API_HOST)
@dp.message(Command("start"))
async def start_cmd(msg: types.Message):
if not await check_user(msg):
return
kb = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="Клиенты Wi-Fi", callback_data="clients")]
])
await msg.answer("Добро пожаловать!", reply_markup=kb)
@dp.callback_query(lambda c: c.data == "clients")
async def show_clients(callback: types.CallbackQuery):
if callback.from_user.id not in ALLOWED_USER_IDS:
await callback.answer("⛔️ Нет доступа", show_alert=True)
return
try:
print("[DEBUG] Подключаюсь к MikroTik API...")
api = get_mt_api()
print("[DEBUG] Получаю список клиентов через /interface/wifi/registration-table...")
# Правильный путь для RouterOS 7.x
clients = list(api.path('interface', 'wifi', 'registration-table'))
print(f"[DEBUG] Найдено клиентов: {len(clients)}")
if not clients:
await callback.message.answer("Нет подключённых клиентов Wi-Fi.")
return
text = "<b>Wi-Fi клиенты:</b>\n"
for i, c in enumerate(clients):
print(f"[DEBUG] Обрабатываю клиента {i}: type={type(c)}")
# Конвертируем Path объект в словарь
client_data = dict(c)
print(f"[DEBUG] Данные клиента: {client_data}")
# Используем правильные поля из RouterOS 7.x
mac = client_data.get('mac-address', '-')
interface = client_data.get('interface', '-')
ssid = client_data.get('ssid', '-')
uptime = client_data.get('uptime', '-')
signal = client_data.get('signal', '-')
band = client_data.get('band', '-')
text += f"\n<b>MAC:</b> {mac}\n"
text += f"<b>Интерфейс:</b> {interface}\n"
text += f"<b>SSID:</b> {ssid}\n"
text += f"<b>Uptime:</b> {uptime}\n"
text += f"<b>Сигнал:</b> {signal} | <b>Диапазон:</b> {band}\n"
print("[DEBUG] Отправляю сообщение...")
await callback.message.answer(text, parse_mode="HTML")
except TrapError as e:
print(f"[ERROR] TrapError: {e}")
await callback.message.answer(f"Ошибка API MikroTik: {e}")
except Exception as e:
print(f"[ERROR] Exception: {e}")
print(f"[ERROR] Exception type: {type(e)}")
import traceback
traceback.print_exc()
await callback.message.answer(f"Ошибка: {e}")
# --- FastAPI для fetch-уведомлений ---
app = FastAPI()
@app.get("/health")
async def health():
return {"status": "ok", "service": "mikrotik-telegram-bot"}
@app.get("/event")
async def event(request: Request):
mac = request.query_params.get('mac')
ip = request.query_params.get('ip')
event_type = request.query_params.get('type')
# Можно добавить секрет для безопасности:
# if request.query_params.get('secret') != os.getenv('MT_SECRET'): return 'forbidden'
text = f"MikroTik: событие {event_type}\nMAC: {mac}\nIP: {ip}"
# Сохраняем клиента и сессию (fetch)
await upsert_client(mac, name=None)
await start_session(mac, ip, source='fetch')
for uid in ALLOWED_USER_IDS:
await bot.send_message(uid, text)
return {"status": "ok"}
# --- Периодический polling MikroTik API для сбора статистики ---
async def poll_mikrotik():
known_clients = {}
while True:
try:
api = get_mt_api()
# Используем правильный путь для RouterOS 7.x
clients = list(api.path('interface', 'wifi', 'registration-table'))
current_macs = set()
for c in clients:
# Конвертируем Path объект в словарь
client_data = dict(c)
mac = client_data.get('mac-address')
# В RouterOS 7.x WiFi нет поля last-ip, используем другие поля
ip = client_data.get('last-ip', 'N/A') # возможно нет этого поля
name = client_data.get('host-name') or client_data.get('comment')
# В WiFi registration-table нет rx-bytes/tx-bytes, используем 0
rx = 0
tx = 0
await upsert_client(mac, name=name)
await start_session(mac, ip, rx, tx, source='api')
await update_session(mac, rx, tx)
current_macs.add(mac)
# Закрываем сессии для клиентов, которых больше нет
for mac in list(known_clients):
if mac not in current_macs:
await close_session(mac)
known_clients.pop(mac)
known_clients = {mac: True for mac in current_macs}
except Exception as e:
print(f"[poll_mikrotik] Ошибка: {e}")
await asyncio.sleep(30) # интервал опроса
# --- Одновременный запуск бота, FastAPI и polling ---
async def main():
await init_db()
bot_task = asyncio.create_task(dp.start_polling(bot))
config = uvicorn.Config(app, host="0.0.0.0", port=8000, log_level="info")
server = uvicorn.Server(config)
api_task = asyncio.create_task(server.serve())
poll_task = asyncio.create_task(poll_mikrotik())
await asyncio.gather(bot_task, api_task, poll_task)
# --- Команда /history с постраничностью ---
@dp.message(Command("history"))
async def history_cmd(msg: types.Message):
if not await check_user(msg):
return
await send_history_page(msg, period_days=30, page=1)
@dp.callback_query(lambda c: c.data.startswith("history_"))
async def history_period(callback: types.CallbackQuery):
if callback.from_user.id not in ALLOWED_USER_IDS:
await callback.answer("⛔️ Нет доступа", show_alert=True)
return
parts = callback.data.split("_")
days = int(parts[1])
page = int(parts[2]) if len(parts) > 2 else 1
await send_history_page(callback.message, period_days=days, page=page, edit=True, callback=callback)
async def send_history_page(message, period_days, page, edit=False, callback=None):
page_size = 10
rows, total = await get_history(period_days=period_days, page=page, page_size=page_size)
max_page = max(1, (total + page_size - 1) // page_size)
text = f"<b>История подключений за {period_days} дн. (стр. {page}/{max_page}):</b>\n"
if not rows:
text += "Нет подключений за выбранный период."
else:
for row in rows:
mac = row[1]
ip = row[2]
start = row[3]
end = row[4] or "..."
rx = row[5] or 0
tx = row[6] or 0
name = row[-2] or row[-1] or mac
text += f"\n<b>{name}</b> ({mac})\nIP: {ip}\nВремя: {start}{end}\nRX: {rx//1024} KB, TX: {tx//1024} KB\n"
# Кнопки: Назад, Обновить, Вперёд (одна строка)
buttons = []
if page > 1:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data=f"history_{period_days}_{page-1}"))
else:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data="noop", disabled=True))
buttons.append(InlineKeyboardButton("Обновить", callback_data=f"history_{period_days}_{page}"))
if page < max_page:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data=f"history_{period_days}_{page+1}"))
else:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data="noop", disabled=True))
kb = InlineKeyboardMarkup(inline_keyboard=[buttons])
if edit and callback:
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=kb)
await callback.answer()
else:
await message.answer(text, parse_mode="HTML", reply_markup=kb)
# --- Команда /clients с постраничностью ---
@dp.message(Command("clients"))
async def clients_cmd(msg: types.Message):
if not await check_user(msg):
return
await send_clients_page(msg, page=1)
@dp.callback_query(lambda c: c.data.startswith("clients_"))
async def clients_page_cb(callback: types.CallbackQuery):
if callback.from_user.id not in ALLOWED_USER_IDS:
await callback.answer("⛔️ Нет доступа", show_alert=True)
return
parts = callback.data.split("_")
page = int(parts[1]) if len(parts) > 1 else 1
await send_clients_page(callback.message, page=page, edit=True, callback=callback)
async def send_clients_page(message, page, edit=False, callback=None):
page_size = 10
rows, total = await get_clients(page=page, page_size=page_size)
max_page = max(1, (total + page_size - 1) // page_size)
text = f"<b>Устройства (стр. {page}/{max_page}):</b>\n"
if not rows:
text += "Нет известных клиентов."
else:
for row in rows:
mac = row[0]
name = row[1] or mac
custom = row[2]
last_seen = row[3]
text += f"\n<b>{custom or name}</b> ({mac})\nПоследняя активность: {last_seen}\n"
# Кнопки: Назад, Обновить, Вперёд (одна строка)
buttons = []
if page > 1:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data=f"clients_{page-1}"))
else:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data="noop", disabled=True))
buttons.append(InlineKeyboardButton("Обновить", callback_data=f"clients_{page}"))
if page < max_page:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data=f"clients_{page+1}"))
else:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data="noop", disabled=True))
kb = InlineKeyboardMarkup(inline_keyboard=[buttons])
if edit and callback:
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=kb)
await callback.answer()
else:
await message.answer(text, parse_mode="HTML", reply_markup=kb)
# --- Обработчик для неактивных кнопок ---
@dp.callback_query(lambda c: c.data == "noop")
async def noop_callback(callback: types.CallbackQuery):
await callback.answer()
# --- Команда /stats с постраничностью ---
@dp.message(Command("stats"))
async def stats_cmd(msg: types.Message):
if not await check_user(msg):
return
await send_stats_page(msg, period_days=30, page=1)
@dp.callback_query(lambda c: c.data.startswith("stats_"))
async def stats_period(callback: types.CallbackQuery):
if callback.from_user.id not in ALLOWED_USER_IDS:
await callback.answer("⛔️ Нет доступа", show_alert=True)
return
parts = callback.data.split("_")
days = int(parts[1])
page = int(parts[2]) if len(parts) > 2 else 1
await send_stats_page(callback.message, period_days=days, page=page, edit=True, callback=callback)
async def send_stats_page(message, period_days, page, edit=False, callback=None):
page_size = 10
rows, total = await get_stats(period_days=period_days, page=page, page_size=page_size)
max_page = max(1, (total + page_size - 1) // page_size)
text = f"<b>Топ по трафику за {period_days} дн. (стр. {page}/{max_page}):</b>\n"
if not rows:
text += "Нет данных за выбранный период."
else:
for row in rows:
mac = row[0]
name = row[1] or row[2] or mac
rx = row[3] or 0
tx = row[4] or 0
count = row[5]
text += f"\n<b>{name}</b> ({mac})\nСессий: {count}\nRX: {rx//1024} KB, TX: {tx//1024} KB, Всего: {(rx+tx)//1024} KB\n"
# Кнопки: Назад, Обновить, Вперёд (одна строка)
buttons = []
if page > 1:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data=f"stats_{period_days}_{page-1}"))
else:
buttons.append(InlineKeyboardButton("⏪ Назад", callback_data="noop", disabled=True))
buttons.append(InlineKeyboardButton("Обновить", callback_data=f"stats_{period_days}_{page}"))
if page < max_page:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data=f"stats_{period_days}_{page+1}"))
else:
buttons.append(InlineKeyboardButton("Вперёд ⏩", callback_data="noop", disabled=True))
kb = InlineKeyboardMarkup(inline_keyboard=[buttons])
if edit and callback:
await callback.message.edit_text(text, parse_mode="HTML", reply_markup=kb)
await callback.answer()
else:
await message.answer(text, parse_mode="HTML", reply_markup=kb)
if __name__ == "__main__":
asyncio.run(main())