DevCore is the best
Приветствую всех форумчан!
В данном гайде я покажу как использовать Middleware в Aiogram 2
Сегодня вы научитесь делать блокировку пользователя через админку
Разберем созданный мною класс Users в db_api/tables/users_table.py
id - обычная нумерация записи в бд
user_id- id пользователя Telegram, по нему мы будем определять заблокирован ли юзер или нет
is_blocked- boolean значение (True, False) при регистрации по умолчанию пользователь не заблокирован
Теперь создайте папку middlewares, в которой
вам следует создать файл ban_middleware.py
Рассмотрим следующий код:
Итак. Мы назвали класс UserBannedMiddleware
Представим, что пользователь уже написал сообщение боту.
И перед тем, как его обработать (у нас это хэндлер команды /start),
мы будем проверять забанен юзер или нет, чекните функцию on_process_message
Проще говоря, сперва мы берем юзера из бд. Если он есть и он забанен, то отменяем хэндлер
То же самые делается и в функциях on_process_callback_query и on_process_inline_query
Эту функцию мы должны импортировать в
from middlewares.ban_middleware import setup_ban_middleware
Там же создайте ф-цию setup_middlewares,
в которую вам необходимо поместить setup_ban_middleware, с параметром dp (Dispatcher)
А также вот, как должна выглядить ф-ция on_startup
Теперь давайте создадим ф-цию блокировки юзера через админку
Его мы импортируем в
from middlewares.ban_middleware import setup_ban_middleware
и добавляем в ф-цию register_handlers
На этом все! Всем спасибо за внимание!
Буду рад, если оцените статью
В данном гайде я покажу как использовать Middleware в Aiogram 2
Сегодня вы научитесь делать блокировку пользователя через админку
Специально для вас я подготовил начальную структуру проекта
с уже созданной схемой таблицы через SQLAlchemy и командой /start:
Вот как она выглядит:
с уже созданной схемой таблицы через SQLAlchemy и командой /start:
You must be registered for see links
Вот как она выглядит:
You must be registered for see images
Разберем созданный мною класс Users в db_api/tables/users_table.py
Python:
from sqlalchemy import Column, Integer, BigInteger, Boolean
from db_api.database import Base
class Users(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
user_id = Column(BigInteger, unique=True)
is_blocked = Column(Boolean, default=False)
user_id- id пользователя Telegram, по нему мы будем определять заблокирован ли юзер или нет
is_blocked- boolean значение (True, False) при регистрации по умолчанию пользователь не заблокирован
Теперь создайте папку middlewares, в которой
вам следует создать файл ban_middleware.py
Рассмотрим следующий код:
Python:
from aiogram import Dispatcher
from aiogram.dispatcher.handler import CancelHandler # отменяет вызов хэндлера
from aiogram.dispatcher.middlewares import BaseMiddleware # класс Middleware от Aiogram
from aiogram.types import Message, CallbackQuery, InlineQuery
from db_api.commands.users_commands import select_user # берем пользователя из бд
Python:
class UserBannedMiddleware(BaseMiddleware):
async def on_process_message(self, message: Message, data: dict):
user = await select_user(message.from_user.id)
if user and user.is_blocked:
await message.answer(
'<b> Ваш аккаунт заблокирован!</b>'
)
raise CancelHandler
async def on_process_callback_query(self, call: CallbackQuery, data: dict):
user = await select_user(call.from_user.id)
if user and user.is_blocked:
await call.answer(
' Ваш аккаунт заблокирован!',
show_alert=True
)
raise CancelHandler
async def on_process_inline_query(self, query: InlineQuery, data: dict):
user = await select_user(query.from_user.id)
if user and user.is_blocked:
raise CancelHandler
Итак. Мы назвали класс UserBannedMiddleware
Представим, что пользователь уже написал сообщение боту.
И перед тем, как его обработать (у нас это хэндлер команды /start),
мы будем проверять забанен юзер или нет, чекните функцию on_process_message
Проще говоря, сперва мы берем юзера из бд. Если он есть и он забанен, то отменяем хэндлер
То же самые делается и в функциях on_process_callback_query и on_process_inline_query
Python:
def setup_ban_middleware(dp: Dispatcher):
dp.middleware.setup(UserBannedMiddleware())
You must be registered for see links
.from middlewares.ban_middleware import setup_ban_middleware
Там же создайте ф-цию setup_middlewares,
в которую вам необходимо поместить setup_ban_middleware, с параметром dp (Dispatcher)
Python:
def setup_middlewares(dp: Dispatcher):
setup_ban_middleware(dp)
Python:
async def on_startup(dp: Dispatcher):
setup_middlewares(dp)
register_handlers(dp)
await create_base()
Теперь давайте создадим ф-цию блокировки юзера через админку
Создайте ф-цию update_user_ban_status в db_api/commands/users_commands.py
Python:
async def update_user_ban_status(user_id: int, ban_status: bool):
async with get_session() as session:
sql = update(Users).where(
Users.user_id == user_id
).values(is_blocked=ban_status)
await session.execute(sql)
await session.commit()
Python:
from aiogram.types import Message
from data.config import ADMINS_ID # тут dаtа на латыне
from db_api.commands.users_commands import update_user_ban_status, select_user
from db_api.tables.users_table import Users
async def ban_user_command(message: Message):
args = message.get_args()
if not args:
return await message.reply(
'<b>⚠ Отсутствуют аргументы к команде!</b>'
)
try:
user: Users = await select_user(int(args))
if not user:
return await message.answer(
'<b>⚠ Такого пользователя не существует!</b>'
)
if user.user_id in ADMINS_ID:
return await message.answer(
'<b>⚠ Нельзя заблокировать администратора!</b>'
)
need_ban = message.text.startswith('/ban')
if need_ban and user.is_blocked:
return await message.reply(
'<b>⚠ Пользователь итак заблокирован!</b>'
)
elif not need_ban and not user.is_blocked:
return await message.reply(
'<b>❌ Пользователь итак разблокирован!</b>'
)
await update_user_ban_status(user.user_id, True if not user.is_blocked else False)
await message.bot.send_message(
user.user_id,
'<b> Ваш аккаунт заблокирован!</b>' if not user.is_blocked
else '<b>✅ Ваш аккаунт разблокирован!</b>'
)
await message.reply(
f'<b> @{message.from_user.username} заблокирован!</b>'
if not user.is_blocked
else f'<b>✅ @{message.from_user.username} разблокирован!</b>'
)
except ValueError:
await message.reply(
'<b>⚠ Значение должно быть числом!</b>'
)
Python:
def register_ban_command(dp: Dispatcher):
dp.register_message_handler(
ban_user_command,
Text(startswith=['/ban', '/unban']),
user_id=ADMINS_ID,
state='*'
)
You must be registered for see links
from middlewares.ban_middleware import setup_ban_middleware
и добавляем в ф-цию register_handlers
Python:
def register_handlers(dp: Dispatcher):
resister_command_start(dp)
register_ban_command(dp)
Не забудьте указать [tooltip=152]токен[/tooltip] от бота в .env,
а также свой user_id Telegram в dаtа/
А также установите все модули, которые указаны в requirements.txt
а также свой user_id Telegram в dаtа/
You must be registered for see links
в переменнной ADMINS_IDА также установите все модули, которые указаны в requirements.txt
На этом все! Всем спасибо за внимание!
Буду рад, если оцените статью
You must be registered for see links