Skip to content

Instantly share code, notes, and snippets.

@YuriFontella
Created February 1, 2024 11:42
Show Gist options
  • Save YuriFontella/ebe771b7ade392bf3af37e3546c18c0d to your computer and use it in GitHub Desktop.
Save YuriFontella/ebe771b7ade392bf3af37e3546c18c0d to your computer and use it in GitHub Desktop.
import httpx
import asyncio
from fastapi import FastAPI, Request
from tortoise import Tortoise, Model, fields, connections
from WPP_Whatsapp import Create
from urllib.parse import parse_qs
from contextlib import asynccontextmanager
URL = 'https://slack.com/api/chat.postMessage'
TOKEN = ''
CHANNEL = ''
creator = Create(session='iphone', browser='firefox')
client = creator.start()
if creator.state != 'CONNECTED':
raise Exception(creator.state)
else:
print(creator.state)
class Contacts(Model):
name = fields.CharField(max_length=255)
number = fields.IntField(unique=True)
async def slack(text, name, image):
async with httpx.AsyncClient() as client:
response = await client.post(
url=URL,
data={
'token': TOKEN,
'channel': CHANNEL,
'text': text,
'icon_url': image,
'username': name
}
)
if response.status_code == 200:
data = response.json()
if data.get('ok', False):
print(True)
else:
print(response.json())
async def whatsapp():
async def on_message(message):
_sender = message.get('sender')
_from = message.get('from')
_body = message.get('body')
if _from == 'status@broadcast':
pass
elif not message.get("isGroupMsg") and not _sender.get('isBusiness'):
print(_sender)
print(_body)
if _body:
pushname = _sender['pushname']
number = _sender['id']['user']
image = _sender['profilePicThumbObj']['eurl']
await Contacts.update_or_create(defaults={'name': pushname}, number=number)
await slack(
text=_body,
name=pushname,
image=image
)
creator.client.onMessage(on_message)
@asynccontextmanager
async def lifespan(app):
app.state.client = client
await Tortoise.init(db_url='sqlite://:memory:', modules={'models': ["main"]})
await Tortoise.generate_schemas()
asyncio.create_task(whatsapp())
yield
await connections.close_all()
app = FastAPI(lifespan=lifespan)
@app.post('/whatsapp')
async def start(request: Request):
body = await request.body()
decoded_url = body.decode('utf-8')
query_params = parse_qs(decoded_url)
if 'text' not in query_params:
return 'Envie uma opção'
text = query_params['text'][0]
result = [item.strip() for item in text.split('|')]
option = result[0]
if option == 'contacts':
contacts = await Contacts.all().values()
return {
"text": "Lista de contatos:",
"attachments": [
{"text": f"ID {contact['id']} - {str(contact['name']).capitalize()}"} for contact in contacts
]
}
elif option == 'message':
id = result[1]
text= result[2]
contact = await Contacts.get_or_none(id=id)
if contact:
client = request.app.state.client
client.sendText(str(contact.number) + '@c.us', result[2])
return True
@app.post('/contacts')
async def on_post(request: Request):
data = await request.json()
await Contacts.update_or_create(defaults={'name': data['name']}, number=data['number'])
return True
@app.get('/contacts')
async def on_get():
return await Contacts.all()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment