Error handler aiogram

Is a pretty simple and fully asynchronous framework for Telegram Bot API written in Python 3.7 with asyncio and aiohttp. - aiogram/dispatcher.py at dev-2.x · aiogram/aiogram
import asyncio import functools import itertools import logging import time import typing import aiohttp from aiohttp.helpers import sentinel from aiogram.utils.deprecated import renamed_argument from .filters import Command, ContentTypeFilter, ExceptionsFilter, FiltersFactory, HashTag, Regexp, RegexpCommandsFilter, StateFilter, Text, IDFilter, AdminFilter, IsReplyFilter, ForwardedMessageFilter, IsSenderContact, ChatTypeFilter, MediaGroupFilter, AbstractFilter from .handler import Handler from .middlewares import MiddlewareManager from .storage import BaseStorage, DELTA, DisabledStorage, EXCEEDED_COUNT, FSMContext, LAST_CALL, RATE_LIMIT, RESULT from .webhook import BaseResponse from .. import types from ..bot import Bot from ..utils.exceptions import TelegramAPIError, Throttled from ..utils.mixins import ContextInstanceMixin, DataMixin log = logging.getLogger(__name__) DEFAULT_RATE_LIMIT = .1 def _ensure_loop(x: «asyncio.AbstractEventLoop»): assert isinstance( x, asyncio.AbstractEventLoop ), f»Loop must be the implementation of {asyncio.AbstractEventLoop!r}, « f»not {type(x)!r}« class Dispatcher(DataMixin, ContextInstanceMixin): «»» Simple Updates dispatcher It will process incoming updates: messages, edited messages, channel posts, edited channel posts, inline queries, chosen inline results, callback queries, shipping queries, pre-checkout queries. «»» def __init__(self, bot, loop=None, storage: typing.Optional[BaseStorage] = None, run_tasks_by_default: bool = False, throttling_rate_limit=DEFAULT_RATE_LIMIT, no_throttle_error=False, filters_factory=None): if not isinstance(bot, Bot): raise TypeError(f»Argument ‘bot’ must be an instance of Bot, not ‘{type(bot).__name__}‘») if storage is None: storage = DisabledStorage() if filters_factory is None: filters_factory = FiltersFactory(self) self.bot: Bot = bot self._main_loop = loop self.storage = storage self.run_tasks_by_default = run_tasks_by_default self.throttling_rate_limit = throttling_rate_limit self.no_throttle_error = no_throttle_error self.filters_factory: FiltersFactory = filters_factory self.updates_handler = Handler(self, middleware_key=‘update’) self.message_handlers = Handler(self, middleware_key=‘message’) self.edited_message_handlers = Handler(self, middleware_key=‘edited_message’) self.channel_post_handlers = Handler(self, middleware_key=‘channel_post’) self.edited_channel_post_handlers = Handler(self, middleware_key=‘edited_channel_post’) self.inline_query_handlers = Handler(self, middleware_key=‘inline_query’) self.chosen_inline_result_handlers = Handler(self, middleware_key=‘chosen_inline_result’) self.callback_query_handlers = Handler(self, middleware_key=‘callback_query’) self.shipping_query_handlers = Handler(self, middleware_key=‘shipping_query’) self.pre_checkout_query_handlers = Handler(self, middleware_key=‘pre_checkout_query’) self.poll_handlers = Handler(self, middleware_key=‘poll’) self.poll_answer_handlers = Handler(self, middleware_key=‘poll_answer’) self.my_chat_member_handlers = Handler(self, middleware_key=‘my_chat_member’) self.chat_member_handlers = Handler(self, middleware_key=‘chat_member’) self.chat_join_request_handlers = Handler(self, middleware_key=‘chat_join_request’) self.errors_handlers = Handler(self, once=False, middleware_key=‘error’) self.middleware = MiddlewareManager(self) self.updates_handler.register(self.process_update) self._polling = False self._closed = True self._dispatcher_close_waiter = None self._setup_filters() @property def loop(self) -> typing.Optional[asyncio.AbstractEventLoop]: # for the sake of backward compatibility # lib internally must delegate tasks with respect to _main_loop attribute # however should never be used by the library itself # use more generic approaches from asyncio’s namespace return self._main_loop @property def _close_waiter(self) -> «asyncio.Future»: if self._dispatcher_close_waiter is None: self._dispatcher_close_waiter = asyncio.get_event_loop().create_future() return self._dispatcher_close_waiter def _setup_filters(self): filters_factory = self.filters_factory filters_factory.bind(StateFilter, exclude_event_handlers=[ self.errors_handlers, self.poll_handlers, self.poll_answer_handlers, ]) filters_factory.bind(ContentTypeFilter, event_handlers=[ self.message_handlers, self.edited_message_handlers, self.channel_post_handlers, self.edited_channel_post_handlers, ]), filters_factory.bind(Command, event_handlers=[ self.message_handlers, self.edited_message_handlers ]) filters_factory.bind(Text, event_handlers=[ self.message_handlers, self.edited_message_handlers, self.channel_post_handlers, self.edited_channel_post_handlers, self.callback_query_handlers, self.poll_handlers, self.inline_query_handlers, ]) filters_factory.bind(HashTag, event_handlers=[ self.message_handlers, self.edited_message_handlers, self.channel_post_handlers, self.edited_channel_post_handlers, ]) filters_factory.bind(Regexp, event_handlers=[ self.message_handlers, self.edited_message_handlers, self.channel_post_handlers, self.edited_channel_post_handlers, self.callback_query_handlers, self.poll_handlers, self.inline_query_handlers, ]) filters_factory.bind(RegexpCommandsFilter, event_handlers=[ self.message_handlers, self.edited_message_handlers, ]) filters_factory.bind(ExceptionsFilter, event_handlers=[ self.errors_handlers, ]) filters_factory.bind(AdminFilter, event_handlers=[ self.message_handlers, self.edited_message_handlers, self.channel_post_handlers, self.edited_channel_post_handlers, self.callback_query_handlers, self.inline_query_handlers, self.chat_member_handlers, self.chat_join_request_handlers, ]) filters_factory.bind(IDFilter, event_handlers=[ self.message_handlers, self.edited_message_handlers, self.channel_post_handlers, self.edited_channel_post_handlers, self.callback_query_handlers, self.inline_query_handlers, self.chat_member_handlers, self.my_chat_member_handlers, self.chat_join_request_handlers, ]) filters_factory.bind(IsReplyFilter, event_handlers=[ self.message_handlers, self.edited_message_handlers, self.channel_post_handlers, self.edited_channel_post_handlers, ]) filters_factory.bind(IsSenderContact, event_handlers=[ self.message_handlers, self.edited_message_handlers, self.channel_post_handlers, self.edited_channel_post_handlers, ]) filters_factory.bind(ForwardedMessageFilter, event_handlers=[ self.message_handlers, self.edited_channel_post_handlers, self.channel_post_handlers, self.edited_channel_post_handlers ]) filters_factory.bind(ChatTypeFilter, event_handlers=[ self.message_handlers, self.edited_message_handlers, self.channel_post_handlers, self.edited_channel_post_handlers, self.callback_query_handlers, self.my_chat_member_handlers, self.chat_member_handlers, self.chat_join_request_handlers, ]) filters_factory.bind(MediaGroupFilter, event_handlers=[ self.message_handlers, self.edited_channel_post_handlers, self.channel_post_handlers, self.edited_channel_post_handlers ]) def __del__(self): self.stop_polling() async def skip_updates(self): «»» You can skip old incoming updates from queue. This method is not recommended for using in production. Note that the webhook will be deleted! «»» await self.bot.delete_webhook(drop_pending_updates=True) async def process_updates(self, updates, fast: bool = True): «»» Process list of updates :param updates: :param fast: :return: «»» if fast: tasks = [self.updates_handler.notify(update) for update in updates] return await asyncio.gather(*tasks) results = [] for update in updates: results.append(await self.updates_handler.notify(update)) return results async def process_update(self, update: types.Update): «»» Process single update object :param update: :return: «»» types.Update.set_current(update) try: if update.message: types.Message.set_current(update.message) types.User.set_current(update.message.from_user) types.Chat.set_current(update.message.chat) return await self.message_handlers.notify(update.message) if update.edited_message: types.Message.set_current(update.edited_message) types.User.set_current(update.edited_message.from_user) types.Chat.set_current(update.edited_message.chat) return await self.edited_message_handlers.notify(update.edited_message) if update.channel_post: types.Message.set_current(update.channel_post) types.Chat.set_current(update.channel_post.chat) return await self.channel_post_handlers.notify(update.channel_post) if update.edited_channel_post: types.Message.set_current(update.edited_channel_post) types.Chat.set_current(update.edited_channel_post.chat) return await self.edited_channel_post_handlers.notify(update.edited_channel_post) if update.inline_query: types.InlineQuery.set_current(update.inline_query) types.User.set_current(update.inline_query.from_user) return await self.inline_query_handlers.notify(update.inline_query) if update.chosen_inline_result: types.ChosenInlineResult.set_current(update.chosen_inline_result) types.User.set_current(update.chosen_inline_result.from_user) return await self.chosen_inline_result_handlers.notify(update.chosen_inline_result) if update.callback_query: types.CallbackQuery.set_current(update.callback_query) if update.callback_query.message: types.Chat.set_current(update.callback_query.message.chat) types.User.set_current(update.callback_query.from_user) return await self.callback_query_handlers.notify(update.callback_query) if update.shipping_query: types.ShippingQuery.set_current(update.shipping_query) types.User.set_current(update.shipping_query.from_user) return await self.shipping_query_handlers.notify(update.shipping_query) if update.pre_checkout_query: types.PreCheckoutQuery.set_current(update.pre_checkout_query) types.User.set_current(update.pre_checkout_query.from_user) return await self.pre_checkout_query_handlers.notify(update.pre_checkout_query) if update.poll: types.Poll.set_current(update.poll) return await self.poll_handlers.notify(update.poll) if update.poll_answer: types.PollAnswer.set_current(update.poll_answer) types.User.set_current(update.poll_answer.user) return await self.poll_answer_handlers.notify(update.poll_answer) if update.my_chat_member: types.ChatMemberUpdated.set_current(update.my_chat_member) types.User.set_current(update.my_chat_member.from_user) return await self.my_chat_member_handlers.notify(update.my_chat_member) if update.chat_member: types.ChatMemberUpdated.set_current(update.chat_member) types.User.set_current(update.chat_member.from_user) return await self.chat_member_handlers.notify(update.chat_member) if update.chat_join_request: types.ChatJoinRequest.set_current(update.chat_join_request) types.Chat.set_current(update.chat_join_request.chat) types.User.set_current(update.chat_join_request.from_user) return await self.chat_join_request_handlers.notify(update.chat_join_request) except Exception as e: err = await self.errors_handlers.notify(update, e) if err: return err raise async def reset_webhook(self, check=True) -> bool: «»» Reset webhook :param check: check before deleting :return: «»» if check: wh = await self.bot.get_webhook_info() if not wh.url: return False return await self.bot.delete_webhook() def _loop_create_task(self, coro): return asyncio.create_task(coro) async def start_polling(self, timeout=20, relax=0.1, limit=None, reset_webhook=None, fast: bool = True, error_sleep: int = 5, allowed_updates: typing.Optional[typing.List[str]] = None): «»» Start long-polling :param timeout: :param relax: :param limit: :param reset_webhook: :param fast: :param error_sleep: :param allowed_updates: :return: «»» if self._polling: raise RuntimeError(‘Polling already started’) log.info(‘Start polling.’) # context.set_value(MODE, LONG_POLLING) Dispatcher.set_current(self) Bot.set_current(self.bot) if reset_webhook is None: await self.reset_webhook(check=False) if reset_webhook: await self.reset_webhook(check=True) self._polling = True offset = None try: current_request_timeout = self.bot.timeout if current_request_timeout is not sentinel and timeout is not None: request_timeout = aiohttp.ClientTimeout(total=current_request_timeout.total + timeout or 1) else: request_timeout = None while self._polling: try: with self.bot.request_timeout(request_timeout): updates = await self.bot.get_updates( limit=limit, offset=offset, timeout=timeout, allowed_updates=allowed_updates ) except asyncio.CancelledError: break except Exception as e: log.exception(‘Cause exception while getting updates.’) await asyncio.sleep(error_sleep) continue if updates: log.debug(f»Received {len(updates)} updates.») offset = updates[1].update_id + 1 asyncio.create_task(self._process_polling_updates(updates, fast)) if relax: await asyncio.sleep(relax) finally: self._close_waiter.set_result(None) log.warning(‘Polling is stopped.’) async def _process_polling_updates(self, updates, fast: bool = True): «»» Process updates received from long-polling. :param updates: list of updates. :param fast: «»» need_to_call = [] for responses in itertools.chain.from_iterable(await self.process_updates(updates, fast)): for response in responses: if not isinstance(response, BaseResponse): continue need_to_call.append(response.execute_response(self.bot)) if need_to_call: try: asyncio.gather(*need_to_call) except TelegramAPIError: log.exception(‘Cause exception while processing updates.’) def stop_polling(self): «»» Break long-polling process. :return: «»» if hasattr(self, ‘_polling’) and self._polling: log.info(‘Stop polling…’) self._polling = False async def wait_closed(self): «»» Wait for the long-polling to close :return: «»» await asyncio.shield(self._close_waiter) def is_polling(self): «»» Check if polling is enabled :return: «»» return self._polling def register_message_handler(self, callback, *custom_filters, commands=None, regexp=None, content_types=None, state=None, run_task=None, **kwargs): «»» Register handler for message .. code-block:: python3 # This handler works only if state is None (by default). dp.register_message_handler(cmd_start, commands=[‘start’, ‘about’]) dp.register_message_handler(entry_point, commands=[‘setup’]) # This handler works only if current state is «first_step» dp.register_message_handler(step_handler_1, state=»first_step») # If you want to handle all states by one handler, use `state=»*»`. dp.register_message_handler(cancel_handler, commands=[‘cancel’], state=»*») dp.register_message_handler(cancel_handler, lambda msg: msg.text.lower() == ‘cancel’, state=»*») :param callback: :param commands: list of commands :param regexp: REGEXP :param content_types: List of content types. :param custom_filters: list of custom filters :param kwargs: :param state: :return: decorated function «»» filters_set = self.filters_factory.resolve(self.message_handlers, *custom_filters, commands=commands, regexp=regexp, content_types=content_types, state=state, **kwargs) self.message_handlers.register(self._wrap_async_task(callback, run_task), filters_set) def message_handler(self, *custom_filters, commands=None, regexp=None, content_types=None, state=None, run_task=None, **kwargs): «»» Decorator for message handler Examples: Simple commands handler: .. code-block:: python3 @dp.message_handler(commands=[‘start’, ‘welcome’, ‘about’]) async def cmd_handler(message: types.Message): Filter messages by regular expression: .. code-block:: python3 @dp.message_handler(regexp=’^[a-z]+-[0-9]+’) async def msg_handler(message: types.Message): Filter messages by command regular expression: .. code-block:: python3 @dp.message_handler(filters.RegexpCommandsFilter(regexp_commands=[‘item_([0-9]*)’])) async def send_welcome(message: types.Message): Filter by content type: .. code-block:: python3 @dp.message_handler(content_types=ContentType.PHOTO | ContentType.DOCUMENT) async def audio_handler(message: types.Message): Filter by custom function: .. code-block:: python3 @dp.message_handler(lambda message: message.text and ‘hello’ in message.text.lower()) async def text_handler(message: types.Message): Use multiple filters: .. code-block:: python3 @dp.message_handler(commands=[‘command’], content_types=ContentType.TEXT) async def text_handler(message: types.Message): Register multiple filters set for one handler: .. code-block:: python3 @dp.message_handler(commands=[‘command’]) @dp.message_handler(lambda message: demojize(message.text) == ‘:new_moon_with_face:’) async def text_handler(message: types.Message): This handler will be called if the message starts with ‘/command’ OR is some emoji By default content_type is :class:`ContentType.TEXT` :param commands: list of commands :param regexp: REGEXP :param content_types: List of content types. :param custom_filters: list of custom filters :param kwargs: :param state: :param run_task: run callback in task (no wait results) :return: decorated function «»» def decorator(callback): self.register_message_handler(callback, *custom_filters, commands=commands, regexp=regexp, content_types=content_types, state=state, run_task=run_task, **kwargs) return callback return decorator def register_edited_message_handler(self, callback, *custom_filters, commands=None, regexp=None, content_types=None, state=None, run_task=None, **kwargs): «»» Register handler for edited message :param callback: :param commands: list of commands :param regexp: REGEXP :param content_types: List of content types. :param state: :param custom_filters: list of custom filters :param run_task: run callback in task (no wait results) :param kwargs: :return: decorated function «»» filters_set = self.filters_factory.resolve(self.edited_message_handlers, *custom_filters, commands=commands, regexp=regexp, content_types=content_types, state=state, **kwargs) self.edited_message_handlers.register(self._wrap_async_task(callback, run_task), filters_set) def edited_message_handler(self, *custom_filters, commands=None, regexp=None, content_types=None, state=None, run_task=None, **kwargs): «»» Decorator for edited message handler You can use combination of different handlers .. code-block:: python3 @dp.message_handler() @dp.edited_message_handler() async def msg_handler(message: types.Message): :param commands: list of commands :param regexp: REGEXP :param content_types: List of content types. :param state: :param custom_filters: list of custom filters :param run_task: run callback in task (no wait results) :param kwargs: :return: decorated function «»» def decorator(callback): self.register_edited_message_handler(callback, *custom_filters, commands=commands, regexp=regexp, content_types=content_types, state=state, run_task=run_task, **kwargs) return callback return decorator def register_channel_post_handler(self, callback, *custom_filters, commands=None, regexp=None, content_types=None, state=None, run_task=None, **kwargs): «»» Register handler for channel post :param callback: :param commands: list of commands :param regexp: REGEXP :param content_types: List of content types. :param state: :param custom_filters: list of custom filters :param run_task: run callback in task (no wait results) :param kwargs: :return: decorated function «»» filters_set = self.filters_factory.resolve(self.channel_post_handlers, *custom_filters, commands=commands, regexp=regexp, content_types=content_types, state=state, **kwargs) self.channel_post_handlers.register(self._wrap_async_task(callback, run_task), filters_set) def channel_post_handler(self, *custom_filters, commands=None, regexp=None, content_types=None, state=None, run_task=None, **kwargs): «»» Decorator for channel post handler :param commands: list of commands :param regexp: REGEXP :param content_types: List of content types. :param state: :param custom_filters: list of custom filters :param run_task: run callback in task (no wait results) :param kwargs: :return: decorated function «»» def decorator(callback): self.register_channel_post_handler(callback, *custom_filters, commands=commands, regexp=regexp, content_types=content_types, state=state, run_task=run_task, **kwargs) return callback return decorator def register_edited_channel_post_handler(self, callback, *custom_filters, commands=None, regexp=None, content_types=None, state=None, run_task=None, **kwargs): «»» Register handler for edited channel post :param callback: :param commands: list of commands :param regexp: REGEXP :param content_types: List of content types. :param state: :param custom_filters: list of custom filters :param run_task: run callback in task (no wait results) :param kwargs: :return: decorated function «»» filters_set = self.filters_factory.resolve(self.edited_message_handlers, *custom_filters, commands=commands, regexp=regexp, content_types=content_types, state=state, **kwargs) self.edited_channel_post_handlers.register(self._wrap_async_task(callback, run_task), filters_set) def edited_channel_post_handler(self, *custom_filters, commands=None, regexp=None, content_types=None, state=None, run_task=None, **kwargs): «»» Decorator for edited channel post handler :param commands: list of commands :param regexp: REGEXP :param content_types: List of content types. :param custom_filters: list of custom filters :param state: :param run_task: run callback in task (no wait results) :param kwargs: :return: decorated function «»» def decorator(callback): self.register_edited_channel_post_handler(callback, *custom_filters, commands=commands, regexp=regexp, content_types=content_types, state=state, run_task=run_task, **kwargs) return callback return decorator def register_inline_handler(self, callback, *custom_filters, state=None, run_task=None, **kwargs): «»» Register handler for inline query Example: .. code-block:: python3 dp.register_inline_handler(some_inline_handler, lambda inline_query: True) :param callback: :param custom_filters: list of custom filters :param state: :param run_task: run callback in task (no wait results) :param kwargs: :return: decorated function «»» if custom_filters is None: custom_filters = [] filters_set = self.filters_factory.resolve(self.inline_query_handlers, *custom_filters, state=state, **kwargs) self.inline_query_handlers.register(self._wrap_async_task(callback, run_task), filters_set) def inline_handler(self, *custom_filters, state=None, run_task=None, **kwargs): «»» Decorator for inline query handler Example: .. code-block:: python3 @dp.inline_handler(lambda inline_query: True) async def some_inline_handler(inline_query: types.InlineQuery) :param state: :param custom_filters: list of custom filters :param run_task: run callback in task (no wait results) :param kwargs: :return: decorated function «»» def decorator(callback): self.register_inline_handler(callback, *custom_filters, state=state, run_task=run_task, **kwargs) return callback return decorator def register_chosen_inline_handler(self, callback, *custom_filters, state=None, run_task=None, **kwargs): «»» Register handler for chosen inline query Example: .. code-block:: python3 dp.register_chosen_inline_handler(some_chosen_inline_handler, lambda chosen_inline_result: True) :param callback: :param state: :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: :return: «»» if custom_filters is None: custom_filters = [] filters_set = self.filters_factory.resolve(self.chosen_inline_result_handlers, *custom_filters, state=state, **kwargs) self.chosen_inline_result_handlers.register(self._wrap_async_task(callback, run_task), filters_set) def chosen_inline_handler(self, *custom_filters, state=None, run_task=None, **kwargs): «»» Decorator for chosen inline query handler Example: .. code-block:: python3 @dp.chosen_inline_handler(lambda chosen_inline_result: True) async def some_chosen_inline_handler(chosen_inline_result: types.ChosenInlineResult) :param state: :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: :return: «»» def decorator(callback): self.register_chosen_inline_handler(callback, *custom_filters, state=state, run_task=run_task, **kwargs) return callback return decorator def register_callback_query_handler(self, callback, *custom_filters, state=None, run_task=None, **kwargs): «»» Register handler for callback query Example: .. code-block:: python3 dp.register_callback_query_handler(some_callback_handler, lambda callback_query: True) :param callback: :param state: :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» filters_set = self.filters_factory.resolve(self.callback_query_handlers, *custom_filters, state=state, **kwargs) self.callback_query_handlers.register(self._wrap_async_task(callback, run_task), filters_set) def callback_query_handler(self, *custom_filters, state=None, run_task=None, **kwargs): «»» Decorator for callback query handler Example: .. code-block:: python3 @dp.callback_query_handler(lambda callback_query: True) async def some_callback_handler(callback_query: types.CallbackQuery) :param state: :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» def decorator(callback): self.register_callback_query_handler(callback, *custom_filters, state=state, run_task=run_task, **kwargs) return callback return decorator def register_shipping_query_handler(self, callback, *custom_filters, state=None, run_task=None, **kwargs): «»» Register handler for shipping query Example: .. code-block:: python3 dp.register_shipping_query_handler(some_shipping_query_handler, lambda shipping_query: True) :param callback: :param state: :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» filters_set = self.filters_factory.resolve(self.shipping_query_handlers, *custom_filters, state=state, **kwargs) self.shipping_query_handlers.register(self._wrap_async_task(callback, run_task), filters_set) def shipping_query_handler(self, *custom_filters, state=None, run_task=None, **kwargs): «»» Decorator for shipping query handler Example: .. code-block:: python3 @dp.shipping_query_handler(lambda shipping_query: True) async def some_shipping_query_handler(shipping_query: types.ShippingQuery) :param state: :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» def decorator(callback): self.register_shipping_query_handler(callback, *custom_filters, state=state, run_task=run_task, **kwargs) return callback return decorator def register_pre_checkout_query_handler(self, callback, *custom_filters, state=None, run_task=None, **kwargs): «»» Register handler for pre-checkout query Example: .. code-block:: python3 dp.register_pre_checkout_query_handler(some_pre_checkout_query_handler, lambda shipping_query: True) :param callback: :param state: :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» filters_set = self.filters_factory.resolve(self.pre_checkout_query_handlers, *custom_filters, state=state, **kwargs) self.pre_checkout_query_handlers.register(self._wrap_async_task(callback, run_task), filters_set) def pre_checkout_query_handler(self, *custom_filters, state=None, run_task=None, **kwargs): «»» Decorator for pre-checkout query handler Example: .. code-block:: python3 @dp.pre_checkout_query_handler(lambda shipping_query: True) async def some_pre_checkout_query_handler(shipping_query: types.ShippingQuery) :param state: :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» def decorator(callback): self.register_pre_checkout_query_handler(callback, *custom_filters, state=state, run_task=run_task, **kwargs) return callback return decorator def register_poll_handler(self, callback, *custom_filters, run_task=None, **kwargs): «»» Register handler for poll Example: .. code-block:: python3 dp.register_poll_handler(some_poll_handler) :param callback: :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» filters_set = self.filters_factory.resolve(self.poll_handlers, *custom_filters, **kwargs) self.poll_handlers.register(self._wrap_async_task(callback, run_task), filters_set) def poll_handler(self, *custom_filters, run_task=None, **kwargs): «»» Decorator for poll handler Example: .. code-block:: python3 @dp.poll_handler() async def some_poll_handler(poll: types.Poll) :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» def decorator(callback): self.register_poll_handler(callback, *custom_filters, run_task=run_task, **kwargs) return callback return decorator def register_poll_answer_handler(self, callback, *custom_filters, run_task=None, **kwargs): «»» Register handler for poll_answer Example: .. code-block:: python3 dp.register_poll_answer_handler(some_poll_answer_handler) :param callback: :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» filters_set = self.filters_factory.resolve(self.poll_answer_handlers, *custom_filters, **kwargs) self.poll_answer_handlers.register(self._wrap_async_task(callback, run_task), filters_set) def poll_answer_handler(self, *custom_filters, run_task=None, **kwargs): «»» Decorator for poll_answer handler Example: .. code-block:: python3 @dp.poll_answer_handler() async def some_poll_answer_handler(poll_answer: types.PollAnswer) :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» def decorator(callback): self.register_poll_answer_handler(callback, *custom_filters, run_task=run_task, **kwargs) return callback return decorator def register_my_chat_member_handler(self, callback: typing.Callable, *custom_filters, run_task: typing.Optional[bool] = None, **kwargs) -> None: «»» Register handler for my_chat_member Example: .. code-block:: python3 dp.register_my_chat_member_handler(some_my_chat_member_handler) :param callback: :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» filters_set = self.filters_factory.resolve( self.my_chat_member_handlers, *custom_filters, **kwargs, ) self.my_chat_member_handlers.register( handler=self._wrap_async_task(callback, run_task), filters=filters_set, ) def my_chat_member_handler(self, *custom_filters, run_task=None, **kwargs): «»» Decorator for my_chat_member handler Example: .. code-block:: python3 @dp.my_chat_member_handler() async def some_handler(my_chat_member: types.ChatMemberUpdated) :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» def decorator(callback): self.register_my_chat_member_handler( callback, *custom_filters, run_task=run_task, **kwargs, ) return callback return decorator def register_chat_member_handler(self, callback: typing.Callable, *custom_filters, run_task: typing.Optional[bool] = None, **kwargs) -> None: «»» Register handler for chat_member Example: .. code-block:: python3 dp.register_chat_member_handler(some_chat_member_handler) :param callback: :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» filters_set = self.filters_factory.resolve( self.chat_member_handlers, *custom_filters, **kwargs, ) self.chat_member_handlers.register( handler=self._wrap_async_task(callback, run_task), filters=filters_set, ) def chat_member_handler(self, *custom_filters, run_task=None, **kwargs): «»» Decorator for chat_member handler Example: .. code-block:: python3 @dp.chat_member_handler() async def some_handler(chat_member: types.ChatMemberUpdated) :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» def decorator(callback): self.register_chat_member_handler( callback, *custom_filters, run_task=run_task, **kwargs, ) return callback return decorator def register_chat_join_request_handler(self, callback: typing.Callable, *custom_filters, run_task: typing.Optional[bool] = None, **kwargs) -> None: «»» Register handler for chat_join_request Example: .. code-block:: python3 dp.register_chat_join_request(some_chat_join_request) :param callback: :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» filters_set = self.filters_factory.resolve( self.chat_join_request_handlers, *custom_filters, **kwargs, ) self.chat_join_request_handlers.register( handler=self._wrap_async_task(callback, run_task), filters=filters_set, ) def chat_join_request_handler(self, *custom_filters, run_task=None, **kwargs): «»» Decorator for chat_join_request handler Example: .. code-block:: python3 @dp.chat_join_request() async def some_handler(chat_member: types.ChatJoinRequest) :param custom_filters: :param run_task: run callback in task (no wait results) :param kwargs: «»» def decorator(callback): self.register_chat_join_request_handler( callback, *custom_filters, run_task=run_task, **kwargs, ) return callback return decorator def register_errors_handler(self, callback, *custom_filters, exception=None, run_task=None, **kwargs): «»» Register handler for errors :param callback: :param exception: you can make handler for specific errors type :param run_task: run callback in task (no wait results) «»» filters_set = self.filters_factory.resolve(self.errors_handlers, *custom_filters, exception=exception, **kwargs) self.errors_handlers.register(self._wrap_async_task(callback, run_task), filters_set) def errors_handler(self, *custom_filters, exception=None, run_task=None, **kwargs): «»» Decorator for errors handler :param exception: you can make handler for specific errors type :param run_task: run callback in task (no wait results) :return: «»» def decorator(callback): self.register_errors_handler(self._wrap_async_task(callback, run_task), *custom_filters, exception=exception, **kwargs) return callback return decorator def current_state(self, *, chat: typing.Union[str, int, None] = None, user: typing.Union[str, int, None] = None) -> FSMContext: «»» Get current state for user in chat as context .. code-block:: python3 with dp.current_state(chat=message.chat.id, user=message.user.id) as state: pass state = dp.current_state() state.set_state(‘my_state’) :param chat: :param user: :return: «»» if chat is None: chat_obj = types.Chat.get_current() chat = chat_obj.id if chat_obj else None if user is None: user_obj = types.User.get_current() user = user_obj.id if user_obj else None return FSMContext(storage=self.storage, chat=chat, user=user) @renamed_argument(old_name=‘user’, new_name=‘user_id’, until_version=‘3.0’, stacklevel=3) @renamed_argument(old_name=‘chat’, new_name=‘chat_id’, until_version=‘3.0’, stacklevel=4) async def throttle(self, key, *, rate=None, user_id=None, chat_id=None, no_error=None) -> bool: «»» Execute throttling manager. Returns True if limit has not exceeded otherwise raises ThrottleError or returns False :param key: key in storage :param rate: limit (by default is equal to default rate limit) :param user_id: user id :param chat_id: chat id :param no_error: return boolean value instead of raising error :return: bool «»» if not self.storage.has_bucket(): raise RuntimeError(‘This storage does not provide Leaky Bucket’) if no_error is None: no_error = self.no_throttle_error if rate is None: rate = self.throttling_rate_limit if user_id is None and chat_id is None: chat_obj = types.Chat.get_current() chat_id = chat_obj.id if chat_obj else None user_obj = types.User.get_current() user_id = user_obj.id if user_obj else None # Detect current time now = time.time() bucket = await self.storage.get_bucket(chat=chat_id, user=user_id) # Fix bucket if bucket is None: bucket = {key: {}} if key not in bucket: bucket[key] = {} data = bucket[key] # Calculate called = data.get(LAST_CALL, now) delta = now called result = delta >= rate or delta <= 0 # Save results data[RESULT] = result data[RATE_LIMIT] = rate data[LAST_CALL] = now data[DELTA] = delta if not result: data[EXCEEDED_COUNT] += 1 else: data[EXCEEDED_COUNT] = 1 bucket[key].update(data) await self.storage.set_bucket(chat=chat_id, user=user_id, bucket=bucket) if not result and not no_error: # Raise if it is allowed raise Throttled(key=key, chat=chat_id, user=user_id, **data) return result @renamed_argument(old_name=‘user’, new_name=‘user_id’, until_version=‘3.0’, stacklevel=3) @renamed_argument(old_name=‘chat’, new_name=‘chat_id’, until_version=‘3.0’, stacklevel=4) async def check_key(self, key, chat_id=None, user_id=None): «»» Get information about key in bucket :param key: :param chat_id: :param user_id: :return: «»» if not self.storage.has_bucket(): raise RuntimeError(‘This storage does not provide Leaky Bucket’) if user_id is None and chat_id is None: chat_obj = types.Chat.get_current() chat_id = chat_obj.id if chat_obj else None user_obj = types.User.get_current() user_id = user_obj.id if user_obj else None bucket = await self.storage.get_bucket(chat=chat_id, user=user_id) data = bucket.get(key, {}) return Throttled(key=key, chat=chat_id, user=user_id, **data) @renamed_argument(old_name=‘user’, new_name=‘user_id’, until_version=‘3.0’, stacklevel=3) @renamed_argument(old_name=‘chat’, new_name=‘chat_id’, until_version=‘3.0’, stacklevel=4) async def release_key(self, key, chat_id=None, user_id=None): «»» Release blocked key :param key: :param chat_id: :param user_id: :return: «»» if not self.storage.has_bucket(): raise RuntimeError(‘This storage does not provide Leaky Bucket’) if user_id is None and chat_id is None: chat_obj = types.Chat.get_current() chat_id = chat_obj.id if chat_obj else None user_obj = types.User.get_current() user_id = user_obj.id if user_obj else None bucket = await self.storage.get_bucket(chat=chat_id, user=user_id) if bucket and key in bucket: del bucket[key] await self.storage.set_bucket(chat=chat_id, user=user_id, bucket=bucket) return True return False def async_task(self, func): «»» Execute handler as task and return None. Use this decorator for slow handlers (with timeouts) .. code-block:: python3 @dp.message_handler(commands=[‘command’]) @dp.async_task async def cmd_with_timeout(message: types.Message): await asyncio.sleep(120) return SendMessage(message.chat.id, ‘KABOOM’).reply(message) :param func: :return: «»» def process_response(task): try: response = task.result() except Exception as e: asyncio.create_task( self.errors_handlers.notify(types.Update.get_current(), e)) else: if isinstance(response, BaseResponse): asyncio.create_task(response.execute_response(self.bot)) @functools.wraps(func) async def wrapper(*args, **kwargs): task = asyncio.create_task(func(*args, **kwargs)) task.add_done_callback(process_response) return wrapper def _wrap_async_task(self, callback, run_task=None) -> callable: if run_task is None: run_task = self.run_tasks_by_default if run_task: return self.async_task(callback) return callback def throttled(self, on_throttled: typing.Optional[typing.Callable] = None, key=None, rate=None, user_id=None, chat_id=None): «»» Meta-decorator for throttling. Invokes on_throttled if the handler was throttled. Example: .. code-block:: python3 async def handler_throttled(message: types.Message, **kwargs): await message.answer(«Throttled!») @dp.throttled(handler_throttled) async def some_handler(message: types.Message): await message.answer(«Didn’t throttled!») :param on_throttled: the callable object that should be either a function or return a coroutine :param key: key in storage :param rate: limit (by default is equal to default rate limit) :param user_id: user id :param chat_id: chat id :return: decorator «»» def decorator(func): @functools.wraps(func) async def wrapped(*args, **kwargs): is_not_throttled = await self.throttle(key if key is not None else func.__name__, rate=rate, user_id=user_id, chat_id=chat_id, no_error=True) if is_not_throttled: return await func(*args, **kwargs) kwargs.update( { ‘rate’: rate, ‘key’: key, ‘user_id’: user_id, ‘chat_id’: chat_id, } ) # update kwargs with parameters which were given to throttled if on_throttled: if asyncio.iscoroutinefunction(on_throttled): await on_throttled(*args, **kwargs) else: kwargs.update({‘loop’: asyncio.get_running_loop()}) partial_func = functools.partial( on_throttled, *args, **kwargs ) asyncio.get_running_loop().run_in_executor( None, partial_func ) return wrapped return decorator def bind_filter(self, callback: typing.Union[typing.Callable, AbstractFilter], validator: typing.Optional[typing.Callable] = None, event_handlers: typing.Optional[typing.List[Handler]] = None, exclude_event_handlers: typing.Optional[typing.Iterable[Handler]] = None): «»» Register filter :param callback: callable or subclass of :obj:`AbstractFilter` :param validator: custom validator. :param event_handlers: list of instances of :obj:`Handler` :param exclude_event_handlers: list of excluded event handlers (:obj:`Handler`) «»» self.filters_factory.bind(callback=callback, validator=validator, event_handlers=event_handlers, exclude_event_handlers=exclude_event_handlers) def unbind_filter(self, callback: typing.Union[typing.Callable, AbstractFilter]): «»» Unregister filter :param callback: callable of subclass of :obj:`AbstractFilter` «»» self.filters_factory.unbind(callback=callback) def setup_middleware(self, middleware): «»» Setup middleware :param middleware: :return: «»» self.middleware.setup(middleware)

Есть ошибка:

WARNING:aiogram:Goodbye!
Traceback (most recent call last):
File "C:/Users/zahar/Downloads/bot/bot.py", line 21, in
executor.start_polling(dp, skip_updates=True)
File "C:UserszaharDownloadsbotvenvlibsite-packagesaiogramutilsexecutor.py", line 41, in start_polling
executor.start_polling(reset_webhook=reset_webhook, timeout=timeout, relax=relax, fast=fast)
File "C:UserszaharDownloadsbotvenvlibsite-packagesaiogramutilsexecutor.py", line 305, in start_polling
loop.run_until_complete(self._startup_polling())
File "C:UserszaharAppDataLocalProgramsPythonPython38-32libasynciobase_events.py", line 616, in run_until_complete
return future.result()
File "C:UserszaharDownloadsbotvenvlibsite-packagesaiogramutilsexecutor.py", line 356, in _startup_polling
await self._welcome()
File "C:UserszaharDownloadsbotvenvlibsite-packagesaiogramutilsexecutor.py", line 346, in _welcome
user = await self.dispatcher.bot.me
File "C:UserszaharDownloadsbotvenvlibsite-packagesaiogrambotbot.py", line 27, in me
setattr(self, '_me', await self.get_me())
File "C:UserszaharDownloadsbotvenvlibsite-packagesaiogrambotbot.py", line 176, in get_me
result = await self.request(api.Methods.GET_ME, payload)
File "C:UserszaharDownloadsbotvenvlibsite-packagesaiogrambotbase.py", line 201, in request
return await api.make_request(self.session, self.__token, method, data, files,
File "C:UserszaharDownloadsbotvenvlibsite-packagesaiogrambotapi.py", line 104, in make_request
return check_result(method, response.content_type, response.status, await response.text())
File "C:UserszaharDownloadsbotvenvlibsite-packagesaiogrambotapi.py", line 84, in check_result
exceptions.Unauthorized.detect(description)
File "C:UserszaharDownloadsbotvenvlibsite-packagesaiogramutilsexceptions.py", line 137, in detect
raise cls(description)
aiogram.utils.exceptions.Unauthorized: Unauthorized

Я хотел бы сделать так, чтобы когда возникала эта ошибка, код автоматически полностью останавливался при помощи os._exit(0), но я не знаю как отловить это исключение или ошибку.

Если написать код, который внизу, то ничего не произойдёт и бот будет дальше работать с этой ошибкой:

if __name__ == "__main__":
    try:
        executor.start_polling(dp)
    except Exception:
        os._exit(0)

Установка¶

Для начала давайте создадим каталог для бота, организуем там virtual environment (далее venv) и
установим библиотеку aiogram.
Проверим, что установлен Python версии 3.7 (если вы знаете, что установлен 3.8 и выше, можете пропустить этот кусок):

[groosha@main lesson_01]$ python3.7
Python 3.7.6 (default, Apr 27 2020, 00:17:38) 
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> exit()
[groosha@main lesson_01]$ 

Теперь создадим файл requirements.txt, в котором укажем используемую нами версию aiogram.

О версиях aiogram

В этой главе используется aiogram версии 2.9.2, но перед началом работы рекомендую заглянуть в
канал релизов библиотеки и проверить наличие более новой версии. Подойдёт любая
более новая, начинающаяся с цифры 2, поскольку в будущем ожидается релиз aiogram 3.0 с заметными изменениями
и без обратной совместимости.
Чтобы избежать неприятностей, зафиксируемся на 2.9.2 и далее будем обновляться вручную.

[groosha@main lesson_01]$ python3.7 -m venv venv
[groosha@main lesson_01]$ echo "aiogram==2.9.2" > requirements.txt 
[groosha@main lesson_01]$ source venv/bin/activate
(venv) [groosha@main lesson_01]$ pip install -r requirements.txt
# ...здесь куча строк про установку...
Successfully installed Babel-2.8.0 aiogram-2.9.2 aiohttp-3.6.2 async-timeout-3.0.1 attrs-19.3.0 certifi-2020.6.20 chardet-3.0.4 idna-2.10 multidict-4.7.6 pytz-2020.1 typing-extensions-3.7.4.2 yarl-1.5.1
WARNING: You are using pip version 19.2.3, however version 20.2.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
(venv) [groosha@main lesson_01]$ 

Обратите внимание на префикс «venv» в терминале. Он указывает, что мы находимся в виртуальном окружении с именем «venv».
Проверим, что внутри venv вызов команды python указывает на всё тот же Python 3.7:

(venv) [groosha@main lesson_01]$ python
Python 3.7.6 (default, Apr 27 2020, 00:17:38) 
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> exit()
(venv) [groosha@main lesson_01]$ deactivate 
[groosha@main lesson_01]$ 

Последней командой deactivate мы вышли из venv, чтобы он нам не мешал.

Первый бот¶

Давайте создадим файл bot.py с базовым шаблоном бота на aiogram:

#!venv/bin/python
import logging
from aiogram import Bot, Dispatcher, executor, types

# Объект бота
bot = Bot(token="12345678:AaBbCcDdEeFfGgHh")
# Диспетчер для бота
dp = Dispatcher(bot)
# Включаем логирование, чтобы не пропустить важные сообщения
logging.basicConfig(level=logging.INFO)


# Хэндлер на команду /test1
@dp.message_handler(commands="test1")
async def cmd_test1(message: types.Message):
    await message.reply("Test 1")


if __name__ == "__main__":
    # Запуск бота
    executor.start_polling(dp, skip_updates=True)

Первое, на что нужно обратить внимание: aiogram — асинхронная библиотека, поэтому ваши функции тоже должны быть асинхронными,
а перед вызовами методов API нужно ставить ключевое слово await, т.к. эти вызовы возвращают корутины.

Асинхронное программирование в Python

Не стоит пренебрегать официальной документацией!
Прекрасный туториал по asyncio доступен на сайте Python.

Если вы в прошлом работали с какой-то другой библиотекой для Telegram, например, pyTelegramBotAPI, то концепция
хэндлеров (обработчиков событий) вам сразу станет понятна, разница лишь в том, что в aiogram хэндлерами управляет диспетчер.
Диспетчер регистрирует функции-обработчики, дополнительно ограничивая перечень вызывающих их событий через фильтры.
После получения очередного апдейта (события от Telegram), диспетчер выберет нужную функцию обработки, подходящую по всем
фильтрам, например, «обработка сообщений, являющихся изображениями, в чате с ID икс и с длиной подписи игрек». Если две
функции имеют одинаковые по логике фильтры, то будет вызвана та, что зарегистрирована раньше.

Чтобы зарегистрировать функцию как обработчик сообщений, нужно сделать одно из двух действий:
1. Навесить на неё декоратор, как в примере выше.
С различными типами декораторов мы познакомимся позднее.
2. Напрямую вызвать метод регистрации у диспетчера.

Рассмотрим следующий код:

# Хэндлер на команду /test1
@dp.message_handler(commands="test1")
async def cmd_test1(message: types.Message):
    await message.reply("Test 1")

# Хэндлер на команду /test2
async def cmd_test2(message: types.Message):
    await message.reply("Test 2")

Давайте запустим с ним бота:
Команда /test2 не работает

Функция cmd_test2 не работает, т.к. диспетчер о ней не знает. Исправим эту ошибку
и отдельно зарегистрируем функцию:

# Хэндлер на команду /test2
async def cmd_test2(message: types.Message):
    await message.reply("Test 2")

# Где-то в другом месте...
dp.register_message_handler(cmd_test2, commands="test2")

Снова запустим бота:
Обе команды работают

Обработка ошибок¶

При работе бота неизбежно возникновение различных ошибок, связанных не с кодом, а с внешними событиями. Простейший пример:
попытка отправить сообщение пользователю, заблокировавшему бота. Чтобы не оборачивать каждый вызов в try..except,
в aiogram существует специальный хэндлер для исключений, связанных с Bot API.
Рассмотрим следующий пример кода, имитирующий задержку перед ответом пользователю:

@dp.message_handler(commands="block")
async def cmd_block(message: types.Message):
    await asyncio.sleep(10.0)  # Здоровый сон на 10 секунд
    await message.reply("Вы заблокированы")

За эти 10 секунд пользователь может успеть заблокировать бота со своей стороны и попытка вызвать метод reply приведёт
к появлению исключения BotBlocked. Напишем специальный хэндлер для этого исключения:

from aiogram.utils.exceptions import BotBlocked

@dp.errors_handler(exception=BotBlocked)
async def error_bot_blocked(update: types.Update, exception: BotBlocked):
    # Update: объект события от Telegram. Exception: объект исключения
    # Здесь можно как-то обработать блокировку, например, удалить пользователя из БД
    print(f"Меня заблокировал пользователь!nСообщение: {update}nОшибка: {exception}")

    # Такой хэндлер должен всегда возвращать True,
    # если дальнейшая обработка не требуется.
    return True

Аналогично пишутся обработчики и на другие исключения. Таким образом, если одна и та же непредвиденная ситуация может
возникнуть в различных хэндлерах, то можно вынести её обработку в отдельный хэндлер ошибок. Кода будет меньше, а оставшийся
станет читабельнее.

Важно

У errors_handler есть одна особенность, из-за которой его использование может быть нежелательно. Дело в том, что
после срабатывания и завершения хэндлера, управление в исходную функцию не возвращается. Проще говоря, если,
например, 57-я итерация цикла из 100 привела к срабатыванию errors_handler, остальные итерации выполнены не будут,
как и весь остальной код исходной функции. В этом случае ничего не остаётся, кроме как использовать try..except.

Синтаксический сахар¶

Для того, чтобы сделать код чище и читабельнее, aiogram расширяет возможности стандартных объектов Telegram.
Например, вместо bot.send_message(...) можно написать message.answer(...) или message.reply(...). В последних
двух случаях не нужно подставлять chat_id, подразумевается, что он такой же, как и в исходном сообщении.
Разница между answer и reply простая: первый метод просто отправляет сообщение в тот же чат, второй делает «ответ» на
сообщение из message:

@dp.message_handler(commands="answer")
async def cmd_answer(message: types.Message):
    await message.answer("Это простой ответ")


@dp.message_handler(commands="reply")
async def cmd_reply(message: types.Message):
    await message.reply('Это ответ с "ответом"')

Разница между message.answer() и message.reply()

Более того, для большинства типов сообщений есть вспомогательные методы вида
«answer_{type}» или «reply_{type}», например:

@dp.message_handler(commands="dice")
async def cmd_dice(message: types.Message):
    await message.answer_dice(emoji="🎲")

что значит ‘message: types.Message’ ?

Python является интерпретируемым языком с сильной, но динамической типизацией,
поэтому встроенная проверка типов, как, например, в C++ или Java, отсутствует. Однако начиная с версии 3.5
в языке появилась поддержка подсказок типов, благодаря которой
различные чекеры и IDE вроде PyCharm анализируют типы используемых значений и подсказывают
программисту, если он передаёт что-то не то. В данном случае подсказка types.Message соообщает
PyCharm-у, что переменная message имеет тип Message, описанный в модуле types библиотеки
aiogram (см. импорты в начале кода). Благодаря этому IDE может на лету подсказывать атрибуты и функции.

При вызове команды /dice бот отправит в тот же чат игральный кубик. Разумеется, если его надо отправить в какой-то
другой чат, то придётся по-старинке вызывать await bot.send_dice(...). Но объект bot (экземпляр класса Bot) может быть
недоступен в области видимости конкретной функции. К счастью, объект бота доступен во всех типах апдейтов: Message,
CallbackQuery, InlineQuery и т.д. Предположим, вы хотите по команде /dice отправлять кубик не в тот же чат, а в канал
с ID -100123456789. Перепишем предыдущую функцию:

@dp.message_handler(commands="dice")
async def cmd_dice(message: types.Message):
    await message.bot.send_dice(-100123456789, emoji="🎲")

Всё хорошо, но если вдруг вы захотите поделиться с кем-то кодом, то придётся каждый раз помнить об удалении
из исходников токена бота, иначе придётся его перевыпускать у @BotFather. Чтобы обезопасить себя,
давайте перестанем указывать токен прямо в коде, а вынесем его как переменную окружения.
Замените следующие строчки из начала файла:

import logging
from aiogram import Bot, Dispatcher, executor, types

bot = Bot(token="12345678:AaBbCcDdEeFfGgHh")

на эти:

import logging
from aiogram import Bot, Dispatcher, executor, types
from os import getenv
from sys import exit

bot_token = getenv("BOT_TOKEN")
if not bot_token:
    exit("Error: no token provided")

bot = Bot(token=bot_token)

Но теперь ваш бот не запустится, т.к. будет сразу завершаться с ошибкой Error: no token provided.
Чтобы передать переменную окружения в PyCharm, откройте сверху раздел Run -> Edit Configurations
и добавьте в окне Environment Variables переменную с именем BOT_TOKEN и значением токена.

Разница между message.answer() и message.reply()

Запустите снова бота и убедитесь, что он работает. Получившийся код можно смело сохранять в
PyCharm в File Templates.

На этом мы закончим знакомство с библиотекой, а в следующих главах рассмотрим другие «фишки» aiogram и Telegram Bot API.

Ошибки в ботах и как их читать

Latand

Хочешь научиться писать ботов? Проходи и смотри бесплатные уроки на площадке Botfather.Dev

В первую очередь, давайте разделим 3 типа ошибок:

  1. Ошибки в Python — например, когда вы делите на ноль
  2. Ошибки при работе с Telegram Bot API — например, когда вы пытаетесь отправить сообщение ботом несуществующему пользователю
  3. Прочие ошибки, при работе с другими библиотеками.

Когда возникает ошибка в коде, вы ее узнаете (обычно) по красному цвету текста и кучи строк, которые вы не планировали выводить. Мы рассмотрим наиболее часто повторяющиеся ошибки. Но, возможно вашей ошибки тут не будет.

Сначала мы научимся читать ошибки, чтобы понимать, что вообще с ними можно делать, а потом разберем типичные ошибки с Python и в Telegram bot API и как их решать.

Содержание статьи

├── Как читать код ошибки?
├── Что писать в чат, если возникла ошибка?
├── Типичные ошибки в Python
├── Типичные ошибки при работе с Telegram Bot API
├── Прочие ошибки

Как читать код ошибки?

Лучше всего это показать на примере. Эту ошибку прислал один из студентов, мы её и разберем. Как обычно, много красного текста, но! Это не должно вас пугать.

Одна из ошибок, которые мне присылают ученики

Самое важное — надо определить, что это таки именно ошибка, а не что-то другое, а это можно сделать по слову Traceback где-то вначале красного текста. Если там такое есть — это ошибка.

Ищем слово Traceback

Тут и начинается код ошибки. Traceback — («trace», «back») с английского «отследить» и «назад«. Т.е. прослеживаем откуда эту ошибка пришла.

Смотрим сверху вниз, сначала в файле (слово File) по пути D:Pythonaiogram-bot-template…. и потом смотрим в конец строки

начало строки
конец строки

Видим в конце

dispatcher.py, line 388, in _process_polling_updates

Это то, откуда эта ошибка появилась позже всего и на чём всё сломалось. Но это не то место, где вы допустили ошибку! Это файл dispatcher.py, который лежит по определенному пути, и в нем на 388 строке, в функции _process_polling_updates (обработка обновлений поллинга) возникла ошибка ошибка в каком-то таком месте:

Все еще сложно понять, что это такое. Вы это не писали? Точно?

Странно… Я тоже этого не писал.

А! Там же ниже еще есть код, оказывается тут выполнялся код функции

await self.process_updates(updates, fast)

А внизу есть код ошибки и этой функции!

Смотрим туда. Там пишет, что ошибка в том же dispatcher.py, в котором выполнялся код функции:

await asyncio.gather(*tasks)

Ну это то точно вы писали! Нет? Уверены???

Значит смотрим ниже.

Дальше идет выполнение какой-то функции:

handler_obj.handler(*args, **partial_data)

В строке 117, в функции notify.

Ну, короче вы поняли, смотрим дальше, пока не появится что-то знакомое.

Ищем исток ошибки, где источник находится в файлах, что писали именно вы!

А! нашел!

Папка, где ученик хранит свой проект называется «Проект», в нем есть папка aiogram-bot-template. Похоже на то.

И тут смотрим как обычно на то, где ошибка возникла, где-то в обоих файлах:

menu_handlers

db_commands

Внимательно!

Сначала выполнилась функция navigate на строке 118, потом list_tiker на строке 74, а потом update_userstiker, в которой выполнилась функция:

await user_tiker.create()

Вот оно! Вот это место, где ученик допустил ошибку. Но не похоже на то, что тут что-то сделано не так. Да и внизу есть какой-то код. Но в любом случае, это 100% то место, где ученик допустил ошибку.

Внизу можно прочитать следующее:

Ошибка RuntimeError: cannot reuse already awaited coroutine. Если английский вам не сильно знаком, а с питоном вы еще пытаетесь подружиться, но до асинхронности еще не дошли, то для вас это будет полной белибердой.

Пытаемся решить ошибку…

Для начала, пробуем прочитать ошибку и перевести ее. Очень часто в самой ошибке содержится уже её решение.

Если это не помогло — пробуем копировать ошибку и просто-напросто ГУГЛИТЬ! ДА!

https://stackoverflow.com/questions/51116849/asyncio-await-coroutine-more-than-once-periodic-tasks
Вот например, один из примеров ответа на эту ошибку. Чаще всего ответы вы найдете на stackoverflow практически на все ошибки!

Если там на английском — не страшно, переводите страницу, благо Google Chrome позволяет это делать максимально просто. А еще лучше, сначала попробуйте саму ошибку перевести!

Вопрос

На том посте видим ответ с галочкой:

Ответ

Тут говорится, что возможно человек путает асинхронные функции с корутинами (сопрограммами). В данном примере sample — функция (или ссылка) на асинхронную функцию, а sample() — уже корутина. И, чтобы забрать данные из корутины, и ее выполнить, необходимо сделать сначала await перед ней.

Почитав немного других постов я понял, что загвоздка в том, что корутину можно await-ить только один раз! Сделать await sample можно всего один раз.

Значит можно сделать предположение, что user_tiker, перед которым стоит await уже был ранее await-нут. Но, на этом мои догадки заканчиваются…

Ах да, тут ученику, по этой ошибке подсказали:

А вот тут похоже, что сначала происходит await id_tiker и потом он передается в User_Tiker. Не уверен в чем тут дело, но возможно gino пытается внутри что-то await-ить, а может быть ученик сделал что-то, но не показал, и поэтому мы можем только гадать в чем была проблема, потому что позже он написал:

Говорит, что перезагрузка бот помогла…

Ну, главное, что решил!

Ах да, мы ж тут не конкретную ошибку разбирали, а возможный путь её решения.

Подытожим:

  1. Ищем Traceback
  2. Читаем его и пытаемся найти то последнее место в коде, где ВЫ могли допустить ошибку. То есть в том, что писали именно вы.
  3. В крайнем случае — обращаемся к концу кода и читаем именно Exception или Error.
  4. Гуглим этот Exception/Error и ищем пути решения.
  5. Отслеживаем в других функциях выше, где вы могли допустить ошибку.
  6. Если ничего не находите дельного — пишите в чаты.

Я не нашел решения! Что писать в чат, когда возникла ошибка?

Очень просто. Есть несколько важных моментов:

  1. Пишите вежливо, никто не обязан за вас решать ваши ошибки, но кто-то найдется, чтобы помочь.
  2. Опишите проблему, если можно, в одном сообщении.
  3. Опишите то, что вы пытались сделать, и какие у вас предположения! Никто не любит помогать тем, кто сразу пишет в чат и не гуглит.
  4. Залейте ВЕСЬ Traceback на pastebin.com или dpaste.org
  5. Можете добавить часть вашего кода, где у вас возникает ошибка, берите функции и строки из трейсбека + 10-20 строк по вниз и вверх, чтобы был понятен контекст проблемы.

С этого скриншота нихрена не понятно, что у вас за ошибка, если что

Ошибки в Python

Ошибок в Python может быть очень много и зависит от контекста. Для того, чтобы они не возникали — надо учить питон. Ага, все таки придется…

Почитать о них всех можно тут: https://docs.python.org/3/library/exceptions.html#concrete-exceptions

Те, с которыми часто сталкиваются:

SyntaxError — ошибка при работе с синтаксисом Python

Пожалуй, самая распространенная ошибка… Если бы все начинали не с ботов, а с изучения языка программирования Python, то было бы лучше!

Она возникает, когда вы просто неправильно ввели код. Когда возникает?

  • Опечатались при создании функции/класса. Ввели не

def dunc():, а

defc func():

Или

clas MyClass():, вместо

class MyClass():

  • Не поставили скобочки/двоеточие/знак равно, где нужно. Например, при создании функции:
def a:
    return 1 
  • Не закрыли скобки

Найдите пропущенную скобку.
А вот и ошибка
  • Не поставили кавычки, когда записываете текст в переменную:

BOT_TOKEN = 1223423543:IOFNGVOIWENOGIBWENIOGEWRNGVEW

  • Используете зарезервированные имена функций в питоне

async уже забронировали.

ImportError — ошибка при работе с импортами.

  • Ошибка импорта библиотеки, очень часто при цикличном импорте. То есть, вы в файле a, импортируете функцию из модуля b. При этом, в модуле b у вас что-то импортируется из модуля a. И питон начинать ходить по кругу и импортировать одно из другого (нет).

Сделайте так, чтобы не пришлось из модуля a импортировать что-то в b. Переместите эту функцию в модуль c.

  • А еще бывает такое:

Если вы запускаете код бота, но не из того модуля, где идет запуск поллинга, а где просто обозначены хендлеры. Этот файл не надо запускать.

ModuleNotFoundError — не установлена библиотека.

Да. Не установлена. Установите ее с помощью pip install <имя библиотеки>, но перед этим убедитесь, что вы ввели правильное название ее из pypi. Названия можно глянуть тут: https://pypi.org/project/pip/

  • А еще, может быть просто вы пытаетесь импортировать файл из своего проекта, но он лежит в другой папке

TypeError — ошибка при работе с типами данных.

Это может произойти, когда вы используете неправильные методы для работы с объектами. Например, вы попытались сложить текст с числом.

TypeError: unsupported operand type(s) for +: 'int' and 'str'

Но еще может возникнуть, когда вы работаете с вызовом функций:

  • TypeError: func() takes 1 positional argument but 2 were given

В функцию передаете больше аргументов, чем она ждет.

  • TypeError: func() got multiple values for argument 'a'

Когда вы передаете в функцию несколько значений для одной переменной.

  • TypeError: func() missing 1 required positional argument: 'a'

Когда вы в функцию не передали нужные аргументы, например positional — те, что идут по порядку

  • TypeError: my_func() missing 1 required positional argument "self"

Это довольно частая ошибка, когда вы работаете с классом, который не инициализирован.

Например, есть класс Bot, у него есть метод get_me, вы пытаетесь запустить функцию Bot.get_me(). И, несмотря на то, что класс имеет этот метод, пока вы не выполните функцию __init__, в которой создастся self, у вас будет эта ошибка.

Функция инит выполняется когда вы создаете экземпляр класса. Как-то так: bot = Bot(). Только после этого можно сделать bot.get_me()

  • TypeError: object function can't be used in `await` expression

Похоже, что кто-то забыл, что await-ить можно только асинхронные функции, а не обычные. Либо уберите await, либо сделайте функцию асинхронной, с помощью приставки async: async def my_func(): ...

  • TypeError: «coroutine» object is not subscriptable

Тут ошибка говорит, что мы не можем достать элемент из корутины. Типа если сделать await bot.get_me()["username"]. Вообще, есть порядок выполнения кода, и тут сначала питон пытается достать элемент по ключу username из корутины bot.get_me(), а потом только он сделает await.

В данном случае, нам надо сначала сделать await, потому мы заключим этот фрагмент в скобки:

(await bot.get_me())["username"]

И тогда все выполнится.

Но ошибка заключается в том, что данный тип объекта не позволяет брать элементы таким способом ["element"], или по индексу [2].

RuntimeWarning: coroutine ‘my_func’ was never awaited

… RuntimeWarning: Enable tracemalloc to get the object allocation traceback

Не совсем ошибка, но все же частая проблема. У вас была корутина my_func (или другое название), к которой вы не прописали await. Скорее всего вы это должны сделать, поэтому вернитесь и await-ните вашу корутину! Сделайте await my_func()

ValueError — ошибка при работе со переданными значениями.

Возникает, когда вы передаете в функцию значение правильного типа, но неправильного значения. Примеры:

  • Переданный символ ":" (двоеточие) используется функцией для разделения значений в CallbackData, поэтому его нельзя использовать в самих значениях. Это может произойти, когда вы работаете с CallbackData Factory

  • Длина callback_data в инлайн кнопках ограничена 64-ю байтами. Бывают такие ошибки:

  • Пытаетесь из текста сделать int

int(«paid») не сработает, проверьте, что вы туда суете.

AttributeError — ошибка при работе с атрибутами классов.

Такая ошибка возникает, когда вы пытаетесь вызвать атрибут (переменную класса), которую этот класс не имеет.

Примеры:

  • AttributeError "NoneType" object has no attribute "run_until_complete"

Довольно частая ошибка, когда пытаются сделать dp.loop.run_until_complete, а dp.loop оказывается равен None. Ну и когда этот объект «ничто», у него и не может быть никаких атрибутов. P.S. Никогда не берите loop из объекта dp.

Ошибки при работе с Telegram Bot API

Вообще, частенько, при работе с Бот АПИ, в ошибке указано решение… Но его почему-то не читают.

aiogram.utils.exceptions.TerminatedByOtherGetUpdates

У вас запущено более одного процесса, которые делают запрос getUpdates к телеграму с одного и того же токена бота Т.е. когда в боте запускается функция start_polling несколько раз в разных процессах.

Вы либо не остановили один процесс и запустили код заново, либо закрыли неправильно Pycharm, не останавливая процесс, и потом запустили бота заново. Либо у вас работает бот на сервере, а вы еще и запускаете на локальном компьютере.

Пожалуйста, найдите этот второй процесс и убейте его. Можно просто перезагрузить компьютер. А если не можете найти — смените токен бота и все решится.

aiogram.utils.exceptions.ChatNotFound

Ошибка возникает, если:

  • Вы ввели неверный идентификатор чата (просто неправильное значение передано в аргумент chat_id)
  • Бот не контактировал с пользователем ранее, поэтому он не может ему написать.

aiogram.utils.exceptions.CantParseEntities

В тексте у вас закрался запрещенный символ.

Обычно, если вы используете parse_mode для форматирования текста (HTML или MARKDOWN), то для того, чтобы понять где у вас в тексте жирный шрифт или курсив — используются специальные символы/теги. Примеры:

  • Для HTML жирный шрифт будет <b>жирный</b>. Поэтому, если в тексте будет что-то типа <coroutine>, то телеграм его не распознает и выдаст эту ошибку
  • Для HTML используемые теги надо закрывать, т.е. если вы начали с тегом <b>, то где-то его надо закрыть: </b>
  • Для MARKDOWN жирный шрифт будет *жирный*. Поэтому, если где-то в тексте будет нечётное количество звездочек, т.е. в тексте где-то появилась звездочка, которой вы не хотели текст сделать жирным — будет ошибка.

Ошибку можно решить с помощью функций quote_html или escape_md:

from aiogram.utils.markdown import quote_html
text = quote_html("<booo>")

aiogram.utils.exceptions.BadRequest

Название ошибки ни о чем не говорит. Просто то, что запрос не состоялся. А вот решение написано в описании ошибки. Например:

  • Not enough rights to export chat invite link — У бота недостаточно прав для того, чтобы создать ссылку-приглашение в чат. Скорее всего он не админ, или просто не имеет на это право.

aiogram.utils.exceptions.ValidationError: Token is invalid!

Все просто, как описано в ошибке. Вы ввели неверный токен. Можете проверить это с помощью функции print. Попробуйте print(BOT_TOKEN) если у вас эта переменная так называется, и убедитесь сами.

Как Частенько такое вижу, когда мои студенты не переименовывают файл .env.dist в .env.

АЛЛО! Расширение dist используется для примеров файлов, а не для того, чтобы туда писались переменные! Переименуйте и тогда подтянется уже ваш токен из файла .env.

aiogram.utils.exceptions.RetryAfter

В телеграме есть лимиты. Например, сообщения советуют слать не чаще 1 секунды в один и тот же чат. Делайте иногда задержки в своем коде между отправкой нескольких сообщений:

await asyncio.sleep(1)

Прочие ошибки

Тут довольно специфичные ошибки, на случай, если вы не нашли выше описание нужной.

ERROR: Command errored out with exit status 1:…

Если у вас именно такая ошибка, или похожая как на скрине — у вас не установилась библиотека на Python3.9. Эта версия на Windows еще не отлажена окончательно, поэтому решение тут — удалить Python3.9 и поставить Python3.8

asyncpg.exceptions.InvalidCatalogNameError: database «…» does not exist

Тут библиотека asyncpg сообщает вам, что в вашей базе данных отсутствует база с именем, указанным в кавычках.

sqlite3.OperationalError: near «)»: syntax error

Синтаксическая ошибка в вашем SQL-запросе. Проследите строчку, где этот запрос делается и проверьте где вы написали что-то несусветное рядом со скобкой. Либо не рядом со скобкой (в ошибке будет написано в кавычках).

Home>2021-08-30 15:12

python : How to stop Telegram Bota (Aiogram) with any error?

Telegram Bot written in python using the aiogram library . There is a need to stop the bot and close program when there are errors that relate directly Aiogram library.

as an example of an error you can consider (
Or any other which occurs within the Aiogram library)
:
Aiogram.utils.Exceptions.TerminatedByothergetUpdates: Terminated by Other Getupdates Request; Make Sure That Only One Bot Instance IS Running

If you briefly here:

From Aiogram Import Bot, Types
From aiogram.dispatcher Import Dispatcher
from aiogram.utils import executor
From Config Import Token
if __name__== '__main__':
    Bot= Bot (Token= Token)
    DP= DISPATCHER (BOT)
    Executor.Start_Polling (DP)

tried to wrap

Executor.Start_Polling (DP)

In the design

Try, Except

. However it did not help because the error occurred inside

Executor.Start_Polling (DP)

And thereby did not have any influence on

Try, Except

main function.

What needs to be done to stop the bot in case of errors and the closure of the program itself?

Понравилась статья? Поделить с друзьями:
  • Error handle does not name a type
  • Error h06 webasto
  • Error h is not a valid key name line 007 hotkey как исправить
  • Error guru meditation 78eac4 88 39fdba failed to obtain anisette 500 internal server error
  • Error guf на тонометре