import socket
import time
import random
import subprocess
import json
import os
import datetime
import urllib.parse
import hashlib
import re
import requests
import html
connect_time = time.time()
config = None
with open('./config.json', 'r', encoding='utf-8') as f:
config = json.load(f)
SERVER = config['server']
PORT = config['port']
NICKNAME = config['nickname']
REALNAME = config['realname']
CHANNELS = config['channels']
GREETINGS = config['greetings']
LOGPATH = config['logpath']
RELAYBOTS = config['relaybots']
IDENT = NICKNAME
BUFFER_SIZE = 2048
commands = {}
def command(name):
def decorator(func):
commands[name] = func
return func
return decorator
@command("help")
def help_cmd(chan, sender, args):
return """命令列表:
1. 询问大模型:!gemini 问题
2. 具有联网搜索的大模型:!google 问题
3. 丢骰子: !dice [骰数]d[面数]
4. 随机选择:!choice 选项1 选项2 ... 选项n
5. 复读机:!say 复读内容
6. AI词典: !dict 单词
7. 查看聊天记录:!log
8. 离线留言: !memo 收信人 消息
9. 举办电话会议: !meet
"""
def load_memos():
try:
with open("memos", "r") as fp:
return json.load(fp)
except:
return dict()
MEMOS = load_memos()
def save_memos():
with open("memos", "w") as fp:
json.dump(MEMOS, fp)
@command("memo")
def memo_cmd(chan, sender, args):
if chan not in MEMOS:
MEMOS[chan] = dict()
if len(args) < 2:
return '用法: !memo 收信人 消息'
recver = args[0]
msg = ' '.join(args[1:])
if recver not in MEMOS[chan]:
MEMOS[chan][recver] = []
MEMOS[chan][recver].append('(来自' + sender + '的留言) ' + recver + ': ' + msg)
save_memos()
return '已留言'
meetlink = dict()
meettime = dict()
@command("meet")
def meet_cmd(chan, sender, args):
if chan in meetlink and chan in meettime:
if time.time() - meettime[chan] < 120:
return meetlink[chan]
data_to_hash = SERVER + chan + '#' + str(time.time())
data_bytes = data_to_hash.encode('utf-8')
hash_object = hashlib.sha256()
hash_object.update(data_bytes)
hex_digest = hash_object.hexdigest()
link = '加入会议: https://meet.jit.si/' + hex_digest[:24] + '#config.startWithVideoMuted=true'
meetlink[chan] = link
meettime[chan] = time.time()
return link
@command("log")
def log_command(chan, sender, args):
return f"https://raye.mistivia.com/irclog/view/?chan={chan[1:]}"
def load_lastseen():
return dict()
# try:
# with open("lastseen", "r") as fp:
# return json.load(fp)
# except:
# return dict()
LAST_SEEN = load_lastseen()
def save_lastseen():
pass
# with open("lastseen", "w") as fp:
# json.dump(LAST_SEEN, fp)
def get_greeting(chan, sender, args):
if time.time() - connect_time < 300:
return ''
if sender == NICKNAME:
return ''
if not chan in GREETINGS:
return ''
if sender in config['no_greeting_nicks']:
return ''
if chan in LAST_SEEN \
and sender in LAST_SEEN[chan] \
and time.time() - LAST_SEEN[chan][sender] < 300:
LAST_SEEN[chan][sender] = time.time()
save_lastseen()
return ''
if chan not in LAST_SEEN:
LAST_SEEN[chan] = dict()
LAST_SEEN[chan][sender] = time.time()
save_lastseen()
return "Dōmo, " + sender + ' san.'
@command("join")
def join_command(chan, sender, args):
resp = get_greeting(chan, sender, args)
if chan in MEMOS and sender in MEMOS[chan]:
for msg in MEMOS[chan][sender]:
if resp != '':
resp += '\n'
resp += msg
MEMOS[chan][sender] = []
save_memos()
return resp
@command("dict")
def dict_command(chan, sender, args):
query = " ".join(args)
prompt = """你是一个多语言到汉语的词典,你将针对用户输入的单词,给出中文的释义和原始语言的例句。回复要简单清晰简短。格式为:“释义:xxx;例句:xxx”。如果单词有多个意思,每个意思一行。
用户: """ + query + '\n\n词典: '
command = ['/usr/local/bin/gemini']
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate(input=prompt)
return stdout
@command("google")
def gamini_command(chan, sender, args):
query = " ".join(args)
prompt = """你是一个人工智能助手,你将针对用户的问题或者指令给出明确、简短、简洁、优秀的回答,必要时使用Google搜索。你的回答对用户非常重要。
用户: """ + query + '\n\n助手: '
command = ['/usr/local/bin/google']
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate(input=prompt)
return stdout
@command("gemini")
def gamini_command(chan, sender, args):
query = " ".join(args)
prompt = """你是一个人工智能助手,你将针对用户的问题或者指令给出明确、简短、简洁、优秀的回答。你的回答对用户非常重要。
用户: """ + query + '\n\n助手: '
command = ['/usr/local/bin/gemini']
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
stdout, stderr = process.communicate(input=prompt)
return stdout
@command("hello")
def hello_command(chan, sender, args):
return f"Hello {sender}!"
@command("say")
def say_command(chan, sender, args):
if args:
return " ".join(args)
else:
return "What do you want me to say?"
@command("choice")
def choice_command(chan, sender, args):
if not args:
return "请提供至少一个选项供我选择。"
selected_item = random.choice(args)
return f"{selected_item}"
@command("dice")
def dice_command(chan, sender, args):
if not args:
return "请指定要掷的骰子。用法: dice [数量]d[面数]"
try:
if args[0][0] == 'd':
num_dice = 1
num_sides = int(args[0][1:])
else:
num_dice_str, num_sides_str = args[0].split('d')
num_dice = int(num_dice_str)
num_sides = int(num_sides_str)
except ValueError:
return "无效的骰子格式。请使用 [数量]d[面数] 的格式。"
if num_dice <= 0 or num_sides <= 1:
return "骰子数量必须大于0,面数必须大于1。"
results = []
for _ in range(num_dice):
results.append(random.randint(1, num_sides))
total = sum(results)
return f"{total}"
@command("roll")
def roll_command(chan, sender, args):
return dice_command(sender, args)
# ========================================================================
url_pattern = re.compile(
r'(\bhttps?:\/\/[-A-Z0-9+&@#/%?=~_|!:,.;]*[-A-Z0-9+&@#/%?=~_|])',
re.IGNORECASE
)
title_regex = re.compile(
r'
]*>([^>]*?)',
re.IGNORECASE | re.DOTALL
)
def url_titles(text):
found_urls = url_pattern.findall(text)
unique_urls = []
if len(found_urls) > 0:
url = found_urls[0]
unique_urls = [url]
results = []
MAX_CONTENT_SIZE = 5 * 1024 * 1024
for url in unique_urls:
try:
with requests.get(
url,
timeout=10,
allow_redirects=True,
stream=True
) as response:
if response.status_code != requests.codes.ok:
continue
content_type = response.headers.get('Content-Type', '').lower()
if 'text/html' not in content_type:
continue
content = b''
for chunk in response.iter_content(chunk_size=8192):
content += chunk
if len(content) >= MAX_CONTENT_SIZE:
continue
encoding = 'utf-8'
html_doc = content.decode(encoding)
match = title_regex.search(html_doc)
if match:
title_content = match.group(1).strip()
if title_content:
results.append(html.unescape(title_content))
except Exception as e:
print(e)
return results
def cut_string(text, chunk_size=420):
chunks = []
current_chunk = []
current_length = 0
for char in text:
char_bytes = char.encode('utf-8')
char_length = len(char_bytes)
if current_length + char_length > chunk_size:
if len(current_chunk) > 0:
chunks.append("".join(current_chunk))
current_chunk = [char]
current_length = char_length
else:
current_chunk.append(char)
current_length += char_length
if len(current_chunk) > 0:
chunks.append("".join(current_chunk))
return chunks
def write_log(channel, nick, msg):
if msg.startswith('!log'):
return
if msg.startswith('!log'):
return
if msg.startswith('https://raye.mistivia.com/irclog/') and nick == NICKNAME:
return
now = datetime.datetime.now()
base_dir = LOGPATH
year = now.strftime("%Y")
filename = now.strftime("%m-%d.txt")
log_dir = os.path.join(base_dir, channel, year)
file_path = os.path.join(log_dir, filename)
try:
os.makedirs(log_dir, exist_ok=True)
except OSError:
return
time_str = now.strftime("%H:%M:%S")
log_line = f"[{time_str}] <{nick}>: {msg}\n"
try:
with open(file_path, 'a', encoding='utf-8') as f:
f.write(log_line)
except IOError:
pass
def save_topics():
with open("topics", "w") as fp:
json.dump(TOPICS, fp)
def load_topics():
try:
with open("topics", "r") as fp:
return json.load(fp)
except:
return dict()
TOPICS = load_topics()
class IRCBot:
def __init__(self, server, port, nickname, ident, realname):
self.server = server
self.port = port
self.nickname = nickname
self.ident = ident
self.realname = realname
self.socket = None
self.running = True
def send_raw(self, msg):
try:
self.socket.send(f"{msg}\r\n".encode("utf-8"))
print(f">>> {msg}")
except Exception as e:
print(f"Error sending message: {e}")
self.running = False
def join_channel(self, channel):
self.send_raw(f"JOIN {channel}")
self.send_raw(f"TOPIC {channel}")
def send_message(self, target, msg):
if msg == '':
return
msg = msg.replace('\r', '')
lines = msg.split('\n')
for line in lines:
self.send_line(target, line)
def send_line(self, target, msg):
chunks = cut_string(msg)
max_length = 100
message_length = len(msg)
for chunk in chunks:
self.send_raw(f"PRIVMSG {target} :{chunk}")
write_log(target, NICKNAME, chunk)
def connect(self):
print(f"Connecting to {self.server}:{self.port}...")
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(300)
self.socket.connect((self.server, self.port))
print("Connection successful.")
connect_time = time.time()
self.send_raw(f"NICK {self.nickname}")
self.send_raw(f"USER {self.ident} 0 * :{self.realname}")
except socket.error as e:
print(f"Connection error: {e}")
self.running = False
return False
return True
def handle_command(self, sender, target, command, args):
if command in commands:
response = ""
try:
response = commands[command](target, sender, args)
response = response.strip()
except Exception as e:
response = f"Error: {e}"
response = response.strip()
self.send_message(target, response)
def parse_message(self, data):
for line in data.split('\r\n'):
if not line:
continue
print(f"<<< {line}")
parts = line.split(' ', 2)
if parts[0] == "PING":
self.send_raw(f"PONG {parts[1]}")
continue
prefix = ""
if parts[0].startswith(':'):
prefix = parts.pop(0)[1:]
if not parts:
continue
command = parts[0]
params = []
trailing = ""
if len(parts) > 1:
rest = parts[1]
if ':' in rest:
index = rest.find(':')
params.extend(rest[:index].split())
trailing = rest[index+1:]
else:
params.extend(rest.split())
self.handle_irc_event(prefix, command, params, trailing)
def handle_irc_event(self, prefix, command, params, trailing):
sender_nick = prefix.split('!', 1)[0] if '!' in prefix else prefix
if command == "376" or command == "422":
print("Registered successfully. Joining channel...")
for channel in CHANNELS:
self.join_channel(channel)
elif command == "331":
channel = params[1]
if channel in TOPICS:
print("No topic, setting...")
self.send_raw(f"TOPIC {channel} :{TOPICS[channel]}")
elif command == "TOPIC" or command == "332":
if command == "332":
channel = params[1]
else:
channel = params[0]
TOPICS[channel] = trailing
save_topics()
elif command == "QUIT":
for chan in LAST_SEEN:
if sender_nick in LAST_SEEN[chan]:
LAST_SEEN[chan][sender_nick] = time.time()
save_lastseen()
elif command == 'PART':
args = params
if len(params) >= 1:
chan = params[0]
else:
chan = trailing
if chan in LAST_SEEN:
LAST_SEEN[chan][sender_nick] = time.time()
save_lastseen()
elif command == "PRIVMSG":
if sender_nick == NICKNAME:
return
target = params[0]
message = trailing
print(f"[{target}] <{sender_nick}>: {message}")
write_log(target, sender_nick, message)
reply_target = target if target.startswith('#') else sender_nick
is_cmd = False
if message.startswith("!") or message.startswith("!"):
is_cmd = True
try:
cmd_parts = message[1:].split()
cmd = cmd_parts[0].lower()
args = cmd_parts[1:]
self.handle_command(sender_nick, reply_target, cmd, args)
except IndexError:
pass
elif sender_nick in RELAYBOTS:
end_of_nick_index = message.find('>')
nick = message[1:end_of_nick_index]
msg_start_index = end_of_nick_index + 2
msg = message[msg_start_index:].strip()
if msg.startswith("!") or msg.startswith("!"):
is_cmd = True
try:
cmd_parts = msg[1:].split()
cmd = cmd_parts[0].lower()
args = cmd_parts[1:]
if target.startswith('#'):
self.handle_command(nick, reply_target, cmd, args)
except IndexError:
pass
if not is_cmd and reply_target not in config['nolinks']:
titles = url_titles(message)
if len(titles) > 0:
for t in titles:
self.send_message(reply_target, '⤷ ' + t)
elif command == "JOIN":
args = params
if len(params) >= 1:
reply_target = params[0]
else:
reply_target = trailing
self.handle_command(sender_nick, reply_target, 'join', [])
def run(self):
if not self.connect():
return
buffer = ""
while self.running:
try:
data = self.socket.recv(BUFFER_SIZE).decode("utf-8", errors="ignore")
if not data:
print("Connection lost (server closed socket).")
self.running = False
break
buffer += data
if '\r\n' in buffer:
messages, buffer = buffer.rsplit('\r\n', 1)
self.parse_message(messages)
except socket.timeout:
continue
except socket.error as e:
print(f"Socket error occurred: {e}")
self.running = False
break
except KeyboardInterrupt:
raise
except Exception as e:
print(f"An unexpected error occurred: {e}")
time.sleep(1)
self.cleanup()
def cleanup(self):
if self.socket:
print("Closing socket connection.")
try:
self.send_raw("QUIT :Bot shutting down")
except Exception:
pass
finally:
self.socket.close()
if __name__ == "__main__":
while True:
try:
bot = IRCBot(SERVER, PORT, NICKNAME, IDENT, REALNAME)
bot.run()
except KeyboardInterrupt:
print("Bot stopped by user.")
break
except Exception as e:
print(f"An unexpected error occurred: {e}")
time.sleep(1)