162 lines
6.4 KiB
Python
162 lines
6.4 KiB
Python
import logging
|
||
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from typing import Optional
|
||
|
||
from pymumble_py3.users import User, Users
|
||
|
||
from src.model import MumbleBot
|
||
from src.service.ChannelService import ChannelService
|
||
|
||
_logger = logging.getLogger(__name__)
|
||
|
||
|
||
class MumbleBotService:
|
||
bot_data: MumbleBot
|
||
channel_service: ChannelService
|
||
|
||
def __init__(self, bot_data: MumbleBot):
|
||
self.bot_data = bot_data
|
||
self.channel_service = ChannelService(bot_data)
|
||
|
||
def message_received(self, message_info):
|
||
actor_id = message_info.actor
|
||
if actor_id == 0:
|
||
return
|
||
session = self.get_session_by_actor_id(actor_id)
|
||
username = self.get_username_by_session(session)
|
||
is_admin = self.is_admin(username)
|
||
message = message_info.message.strip()
|
||
|
||
_logger.info(f"{username} {('', '(админ бота)')[is_admin]} прислал сообщение: {message}")
|
||
|
||
match message:
|
||
case '!help':
|
||
message = ' '
|
||
for x in self.bot_data.config.config['Command user']:
|
||
message = message + '<p>' + self.bot_data.config.config['Command user'][x] + '</p>'
|
||
if is_admin:
|
||
for x in self.bot_data.config.config['Command admin']:
|
||
message = message + '<p>' + self.bot_data.config.config['Command admin'][x] + '</p>'
|
||
self.get_user_by_session(session).send_text_message(message)
|
||
case '!users':
|
||
if is_admin:
|
||
message = '<p>Список пользователей:</p>'
|
||
users = self.bot_data.get_userlist()
|
||
for user in users:
|
||
message = message + '<p>' + user['name'] + '</p>'
|
||
self.get_user_by_session(session).send_text_message(message)
|
||
case '!initchannels':
|
||
if is_admin:
|
||
self.channel_service.init_channels()
|
||
message = 'Каналы обновлены'
|
||
self.get_user_by_session(session).send_text_message(message)
|
||
case 'test':
|
||
self.test()
|
||
case _:
|
||
self.get_user_by_session(session).send_text_message(message)
|
||
|
||
# TODO: одинаковые каналы (коллизия сообщений)
|
||
def user_change_channel(self, user: User, action):
|
||
_logger.info(f"{user['name']} перешёл в канал: {self.bot_data.bot.channels[action['channel_id']]['name']}")
|
||
message = self.channel_service.get_message_by_channel_id(action['channel_id'])
|
||
if len(message) > 0:
|
||
self.get_user_by_session(user['session']).send_text_message(message)
|
||
|
||
def user_connect_server(self, action):
|
||
user = self.get_user_by_session(action['session'])
|
||
|
||
if user['name'] == self.bot_data.bot.user:
|
||
return
|
||
|
||
_logger.info(f"{user['name']} зашёл на сервер")
|
||
|
||
message = self.channel_service.get_welcome_message()
|
||
if len(message) > 0:
|
||
self.get_user_by_session(action['session']).send_text_message(message)
|
||
_logger.debug(f"{user['name']} ")
|
||
|
||
def get_session_by_actor_id(self, actor_id: int) -> Optional[int]:
|
||
user = self.get_user_by_actor_id(actor_id)
|
||
if user is not None:
|
||
return user['session']
|
||
else:
|
||
return None
|
||
|
||
def get_user_by_actor_id(self, actor_id: int) -> Optional[User]:
|
||
try:
|
||
users = self.bot_data.bot.users
|
||
for user in users.values():
|
||
if user['session'] == actor_id:
|
||
return user
|
||
except KeyError:
|
||
_logger.warning(f"Не удалось найти пользователя с actorId: {actor_id}")
|
||
return None
|
||
|
||
def get_username_by_session(self, session: int) -> Optional[str]:
|
||
try:
|
||
return self.get_user_by_session(session)['name']
|
||
except KeyError:
|
||
_logger.warning(f"Не удалось найти пользователя с session: {session}")
|
||
return None
|
||
|
||
def get_user_by_id(self, user_id: int) -> Optional[User]:
|
||
users = self.bot_data.bot.users
|
||
for user in users.values():
|
||
try:
|
||
if user['user_id'] == user_id:
|
||
return user
|
||
except KeyError:
|
||
continue
|
||
_logger.warning(f"Не удалось найти пользователя с id: {user_id}")
|
||
return None
|
||
|
||
def get_user_by_session(self, session: int) -> Optional[User]:
|
||
users = self.bot_data.bot.users
|
||
for user in users.values():
|
||
try:
|
||
if user['session'] == session:
|
||
return user
|
||
except KeyError:
|
||
continue
|
||
_logger.warning(f"Не удалось найти пользователя с session: {session}")
|
||
return None
|
||
|
||
def get_user_by_name(self, user_name: str) -> Optional[User]:
|
||
users = self.bot_data.bot.users
|
||
for user in users:
|
||
if user['name'] == user_name:
|
||
return user
|
||
_logger.warning(f"Не удалось найти пользователя с username: {user_name}")
|
||
return None
|
||
|
||
def get_userlist(self) -> Users:
|
||
return self.bot_data.bot.users.values()
|
||
|
||
# # TODO
|
||
# # обработчик колбэка подключения пользователя к серверу
|
||
# @staticmethod
|
||
# def user_connect_server(user):
|
||
# if kostyl1 == 1:
|
||
# # дополняем список всех пользователей сервера
|
||
# if mumble.users[user['session']]['name'] in userlist:
|
||
# welcome_message(user)
|
||
# check_offline_messages(user)
|
||
# else:
|
||
# first_welcome_message(user)
|
||
# userlist.add_section(mumble.users[user['session']]['name'])
|
||
# userlist.set(mumble.users[user['session']]['name'], mumble.users[user['session']]['name'], '')
|
||
# with open(config['Bot']['userlist'], "w") as config_file:
|
||
# userlist.write(config_file)
|
||
|
||
def is_admin(self, user) -> bool:
|
||
list_admin = self.bot_data.config.adminList
|
||
if user in list_admin:
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
def test(self):
|
||
self.channel_service.get_tree()
|
||
|