Skip to content

Instantly share code, notes, and snippets.

@JFoxUK
Created October 5, 2023 08:45
Show Gist options
  • Save JFoxUK/e36390072560876f4e5cd550b4ce7aec to your computer and use it in GitHub Desktop.
Save JFoxUK/e36390072560876f4e5cd550b4ce7aec to your computer and use it in GitHub Desktop.
Salesforce PubSub Example
from __future__ import print_function
import grpc
import requests
import threading
import io
import pubsub_api_pb2 as pb2
import pubsub_api_pb2_grpc as pb2_grpc
import avro.schema
import avro.io
import time
import certifi
import json
semaphore = threading.Semaphore(1)
latest_replay_id = None
with open(certifi.where(), 'rb') as f:
creds = grpc.ssl_channel_credentials(f.read())
with grpc.secure_channel('api.pubsub.salesforce.com:7443', creds) as channel:
# All of the code in the rest of the tutorial will go inside
# this block
username = 'jonathan.fox@brave.com'
password = '*****************'
url = 'https://login.salesforce.com/services/Soap/u/55.0/'
headers = {'content-type': 'text/xml', 'SOAPAction': 'login'}
xml = "<soapenv:Envelope xmlns:soapenv='http://schemas.xmlsoap.org/soap/envelope/' " + \
"xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' " + \
"xmlns:urn='urn:partner.soap.sforce.com'><soapenv:Body>" + \
"<urn:login><urn:username><![CDATA[" + username + \
"]]></urn:username><urn:password><![CDATA[" + password + \
"]]></urn:password></urn:login></soapenv:Body></soapenv:Envelope>"
res = requests.post(url, data=xml, headers=headers, verify=False)
#Optionally, print the content field returned
print(res.content)
sessionid = '00D7R0000042qRv!ARUAQO2Tx2hkNBv6J4i9KNQVwyL9eEtHKZ_pAGDoLAyWlzDslzF_RAcwsrc9bFlCG1s.fT8eqlzRYcURQDbtA10Y'
instanceurl = 'https://brave-wolf-6odc-dev-ed.my.salesforce.com'
tenantid = '00D7R0000042qRv'
authmetadata = (('accesstoken', sessionid),
('instanceurl', instanceurl),
('tenantid', tenantid))
stub = pb2_grpc.PubSubStub(channel)
def fetchReqStream(topic):
while True:
semaphore.acquire()
yield pb2.FetchRequest(
topic_name = topic,
replay_preset = pb2.ReplayPreset.LATEST,
num_requested = 1)
def decode(schema, payload):
schema = avro.schema.parse(schema)
buf = io.BytesIO(payload)
decoder = avro.io.BinaryDecoder(buf)
reader = avro.io.DatumReader(schema)
ret = reader.read(decoder)
return ret
mysubtopic = "/data/AccountChangeEvent"
print('Subscribing to ' + mysubtopic)
substream = stub.Subscribe(fetchReqStream(mysubtopic),
metadata=authmetadata)
for event in substream:
semaphore.release()
if event.events:
payloadbytes = event.events[0].event.payload
schemaid = event.events[0].event.schema_id
schema = stub.GetSchema(
pb2.SchemaRequest(schema_id=schemaid),
metadata=authmetadata).schema_json
decoded = decode(schema, payloadbytes)
print("Got an event!", json.dumps(decoded))
else:
print("[", time.strftime('%b %d, %Y %l:%M%p %Z'),
"] The subscription is active.")
latest_replay_id = event.latest_replay_id
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment