summaryrefslogtreecommitdiff
path: root/ircbot/main.py
diff options
context:
space:
mode:
Diffstat (limited to 'ircbot/main.py')
-rw-r--r--ircbot/main.py409
1 files changed, 409 insertions, 0 deletions
diff --git a/ircbot/main.py b/ircbot/main.py
new file mode 100644
index 0000000..05aac2b
--- /dev/null
+++ b/ircbot/main.py
@@ -0,0 +1,409 @@
+import socket
+import time
+import random
+import subprocess
+import json
+import os
+import datetime
+import urllib.parse
+
+SERVER = "raye.mistivia.com"
+PORT = 6667
+NICKNAME = "android"
+IDENT = NICKNAME
+REALNAME = "bot"
+CHANNELS = ["#main", "#ezl9fd7fa13c4bad4f4"]
+LOGPATH = '/volume/webroot/irclog'
+
+BUFFER_SIZE = 2048
+
+commands = {}
+def command(name):
+ def decorator(func):
+ commands[name] = func
+ return func
+ return decorator
+
+# ================================================
+
+@command("help")
+def help_cmd(chan, sender, args):
+ return """命令列表:
+ 1. 询问大模型:!gemini 问题
+ 2. 丢骰子: !dice [骰数]d[面数]
+ 3. 随机选择:!choice 选项1 选项2 ... 选项n
+ 4. 复读机:!say 复读内容
+ 5. AI词典:!dict 单词
+ 6. 查看聊天记录:!log
+ """
+def today_log(chan):
+ now = datetime.datetime.now()
+ year = now.strftime("%Y")
+ month = now.strftime("%m")
+ day = now.strftime("%d")
+ url = (f"https://raye.mistivia.com/irclog/view.html#{chan}/"
+ f"{year}/{month}-{day}")
+ return url
+
+def yesterday_log(chan):
+ now = datetime.datetime.now() - datetime.timedelta(days=1)
+ year = now.strftime("%Y")
+ month = now.strftime("%m")
+ day = now.strftime("%d")
+ url = (f"https://raye.mistivia.com/irclog/view.html#{chan}/"
+ f"{year}/{month}-{day}")
+ return url
+
+@command("log")
+def log_command(chan, sender, args):
+ log1 = today_log(chan)
+ log0 = yesterday_log(chan)
+ return "今天: " + log1 + '\n' + "昨天: " + log0
+
+@command("join")
+def join_command(chan, sender, args):
+ if sender == NICKNAME:
+ return ''
+ return "Dōmo, " + sender + ' san.'
+
+@command("dict")
+def dict_command(chan, sender, args):
+ query = " ".join(args)
+ prompt = """你是一个多语言到汉语的词典,你将针对用户输入的单词,给出中文的释义和原始语言的例句。回复要简单清晰简短。格式为:“意思:xxx;例句:xxx”。如果单词有多个意思,每个意思一行。
+
+用户: """ + query + '\n\n词典: '
+ command = ['/usr/local/bin/gemini']
+ process = subprocess.Popen(
+ command,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True
+ )
+ stdout, stderr = process.communicate(input=prompt)
+ return stdout
+
+@command("gemini")
+def gamini_command(chan, sender, args):
+ query = " ".join(args)
+ prompt = """你是一个人工智能助手,你将针对用户的问题或者指令给出明确、简短、简洁、并且尽可能好的回答。你的回答对用户非常重要。
+
+用户: """ + query + '\n\n助手: '
+ command = ['/usr/local/bin/gemini']
+ process = subprocess.Popen(
+ command,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True
+ )
+ stdout, stderr = process.communicate(input=prompt)
+ return stdout
+
+@command("hello")
+def hello_command(chan, sender, args):
+ return f"Hello {sender}!"
+
+@command("say")
+def say_command(chan, sender, args):
+ if args:
+ return " ".join(args)
+ else:
+ return "What do you want me to say?"
+
+@command("choice")
+def choice_command(chan, sender, args):
+ if not args:
+ return "请提供至少一个选项供我选择。"
+ selected_item = random.choice(args)
+
+ return f"{selected_item}"
+
+@command("dice")
+def dice_command(chan, sender, args):
+ if not args:
+ return "请指定要掷的骰子。用法: dice [数量]d[面数]"
+
+ try:
+ if args[0][0] == 'd':
+ num_dice = 1
+ num_sides = int(args[0][1:])
+ else:
+ num_dice_str, num_sides_str = args[0].split('d')
+ num_dice = int(num_dice_str)
+ num_sides = int(num_sides_str)
+ except ValueError:
+ return "无效的骰子格式。请使用 [数量]d[面数] 的格式。"
+
+ if num_dice <= 0 or num_sides <= 1:
+ return "骰子数量必须大于0,面数必须大于1。"
+
+ results = []
+ for _ in range(num_dice):
+ results.append(random.randint(1, num_sides))
+
+ total = sum(results)
+ return f"{total}"
+
+@command("roll")
+def roll_command(chan, sender, args):
+ return dice_command(sender, args)
+
+# ========================================================================
+
+def write_log(channel, nick, msg):
+ if msg.startswith('!log'):
+ return
+ if msg.startswith('!log'):
+ return
+ if msg.startswith('今天: https://raye.mistivia.com/irclog/') and nick == NICKNAME:
+ return
+ if msg.startswith('昨天: https://raye.mistivia.com/irclog/') and nick == NICKNAME:
+ return
+ now = datetime.datetime.now()
+ base_dir = LOGPATH
+ year = now.strftime("%Y")
+ filename = now.strftime("%m-%d.txt")
+
+ log_dir = os.path.join(base_dir, channel, year)
+ file_path = os.path.join(log_dir, filename)
+
+ try:
+ os.makedirs(log_dir, exist_ok=True)
+ except OSError:
+ return
+
+ time_str = now.strftime("%H:%M:%S")
+ log_line = f"[{time_str}] <{nick}>: {msg}\n"
+
+ try:
+ with open(file_path, 'a', encoding='utf-8') as f:
+ f.write(log_line)
+ except IOError:
+ pass
+
+def save_topics():
+ with open("topics", "w") as fp:
+ json.dump(TOPICS, fp)
+
+def load_topics():
+ try:
+ with open("topics", "r") as fp:
+ return json.load(fp)
+ except:
+ return dict()
+
+TOPICS = load_topics()
+
+class IRCBot:
+ def __init__(self, server, port, nickname, ident, realname):
+ self.server = server
+ self.port = port
+ self.nickname = nickname
+ self.ident = ident
+ self.realname = realname
+ self.socket = None
+ self.running = True
+
+ def send_raw(self, msg):
+ try:
+ self.socket.send(f"{msg}\r\n".encode("utf-8"))
+ print(f">>> {msg}")
+ except Exception as e:
+ print(f"Error sending message: {e}")
+ self.running = False
+
+ def join_channel(self, channel):
+ self.send_raw(f"JOIN {channel}")
+ self.send_raw(f"TOPIC {channel}")
+
+ def send_message(self, target, msg):
+ if msg == '':
+ return
+ msg = msg.replace('\r', '')
+ lines = msg.split('\n')
+ for line in lines:
+ self.send_line(target, line)
+
+ def send_line(self, target, msg):
+ max_length = 100
+ message_length = len(msg)
+ if message_length > max_length:
+ num_batches = (message_length + max_length - 1) // max_length
+
+ for i in range(num_batches):
+ start_index = i * max_length
+ end_index = min((i + 1) * max_length, message_length)
+ chunk = msg[start_index:end_index]
+ self.send_raw(f"PRIVMSG {target} :{chunk}")
+ write_log(target, NICKNAME, chunk)
+ else:
+ self.send_raw(f"PRIVMSG {target} :{msg}")
+ write_log(target, NICKNAME, msg)
+
+ def connect(self):
+ print(f"Connecting to {self.server}:{self.port}...")
+ try:
+ self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.socket.settimeout(300)
+ self.socket.connect((self.server, self.port))
+ print("Connection successful.")
+
+ self.send_raw(f"NICK {self.nickname}")
+ self.send_raw(f"USER {self.ident} 0 * :{self.realname}")
+
+ except socket.error as e:
+ print(f"Connection error: {e}")
+ self.running = False
+ return False
+ return True
+
+ def handle_command(self, sender, target, command, args):
+ if command in commands:
+ response = ""
+ try:
+ response = commands[command](target, sender, args)
+ response = response.strip()
+ except Exception as e:
+ response = f"Error: {e}"
+ response = response.strip()
+
+ self.send_message(target, response)
+
+ def parse_message(self, data):
+ for line in data.split('\r\n'):
+ if not line:
+ continue
+
+ print(f"<<< {line}")
+
+ parts = line.split(' ', 2)
+
+ if parts[0] == "PING":
+ self.send_raw(f"PONG {parts[1]}")
+ continue
+
+ prefix = ""
+ if parts[0].startswith(':'):
+ prefix = parts.pop(0)[1:]
+
+ if not parts:
+ continue
+
+ command = parts[0]
+
+ params = []
+ trailing = ""
+ if len(parts) > 1:
+ rest = parts[1]
+ if ':' in rest:
+ index = rest.find(':')
+ params.extend(rest[:index].split())
+ trailing = rest[index+1:]
+ else:
+ params.extend(rest.split())
+
+ self.handle_irc_event(prefix, command, params, trailing)
+
+ def handle_irc_event(self, prefix, command, params, trailing):
+ sender_nick = prefix.split('!', 1)[0] if '!' in prefix else prefix
+
+ if command == "376" or command == "422":
+ print("Registered successfully. Joining channel...")
+ for channel in CHANNELS:
+ self.join_channel(channel)
+ elif command == "331":
+ channel = params[1]
+ if channel in TOPICS:
+ print("No topic, setting...")
+ self.send_raw(f"TOPIC {channel} :{TOPICS[channel]}")
+ elif command == "TOPIC" or command == "332":
+ if command == "332":
+ channel = params[1]
+ else:
+ channel = params[0]
+ TOPICS[channel] = trailing
+ save_topics()
+ elif command == "PRIVMSG":
+ if sender_nick == NICKNAME:
+ return
+ target = params[0]
+ message = trailing
+
+ print(f"[{target}] <{sender_nick}>: {message}")
+ write_log(target, sender_nick, message)
+
+ if message.startswith("!") or message.startswith("!"):
+ try:
+ cmd_parts = message[1:].split()
+ cmd = cmd_parts[0].lower()
+ args = cmd_parts[1:]
+
+ reply_target = target if target.startswith('#') else sender_nick
+
+ self.handle_command(sender_nick, reply_target, cmd, args)
+ except IndexError:
+ pass
+
+ elif command == "JOIN":
+ args = params
+ if len(params) >= 1:
+ reply_target = params[0]
+ else:
+ reply_target = trailing
+ self.handle_command(sender_nick, reply_target, 'join', [])
+
+ def run(self):
+ if not self.connect():
+ return
+
+ buffer = ""
+
+ while self.running:
+ try:
+ data = self.socket.recv(BUFFER_SIZE).decode("utf-8", errors="ignore")
+
+ if not data:
+ print("Connection lost (server closed socket).")
+ self.running = False
+ break
+ buffer += data
+ if '\r\n' in buffer:
+ messages, buffer = buffer.rsplit('\r\n', 1)
+ self.parse_message(messages)
+
+ except socket.timeout:
+ continue
+ except socket.error as e:
+ print(f"Socket error occurred: {e}")
+ self.running = False
+ break
+ except KeyboardInterrupt:
+ raise
+ except Exception as e:
+ print(f"An unexpected error occurred: {e}")
+ time.sleep(1)
+
+ self.cleanup()
+
+ def cleanup(self):
+ if self.socket:
+ print("Closing socket connection.")
+ try:
+ self.send_raw("QUIT :Bot shutting down")
+ except Exception:
+ pass
+ finally:
+ self.socket.close()
+
+if __name__ == "__main__":
+ while True:
+ try:
+ bot = IRCBot(SERVER, PORT, NICKNAME, IDENT, REALNAME)
+ bot.run()
+ except KeyboardInterrupt:
+ print("Bot stopped by user.")
+ break
+ except Exception as e:
+ print(f"An unexpected error occurred: {e}")
+ time.sleep(1)