summaryrefslogtreecommitdiff
path: root/airc.py
diff options
context:
space:
mode:
Diffstat (limited to 'airc.py')
-rw-r--r--airc.py189
1 files changed, 189 insertions, 0 deletions
diff --git a/airc.py b/airc.py
new file mode 100644
index 0000000..55ea907
--- /dev/null
+++ b/airc.py
@@ -0,0 +1,189 @@
+# airc.py
+import asyncio
+import logging
+import ssl
+from collections import defaultdict
+from typing import (Awaitable, Callable, Dict, List, NamedTuple, Optional,
+ Union)
+
+# --- Basic Configuration ---
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(levelname)s - %(message)s'
+)
+
+# --- Message Parsing ---
+
+class Prefix(NamedTuple):
+ """Represents the prefix of an IRC message (nick!user@host)."""
+ nick: str
+ user: Optional[str] = None
+ host: Optional[str] = None
+
+class Message(NamedTuple):
+ """Represents a parsed IRC message."""
+ prefix: Optional[Prefix]
+ command: str
+ params: List[str]
+
+ @classmethod
+ def parse(cls, line: str) -> 'Message':
+ """Parses a raw IRC line into a Message object."""
+ line = line.strip()
+ prefix, command, params = None, '', []
+
+ if line.startswith(':'):
+ prefix_str, line = line.split(' ', 1)
+ nick, _, user_host = prefix_str[1:].partition('!')
+ user, _, host = user_host.partition('@')
+ prefix = Prefix(nick, user or None, host or None)
+
+ if ' :' in line:
+ parts, trailing = line.split(' :', 1)
+ params = parts.split()
+ params.append(trailing)
+ else:
+ params = line.split()
+
+ command = params.pop(0)
+ return cls(prefix, command.upper(), params)
+
+class Client:
+ def __init__(
+ self,
+ host: str,
+ port: int,
+ nickname: str,
+ username: str = None,
+ realname: str = None,
+ password: Optional[str] = None,
+ use_ssl: bool = True,
+ ):
+ self.host = host
+ self.port = port
+ self.use_ssl = use_ssl
+
+ self.nickname = nickname
+ self.username = username or nickname
+ self.realname = realname or nickname
+ self.password = password
+
+ self._reader: Optional[asyncio.StreamReader] = None
+ self._writer: Optional[asyncio.StreamWriter] = None
+ self._is_connected = False
+ self._handlers: Dict[str, List[Callable[..., Awaitable[None]]]] = defaultdict(list)
+
+ # Register essential internal handlers
+ self.on('PING')(self._handle_ping)
+
+ async def connect(self, reconnect_delay: int = 10):
+ """
+ Connects to the IRC server and enters the main processing loop.
+ Will attempt to reconnect if the connection is lost.
+ """
+ while True:
+ try:
+ logging.info(f"Connecting to {self.host}:{self.port}...")
+
+ ssl_context = ssl.create_default_context() if self.use_ssl else None
+
+ self._reader, self._writer = await asyncio.open_connection(
+ self.host, self.port, ssl=ssl_context
+ )
+ self._is_connected = True
+ logging.info("Connection successful.")
+
+ await self._register()
+ await self._read_loop()
+
+ except (ConnectionRefusedError, OSError) as e:
+ logging.error(f"Connection failed: {e}")
+ except Exception as e:
+ logging.error(f"An unexpected error occurred: {e}", exc_info=True)
+ finally:
+ self._is_connected = False
+ if self._writer:
+ self._writer.close()
+ await self._writer.wait_closed()
+ logging.info(f"Disconnected. Reconnecting in {reconnect_delay} seconds...")
+ await asyncio.sleep(reconnect_delay)
+
+ async def _register(self):
+ """Sends initial NICK/USER/PASS commands to register with the server."""
+ if self.password:
+ await self.send_raw(f"PASS {self.password}")
+ await self.send_raw(f"NICK {self.nickname}")
+ await self.send_raw(f"USER {self.username} 0 * :{self.realname}")
+
+ async def _read_loop(self):
+ """Continuously reads from the server, parses messages, and dispatches them."""
+ while self._is_connected and self._reader:
+ raw_line = await self._reader.readline()
+ if not raw_line:
+ logging.warning("Received empty data, connection likely closed.")
+ break
+
+ line = raw_line.decode('utf-8', errors='replace').strip()
+ logging.debug(f"<- {line}")
+
+ try:
+ message = Message.parse(line)
+ await self._dispatch(message)
+ except Exception as e:
+ logging.error(f"Failed to parse or dispatch line '{line}': {e}")
+
+ async def _dispatch(self, message: Message):
+ """Calls registered handlers for a given message command."""
+ # Handlers for specific commands (e.g., 'PRIVMSG', '001')
+ for handler in self._handlers.get(message.command, []):
+ asyncio.create_task(handler(message))
+
+ # Wildcard handlers that receive all messages
+ for handler in self._handlers.get('*', []):
+ asyncio.create_task(handler(message))
+
+ async def _handle_ping(self, message: Message):
+ """Internal PING handler to keep the connection alive."""
+ pong_data = message.params[0]
+ await self.send_raw(f"PONG :{pong_data}")
+ logging.info(f"Responded to PING with PONG {pong_data}")
+
+ def on(self, command: str) -> Callable:
+ """
+ A decorator to register a handler for a specific IRC command.
+
+ Example:
+ @client.on('PRIVMSG')
+ async def on_message(message: Message):
+ print(f"Received: {message}")
+ """
+ def decorator(func: Callable[[Message], Awaitable[None]]):
+ self._handlers[command.upper()].append(func)
+ return func
+ return decorator
+
+ # --- Public API Methods ---
+
+ async def send_raw(self, data: str):
+ if self._writer and self._is_connected:
+ encoded_data = data.encode('utf-8') + b'\r\n'
+ self._writer.write(encoded_data)
+ await self._writer.drain()
+ logging.debug(f"-> {data}")
+ else:
+ logging.error("Cannot send data: not connected.")
+
+ async def send_privmsg(self, target: str, text: str):
+ await self.send_raw(f"PRIVMSG {target} :{text}")
+
+ async def join(self, channel: str):
+ await self.send_raw(f"JOIN {channel}")
+
+ async def part(self, channel: str, reason: str = "Leaving"):
+ await self.send_raw(f"PART {channel} :{reason}")
+
+ async def quit(self, reason: str = "Client shutting down"):
+ await self.send_raw(f"QUIT :{reason}")
+ if self._writer:
+ self._writer.close()
+ await self._writer.wait_closed() \ No newline at end of file