Source code for evennia.server.portal.discord

"""
Implements Discord chat channel integration.

The Discord API uses a mix of websockets and REST API endpoints.

In order for this integration to work, you need to have your own
discord bot set up via https://discord.com/developers/applications
with the MESSAGE CONTENT toggle switched on, and your bot token
added to `server/conf/secret_settings.py` as your  DISCORD_BOT_TOKEN
"""

import json
import os
from io import BytesIO
from random import random

from autobahn.twisted.websocket import (
    WebSocketClientFactory,
    WebSocketClientProtocol,
    connectWS,
)
from django.conf import settings
from twisted.internet import protocol, reactor, ssl, task
from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool, readBody
from twisted.web.http_headers import Headers

from evennia.server.session import Session
from evennia.utils import class_from_module, get_evennia_version, logger
from evennia.utils.utils import delay

_BASE_SESSION_CLASS = class_from_module(settings.BASE_SESSION_CLASS)

DISCORD_API_VERSION = 10
# include version number to prevent automatically updating to breaking changes
DISCORD_API_BASE_URL = f"https://discord.com/api/v{DISCORD_API_VERSION}"

DISCORD_USER_AGENT = f"Evennia (https://www.evennia.com, {get_evennia_version(mode='short')})"
DISCORD_BOT_TOKEN = settings.DISCORD_BOT_TOKEN
DISCORD_BOT_INTENTS = settings.DISCORD_BOT_INTENTS

# Discord OP codes, alphabetic
OP_DISPATCH = 0
OP_HEARTBEAT = 1
OP_HEARTBEAT_ACK = 11
OP_HELLO = 10
OP_IDENTIFY = 2
OP_INVALID_SESSION = 9
OP_RECONNECT = 7
OP_RESUME = 6


# create quiet HTTP pool to muffle GET/POST requests
[docs]class QuietConnectionPool(HTTPConnectionPool): """ A quiet version of the HTTPConnectionPool which sets the factory's `noisy` property to False to muffle log output. """
[docs] def __init__(self, reactor, persistent=True): super().__init__(reactor, persistent) self._factory.noisy = False
_AGENT = Agent(reactor, pool=QuietConnectionPool(reactor))
[docs]def should_retry(status_code): """ Helper function to check if the request should be retried later. Args: status_code (int) - The HTTP status code Returns: retry (bool) - True if request should be retried False otherwise """ if status_code >= 500 and status_code <= 504: # these are common server error codes when the server is temporarily malfunctioning # in these cases, we should retry return True else: # handle all other cases; this can be expanded later if needed for special cases return False
[docs]class DiscordWebsocketServerFactory(WebSocketClientFactory, protocol.ReconnectingClientFactory): """ A customized websocket client factory that navigates the Discord gateway process. """ initialDelay = 1 factor = 1.5 maxDelay = 60 noisy = False gateway = None resume_url = None is_connecting = False
[docs] def __init__(self, sessionhandler, *args, **kwargs): self.uid = kwargs.get("uid") self.sessionhandler = sessionhandler self.port = None self.bot = None
[docs] def get_gateway_url(self, *args, **kwargs): # get the websocket gateway URL from Discord d = _AGENT.request( b"GET", f"{DISCORD_API_BASE_URL}/gateway".encode("utf-8"), Headers( { "User-Agent": [DISCORD_USER_AGENT], "Authorization": [f"Bot {DISCORD_BOT_TOKEN}"], "Content-Type": ["application/json"], } ), None, ) def cbResponse(response): if response.code == 200: d = readBody(response) d.addCallback(self.websocket_init, *args, **kwargs) return d else: logger.log_warn("Discord gateway request failed.") d.addCallback(cbResponse)
[docs] def websocket_init(self, payload, *args, **kwargs): """ callback for when the URL is gotten """ data = json.loads(str(payload, "utf-8")) self.is_connecting = False if url := data.get("url"): self.gateway = f"{url}/?v={DISCORD_API_VERSION}&encoding=json".encode("utf-8") useragent = kwargs.pop("useragent", DISCORD_USER_AGENT) headers = kwargs.pop( "headers", { "Authorization": [f"Bot {DISCORD_BOT_TOKEN}"], "Content-Type": ["application/json"], }, ) logger.log_info("Connecting to Discord Gateway...") WebSocketClientFactory.__init__( self, url, *args, headers=headers, useragent=useragent, **kwargs ) self.start() else: logger.log_err("Discord did not return a websocket URL; connection cancelled.")
[docs] def buildProtocol(self, addr): """ Build new instance of protocol Args: addr (str): Not used, using factory/settings data """ if hasattr(settings, "DISCORD_SESSION_CLASS"): protocol_class = class_from_module( settings.DISCORD_SESSION_CLASS, fallback=DiscordClient ) protocol = protocol_class() else: protocol = DiscordClient() protocol.factory = self protocol.sessionhandler = self.sessionhandler return protocol
[docs] def startedConnecting(self, connector): """ Tracks reconnections for debugging. Args: connector (Connector): Represents the connection. """ logger.log_info("Connecting to Discord...")
[docs] def reconnect(self): """ Force a reconnection of the bot protocol. This requires de-registering the session and then reattaching a new one. """ # set up the reconnection if self.resume_url: self.url = self.resume_url elif self.gateway: self.url = self.gateway else: # we don't know where to reconnect to! we'll start from the beginning self.url = None # reset the internal delay, since this is a deliberate disconnect self.delay = self.initialDelay # disconnect to allow the reconnection process to kick in self.bot.sendClose() self.sessionhandler.server_disconnect(self.bot)
[docs] def start(self): "Connect protocol to remote server" if not self.gateway: # we don't know where to connect to # get the gateway URL from Discord self.is_connecting = True self.get_gateway_url() elif not self.is_connecting: # everything is good, connect connectWS(self)
[docs]class DiscordClient(WebSocketClientProtocol, _BASE_SESSION_CLASS): """ Implements the Discord client """ nextHeartbeatCall = None pending_heartbeat = False heartbeat_interval = None last_sequence = 0 session_id = None discord_id = None
[docs] def __init__(self): WebSocketClientProtocol.__init__(self) _BASE_SESSION_CLASS.__init__(self)
[docs] def at_login(self): pass
[docs] def onOpen(self): """ Called when connection is established. """ logger.log_msg("Discord connection established.") self.factory.bot = self self.init_session("discord", "discord.gg", self.factory.sessionhandler) self.uid = int(self.factory.uid) self.logged_in = True self.sessionhandler.connect(self)
[docs] def onMessage(self, payload, isBinary): """ Callback fired when a complete WebSocket message was received. Args: payload (bytes): The WebSocket message received. isBinary (bool): Flag indicating whether payload is binary or UTF-8 encoded text. """ if isBinary: logger.log_info("DISCORD: got a binary payload for some reason") return data = json.loads(str(payload, "utf-8")) if seqid := data.get("s"): self.last_sequence = seqid # not sure if that error json format is for websockets, so # check for it just in case if "errors" in data: self.handle_error(data) return # check for discord gateway API op codes first if data["op"] == OP_HELLO: self.interval = data["d"]["heartbeat_interval"] / 1000 # convert millisec to seconds if self.nextHeartbeatCall: self.nextHeartbeatCall.cancel() self.nextHeartbeatCall = self.factory._batched_timer.call_later( self.interval * random(), self.doHeartbeat, ) if self.session_id: # we already have a session; try to resume instead self.resume() else: self.identify() elif data["op"] == OP_HEARTBEAT_ACK: # our last heartbeat was acknowledged, so reset the "pending" flag self.pending_heartbeat = False elif data["op"] == OP_HEARTBEAT: # Discord wants us to send a heartbeat immediately self.doHeartbeat(force=True) elif data["op"] == OP_INVALID_SESSION: # Discord doesn't like our current session; reconnect for a new one logger.log_msg("Discord: received 'Invalid Session' opcode. Reconnecting.") if data["d"] == False: # can't resume, clear existing resume data self.session_id = None self.factory.resume_url = None self.factory.reconnect() elif data["op"] == OP_RECONNECT: # reconnect as requested; Discord does this regularly for server load balancing logger.log_msg("Discord: received 'Reconnect' opcode. Reconnecting.") self.factory.reconnect() elif data["op"] == OP_DISPATCH: # handle the general dispatch opcode events by type if data["t"] == "READY": # our recent identification is valid; process new session info self.connection_ready(data["d"]) else: # general message, pass on to data_in self.data_in(data=data)
[docs] def onClose(self, wasClean, code=None, reason=None): """ This is executed when the connection is lost for whatever reason. it can also be called directly, from the disconnect method. Args: wasClean (bool): ``True`` if the WebSocket was closed cleanly. code (int or None): Close status as sent by the WebSocket peer. reason (str or None): Close reason as sent by the WebSocket peer. """ self.sessionhandler.disconnect(self) if self.nextHeartbeatCall: self.nextHeartbeatCall.cancel() self.nextHeartbeatCall = None if wasClean: logger.log_info(f"Discord connection closed ({code}) reason: {reason}") else: logger.log_info(f"Discord connection lost.")
def _send_json(self, data): """ Post JSON data to the websocket Args: data (dict): content to send. """ return self.sendMessage(json.dumps(data).encode("utf-8")) def _post_json(self, url, data, **kwargs): """ Post JSON data to a REST API endpoint Args: url (str) - The API path which is being posted to data (dict) - Content to be sent """ url = f"{DISCORD_API_BASE_URL}/{url}" body = FileBodyProducer(BytesIO(json.dumps(data).encode("utf-8"))) request_type = kwargs.pop("type", "POST") d = _AGENT.request( request_type.encode("utf-8"), url.encode("utf-8"), Headers( { "User-Agent": [DISCORD_USER_AGENT], "Authorization": [f"Bot {DISCORD_BOT_TOKEN}"], "Content-Type": ["application/json"], } ), body, ) def cbResponse(response): if response.code == 200 or response.code == 204: d = readBody(response) d.addCallback(self.post_response) return d elif should_retry(response.code): delay(300, self._post_json, url, data, **kwargs) d.addCallback(cbResponse)
[docs] def post_response(self, body, **kwargs): """ Process the response from sending a POST request Args: body (bytes) - The post response body """ data = json.loads(body) if "errors" in data: self.handle_error(data)
[docs] def handle_error(self, data, **kwargs): """ General hook for processing errors. Args: data (dict) - The received error data """ logger.log_err(str(data))
[docs] def resume(self): """ Called after a reconnection to re-identify and replay missed events """ if not self.last_sequence or not self.session_id: # we have no known state to resume from, identify normally self.identify() # build a RESUME request for Discord and send it data = { "op": OP_RESUME, "d": { "token": DISCORD_BOT_TOKEN, "session_id": self.session_id, "s": self.sequence_id, }, } self._send_json(data)
[docs] def disconnect(self, reason=None): """ Generic hook for the engine to call in order to disconnect this protocol. Args: reason (str or None): Motivation for the disconnection. """ self.sendClose(self.CLOSE_STATUS_CODE_NORMAL, reason)
[docs] def identify(self, *args, **kwargs): """ Send Discord authentication. This should be sent once heartbeats begin. """ data = { "op": 2, "d": { "token": DISCORD_BOT_TOKEN, "intents": DISCORD_BOT_INTENTS, "properties": { "os": os.name, "browser": DISCORD_USER_AGENT, "device": DISCORD_USER_AGENT, }, }, } self._send_json(data)
[docs] def connection_ready(self, data): """ Process READY data for relevant bot info. """ self.factory.resume_url = data["resume_gateway_url"] self.session_id = data["session_id"] self.discord_id = data["user"]["id"]
[docs] def doHeartbeat(self, *args, **kwargs): """ Send heartbeat to Discord. """ if not self.pending_heartbeat or kwargs.get("force"): if self.nextHeartbeatCall: self.nextHeartbeatCall.cancel() # send the heartbeat data = {"op": 1, "d": self.last_sequence} self._send_json(data) # track that we sent a heartbeat, in case we don't receive an ACK self.pending_heartbeat = True self.nextHeartbeatCall = self.factory._batched_timer.call_later( self.interval, self.doHeartbeat, ) else: # we didn't get a response since the last heartbeat; reconnect self.factory.reconnect()
[docs] def send_channel(self, text, channel_id, **kwargs): """ Send a message from an Evennia channel to a Discord channel. Use with session.msg(channel=(message, channel, sender)) """ data = {"content": text} data.update(kwargs) self._post_json(f"channels/{channel_id}/messages", data)
[docs] def send_nickname(self, text, guild_id, user_id, **kwargs): """ Changes a user's nickname on a Discord server. Use with session.msg(nickname=(new_nickname, guild_id, user_id)) """ data = {"nick": text} data.update(kwargs) self._post_json(f"guilds/{guild_id}/members/{user_id}", data, type="PATCH")
[docs] def send_role(self, role_id, guild_id, user_id, **kwargs): data = kwargs self._post_json(f"guilds/{guild_id}/members/{user_id}/roles/{role_id}", data, type="PUT")
[docs] def send_default(self, *args, **kwargs): """ Ignore other outputfuncs """ pass
[docs] def data_in(self, data, **kwargs): """ Process incoming data from Discord and sent to the Evennia server Args: data (dict): Converted json data. """ action_type = data.get("t", "UNKNOWN") if action_type == "MESSAGE_CREATE": # someone posted a message on Discord that the bot can see data = data["d"] if data["author"]["id"] == self.discord_id: # it's by the bot itself! disregard return message = data["content"] channel_id = data["channel_id"] keywords = {"channel_id": channel_id} if "guild_id" in data: # message received to a Discord channel keywords["type"] = "channel" author = data["member"]["nick"] or data["author"]["username"] author_id = data["author"]["id"] keywords["sender"] = (author_id, author) keywords["guild_id"] = data["guild_id"] else: # message sent directly to the bot account via DM keywords["type"] = "direct" author = data["author"]["username"] author_id = data["author"]["id"] keywords["sender"] = (author_id, author) # pass the processed data to the server self.sessionhandler.data_in(self, bot_data_in=(message, keywords)) elif action_type in ("GUILD_CREATE", "GUILD_UPDATE"): # we received the current status of a guild the bot is on; process relevant info data = data["d"] keywords = {"type": "guild", "guild_id": data["id"], "guild_name": data["name"]} keywords["channels"] = { chan["id"]: {"name": chan["name"], "guild": data["name"]} for chan in data["channels"] if chan["type"] == 0 } # send the possibly-updated guild and channel data to the server self.sessionhandler.data_in(self, bot_data_in=("", keywords)) elif "DELETE" in action_type: # deletes should possibly be handled separately to check for channel removal # for now, just ignore pass else: # send the data for any other action types on to the bot as-is for optional server-side handling keywords = {"type": action_type} keywords.update(data["d"]) self.sessionhandler.data_in(self, bot_data_in=("", keywords))