import socket import time import random import subprocess import json import os import datetime import urllib.parse SERVER = "raye.mistivia.com" PORT = 6667 NICKNAME = "android" IDENT = NICKNAME REALNAME = "bot" CHANNELS = ["#main", "#ezl9fd7fa13c4bad4f4"] LOGPATH = '/volume/webroot/irclog' BUFFER_SIZE = 2048 commands = {} def command(name): def decorator(func): commands[name] = func return func return decorator # ================================================ @command("help") def help_cmd(chan, sender, args): return """命令列表: 1. 询问大模型:!gemini 问题 2. 丢骰子: !dice [骰数]d[面数] 3. 随机选择:!choice 选项1 选项2 ... 选项n 4. 复读机:!say 复读内容 5. AI词典:!dict 单词 6. 查看聊天记录:!log """ def today_log(chan): now = datetime.datetime.now() year = now.strftime("%Y") month = now.strftime("%m") day = now.strftime("%d") url = (f"https://raye.mistivia.com/irclog/view.html#{chan}/" f"{year}/{month}-{day}") return url def yesterday_log(chan): now = datetime.datetime.now() - datetime.timedelta(days=1) year = now.strftime("%Y") month = now.strftime("%m") day = now.strftime("%d") url = (f"https://raye.mistivia.com/irclog/view.html#{chan}/" f"{year}/{month}-{day}") return url @command("log") def log_command(chan, sender, args): log1 = today_log(chan) log0 = yesterday_log(chan) return "今天: " + log1 + '\n' + "昨天: " + log0 @command("join") def join_command(chan, sender, args): if sender == NICKNAME: return '' return "Dōmo, " + sender + ' san.' @command("dict") def dict_command(chan, sender, args): query = " ".join(args) prompt = """你是一个多语言到汉语的词典,你将针对用户输入的单词,给出中文的释义和原始语言的例句。回复要简单清晰简短。格式为:“意思:xxx;例句:xxx”。如果单词有多个意思,每个意思一行。 用户: """ + query + '\n\n词典: ' command = ['/usr/local/bin/gemini'] process = subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) stdout, stderr = process.communicate(input=prompt) return stdout @command("gemini") def gamini_command(chan, sender, args): query = " ".join(args) prompt = """你是一个人工智能助手,你将针对用户的问题或者指令给出明确、简短、简洁、并且尽可能好的回答。你的回答对用户非常重要。 用户: """ + query + '\n\n助手: ' command = ['/usr/local/bin/gemini'] process = subprocess.Popen( command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) stdout, stderr = process.communicate(input=prompt) return stdout @command("hello") def hello_command(chan, sender, args): return f"Hello {sender}!" @command("say") def say_command(chan, sender, args): if args: return " ".join(args) else: return "What do you want me to say?" @command("choice") def choice_command(chan, sender, args): if not args: return "请提供至少一个选项供我选择。" selected_item = random.choice(args) return f"{selected_item}" @command("dice") def dice_command(chan, sender, args): if not args: return "请指定要掷的骰子。用法: dice [数量]d[面数]" try: if args[0][0] == 'd': num_dice = 1 num_sides = int(args[0][1:]) else: num_dice_str, num_sides_str = args[0].split('d') num_dice = int(num_dice_str) num_sides = int(num_sides_str) except ValueError: return "无效的骰子格式。请使用 [数量]d[面数] 的格式。" if num_dice <= 0 or num_sides <= 1: return "骰子数量必须大于0,面数必须大于1。" results = [] for _ in range(num_dice): results.append(random.randint(1, num_sides)) total = sum(results) return f"{total}" @command("roll") def roll_command(chan, sender, args): return dice_command(sender, args) # ======================================================================== def write_log(channel, nick, msg): if msg.startswith('!log'): return if msg.startswith('!log'): return if msg.startswith('今天: https://raye.mistivia.com/irclog/') and nick == NICKNAME: return if msg.startswith('昨天: https://raye.mistivia.com/irclog/') and nick == NICKNAME: return now = datetime.datetime.now() base_dir = LOGPATH year = now.strftime("%Y") filename = now.strftime("%m-%d.txt") log_dir = os.path.join(base_dir, channel, year) file_path = os.path.join(log_dir, filename) try: os.makedirs(log_dir, exist_ok=True) except OSError: return time_str = now.strftime("%H:%M:%S") log_line = f"[{time_str}] <{nick}>: {msg}\n" try: with open(file_path, 'a', encoding='utf-8') as f: f.write(log_line) except IOError: pass def save_topics(): with open("topics", "w") as fp: json.dump(TOPICS, fp) def load_topics(): try: with open("topics", "r") as fp: return json.load(fp) except: return dict() TOPICS = load_topics() class IRCBot: def __init__(self, server, port, nickname, ident, realname): self.server = server self.port = port self.nickname = nickname self.ident = ident self.realname = realname self.socket = None self.running = True def send_raw(self, msg): try: self.socket.send(f"{msg}\r\n".encode("utf-8")) print(f">>> {msg}") except Exception as e: print(f"Error sending message: {e}") self.running = False def join_channel(self, channel): self.send_raw(f"JOIN {channel}") self.send_raw(f"TOPIC {channel}") def send_message(self, target, msg): if msg == '': return msg = msg.replace('\r', '') lines = msg.split('\n') for line in lines: self.send_line(target, line) def send_line(self, target, msg): max_length = 100 message_length = len(msg) if message_length > max_length: num_batches = (message_length + max_length - 1) // max_length for i in range(num_batches): start_index = i * max_length end_index = min((i + 1) * max_length, message_length) chunk = msg[start_index:end_index] self.send_raw(f"PRIVMSG {target} :{chunk}") write_log(target, NICKNAME, chunk) else: self.send_raw(f"PRIVMSG {target} :{msg}") write_log(target, NICKNAME, msg) def connect(self): print(f"Connecting to {self.server}:{self.port}...") try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.settimeout(300) self.socket.connect((self.server, self.port)) print("Connection successful.") self.send_raw(f"NICK {self.nickname}") self.send_raw(f"USER {self.ident} 0 * :{self.realname}") except socket.error as e: print(f"Connection error: {e}") self.running = False return False return True def handle_command(self, sender, target, command, args): if command in commands: response = "" try: response = commands[command](target, sender, args) response = response.strip() except Exception as e: response = f"Error: {e}" response = response.strip() self.send_message(target, response) def parse_message(self, data): for line in data.split('\r\n'): if not line: continue print(f"<<< {line}") parts = line.split(' ', 2) if parts[0] == "PING": self.send_raw(f"PONG {parts[1]}") continue prefix = "" if parts[0].startswith(':'): prefix = parts.pop(0)[1:] if not parts: continue command = parts[0] params = [] trailing = "" if len(parts) > 1: rest = parts[1] if ':' in rest: index = rest.find(':') params.extend(rest[:index].split()) trailing = rest[index+1:] else: params.extend(rest.split()) self.handle_irc_event(prefix, command, params, trailing) def handle_irc_event(self, prefix, command, params, trailing): sender_nick = prefix.split('!', 1)[0] if '!' in prefix else prefix if command == "376" or command == "422": print("Registered successfully. Joining channel...") for channel in CHANNELS: self.join_channel(channel) elif command == "331": channel = params[1] if channel in TOPICS: print("No topic, setting...") self.send_raw(f"TOPIC {channel} :{TOPICS[channel]}") elif command == "TOPIC" or command == "332": if command == "332": channel = params[1] else: channel = params[0] TOPICS[channel] = trailing save_topics() elif command == "PRIVMSG": if sender_nick == NICKNAME: return target = params[0] message = trailing print(f"[{target}] <{sender_nick}>: {message}") write_log(target, sender_nick, message) if message.startswith("!") or message.startswith("!"): try: cmd_parts = message[1:].split() cmd = cmd_parts[0].lower() args = cmd_parts[1:] reply_target = target if target.startswith('#') else sender_nick self.handle_command(sender_nick, reply_target, cmd, args) except IndexError: pass elif command == "JOIN": args = params if len(params) >= 1: reply_target = params[0] else: reply_target = trailing self.handle_command(sender_nick, reply_target, 'join', []) def run(self): if not self.connect(): return buffer = "" while self.running: try: data = self.socket.recv(BUFFER_SIZE).decode("utf-8", errors="ignore") if not data: print("Connection lost (server closed socket).") self.running = False break buffer += data if '\r\n' in buffer: messages, buffer = buffer.rsplit('\r\n', 1) self.parse_message(messages) except socket.timeout: continue except socket.error as e: print(f"Socket error occurred: {e}") self.running = False break except KeyboardInterrupt: raise except Exception as e: print(f"An unexpected error occurred: {e}") time.sleep(1) self.cleanup() def cleanup(self): if self.socket: print("Closing socket connection.") try: self.send_raw("QUIT :Bot shutting down") except Exception: pass finally: self.socket.close() if __name__ == "__main__": while True: try: bot = IRCBot(SERVER, PORT, NICKNAME, IDENT, REALNAME) bot.run() except KeyboardInterrupt: print("Bot stopped by user.") break except Exception as e: print(f"An unexpected error occurred: {e}") time.sleep(1)