Skip to content

Instantly share code, notes, and snippets.

@yasuabe
Created September 2, 2018 14:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yasuabe/8f90bb3264113327b9131e4c0b7fc316 to your computer and use it in GitHub Desktop.
Save yasuabe/8f90bb3264113327b9131e4c0b7fc316 to your computer and use it in GitHub Desktop.
slack-ops-test2-func: post text
import os
import json
import logging
from urllib import request, parse
token = os.environ['TOKEN']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info(str(event))
if ('challenge' in event):
return {
"statusCode": 200,
"body": json.dumps({'challenge': event['challenge']})
}
else:
process_event(event['event'])
return { "statusCode": 200 }
def process_event(event):
item = event['item']
channel = item['channel']
ts = item['ts']
reaction = event['reaction']
logger.info('{}, {}, {}, {}'.format(item, channel, ts, reaction))
text = retrieve_item_text(channel, ts)
post_result(channel, ts, text)
def retrieve_item_text(channel, ts):
url = 'https://slack.com/api/channels.history'
params = {
'token': token,
'channel': channel,
'count': 1,
'inclusive': True,
'latest': ts
}
requestUrl = '{}?{}'.format(url, parse.urlencode(params))
req = request.Request(requestUrl)
with request.urlopen(req) as res:
body = res.read().decode('utf-8')
return json.loads(body)['messages'][0]['text']
def post_result(channel, thread_ts, text):
url = 'https://slack.com/api/chat.postMessage'
params = {
'token': token,
'channel': channel,
'text': text,
'thread_ts': thread_ts
}
requestUrl = '{}?{}'.format(url, parse.urlencode(params))
req = request.Request(requestUrl)
with request.urlopen(req) as response:
response_body = response.read().decode("utf-8")
logger.info(str(response_body))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment