Skip to content

Instantly share code, notes, and snippets.

@Vexs
Created July 12, 2017 03:42
Show Gist options
  • Save Vexs/c62e6c8c77f76562dae60b52f39ae90b to your computer and use it in GitHub Desktop.
Save Vexs/c62e6c8c77f76562dae60b52f39ae90b to your computer and use it in GitHub Desktop.
Simple discord.py caching avatar logger.
import discord
from discord.ext import commands
import aiohttp
from PIL import Image
import io
import asyncio
from itertools import zip_longest
from pathlib import Path
import os
async def build_avatar_cache(self):
with aiohttp.ClientSession() as session:
members = list(self.bot.get_all_members())
# splits up members into chunks of 10 members at once for building up asyncio.groups
for member_list in list(zip_longest(*[iter(members)] * 10, fillvalue=None)):
async def task(session, member):
if Path(r'avatars/{}.jpeg'.format(member.avatar)).is_file():
with open(r'avatars/{}.jpeg'.format(member.avatar), 'rb') as f:
return member.id, f.read()
else:
url = 'https://cdn.discordapp.com/avatars/{0.id}/{0.avatar}.jpeg?size=64'.format(member)
async with session.get(url) as resp:
response = await resp.read()
with open(r'avatars/{}.jpeg'.format(member.avatar), 'wb') as f:
f.write(response)
return member.id, response
task_list = []
for member in member_list:
if member and member.avatar_url:
task_list.append(task(session, member))
responses = await asyncio.gather(*task_list, loop=self.bot.loop)
for member_id, response in responses:
self.avatars[member_id] = response
print('done loading avatars! Loaded {} avatars!'.format(len(self.avatars.keys())))
self.cache_ready = True
def format_avatar_change(before, after):
with Image.open(io.BytesIO(before)).convert('RGBA') as before, Image.open(io.BytesIO(after)).convert(
'RGBA') as after:
output_image = Image.new('RGBA', (132, 64))
output_image.paste(before, (0, 0))
output_image.paste(after, (69, 0))
image_file_object = io.BytesIO()
output_image.save(image_file_object, format='PNG')
image_file_object.seek(0)
return image_file_object
class AvatarLogger:
""""""
def __init__(self, bot):
self.bot = bot
self.avatars = {}
self.bot.loop.create_task(build_avatar_cache(self))
self.session = aiohttp.ClientSession()
with open('default_avatar.png', 'rb') as f:
self.default_avatar = f.read()
self.cache_ready = False
async def update_member_avatar(self, member):
url = 'https://cdn.discordapp.com/avatars/{0.id}/{0.avatar}.jpeg?size=64'.format(member)
async with self.session.get(url) as resp:
response = await resp.read()
self.avatars[member.id] = response
with open(r'avatars/{}.jpeg'.format(member.avatar), 'wb') as f:
f.write(response)
def remove_avatar_file(self, member):
os.remove(r'avatars/{}.jpeg'.format(member.avatar))
#@commands.command(pass_context=True)
#async def test_add(self, ctx, member: discord.Member):
# with aiohttp.ClientSession() as session:
# url = 'https://cdn.discordapp.com/avatars/{0.id}/{0.avatar}.jpeg?size=64'.format(member)
# async with session.get(url) as resp:
# self.avatars[member.id] = await resp.read()
# await self.bot.say('added {} to the avatar list'.format(member.display_name))
#
#@commands.command(pass_context=True)
#async def test_image(self, ctx):
# with Image.open(io.BytesIO(self.avatars[ctx.message.author.id])).convert('RGBA') as avatar:
# imagefileobject = io.BytesIO()
# avatar.save('avatar.png', format='PNG')
# avatar.save(imagefileobject, format='PNG')
# imagefileobject.seek(0)
# await self.bot.send_file(ctx.message.channel, fp=imagefileobject, filename='avatar.png')
#
#@commands.command(pass_context=True)
#async def test_update(self, ctx, member: discord.Member):
# url = 'https://cdn.discordapp.com/avatars/{0.id}/{0.avatar}.jpeg?size=64'.format(member)
# async with self.session.get(url) as resp:
# after_avatar = await resp.read()
# image_file_object = await self.bot.loop.run_in_executor(None, format_avatar_change, self.avatars[member.id],
# after_avatar)
# await self.bot.send_file(ctx.message.channel, fp=image_file_object, filename='avatar.png')
async def on_member_update(self, before, after):
if not self.cache_ready:
return
if before.avatar == after.avatar:
return
if before.bot:
return
if not before.avatar_url or before.id not in self.avatars.keys():
before_avatar = self.default_avatar
else:
before_avatar = self.avatars[before.id]
if not after.avatar_url:
after_avatar = self.default_avatar
else:
url = 'https://cdn.discordapp.com/avatars/{0.id}/{0.avatar}.jpeg?size=64'.format(after)
async with self.session.get(url) as resp:
after_avatar = await resp.read()
image_file_object = await self.bot.loop.run_in_executor(None, format_avatar_change, before_avatar, after_avatar)
await self.bot.send_file(discord.Object(id='273199762872991744'), fp=image_file_object,
filename='avatar_change.png',
content='{} ID: {} updated their avatar!'.format(before.display_name, before.id))
await self.update_member_avatar(after)
async def on_member_join(self, member):
if member.avatar_url:
await self.update_member_avatar(member)
async def on_member_remove(self, member):
if member.avatar_url:
self.avatars.pop(member.id)
self.remove_avatar_file(member)
def __unload(self):
self.session.close()
def setup(bot):
bot.add_cog(AvatarLogger(bot))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment