Curso-lenguaje-python/catch-all/06_bots_telegram/08_chatgpt_bot/bot/bot.py
2024-08-17 21:08:24 +02:00

876 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import io
import logging
import asyncio
import traceback
import html
import json
from datetime import datetime
import openai
import telegram
from telegram import (
Update,
User,
InlineKeyboardButton,
InlineKeyboardMarkup,
BotCommand
)
from telegram.ext import (
Application,
ApplicationBuilder,
CallbackContext,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
AIORateLimiter,
filters
)
from telegram.constants import ParseMode, ChatAction
import config
import database
import openai_utils
import base64
# setup
db = database.Database()
logger = logging.getLogger(__name__)
user_semaphores = {}
user_tasks = {}
HELP_MESSAGE = """Commands:
⚪ /retry Regenerate last bot answer
⚪ /new Start new dialog
⚪ /mode Select chat mode
⚪ /settings Show settings
⚪ /balance Show balance
⚪ /help Show help
🎨 Generate images from text prompts in <b>👩‍🎨 Artist</b> /mode
👥 Add bot to <b>group chat</b>: /help_group_chat
🎤 You can send <b>Voice Messages</b> instead of text
"""
HELP_GROUP_CHAT_MESSAGE = """You can add bot to any <b>group chat</b> to help and entertain its participants!
Instructions (see <b>video</b> below):
1. Add the bot to the group chat
2. Make it an <b>admin</b>, so that it can see messages (all other rights can be restricted)
3. You're awesome!
To get a reply from the bot in the chat @ <b>tag</b> it or <b>reply</b> to its message.
For example: "{bot_username} write a poem about Telegram"
"""
def split_text_into_chunks(text, chunk_size):
for i in range(0, len(text), chunk_size):
yield text[i:i + chunk_size]
async def register_user_if_not_exists(update: Update, context: CallbackContext, user: User):
if not db.check_if_user_exists(user.id):
db.add_new_user(
user.id,
update.message.chat_id,
username=user.username,
first_name=user.first_name,
last_name= user.last_name
)
db.start_new_dialog(user.id)
if db.get_user_attribute(user.id, "current_dialog_id") is None:
db.start_new_dialog(user.id)
if user.id not in user_semaphores:
user_semaphores[user.id] = asyncio.Semaphore(1)
if db.get_user_attribute(user.id, "current_model") is None:
db.set_user_attribute(user.id, "current_model", config.models["available_text_models"][0])
# back compatibility for n_used_tokens field
n_used_tokens = db.get_user_attribute(user.id, "n_used_tokens")
if isinstance(n_used_tokens, int) or isinstance(n_used_tokens, float): # old format
new_n_used_tokens = {
"gpt-3.5-turbo": {
"n_input_tokens": 0,
"n_output_tokens": n_used_tokens
}
}
db.set_user_attribute(user.id, "n_used_tokens", new_n_used_tokens)
# voice message transcription
if db.get_user_attribute(user.id, "n_transcribed_seconds") is None:
db.set_user_attribute(user.id, "n_transcribed_seconds", 0.0)
# image generation
if db.get_user_attribute(user.id, "n_generated_images") is None:
db.set_user_attribute(user.id, "n_generated_images", 0)
async def is_bot_mentioned(update: Update, context: CallbackContext):
try:
message = update.message
if message.chat.type == "private":
return True
if message.text is not None and ("@" + context.bot.username) in message.text:
return True
if message.reply_to_message is not None:
if message.reply_to_message.from_user.id == context.bot.id:
return True
except:
return True
else:
return False
async def start_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
db.start_new_dialog(user_id)
reply_text = "Hi! I'm <b>ChatGPT</b> bot implemented with OpenAI API 🤖\n\n"
reply_text += HELP_MESSAGE
await update.message.reply_text(reply_text, parse_mode=ParseMode.HTML)
await show_chat_modes_handle(update, context)
async def help_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
await update.message.reply_text(HELP_MESSAGE, parse_mode=ParseMode.HTML)
async def help_group_chat_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
text = HELP_GROUP_CHAT_MESSAGE.format(bot_username="@" + context.bot.username)
await update.message.reply_text(text, parse_mode=ParseMode.HTML)
await update.message.reply_video(config.help_group_chat_video_path)
async def retry_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
if await is_previous_message_not_answered_yet(update, context): return
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
dialog_messages = db.get_dialog_messages(user_id, dialog_id=None)
if len(dialog_messages) == 0:
await update.message.reply_text("No message to retry 🤷‍♂️")
return
last_dialog_message = dialog_messages.pop()
db.set_dialog_messages(user_id, dialog_messages, dialog_id=None) # last message was removed from the context
await message_handle(update, context, message=last_dialog_message["user"], use_new_dialog_timeout=False)
async def _vision_message_handle_fn(
update: Update, context: CallbackContext, use_new_dialog_timeout: bool = True
):
logger.info('_vision_message_handle_fn')
user_id = update.message.from_user.id
current_model = db.get_user_attribute(user_id, "current_model")
if current_model != "gpt-4-vision-preview" and current_model != "gpt-4o":
await update.message.reply_text(
"🥲 Images processing is only available for <b>gpt-4-vision-preview</b> and <b>gpt-4o</b> model. Please change your settings in /settings",
parse_mode=ParseMode.HTML,
)
return
chat_mode = db.get_user_attribute(user_id, "current_chat_mode")
# new dialog timeout
if use_new_dialog_timeout:
if (datetime.now() - db.get_user_attribute(user_id, "last_interaction")).seconds > config.new_dialog_timeout and len(db.get_dialog_messages(user_id)) > 0:
db.start_new_dialog(user_id)
await update.message.reply_text(f"Starting new dialog due to timeout (<b>{config.chat_modes[chat_mode]['name']}</b> mode) ✅", parse_mode=ParseMode.HTML)
db.set_user_attribute(user_id, "last_interaction", datetime.now())
buf = None
if update.message.effective_attachment:
photo = update.message.effective_attachment[-1]
photo_file = await context.bot.get_file(photo.file_id)
# store file in memory, not on disk
buf = io.BytesIO()
await photo_file.download_to_memory(buf)
buf.name = "image.jpg" # file extension is required
buf.seek(0) # move cursor to the beginning of the buffer
# in case of CancelledError
n_input_tokens, n_output_tokens = 0, 0
try:
# send placeholder message to user
placeholder_message = await update.message.reply_text("...")
message = update.message.caption or update.message.text or ''
# send typing action
await update.message.chat.send_action(action="typing")
dialog_messages = db.get_dialog_messages(user_id, dialog_id=None)
parse_mode = {"html": ParseMode.HTML, "markdown": ParseMode.MARKDOWN}[
config.chat_modes[chat_mode]["parse_mode"]
]
chatgpt_instance = openai_utils.ChatGPT(model=current_model)
if config.enable_message_streaming:
gen = chatgpt_instance.send_vision_message_stream(
message,
dialog_messages=dialog_messages,
image_buffer=buf,
chat_mode=chat_mode,
)
else:
(
answer,
(n_input_tokens, n_output_tokens),
n_first_dialog_messages_removed,
) = await chatgpt_instance.send_vision_message(
message,
dialog_messages=dialog_messages,
image_buffer=buf,
chat_mode=chat_mode,
)
async def fake_gen():
yield "finished", answer, (
n_input_tokens,
n_output_tokens,
), n_first_dialog_messages_removed
gen = fake_gen()
prev_answer = ""
async for gen_item in gen:
(
status,
answer,
(n_input_tokens, n_output_tokens),
n_first_dialog_messages_removed,
) = gen_item
answer = answer[:4096] # telegram message limit
# update only when 100 new symbols are ready
if abs(len(answer) - len(prev_answer)) < 100 and status != "finished":
continue
try:
await context.bot.edit_message_text(
answer,
chat_id=placeholder_message.chat_id,
message_id=placeholder_message.message_id,
parse_mode=parse_mode,
)
except telegram.error.BadRequest as e:
if str(e).startswith("Message is not modified"):
continue
else:
await context.bot.edit_message_text(
answer,
chat_id=placeholder_message.chat_id,
message_id=placeholder_message.message_id,
)
await asyncio.sleep(0.01) # wait a bit to avoid flooding
prev_answer = answer
# update user data
if buf is not None:
base_image = base64.b64encode(buf.getvalue()).decode("utf-8")
new_dialog_message = {"user": [
{
"type": "text",
"text": message,
},
{
"type": "image",
"image": base_image,
}
]
, "bot": answer, "date": datetime.now()}
else:
new_dialog_message = {"user": [{"type": "text", "text": message}], "bot": answer, "date": datetime.now()}
db.set_dialog_messages(
user_id,
db.get_dialog_messages(user_id, dialog_id=None) + [new_dialog_message],
dialog_id=None
)
db.update_n_used_tokens(user_id, current_model, n_input_tokens, n_output_tokens)
except asyncio.CancelledError:
# note: intermediate token updates only work when enable_message_streaming=True (config.yml)
db.update_n_used_tokens(user_id, current_model, n_input_tokens, n_output_tokens)
raise
except Exception as e:
error_text = f"Something went wrong during completion. Reason: {e}"
logger.error(error_text)
await update.message.reply_text(error_text)
return
async def unsupport_message_handle(update: Update, context: CallbackContext, message=None):
error_text = f"I don't know how to read files or videos. Send the picture in normal mode (Quick Mode)."
logger.error(error_text)
await update.message.reply_text(error_text)
return
async def message_handle(update: Update, context: CallbackContext, message=None, use_new_dialog_timeout=True):
# check if bot was mentioned (for group chats)
if not await is_bot_mentioned(update, context):
return
# check if message is edited
if update.edited_message is not None:
await edited_message_handle(update, context)
return
_message = message or update.message.text
# remove bot mention (in group chats)
if update.message.chat.type != "private":
_message = _message.replace("@" + context.bot.username, "").strip()
await register_user_if_not_exists(update, context, update.message.from_user)
if await is_previous_message_not_answered_yet(update, context): return
user_id = update.message.from_user.id
chat_mode = db.get_user_attribute(user_id, "current_chat_mode")
if chat_mode == "artist":
await generate_image_handle(update, context, message=message)
return
current_model = db.get_user_attribute(user_id, "current_model")
async def message_handle_fn():
# new dialog timeout
if use_new_dialog_timeout:
if (datetime.now() - db.get_user_attribute(user_id, "last_interaction")).seconds > config.new_dialog_timeout and len(db.get_dialog_messages(user_id)) > 0:
db.start_new_dialog(user_id)
await update.message.reply_text(f"Starting new dialog due to timeout (<b>{config.chat_modes[chat_mode]['name']}</b> mode) ✅", parse_mode=ParseMode.HTML)
db.set_user_attribute(user_id, "last_interaction", datetime.now())
# in case of CancelledError
n_input_tokens, n_output_tokens = 0, 0
try:
# send placeholder message to user
placeholder_message = await update.message.reply_text("...")
# send typing action
await update.message.chat.send_action(action="typing")
if _message is None or len(_message) == 0:
await update.message.reply_text("🥲 You sent <b>empty message</b>. Please, try again!", parse_mode=ParseMode.HTML)
return
dialog_messages = db.get_dialog_messages(user_id, dialog_id=None)
parse_mode = {
"html": ParseMode.HTML,
"markdown": ParseMode.MARKDOWN
}[config.chat_modes[chat_mode]["parse_mode"]]
chatgpt_instance = openai_utils.ChatGPT(model=current_model)
if config.enable_message_streaming:
gen = chatgpt_instance.send_message_stream(_message, dialog_messages=dialog_messages, chat_mode=chat_mode)
else:
answer, (n_input_tokens, n_output_tokens), n_first_dialog_messages_removed = await chatgpt_instance.send_message(
_message,
dialog_messages=dialog_messages,
chat_mode=chat_mode
)
async def fake_gen():
yield "finished", answer, (n_input_tokens, n_output_tokens), n_first_dialog_messages_removed
gen = fake_gen()
prev_answer = ""
async for gen_item in gen:
status, answer, (n_input_tokens, n_output_tokens), n_first_dialog_messages_removed = gen_item
answer = answer[:4096] # telegram message limit
# update only when 100 new symbols are ready
if abs(len(answer) - len(prev_answer)) < 100 and status != "finished":
continue
try:
await context.bot.edit_message_text(answer, chat_id=placeholder_message.chat_id, message_id=placeholder_message.message_id, parse_mode=parse_mode)
except telegram.error.BadRequest as e:
if str(e).startswith("Message is not modified"):
continue
else:
await context.bot.edit_message_text(answer, chat_id=placeholder_message.chat_id, message_id=placeholder_message.message_id)
await asyncio.sleep(0.01) # wait a bit to avoid flooding
prev_answer = answer
# update user data
new_dialog_message = {"user": [{"type": "text", "text": _message}], "bot": answer, "date": datetime.now()}
db.set_dialog_messages(
user_id,
db.get_dialog_messages(user_id, dialog_id=None) + [new_dialog_message],
dialog_id=None
)
db.update_n_used_tokens(user_id, current_model, n_input_tokens, n_output_tokens)
except asyncio.CancelledError:
# note: intermediate token updates only work when enable_message_streaming=True (config.yml)
db.update_n_used_tokens(user_id, current_model, n_input_tokens, n_output_tokens)
raise
except Exception as e:
error_text = f"Something went wrong during completion. Reason: {e}"
logger.error(error_text)
await update.message.reply_text(error_text)
return
# send message if some messages were removed from the context
if n_first_dialog_messages_removed > 0:
if n_first_dialog_messages_removed == 1:
text = "✍️ <i>Note:</i> Your current dialog is too long, so your <b>first message</b> was removed from the context.\n Send /new command to start new dialog"
else:
text = f"✍️ <i>Note:</i> Your current dialog is too long, so <b>{n_first_dialog_messages_removed} first messages</b> were removed from the context.\n Send /new command to start new dialog"
await update.message.reply_text(text, parse_mode=ParseMode.HTML)
async with user_semaphores[user_id]:
if current_model == "gpt-4-vision-preview" or current_model == "gpt-4o" or update.message.photo is not None and len(update.message.photo) > 0:
logger.error(current_model)
# What is this? ^^^
if current_model != "gpt-4o" and current_model != "gpt-4-vision-preview":
current_model = "gpt-4o"
db.set_user_attribute(user_id, "current_model", "gpt-4o")
task = asyncio.create_task(
_vision_message_handle_fn(update, context, use_new_dialog_timeout=use_new_dialog_timeout)
)
else:
task = asyncio.create_task(
message_handle_fn()
)
user_tasks[user_id] = task
try:
await task
except asyncio.CancelledError:
await update.message.reply_text("✅ Canceled", parse_mode=ParseMode.HTML)
else:
pass
finally:
if user_id in user_tasks:
del user_tasks[user_id]
async def is_previous_message_not_answered_yet(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
user_id = update.message.from_user.id
if user_semaphores[user_id].locked():
text = "⏳ Please <b>wait</b> for a reply to the previous message\n"
text += "Or you can /cancel it"
await update.message.reply_text(text, reply_to_message_id=update.message.id, parse_mode=ParseMode.HTML)
return True
else:
return False
async def voice_message_handle(update: Update, context: CallbackContext):
# check if bot was mentioned (for group chats)
if not await is_bot_mentioned(update, context):
return
await register_user_if_not_exists(update, context, update.message.from_user)
if await is_previous_message_not_answered_yet(update, context): return
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
voice = update.message.voice
voice_file = await context.bot.get_file(voice.file_id)
# store file in memory, not on disk
buf = io.BytesIO()
await voice_file.download_to_memory(buf)
buf.name = "voice.oga" # file extension is required
buf.seek(0) # move cursor to the beginning of the buffer
transcribed_text = await openai_utils.transcribe_audio(buf)
text = f"🎤: <i>{transcribed_text}</i>"
await update.message.reply_text(text, parse_mode=ParseMode.HTML)
# update n_transcribed_seconds
db.set_user_attribute(user_id, "n_transcribed_seconds", voice.duration + db.get_user_attribute(user_id, "n_transcribed_seconds"))
await message_handle(update, context, message=transcribed_text)
async def generate_image_handle(update: Update, context: CallbackContext, message=None):
await register_user_if_not_exists(update, context, update.message.from_user)
if await is_previous_message_not_answered_yet(update, context): return
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
await update.message.chat.send_action(action="upload_photo")
message = message or update.message.text
try:
image_urls = await openai_utils.generate_images(message, n_images=config.return_n_generated_images, size=config.image_size)
except openai.error.InvalidRequestError as e:
if str(e).startswith("Your request was rejected as a result of our safety system"):
text = "🥲 Your request <b>doesn't comply</b> with OpenAI's usage policies.\nWhat did you write there, huh?"
await update.message.reply_text(text, parse_mode=ParseMode.HTML)
return
else:
raise
# token usage
db.set_user_attribute(user_id, "n_generated_images", config.return_n_generated_images + db.get_user_attribute(user_id, "n_generated_images"))
for i, image_url in enumerate(image_urls):
await update.message.chat.send_action(action="upload_photo")
await update.message.reply_photo(image_url, parse_mode=ParseMode.HTML)
async def new_dialog_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
if await is_previous_message_not_answered_yet(update, context): return
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
db.set_user_attribute(user_id, "current_model", "gpt-3.5-turbo")
db.start_new_dialog(user_id)
await update.message.reply_text("Starting new dialog ✅")
chat_mode = db.get_user_attribute(user_id, "current_chat_mode")
await update.message.reply_text(f"{config.chat_modes[chat_mode]['welcome_message']}", parse_mode=ParseMode.HTML)
async def cancel_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
if user_id in user_tasks:
task = user_tasks[user_id]
task.cancel()
else:
await update.message.reply_text("<i>Nothing to cancel...</i>", parse_mode=ParseMode.HTML)
def get_chat_mode_menu(page_index: int):
n_chat_modes_per_page = config.n_chat_modes_per_page
text = f"Select <b>chat mode</b> ({len(config.chat_modes)} modes available):"
# buttons
chat_mode_keys = list(config.chat_modes.keys())
page_chat_mode_keys = chat_mode_keys[page_index * n_chat_modes_per_page:(page_index + 1) * n_chat_modes_per_page]
keyboard = []
for chat_mode_key in page_chat_mode_keys:
name = config.chat_modes[chat_mode_key]["name"]
keyboard.append([InlineKeyboardButton(name, callback_data=f"set_chat_mode|{chat_mode_key}")])
# pagination
if len(chat_mode_keys) > n_chat_modes_per_page:
is_first_page = (page_index == 0)
is_last_page = ((page_index + 1) * n_chat_modes_per_page >= len(chat_mode_keys))
if is_first_page:
keyboard.append([
InlineKeyboardButton("»", callback_data=f"show_chat_modes|{page_index + 1}")
])
elif is_last_page:
keyboard.append([
InlineKeyboardButton("«", callback_data=f"show_chat_modes|{page_index - 1}"),
])
else:
keyboard.append([
InlineKeyboardButton("«", callback_data=f"show_chat_modes|{page_index - 1}"),
InlineKeyboardButton("»", callback_data=f"show_chat_modes|{page_index + 1}")
])
reply_markup = InlineKeyboardMarkup(keyboard)
return text, reply_markup
async def show_chat_modes_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
if await is_previous_message_not_answered_yet(update, context): return
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
text, reply_markup = get_chat_mode_menu(0)
await update.message.reply_text(text, reply_markup=reply_markup, parse_mode=ParseMode.HTML)
async def show_chat_modes_callback_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update.callback_query, context, update.callback_query.from_user)
if await is_previous_message_not_answered_yet(update.callback_query, context): return
user_id = update.callback_query.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
query = update.callback_query
await query.answer()
page_index = int(query.data.split("|")[1])
if page_index < 0:
return
text, reply_markup = get_chat_mode_menu(page_index)
try:
await query.edit_message_text(text, reply_markup=reply_markup, parse_mode=ParseMode.HTML)
except telegram.error.BadRequest as e:
if str(e).startswith("Message is not modified"):
pass
async def set_chat_mode_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update.callback_query, context, update.callback_query.from_user)
user_id = update.callback_query.from_user.id
query = update.callback_query
await query.answer()
chat_mode = query.data.split("|")[1]
db.set_user_attribute(user_id, "current_chat_mode", chat_mode)
db.start_new_dialog(user_id)
await context.bot.send_message(
update.callback_query.message.chat.id,
f"{config.chat_modes[chat_mode]['welcome_message']}",
parse_mode=ParseMode.HTML
)
def get_settings_menu(user_id: int):
current_model = db.get_user_attribute(user_id, "current_model")
text = config.models["info"][current_model]["description"]
text += "\n\n"
score_dict = config.models["info"][current_model]["scores"]
for score_key, score_value in score_dict.items():
text += "🟢" * score_value + "⚪️" * (5 - score_value) + f" {score_key}\n\n"
text += "\nSelect <b>model</b>:"
# buttons to choose models
buttons = []
for model_key in config.models["available_text_models"]:
title = config.models["info"][model_key]["name"]
if model_key == current_model:
title = "" + title
buttons.append(
InlineKeyboardButton(title, callback_data=f"set_settings|{model_key}")
)
reply_markup = InlineKeyboardMarkup([buttons])
return text, reply_markup
async def settings_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
if await is_previous_message_not_answered_yet(update, context): return
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
text, reply_markup = get_settings_menu(user_id)
await update.message.reply_text(text, reply_markup=reply_markup, parse_mode=ParseMode.HTML)
async def set_settings_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update.callback_query, context, update.callback_query.from_user)
user_id = update.callback_query.from_user.id
query = update.callback_query
await query.answer()
_, model_key = query.data.split("|")
db.set_user_attribute(user_id, "current_model", model_key)
db.start_new_dialog(user_id)
text, reply_markup = get_settings_menu(user_id)
try:
await query.edit_message_text(text, reply_markup=reply_markup, parse_mode=ParseMode.HTML)
except telegram.error.BadRequest as e:
if str(e).startswith("Message is not modified"):
pass
async def show_balance_handle(update: Update, context: CallbackContext):
await register_user_if_not_exists(update, context, update.message.from_user)
user_id = update.message.from_user.id
db.set_user_attribute(user_id, "last_interaction", datetime.now())
# count total usage statistics
total_n_spent_dollars = 0
total_n_used_tokens = 0
n_used_tokens_dict = db.get_user_attribute(user_id, "n_used_tokens")
n_generated_images = db.get_user_attribute(user_id, "n_generated_images")
n_transcribed_seconds = db.get_user_attribute(user_id, "n_transcribed_seconds")
details_text = "🏷️ Details:\n"
for model_key in sorted(n_used_tokens_dict.keys()):
n_input_tokens, n_output_tokens = n_used_tokens_dict[model_key]["n_input_tokens"], n_used_tokens_dict[model_key]["n_output_tokens"]
total_n_used_tokens += n_input_tokens + n_output_tokens
n_input_spent_dollars = config.models["info"][model_key]["price_per_1000_input_tokens"] * (n_input_tokens / 1000)
n_output_spent_dollars = config.models["info"][model_key]["price_per_1000_output_tokens"] * (n_output_tokens / 1000)
total_n_spent_dollars += n_input_spent_dollars + n_output_spent_dollars
details_text += f"- {model_key}: <b>{n_input_spent_dollars + n_output_spent_dollars:.03f}$</b> / <b>{n_input_tokens + n_output_tokens} tokens</b>\n"
# image generation
image_generation_n_spent_dollars = config.models["info"]["dalle-2"]["price_per_1_image"] * n_generated_images
if n_generated_images != 0:
details_text += f"- DALL·E 2 (image generation): <b>{image_generation_n_spent_dollars:.03f}$</b> / <b>{n_generated_images} generated images</b>\n"
total_n_spent_dollars += image_generation_n_spent_dollars
# voice recognition
voice_recognition_n_spent_dollars = config.models["info"]["whisper"]["price_per_1_min"] * (n_transcribed_seconds / 60)
if n_transcribed_seconds != 0:
details_text += f"- Whisper (voice recognition): <b>{voice_recognition_n_spent_dollars:.03f}$</b> / <b>{n_transcribed_seconds:.01f} seconds</b>\n"
total_n_spent_dollars += voice_recognition_n_spent_dollars
text = f"You spent <b>{total_n_spent_dollars:.03f}$</b>\n"
text += f"You used <b>{total_n_used_tokens}</b> tokens\n\n"
text += details_text
await update.message.reply_text(text, parse_mode=ParseMode.HTML)
async def edited_message_handle(update: Update, context: CallbackContext):
if update.edited_message.chat.type == "private":
text = "🥲 Unfortunately, message <b>editing</b> is not supported"
await update.edited_message.reply_text(text, parse_mode=ParseMode.HTML)
async def error_handle(update: Update, context: CallbackContext) -> None:
logger.error(msg="Exception while handling an update:", exc_info=context.error)
try:
# collect error message
tb_list = traceback.format_exception(None, context.error, context.error.__traceback__)
tb_string = "".join(tb_list)
update_str = update.to_dict() if isinstance(update, Update) else str(update)
message = (
f"An exception was raised while handling an update\n"
f"<pre>update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}"
"</pre>\n\n"
f"<pre>{html.escape(tb_string)}</pre>"
)
# split text into multiple messages due to 4096 character limit
for message_chunk in split_text_into_chunks(message, 4096):
try:
await context.bot.send_message(update.effective_chat.id, message_chunk, parse_mode=ParseMode.HTML)
except telegram.error.BadRequest:
# answer has invalid characters, so we send it without parse_mode
await context.bot.send_message(update.effective_chat.id, message_chunk)
except:
await context.bot.send_message(update.effective_chat.id, "Some error in error handler")
async def post_init(application: Application):
await application.bot.set_my_commands([
BotCommand("/new", "Start new dialog"),
BotCommand("/mode", "Select chat mode"),
BotCommand("/retry", "Re-generate response for previous query"),
BotCommand("/balance", "Show balance"),
BotCommand("/settings", "Show settings"),
BotCommand("/help", "Show help message"),
])
def run_bot() -> None:
application = (
ApplicationBuilder()
.token(config.telegram_token)
.concurrent_updates(True)
.rate_limiter(AIORateLimiter(max_retries=5))
.http_version("1.1")
.get_updates_http_version("1.1")
.post_init(post_init)
.build()
)
# add handlers
user_filter = filters.ALL
if len(config.allowed_telegram_usernames) > 0:
usernames = [x for x in config.allowed_telegram_usernames if isinstance(x, str)]
any_ids = [x for x in config.allowed_telegram_usernames if isinstance(x, int)]
user_ids = [x for x in any_ids if x > 0]
group_ids = [x for x in any_ids if x < 0]
user_filter = filters.User(username=usernames) | filters.User(user_id=user_ids) | filters.Chat(chat_id=group_ids)
application.add_handler(CommandHandler("start", start_handle, filters=user_filter))
application.add_handler(CommandHandler("help", help_handle, filters=user_filter))
application.add_handler(CommandHandler("help_group_chat", help_group_chat_handle, filters=user_filter))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND & user_filter, message_handle))
application.add_handler(MessageHandler(filters.PHOTO & ~filters.COMMAND & user_filter, message_handle))
application.add_handler(MessageHandler(filters.VIDEO & ~filters.COMMAND & user_filter, unsupport_message_handle))
application.add_handler(MessageHandler(filters.Document.ALL & ~filters.COMMAND & user_filter, unsupport_message_handle))
application.add_handler(CommandHandler("retry", retry_handle, filters=user_filter))
application.add_handler(CommandHandler("new", new_dialog_handle, filters=user_filter))
application.add_handler(CommandHandler("cancel", cancel_handle, filters=user_filter))
application.add_handler(MessageHandler(filters.VOICE & user_filter, voice_message_handle))
application.add_handler(CommandHandler("mode", show_chat_modes_handle, filters=user_filter))
application.add_handler(CallbackQueryHandler(show_chat_modes_callback_handle, pattern="^show_chat_modes"))
application.add_handler(CallbackQueryHandler(set_chat_mode_handle, pattern="^set_chat_mode"))
application.add_handler(CommandHandler("settings", settings_handle, filters=user_filter))
application.add_handler(CallbackQueryHandler(set_settings_handle, pattern="^set_settings"))
application.add_handler(CommandHandler("balance", show_balance_handle, filters=user_filter))
application.add_error_handler(error_handle)
# start the bot
application.run_polling()
if __name__ == "__main__":
run_bot()