Created
November 16, 2017 23:49
-
-
Save 10se1ucgo/3b120d992b0f8eb9f2ec9fcf68856ec1 to your computer and use it in GitHub Desktop.
meme
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import textwrap | |
import string | |
from typing import * | |
import discord | |
import webhook | |
from discord.ext import commands | |
class Config(object): | |
def __init__(self, file="config.json"): | |
self.file = file | |
self.login_token = None | |
self.channels: Dict[str, List[str, str]] = {} | |
self.read() | |
def read(self): | |
with open(self.file, "r") as f: | |
cfg = json.load(f) | |
self.login_token = cfg["login_token"] | |
self.channels = cfg["channels"] | |
def write(self): | |
with open(self.file, "w") as f: | |
cfg = {"login_token": self.login_token, "channels": self.channels} | |
json.dump(cfg, f, indent=4) | |
config = Config() | |
class WH: | |
def __init__(self, bot: discord.Client): | |
self.bot = bot | |
self.wh = webhook.WebhookClient(self.bot) | |
async def on_message(self, message: discord.Message): | |
if not config.channels.get(message.channel.id): return | |
author = message.author | |
if author.bot: return | |
msg: str = message.clean_content | |
initials = ''.join(x[0] for x in message.server.name.translate({ord(c): None for c in string.punctuation}).split()) | |
username: str = f"{author.name} [{initials}]" | |
if len(username) > 32: | |
username = author.name | |
for channel, (id, token) in config.channels.items(): | |
if channel == message.channel.id: | |
continue | |
await self.wh.execute_webhook(id, token, content=msg, avatar_url=author.avatar_url, username=username) | |
@commands.command(pass_context=True, no_pm=True) | |
@commands.has_permissions(manage_webhooks=True) | |
async def add(self, ctx: commands.Context, channel: discord.Channel): | |
if config.channels.get(channel.id): | |
return await self.bot.say("That channel is already configured to cross-talk.") | |
wh = await self.wh.create_webhook(channel.id, "CrossTalk") | |
config.channels[channel.id] = [wh.id, wh.token] | |
message = f"Channel #{channel.name} in '{channel.server.name}' has been linked." | |
username = textwrap.shorten(f"'{channel.server.name}' Staff", width=32) | |
for channel_id, (wh_id, wh_token) in config.channels.items(): | |
await self.wh.execute_webhook(wh_id, wh_token, content=message, | |
username=username, avatar_url=channel.server.icon_url) | |
config.write() | |
@commands.command(pass_context=True, no_pm=True) | |
@commands.has_permissions(manage_webhooks=True) | |
async def remove(self, ctx: commands.Context, channel: discord.Channel): | |
if not config.channels.get(channel.id): | |
return await self.bot.say("That channel is not configured to cross-talk.") | |
message = f"Channel #{channel.name} in '{channel.server.name}' has been unlinked. Goodbye." | |
username = textwrap.shorten(f"'{channel.server.name}' Staff", width=32) | |
for channel_id, (wh_id, wh_token) in config.channels.items(): | |
await self.wh.execute_webhook(wh_id, wh_token, | |
content=message, username=username, avatar_url=channel.server.icon_url) | |
await self.wh.delete_webhook(*config.channels[channel.id]) | |
del config.channels[channel.id] | |
config.write() | |
client = commands.Bot(command_prefix=commands.when_mentioned_or(['!ct']), description="AUTISM", | |
pm_help=True) | |
client.add_cog(WH(client)) | |
client.run(config.login_token) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# this was written before discord.py implemented support for webhooks natively so dont flame ty | |
from typing import * | |
import discord | |
from discord.http import Route, HTTPClient | |
class Webhook(object): | |
__slots__ = ['id', 'guild_id', 'channel_id', 'user', 'name', 'avatar', 'token'] | |
def __init__(self, **kwargs): | |
self.id = kwargs.get("id") | |
self.guild_id = kwargs.get("guild_id") | |
self.channel_id = kwargs.get("channel_id") | |
self.user = kwargs.get("user") | |
self.name = kwargs.get("name") | |
self.avatar = kwargs.get("avatar") | |
self.token = kwargs.get("token") | |
def __str__(self): | |
return self.name | |
def __eq__(self, other): | |
return isinstance(other, Webhook) and other.id == self.id | |
def __ne__(self, other): | |
return not self.__eq__(other) | |
def __hash__(self): | |
return hash(self.id) | |
class WebhookClient(object): | |
def __init__(self, client: discord.Client): | |
self.http: HTTPClient = client.http | |
self.request = self.http.request | |
async def create_webhook(self, channel_id: str, name: str, *, avatar: str=None) -> Webhook: | |
payload = { | |
'name': name, | |
'avatar': avatar | |
} | |
data = await self.request( | |
Route("POST", "/channels/{channel_id}/webhooks", channel_id=channel_id), json=payload | |
) | |
return Webhook(**data) | |
async def get_webhooks(self, obj: Union[discord.Channel, discord.Server]) -> List[Webhook]: | |
if isinstance(obj, discord.Channel): | |
return await self.get_channel_webhooks(obj.id) | |
elif isinstance(obj, discord.Server): | |
return await self.get_server_webhooks(obj.id) | |
else: | |
raise discord.InvalidArgument(f'Object must be Channel or Server. Received {obj.__class__.__name__}') | |
async def get_channel_webhooks(self, channel_id: str) -> List[Webhook]: | |
data = await self.request(Route("GET", "/channels/{channel_id}/webhooks", channel_id=channel_id)) | |
return [Webhook(**hook) for hook in data] | |
async def get_server_webhooks(self, guild_id: str) -> List[Webhook]: | |
data = await self.request(Route("GET", "/guilds/{guild_id}/webhooks", guild_id=guild_id)) | |
return [Webhook(**hook) for hook in data] | |
async def get_webhook(self, webhook_id: str, webhook_token: str=None) -> Webhook: | |
route = Route("GET", "/webhooks/{webhook_id}" + ("/{webhook_token}" if webhook_token else ""), | |
webhook_id=webhook_id, webhook_token=webhook_token) | |
data = await self.request(route) | |
return Webhook(**data) | |
async def edit_webhook(self, webhook_id: str, webhook_token: str=None, *, name :str=None, avatar: str=None) -> Webhook: | |
route = Route("PATCH", "/webhooks/{webhook_id}" + ("/{webhook_token}" if webhook_token else ""), | |
webhook_id=webhook_id, webhook_token=webhook_token) | |
payload = { | |
'name': name, | |
'avatar': avatar | |
} | |
data = await self.request(route, json=payload) | |
return Webhook(**data) | |
async def delete_webhook(self, webhook_id: str, webhook_token: str=None): | |
route = Route("DELETE", "/webhooks/{webhook_id}" + ("/{webhook_token}" if webhook_token else ""), | |
webhook_id=webhook_id, webhook_token=webhook_token) | |
return await self.request(route) | |
async def execute_webhook(self, webhook_id: str, webhook_token: str, *, content: str, username: str=None, | |
avatar_url: str=None, tts: bool=False, embeds: List[discord.Embed]=None): | |
payload = {} | |
if content: | |
payload['content'] = content | |
if username: | |
payload['username'] = username | |
if avatar_url: | |
payload['avatar_url'] = avatar_url | |
if tts: | |
payload['tts'] = True | |
if embeds: | |
payload['embed'] = [embed.to_dict() for embed in embeds] | |
route = Route("POST", "/webhooks/{webhook_id}/{webhook_token}", | |
webhook_id=webhook_id, webhook_token=webhook_token) | |
return await self.request(route, json=payload) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment