Skip to content

Instantly share code, notes, and snippets.

@Tendrid
Created June 23, 2016 19:12
Show Gist options
  • Save Tendrid/3c1d505d0e5b43f3acfa441f7b93d636 to your computer and use it in GitHub Desktop.
Save Tendrid/3c1d505d0e5b43f3acfa441f7b93d636 to your computer and use it in GitHub Desktop.
######### ZEROMQ ######################
# Create ZMQ server:
# in __init__ or something:
from tattletale import tattletale
from tattletale_zmq import socket as we_just_need_to_load_this
server_port = "5556"
server_addr = "127.0.0.1"
server_host = "tcp://{}:{}".format(server_addr, server_port)
tattletale('test_zmq_server').config(engine="zmq_server", host=server_addr, port=server_port)
# in any other file:
from tattletale import tattle
@tattle('test_zmq_server')
def callback(*args, **kwargs):
#this is called any time a message comes through to the zmq server
print kwargs
# in any other file:
#send data to zmq server/client
from tattletale import tattletale
tattletale('test_zmq_server').log({"message":"here is my message"})
######### GPIO ###################### (also supportsd i2c,uart, pwm)
from tattletale import tattletale
from tattletale_gpio import gpio
#Register gpio pin
tattletale('gpio').config(engine="gpio", pin="13")
#set pin 13 high
tattletale.log(1)
@tattle('gpio', pin="13")
def callback(pin_state):
print "Pin just set to {}".format(pin_state)
######### Slack ######################
tattletale('test_private').config(engine="slack_bot", token=SLACK_API_KEY)
@tattle("test_private", tt_match_key="message.text", tt_regex="Hello .*")
def callback(*args, **kwargs):
message = kwargs['message']
print "User {} just said hello".format(message['user'], message['text'])
tattletale('test_private').log("I just said something in the #test_private channel!")
######### AMQP ######################
TEST_EXCHANGE = "private"
TEST_ROUTE = "test.route"
from tattletale import tattletale
tattletale(self.TEST_EXCHANGE).config(
engine="amqp",
host="domain.com:5672",
userid=RABBIT_MQ_USER,
password=RABBIT_MQ_PASSWORD)
@tattle(TEST_EXCHANGE, TEST_ROUTE)
def callback(value):
print value.get('data1'), value.get('data2')
tattletale(TEST_EXCHANGE).log(TEST_ROUTE, {"data1": "foo", "data2": "bar"})
"""
You can also bind the tattletale to a wildcard route using an asterisk or hash
(* asterisk) can substitute for exactly one word.
(# hash) can substitute for zero or more words.
@tattle("private", "test.*")
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment