import asyncio
import io
import json
from collections.abc import AsyncGenerator, Generator
from contextlib import suppress
from itertools import chain, repeat
from socket import gaierror
from textwrap import dedent
from time import sleep
from typing import Any, Union
import httpx
from pydantic import BaseModel, Field, validator
from . import tg_types
from .client import (AsyncTgClient, SyncTgClient, TgRuntimeError,
raise_for_tg_response_status, TgHttpStatusError)
[docs]
class BaseTgRequest(BaseModel, tg_types.ValidableMixin):
"""Base class representing a request to the Telegram Bot API.
Provides utility methods for making both asynchronous and synchronous requests to the API.
Typically used as a parent class for specific request types which include their data fields and
possibly override or extend the base methods to customize behavior.
"""
class Config:
extra = 'forbid'
validate_assignment = True
anystr_strip_whitespace = True
[docs]
async def apost_as_json(self, api_method: str) -> bytes:
"""Send a request to the Telegram Bot API asynchronously using a JSON payload.
:param api_method: The Telegram Bot API method to call.
:return: The response from the Telegram Bot API as a byte string.
"""
client = AsyncTgClient.default_client.get(None)
if not client:
raise TgRuntimeError('Requires AsyncTgClient to be specified before call.')
http_response = await client.session.post(
f'{client.api_root}{api_method}',
headers={
'content-type': 'application/json',
'accept': 'application/json',
},
content=self.json(exclude_none=True).encode('utf-8'),
)
raise_for_tg_response_status(http_response)
return http_response.content
[docs]
def post_as_json(self, api_method: str) -> bytes:
"""Send a request to the Telegram Bot API synchronously using a JSON payload.
:param api_method: The Telegram Bot API method to call.
:return: The response from the Telegram Bot API as a byte string.
"""
client = SyncTgClient.default_client.get(None)
if not client:
raise TgRuntimeError('Requires SyncTgClient to be specified before call.')
http_response = client.session.post(
f'{client.api_root}{api_method}',
headers={
'content-type': 'application/json',
'accept': 'application/json',
},
content=self.json(exclude_none=True).encode('utf-8'),
)
raise_for_tg_response_status(http_response)
return http_response.content
[docs]
async def apost_multipart_form_data(self, api_method: str, content: dict, files: dict) -> bytes:
"""Send a request to the Telegram Bot API asynchronously using the "multipart/form-data" format.
:param api_method: The Telegram Bot API method to call.
:param content: A dictionary containing the content to be sent.
:param files: A dictionary containing files to be sent.
:return: The response from the Telegram Bot API as a byte string.
"""
client = AsyncTgClient.default_client.get(None)
if not client:
raise TgRuntimeError('Requires AsyncTgClient to be specified before call.')
if content.get('caption_entities'):
content['caption_entities'] = json.dumps(content['caption_entities'])
if content.get('entities'):
content['entities'] = json.dumps(content['entities'])
if content.get('reply_markup'):
content['reply_markup'] = json.dumps(content['reply_markup'])
if content.get('media'):
content['media'] = json.dumps(content['media'])
http_response = await client.session.post(
f'{client.api_root}{api_method}',
files=files,
data=content,
)
raise_for_tg_response_status(http_response)
return http_response.content
[docs]
def post_multipart_form_data(self, api_method: str, content: dict, files: dict) -> bytes:
"""Send a request to the Telegram Bot API synchronously using the "multipart/form-data" format.
:param api_method: The Telegram Bot API method name.
:param content: A dictionary containing the content to be sent.
:param files: A dictionary containing files to be sent.
:return: The response from the Telegram Bot API as a byte string.
"""
client = SyncTgClient.default_client.get(None)
if not client:
raise TgRuntimeError('Requires SyncTgClient to be specified before call.')
if content.get('caption_entities'):
content['caption_entities'] = json.dumps(content['caption_entities'])
if content.get('entities'):
content['entities'] = json.dumps(content['entities'])
if content.get('reply_markup'):
content['reply_markup'] = json.dumps(content['reply_markup'])
if content.get('media'):
content['media'] = json.dumps(content['media'])
http_response = client.session.post(
f'{client.api_root}{api_method}',
files=files,
data=content,
)
raise_for_tg_response_status(http_response)
return http_response.content
[docs]
class BaseTgResponse(BaseModel):
"""Represents the base structure of a response from the Telegram Bot API.
Every response from the Telegram Bot API contains certain common attributes, which are captured
in this base model. Specific response types might extend this base structure.
"""
ok: bool = Field(
description="A Boolean value indicating the success of the operation.",
)
error_code: int | None = Field(
default=None,
description=dedent("""\
An integer or `None` value that contains the error code if the operation fails.
If the operation was successful, the value will be `None`.
"""),
)
description: str = Field(
default="",
description=dedent("""\
A string value that can contain additional description of the result of the operation or the cause of
the error. If the operation was successful, the value will be empty.
"""),
)
result: Any = Field(
default=None,
description=dedent("""\
Any value that represents the specific result of an operation. The value type may depend on
the specific Telegram Bot API response type.
"""),
)
parameters: tg_types.ResponseParameters | None = Field(
default=None,
description="An optional field that represents additional parameters associated with the response.",
)
class Config:
extra = 'ignore'
allow_mutation = False
[docs]
class SendMessageResponse(BaseTgResponse):
"""Represents an extended response structure from the Telegram Bot API."""
result: tg_types.Message = Field(
description="Result of sending a message.",
)
[docs]
class SendMessageRequest(BaseTgRequest):
"""Object encapsulates data for calling Telegram Bot API endpoint `sendMessage`.
See here https://core.telegram.org/bots/api#sendmessage
"""
chat_id: int = Field(
description=dedent("""\
Unique identifier for the target chat or username of the target channel (in the format @channelusername).
"""),
)
text: str = Field(
min_length=1,
max_length=4096,
description="Text of the message to be sent, 1-4096 characters after entities parsing.",
)
parse_mode: tg_types.ParseMode | None = Field(
default=None,
description="Mode for parsing entities in the message text. See formatting options for more details.",
)
entities: list[tg_types.MessageEntity] | None = Field(
default=None,
description=dedent("""\
A JSON-serialized list of special entities that appear in message text,
which can be specified instead of parse_mode.
"""),
)
disable_web_page_preview: bool | None = Field(
default=None,
description="Disables link previews for links in this message.",
)
disable_notification: bool | None = Field(
default=None,
description="Sends the message silently. Users will receive a notification with no sound.",
)
protect_content: bool | None = Field(
default=None,
description="Protects the contents of the sent message from forwarding and saving.",
)
message_thread_id: bool | None = Field(
default=None,
description=dedent("""\
Unique identifier for the target message thread (topic) of the forum; for forum supergroups only.
"""),
)
allow_sending_without_reply: bool | None = Field(
default=None,
description="Pass True if the message should be sent even if the specified replied-to message is not found.",
)
reply_markup: Union[
tg_types.InlineKeyboardMarkup,
tg_types.ReplyKeyboardMarkup,
tg_types.ReplyKeyboardRemove,
tg_types.ForceReply,
] | None = Field(
default=None,
description=dedent("""\
Additional interface options. A JSON-serialized object for an inline keyboard, custom reply keyboard,
instructions to remove reply keyboard or to force a reply from the user.
"""),
)
class Config:
anystr_strip_whitespace = True
[docs]
async def asend(self) -> SendMessageResponse:
"""Send HTTP request to `sendMessage` Telegram Bot API endpoint asynchronously and parse response."""
json_payload = await self.apost_as_json('sendMessage')
response = SendMessageResponse.parse_raw(json_payload)
return response
[docs]
def send(self) -> SendMessageResponse:
"""Send HTTP request to `sendMessage` Telegram Bot API endpoint synchronously and parse response."""
json_payload = self.post_as_json('sendMessage')
response = SendMessageResponse.parse_raw(json_payload)
return response
[docs]
class SendLocationResponse(BaseTgResponse):
"""Represents an extended response structure from the Telegram Bot API."""
result: tg_types.Message = Field(
description="Result of sending a message.",
)
[docs]
class SendLocationRequest(BaseTgRequest, tg_types.ValidableMixin):
"""Object encapsulates data for calling Telegram Bot API endpoint `sendLocation`.
See here https://core.telegram.org/bots/api#sendlocation
"""
chat_id: int = Field(
description=dedent("""\
Unique identifier for the target chat or username of the target channel (in the format @channelusername).
"""),
)
message_thread_id: int | None = Field(
default=None,
description=dedent("""\
Unique identifier for the target message thread (topic) of the forum; for forum supergroups only.
"""),
)
latitude: float = Field(
description="Latitude of the location.",
)
longitude: float = Field(
description="Longitude of the location.",
)
horizontal_accuracy: float | None = Field(
ge=0,
le=1500,
default=None,
description="The radius of uncertainty for the location, measured in meters; 0-1500.",
)
live_period: int | None = Field(
ge=60,
le=86400,
default=None,
description=dedent("""\
Period in seconds for which the location will be updated (see Live Locations, should be between 60 and
86400).
"""),
)
heading: int | None = Field(
ge=0,
le=360,
default=None,
description=dedent("""\
For live locations, a direction in which the user is moving, in degrees. Must be between 1 and 360 if
specified.
"""),
)
proximity_alert_radius: int | None = Field(
ge=1,
le=100000,
default=None,
description=dedent("""\
For live locations, a maximum distance for proximity alerts about approaching another chat member,
in meters. Must be between 1 and 100000 if specified.
"""),
)
disable_notification: bool | None = Field(
default=None,
description="Sends the message silently. Users will receive a notification with no sound.",
)
protect_content: bool | None = Field(
default=None,
description="Protects the contents of the sent message from forwarding and saving.",
)
reply_markup: Union[
tg_types.InlineKeyboardMarkup,
tg_types.ReplyKeyboardMarkup,
tg_types.ReplyKeyboardRemove,
tg_types.ForceReply,
] | None = Field(
default=None,
description=dedent("""\
Additional interface options. A JSON-serialized object for an inline keyboard, custom reply keyboard,
instructions to remove reply keyboard or to force a reply from the user.
"""),
)
[docs]
async def asend(self) -> SendLocationResponse:
"""Send HTTP request to `sendLocation` Telegram Bot API endpoint asynchronously and parse response."""
json_payload = await self.apost_as_json('sendLocation')
response = SendLocationResponse.parse_raw(json_payload)
return response
[docs]
def send(self) -> SendLocationResponse:
"""Send HTTP request to `sendLocation` Telegram Bot API endpoint synchronously and parse response."""
json_payload = self.post_as_json('sendLocation')
response = SendLocationResponse.parse_raw(json_payload)
return response
[docs]
class SendPhotoResponse(BaseTgResponse):
"""Represents an extended response structure from the Telegram Bot API."""
result: tg_types.Message = Field(
description="Result of sending a photo.",
)
[docs]
class SendBytesPhotoRequest(BaseTgRequest):
"""Object encapsulates data for calling Telegram Bot API endpoint `sendPhoto`.
See here https://core.telegram.org/bots/api#sendphoto
"""
chat_id: int = Field(
description=dedent("""\
Unique identifier for the target chat or username of the target channel (in the format @channelusername).
"""),
)
photo: bytes = Field(
description=dedent("""\
Photo to send. Pass a file_id as String to send a photo that exists on the Telegram servers (recommended),
pass an HTTP URL as a String for Telegram to get a photo from the Internet, or upload a new photo using
multipart/form-data. The photo must be at most 10 MB in size. The photo's width and height must not exceed
10000 in total. Width and height ratio must be at most 20.
"""),
)
filename: str | None = Field(
default=None,
description="",
)
message_thread_id: int | None = Field(
default=None,
description=dedent("""\
Unique identifier for the target message thread (topic) of the forum; for forum supergroups only.
"""),
)
caption: str | None = Field(
default=None,
max_length=1024,
description=dedent("""\
Photo caption (may also be used when resending photos by file_id),
0-1024 characters after entities parsing.
"""),
)
parse_mode: str | None = Field(
default=None,
description="Mode for parsing entities in the photo caption. See formatting options for more details.",
)
caption_entities: list[tg_types.MessageEntity] | None = Field(
default=None,
description=dedent("""\
A JSON-serialized list of special entities that appear in the caption,
which can be specified instead of parse_mode.
"""),
)
has_spoiler: bool | None = Field(
default=None,
description="Pass True if the photo needs to be covered with a spoiler animation.",
)
disable_notification: bool | None = Field(
default=None,
description="Sends the message silently. Users will receive a notification with no sound.",
)
protect_content: bool | None = Field(
default=None,
description="Protects the contents of the sent message from forwarding and saving.",
)
reply_to_message_id: int | None = Field(
default=None,
description="If the message is a reply, ID of the original message.",
)
allow_sending_without_reply: bool | None = Field(
default=None,
description="Pass True if the message should be sent even if the specified replied-to message is not found.",
)
reply_markup: Union[
tg_types.InlineKeyboardMarkup,
tg_types.ReplyKeyboardMarkup,
tg_types.ReplyKeyboardRemove,
tg_types.ForceReply,
] | None = Field(
default=None,
description=dedent("""\
Additional interface options. A JSON-serialized object for an inline keyboard, custom reply keyboard,
instructions to remove reply keyboard or to force a reply from the user.
"""),
)
[docs]
async def asend(self) -> SendPhotoResponse:
"""Send HTTP request to `sendPhoto` Telegram Bot API endpoint asynchronously and parse response."""
content = self.dict(exclude_none=True, exclude={'photo'})
photo_bytes_io = io.BytesIO(self.photo)
photo_bytes_io.name = self.filename
files = {'photo': photo_bytes_io}
json_payload = await self.apost_multipart_form_data('sendPhoto', content, files)
response = SendPhotoResponse.parse_raw(json_payload)
return response
[docs]
def send(self) -> SendPhotoResponse:
"""Send HTTP request to `sendPhoto` Telegram Bot API endpoint synchronously and parse response."""
content = self.dict(exclude_none=True, exclude={'photo'})
photo_bytes_io = io.BytesIO(self.photo)
photo_bytes_io.name = self.filename
files = {'photo': photo_bytes_io}
json_payload = self.post_multipart_form_data('sendPhoto', content, files)
response = SendPhotoResponse.parse_raw(json_payload)
return response
[docs]
class SendUrlPhotoRequest(BaseTgRequest):
"""Object encapsulates data for calling Telegram Bot API endpoint `sendPhoto`.
See here https://core.telegram.org/bots/api#sendphoto
"""
chat_id: int = Field(
description=dedent("""\
Unique identifier for the target chat or username of the target channel (in the format @channelusername).
"""),
)
photo: str = Field(
description=dedent("""\
Photo to send. Pass a file_id as String to send a photo that exists on the Telegram servers (recommended),
pass an HTTP URL as a String for Telegram to get a photo from the Internet, or upload a new photo
using multipart/form-data. The photo must be at most 10 MB in size. The photo's width and height
must not exceed 10000 in total. Width and height ratio must be at most 20.
"""),
)
filename: str | None = Field(
default=None,
description="",
)
message_thread_id: int | None = Field(
default=None,
description=dedent("""\
Unique identifier for the target message thread (topic) of the forum; for forum supergroups only.
"""),
)
caption: str | None = Field(
default=None,
max_length=1024,
description=dedent("""\
Photo caption (may also be used when resending photos by file_id),
0-1024 characters after entities parsing.
"""),
)
parse_mode: str | None = Field(
default=None,
description="Mode for parsing entities in the photo caption. See formatting options for more details.",
)
caption_entities: list[tg_types.MessageEntity] | None = Field(
default=None,
description=dedent("""\
A JSON-serialized list of special entities that appear in the caption,
which can be specified instead of parse_mode.
"""),
)
has_spoiler: bool | None = Field(
default=None,
description="Pass True if the photo needs to be covered with a spoiler animation.",
)
disable_notification: bool | None = Field(
default=None,
description="Sends the message silently. Users will receive a notification with no sound.",
)
protect_content: bool | None = Field(
default=None,
description="Protects the contents of the sent message from forwarding and saving.",
)
reply_to_message_id: int | None = Field(
default=None,
description="If the message is a reply, ID of the original message.",
)
allow_sending_without_reply: bool | None = Field(
default=None,
description="Pass True if the message should be sent even if the specified replied-to message is not found.",
)
reply_markup: Union[
tg_types.InlineKeyboardMarkup,
tg_types.ReplyKeyboardMarkup,
tg_types.ReplyKeyboardRemove,
tg_types.ForceReply,
] | None = Field(
default=None,
description=dedent("""\
Additional interface options. A JSON-serialized object for an inline keyboard, custom reply keyboard,
instructions to remove reply keyboard or to force a reply from the user.
"""),
)
[docs]
async def asend(self) -> SendPhotoResponse:
"""Send HTTP request to `sendPhoto` Telegram Bot API endpoint asynchronously and parse response."""
json_payload = await self.apost_as_json('sendPhoto')
response = SendPhotoResponse.parse_raw(json_payload)
return response
[docs]
def send(self) -> SendPhotoResponse:
"""Send HTTP request to `sendPhoto` Telegram Bot API endpoint synchronously and parse response."""
json_payload = self.post_as_json('sendPhoto')
response = SendPhotoResponse.parse_raw(json_payload)
return response
[docs]
class SendDocumentResponse(BaseTgResponse):
"""Represents an extended response structure from the Telegram Bot API."""
result: tg_types.Message = Field(
description="Result of sending a document.",
)
[docs]
class SendBytesDocumentRequest(BaseTgRequest):
"""Object encapsulates data for calling Telegram Bot API endpoint `sendDocument`.
See here https://core.telegram.org/bots/api#senddocument
"""
chat_id: int = Field(
description=dedent("""\
Unique identifier for the target chat or username of the target channel (in the format @channelusername).
"""),
)
document: bytes = Field(
description=dedent("""\
File to send. Pass a file_id as String to send a file that exists on the Telegram servers (recommended),
pass an HTTP URL as a String for Telegram to get a file from the Internet,
or upload a new one using multipart/form-data.
"""),
)
filename: str | None = Field(
default=None,
description="",
)
message_thread_id: int | None = Field(
default=None,
description=dedent("""\
Unique identifier for the target message thread (topic) of the forum; for forum supergroups only.
"""),
)
thumbnail: bytes | str | None = Field(
default=None,
description=dedent("""\
Thumbnail of the file sent; can be ignored if thumbnail generation for the file is supported server-side.
The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height should
not exceed 320. Ignored if the file is not uploaded using multipart/form-data. Thumbnails can't be reused
and can be only uploaded as a new file, so you can pass “attach://<file_attach_name>” if the thumbnail was
uploaded using multipart/form-data under <file_attach_name>.
"""),
)
caption: str | None = Field(
default=None,
max_length=1024,
description=dedent("""\
Document caption (may also be used when resending documents by file_id),
0-1024 characters after entities parsing.
"""),
)
parse_mode: str | None = Field(
default=None,
description="Mode for parsing entities in the document caption. See formatting options for more details.",
)
caption_entities: list[tg_types.MessageEntity] | None = Field(
default=None,
description=dedent("""\
A JSON-serialized list of special entities that appear in the caption,
which can be specified instead of parse_mode.
"""),
)
disable_content_type_detection: bool | None = Field(
default=None,
description=dedent("""\
Disables automatic server-side content type detection for files uploaded using multipart/form-data.
"""),
)
disable_notification: bool | None = Field(
default=None,
description="Sends the message silently. Users will receive a notification with no sound.",
)
protect_content: bool | None = Field(
default=None,
description="Protects the contents of the sent message from forwarding and saving.",
)
reply_to_message_id: int | None = Field(
default=None,
description="If the message is a reply, ID of the original message.",
)
allow_sending_without_reply: bool | None = Field(
default=None,
description="Pass True if the message should be sent even if the specified replied-to message is not found.",
)
reply_markup: Union[
tg_types.InlineKeyboardMarkup,
tg_types.ReplyKeyboardMarkup,
tg_types.ReplyKeyboardRemove,
tg_types.ForceReply,
] | None = Field(
default=None,
description=dedent("""\
Additional interface options. A JSON-serialized object for an inline keyboard, custom reply keyboard,
instructions to remove reply keyboard or to force a reply from the user.
"""),
)
[docs]
async def asend(self) -> SendDocumentResponse:
"""Send HTTP request to `sendDocument` Telegram Bot API endpoint asynchronously and parse response."""
content = self.dict(exclude_none=True, exclude={'document'})
document_bytes = io.BytesIO(self.document)
document_bytes.name = self.filename
files = {'document': document_bytes}
json_payload = await self.apost_multipart_form_data('sendDocument', content, files)
response = SendDocumentResponse.parse_raw(json_payload)
return response
[docs]
def send(self) -> SendDocumentResponse:
"""Send HTTP request to `sendDocument` Telegram Bot API endpoint synchronously and parse response."""
content = self.dict(exclude_none=True, exclude={'document'})
document_bytes = io.BytesIO(self.document)
document_bytes.name = self.filename
files = {'document': document_bytes}
json_payload = self.post_multipart_form_data('sendDocument', content, files)
response = SendDocumentResponse.parse_raw(json_payload)
return response
[docs]
class SendUrlDocumentRequest(BaseTgRequest):
"""Object encapsulates data for calling Telegram Bot API endpoint `sendDocument`.
See here https://core.telegram.org/bots/api#senddocument
"""
chat_id: int = Field(
description=dedent("""\
Unique identifier for the target chat or username of the target channel (in the format @channelusername).
"""),
)
document: str = Field(
description=dedent("""\
File to send. Pass a file_id as String to send a file that exists on the Telegram servers (recommended),
pass an HTTP URL as a String for Telegram to get a file from the Internet, or upload
a new one using multipart/form-data.
"""),
)
filename: str | None = Field(
default=None,
description="",
)
message_thread_id: int | None = Field(
default=None,
description=dedent("""\
Unique identifier for the target message thread (topic) of the forum; for forum supergroups only.
"""),
)
thumbnail: bytes | str | None = Field(
default=None,
description=dedent("""\
Thumbnail of the file sent; can be ignored if thumbnail generation for the file is supported server-side.
The thumbnail should be in JPEG format and less than 200 kB in size. A thumbnail's width and height
should not exceed 320. Ignored if the file is not uploaded using multipart/form-data. Thumbnails can't
be reused and can be only uploaded as a new file, so you can pass “attach://<file_attach_name>” if the
thumbnail was uploaded using multipart/form-data under <file_attach_name>.
"""),
)
caption: str | None = Field(
default=None,
max_length=1024,
description=dedent("""\
Document caption (may also be used when resending documents by file_id),
0-1024 characters after entities parsing.
"""),
)
parse_mode: str | None = Field(
default=None,
description="Mode for parsing entities in the document caption. See formatting options for more details.",
)
caption_entities: list[tg_types.MessageEntity] | None = Field(
default=None,
description=dedent("""\
A JSON-serialized list of special entities that appear in the caption,
which can be specified instead of parse_mode.
"""),
)
disable_content_type_detection: bool | None = Field(
default=None,
description=dedent("""\
Disables automatic server-side content type detection for files uploaded using multipart/form-data.
"""),
)
disable_notification: bool | None = Field(
default=None,
description="Sends the message silently. Users will receive a notification with no sound.",
)
protect_content: bool | None = Field(
default=None,
description="Protects the contents of the sent message from forwarding and saving.",
)
reply_to_message_id: int | None = Field(
default=None,
description="If the message is a reply, ID of the original message.",
)
allow_sending_without_reply: bool | None = Field(
default=None,
description="Pass True if the message should be sent even if the specified replied-to message is not found.",
)
reply_markup: Union[
tg_types.InlineKeyboardMarkup,
tg_types.ReplyKeyboardMarkup,
tg_types.ReplyKeyboardRemove,
tg_types.ForceReply,
] | None = Field(
default=None,
description=dedent("""\
Additional interface options. A JSON-serialized object for an inline keyboard, custom reply keyboard,
instructions to remove reply keyboard or to force a reply from the user.
"""),
)
[docs]
async def asend(self) -> SendDocumentResponse:
"""Send HTTP request to `sendDocument` Telegram Bot API endpoint asynchronously and parse response."""
json_payload = await self.apost_as_json('sendDocument')
response = SendDocumentResponse.parse_raw(json_payload)
return response
[docs]
def send(self) -> SendDocumentResponse:
"""Send HTTP request to `sendDocument` Telegram Bot API endpoint synchronously and parse response."""
json_payload = self.post_as_json('sendDocument')
response = SendDocumentResponse.parse_raw(json_payload)
return response
[docs]
class DeleteMessageResponse(BaseTgResponse):
"""Represents an extended response structure from the Telegram Bot API."""
result: bool = Field(
description="Message deletion result.",
)
[docs]
class DeleteMessageRequest(BaseTgRequest):
"""Object encapsulates data for calling Telegram Bot API endpoint `deleteMessage`.
See here https://core.telegram.org/bots/api#deletemessage
"""
chat_id: int = Field(
description=dedent("""\
Unique identifier for the target chat or username of the target channel (in the format @channelusername).
"""),
)
message_id: int = Field(
description="Identifier of the message to delete.",
)
[docs]
async def asend(self) -> DeleteMessageResponse:
"""Send HTTP request to `deleteMessage` Telegram Bot API endpoint asynchronously and parse response."""
json_payload = await self.apost_as_json('deleteMessage')
response = DeleteMessageResponse.parse_raw(json_payload)
return response
[docs]
def send(self) -> DeleteMessageResponse:
"""Send HTTP request to `deleteMessage` Telegram Bot API endpoint synchronously and parse response."""
json_payload = self.post_as_json('deleteMessage')
response = DeleteMessageResponse.parse_raw(json_payload)
return response
[docs]
class EditMessageTextResponse(BaseTgResponse):
"""Represents an extended response structure from the Telegram Bot API."""
result: tg_types.Message | bool = Field(
description="Message editing result.",
)
[docs]
class EditMessageTextRequest(BaseTgRequest):
"""Object encapsulates data for calling Telegram Bot API endpoint `editMessageText`.
See here https://core.telegram.org/bots/api#editmessagetext
"""
chat_id: int | None = Field(
default=None,
description=dedent("""\
Required if inline_message_id is not specified. Unique identifier for the target chat or
username of the target channel (in the format @channelusername).
"""),
)
message_id: int | None = Field(
default=None,
description="Required if inline_message_id is not specified. Identifier of the message to edit.",
)
inline_message_id: str | None = Field(
default=None,
description="Required if chat_id and message_id are not specified. Identifier of the inline message.",
)
text: str = Field(
min_length=1,
max_length=4096,
description="New text of the message, 1-4096 characters after entities parsing.",
)
parse_mode: str | None = Field(
default=None,
description="Mode for parsing entities in the message text. See formatting options for more details.",
)
entities: list[tg_types.MessageEntity] | None = Field(
default=None,
description=dedent("""
A JSON-serialized list of special entities that appear in message text,
which can be specified instead of parse_mode.
"""),
)
disable_web_page_preview: bool | None = Field(
default=None,
description="Disables link previews for links in this message.",
)
reply_markup: tg_types.InlineKeyboardMarkup | None = Field(
default=None,
description="A JSON-serialized object for an inline keyboard.",
)
[docs]
async def asend(self) -> EditMessageTextResponse:
"""Send HTTP request to `editmessagetext` Telegram Bot API endpoint asynchronously and parse response."""
json_payload = await self.apost_as_json('editmessagetext')
response = EditMessageTextResponse.parse_raw(json_payload)
return response
[docs]
def send(self) -> EditMessageTextResponse:
"""Send HTTP request to `editmessagetext` Telegram Bot API endpoint synchronously and parse response."""
json_payload = self.post_as_json('editmessagetext')
response = EditMessageTextResponse.parse_raw(json_payload)
return response
[docs]
class EditMessageReplyMarkupResponse(BaseTgResponse):
"""Represents an extended response structure from the Telegram Bot API."""
result: tg_types.Message | bool = Field(
description="The result of editing a ReplyMarkup message.",
)
[docs]
class EditMessageReplyMarkupRequest(BaseTgRequest):
"""Object encapsulates data for calling Telegram Bot API endpoint `editMessageReplyMarkup`.
See here https://core.telegram.org/bots/api#editmessagereplymarkup
"""
chat_id: int | None = Field(
default=None,
description=dedent("""\
Required if inline_message_id is not specified. Unique identifier for the target chat or username
of the target channel (in the format @channelusername).
"""),
)
message_id: int | None = Field(
default=None,
description="Required if inline_message_id is not specified. Identifier of the message to edit.",
)
inline_message_id: str | None = Field(
default=None,
description="Required if chat_id and message_id are not specified. Identifier of the inline message.",
)
reply_markup: tg_types.InlineKeyboardMarkup | None = Field(
default=None,
description="A JSON-serialized object for an inline keyboard.",
)
[docs]
async def asend(self) -> EditMessageReplyMarkupResponse:
"""Send HTTP request to `editmessagereplymarkup` Telegram Bot API endpoint asynchronously and parse response."""
json_payload = await self.apost_as_json('editmessagereplymarkup')
response = EditMessageReplyMarkupResponse.parse_raw(json_payload)
return response
[docs]
def send(self) -> EditMessageReplyMarkupResponse:
"""Send HTTP request to `editmessagereplymarkup` Telegram Bot API endpoint synchronously and parse response."""
json_payload = self.post_as_json('editmessagereplymarkup')
response = EditMessageReplyMarkupResponse.parse_raw(json_payload)
return response
[docs]
class EditMessageCaptionResponse(BaseTgResponse):
"""Represents an extended response structure from the Telegram Bot API."""
result: tg_types.Message | bool = Field(
description="The result of editing a caption.",
)
[docs]
class EditMessageCaptionRequest(BaseTgRequest):
"""Object encapsulates data for calling Telegram Bot API endpoint `editMessageCaption`.
See here https://core.telegram.org/bots/api#editmessagecaption
"""
chat_id: int | None = Field(
default=None,
description=dedent("""\
Required if inline_message_id is not specified. Unique identifier for the target chat or username
of the target channel (in the format @channelusername).
"""),
)
message_id: int | None = Field(
default=None,
description="Required if inline_message_id is not specified. Identifier of the message to edit.",
)
inline_message_id: str | None = Field(
default=None,
description="Required if chat_id and message_id are not specified. Identifier of the inline message.",
)
caption: str | None = Field(
default=None,
max_length=1024,
description="New caption of the message, 0-1024 characters after entities parsing.",
)
parse_mode: str | None = Field(
default=None,
description="Mode for parsing entities in the message caption. See formatting options for more details.",
)
caption_entities: list[tg_types.MessageEntity] | None = Field(
default=None,
description=dedent("""\
A JSON-serialized list of special entities that appear in the caption,
which can be specified instead of parse_mode.
"""),
)
reply_markup: tg_types.InlineKeyboardMarkup | None = Field(
default=None,
description="A JSON-serialized object for an inline keyboard.",
)
[docs]
async def asend(self) -> EditMessageCaptionResponse:
"""Send HTTP request to `editmessagecaption` Telegram Bot API endpoint asynchronously and parse response."""
json_payload = await self.apost_as_json('editmessagecaption')
response = EditMessageCaptionResponse.parse_raw(json_payload)
return response
[docs]
def send(self) -> EditMessageCaptionResponse:
"""Send HTTP request to `editmessagecaption` Telegram Bot API endpoint synchronously and parse response."""
json_payload = self.post_as_json('editmessagecaption')
response = EditMessageCaptionResponse.parse_raw(json_payload)
return response
[docs]
class WebhookInfoResponse(BaseTgResponse):
"""Represents an extended response structure from the Telegram Bot API."""
result: tg_types.WebhookInfo = Field(
description="Webhook info.",
)
[docs]
class WebhookInfoRequest(BaseTgRequest):
"""Get current webhook status.
See https://core.telegram.org/bots/api#getwebhookinfo
"""
[docs]
async def asend(self) -> WebhookInfoResponse:
"""Get the webhook info (async)."""
response = await self.apost_as_json('getWebhookInfo')
return WebhookInfoResponse.parse_raw(response)
[docs]
def send(self) -> WebhookInfoResponse:
"""Get the webhook info (sync)."""
response = self.post_as_json('getWebhookInfo')
return WebhookInfoResponse.parse_raw(response)
[docs]
class SetWebhookResponse(BaseTgResponse):
"""Represents an extended response structure from the Telegram Bot API."""
result: bool = Field(
description="Set webhook result.",
)
[docs]
class SetWebhookRequest(BaseTgRequest):
"""Specify a URL for receiving updates via an outgoing webhook.
See https://core.telegram.org/bots/api#setwebhook
"""
url: str = Field(
description=dedent("""\
HTTPS URL to send updates to.
Use an empty string to remove webhook integration
"""),
)
certificate: bytes | None = Field(
default=None,
description=dedent("""\
Upload your public key certificate so that the root certificate
in use can be checked. See the self-signed guide for details:
https://core.telegram.org/bots/self-signed
"""),
)
ip_address: str | None = Field(
default=None,
description=dedent("""\
The fixed IP address which will be used to send webhook requests
instead of the IP address resolved through DNS
"""),
)
max_connections: int | None = Field(
default=None,
description=dedent("""\
The maximum allowed number of simultaneous HTTPS connections
to the webhook for update delivery, 1-100. Defaults to 40.
Use lower values to limit the load on your bot's server,
and higher values to increase your bot's throughput.
"""),
)
allowed_updates: list[str] | None = Field(
default=None,
description=dedent("""\
A JSON-serialized list of the update types you want your bot
to receive. For example, specify
["message", "edited_channel_post", "callback_query"]
to only receive updates of these types.
"""),
)
drop_pending_updates: bool | None = Field(
default=None,
description="Pass True to drop all pending updates",
)
secret_token: str | None = Field(
default=None,
description=dedent("""\
A secret token to be sent in a header
“X-Telegram-Bot-Api-Secret-Token” in every webhook request,
1-256 characters.
Only characters A-Z, a-z, 0-9, _ and - are allowed. The header is
useful to ensure that the request comes from a webhook set by you.
"""),
)
[docs]
async def asend(self) -> SetWebhookResponse:
"""Set a webhook (async)."""
response = await self.apost_as_json('setWebhook')
return SetWebhookResponse.parse_raw(response)
[docs]
def send(self) -> SetWebhookResponse:
"""Set the webhook (sync)."""
response = self.post_as_json('setWebhook')
return SetWebhookResponse.parse_raw(response)
[docs]
class DeleteWebhookResponse(BaseTgResponse):
"""Represents an extended response structure from the Telegram Bot API."""
result: bool = Field(
description="Delete webhook result.",
)
[docs]
class DeleteWebhookRequest(BaseTgRequest):
"""Remove webhook integration.
See https://core.telegram.org/bots/api#deletewebhook
"""
drop_pending_updates: bool | None = Field(
default=None,
description="Pass True to drop all pending updates",
)
[docs]
async def asend(self) -> DeleteWebhookResponse:
"""Delete a webhook (async)."""
response = await self.apost_as_json('deleteWebhook')
return DeleteWebhookResponse.parse_raw(response)
[docs]
def send(self) -> DeleteWebhookResponse:
"""Delete a webhook (sync)."""
response = self.post_as_json('deleteWebhook')
return DeleteWebhookResponse.parse_raw(response)
[docs]
class GetUpdatesResponse(BaseTgResponse):
"""Represents an extended response structure from the Telegram Bot API."""
result: list[tg_types.Update] = Field(
description="Get updates result.",
)
[docs]
class GetUpdatesRequest(BaseTgRequest):
"""Get updates.
Object encapsulates data for calling Telegram Bot API endpoint
`getUpdates`.
See here https://core.telegram.org/bots/api#getupdates
"""
offset: int | None = Field(
default=None,
description=dedent("""\
Optional. Identifier of the first update to be returned.
Must be greater by one than the highest among the identifiers
of previously received updates. By default, updates starting with
the earliest unconfirmed update are returned. An update is
considered confirmed as soon as getUpdates is called with an offset
higher than its update_id. The negative offset can be specified to
retrieve updates starting from -offset update from the end of the
updates queue. All previous updates will be forgotten.
"""),
)
limit: int = Field(
default=100,
ge=1,
le=100,
description=dedent("""\
Optional. Limits the number of updates to be retrieved. Values
between 1-100 are accepted. Defaults to 100.
"""),
)
timeout: int | None = Field(
default=0,
ge=0,
description=dedent("""\
Optional. Timeout in seconds for long polling. Defaults to 0,
i.e. usual short polling. Should be positive, short polling should
be used for testing purposes only.
"""),
)
allowed_updates: list[str] | None = Field(
default=None,
description=dedent("""\
Optional. Specify a list of the update types you want your bot
to receive. For example, specify
["message", "edited_channel_post", "callback_query"] to only
receive updates of these types.
See https://core.telegram.org/bots/api#update for a complete list
of available update types. Specify an empty list to receive all
update types except chat_member, message_reaction, and
message_reaction_count (default). If not specified, the previous
setting will be used. Please note that this parameter doesn't
affect updates created before the call to the getUpdates, so
unwanted updates may be received for a short period of time.
"""),
)
[docs]
@validator("allowed_updates")
def allowed_updates_match(cls, v: list[str]) -> None: # noqa N805
input_updates = set(v)
possible_updates = {
"message",
"edited_message",
"channel_post",
"edited_channel_post",
"message_reaction",
"message_reaction_count",
"inline_query",
"chosen_inline_result",
"callback_query",
"shipping_query",
"pre_checkout_query",
"poll",
"poll_answer",
"my_chat_member",
"chat_member",
"chat_join_request",
"chat_boost",
"removed_chat_boost",
}
if not input_updates.issubset(possible_updates):
raise ValueError("Unknown allowed updates")
[docs]
async def asend(self) -> GetUpdatesResponse:
"""Get Telegram updates - async."""
response = await self.apost_as_json('getUpdates')
return GetUpdatesResponse.parse_raw(response)
[docs]
def send(self) -> GetUpdatesResponse:
"""Get Telegram updates - sync."""
response = self.post_as_json('getUpdates')
return GetUpdatesResponse.parse_raw(response)
[docs]
async def alisten_to_updates(
self,
) -> AsyncGenerator[tg_types.Update, None]:
"""
Listen to updates - async.
Yield updates in an endless loop
"""
max_pause = 10
pauses = chain((1, 3), repeat(max_pause))
while True:
with suppress(httpx.ReadTimeout):
try:
response = await self.asend()
for update in response.result:
yield update
self.offset = update.update_id + 1
pauses = chain((1, 3), repeat(max_pause))
except (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.RemoteProtocolError,
gaierror,
ConnectionError,
):
pause = next(pauses)
await asyncio.sleep(pause)
[docs]
def listen_to_updates(self) -> Generator[tg_types.Update, None, None]:
"""
Listen to updates - sync.
Yield updates in an endless loop
"""
max_pause = 10
pauses = chain((1, 3), repeat(max_pause))
while True:
with suppress(httpx.ReadTimeout):
try:
for update in self.send().result:
yield update
self.offset = update.update_id + 1
pauses = chain((1, 3), repeat(max_pause))
except (
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.RemoteProtocolError,
gaierror,
ConnectionError,
):
pause = next(pauses)
sleep(pause)
[docs]
class AnswerCallbackQueryResponse(BaseTgResponse):
"""Represents an extended response structure from the Telegram Bot API."""
result: bool = Field(
description="Result of answering to CallbackQuery.",
)
[docs]
class AnswerCallbackQueryRequest(BaseTgRequest):
"""Answer to CallbackQuery.
Object encapsulates data for calling Telegram Bot API endpoint
`answercallbackquery`.
See here https://core.telegram.org/bots/api#answercallbackquery
"""
callback_query_id: str = Field(
description='Unique identifier for the query to be answered.',
)
text: str | None = Field(
default=None,
max_length=200,
description='Text of the notification.',
)
show_alert: bool | None = Field(
default=False,
description='Should an alert be shown by the client instead of a notification or not.',
)
url: str | None = Field(
default=None,
description="URL that will be opened by the user's client.",
)
cache_time: int | None = Field(
default=0,
description=dedent("""\
The maximum amount of time in seconds that the result of the callback query
may be cached client-side.
"""),
)
[docs]
async def asend(self, raise_for_invalid_query: bool | None = False) -> AnswerCallbackQueryResponse:
"""
Answer CallbackQuery (async).
:param raise_for_invalid_query: Defines if TgHttpStatusError should be raised on the 'query
is too old or invalid' error. By default this type of Telegram error will be ignored.
:raises TgHttpStatusError: Exception raised if Telegram responds with error. By default the
'query is too old or invalid' error will be ignored and cause no exception.
"""
try:
response = await self.apost_as_json('answerCallbackQuery')
return AnswerCallbackQueryResponse.parse_raw(response)
except TgHttpStatusError as ex:
if raise_for_invalid_query:
raise ex
elif ex.tg_response and 'query is too old' in ex.tg_response.description:
tg_response_dict = ex.tg_response.dict()
tg_response_dict.update({'result': False})
return AnswerCallbackQueryResponse.parse_obj(tg_response_dict)
else:
raise ex
[docs]
def send(self, raise_for_invalid_query: bool | None = False) -> AnswerCallbackQueryResponse:
"""
Answer CallbackQuery (sync).
:param raise_for_invalid_query: Defines if TgHttpStatusError should be raised on the 'query
is too old or invalid' error. By default this type of Telegram error will be ignored.
:raises TgHttpStatusError: Exception raised if Telegram responds with error. By default the
'query is too old or invalid' error will be ignored and cause no exception.
"""
try:
response = self.post_as_json('answerCallbackQuery')
return AnswerCallbackQueryResponse.parse_raw(response)
except TgHttpStatusError as ex:
if raise_for_invalid_query:
raise ex
elif ex.tg_response and 'query is too old' in ex.tg_response.description:
tg_response_dict = ex.tg_response.dict()
tg_response_dict.update({'result': False})
return AnswerCallbackQueryResponse.parse_obj(tg_response_dict)
else:
raise ex