Skip to content

Instantly share code, notes, and snippets.

@k4ml
Last active March 22, 2020 22:36
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save k4ml/75e1e636bc5e5a03f1b0449212085ea6 to your computer and use it in GitHub Desktop.
Save k4ml/75e1e636bc5e5a03f1b0449212085ea6 to your computer and use it in GitHub Desktop.
2 way bridge between Slack and Telegram.
"""
Copyright 2017 k4ml@twitter.com.
Permission to use, copy, modify, and/or distribute this software
for any purpose with or without fee is hereby granted, provided
that the above copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE
FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION,
ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
Run this script as:-
SLACK_API_TOKEN=xxx TG_BOT_TOKEN=xxx TG_CHAT_ID=123 ./venv/bin/python slacktg_bridge.py
This packages are needed:-
python-telegram-bot
slackclient
"""
import os
import time
import logging
import threading
import telegram
from slackclient import SlackClient
# Enable logging
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
logger = logging.getLogger(__name__)
slack_token = os.environ["SLACK_API_TOKEN"]
sc = SlackClient(slack_token)
tg_token = os.environ['TG_BOT_TOKEN']
bot = telegram.Bot(token=tg_token)
chat_id = os.environ['TG_CHAT_ID']
def get_channel_info(channel_id):
resp = sc.api_call('channels.info', channel=channel_id)
if resp['ok']:
return resp['channel']
return None
def listen_slack():
if sc.rtm_connect():
while True:
messages = sc.rtm_read()
for msg in messages:
if msg['type'] != 'message':
continue
if 'subtype' in msg and msg['subtype'] == 'bot_message':
continue
print(msg)
resp = sc.api_call('users.info', user=msg['user'])
if resp['ok']:
user = resp['user']
channel = get_channel_info(msg['channel']) or 'N/A'
print(user['name'], msg['text'])
ret = bot.sendMessage(chat_id=chat_id, text='#%s:%s> %s' % (channel['name'], user['name'], msg['text']))
time.sleep(3)
else:
print("Connection Failed")
def listen_telegram():
offset = -1
update = None
while True:
updates = bot.get_updates(offset)
for update in updates:
reply = update.message.reply_to_message
if reply:
channel_part = reply.text.split(':')[0]
if channel_part.strip().startswith('#'):
channel = channel_part
else:
channel = '#integrationsandbox'
sc.api_call('chat.postMessage', channel=channel, text='From telegram: %s' % update.message.text)
print(update.message.text)
print(update.message.reply_to_message)
if 'posted to slack' not in update.message.text:
update.message.reply_text('posted to slack')
if update:
offset = update.update_id + 1
time.sleep(2)
if __name__ == '__main__':
tg_thread = threading.Thread(target=listen_telegram)
slack_thread = threading.Thread(target=listen_slack)
tg_thread.start()
slack_thread.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment