Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
topic for PubSub-KMS
#!/usr/bin/env python
import googleapiclient.discovery
import base64
from gcloud import pubsub
project_id="<>"
location="global"
keyring="<>"
cryptokey="<>"
data="{test:\"test\"}"
testtopic="<>"
def main():
encrypted_data = encrypt(data)
sendtotopic(encrypted_data)
def encrypt(data):
# Creates an API client for the KMS API.
kms_client = googleapiclient.discovery.build('cloudkms', 'v1beta1')
# The resource name of the CryptoKey.
name = 'projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}'.format(
project_id, location, keyring, cryptokey)
encoded_text = base64.b64encode(data)
# Use the KMS API to encrypt the text.
cryptokeys = kms_client.projects().locations().keyRings().cryptoKeys()
request = cryptokeys.encrypt(
name=name, body={'plaintext': encoded_text.decode('utf-8')})
response = request.execute()
print ('Encrypted data {}.'.format(response['ciphertext'].encode('utf-8')))
return response['ciphertext'].encode('utf-8')
def sendtotopic(encrypted_data):
tps = pubsub.Client()
testtopic = tps.topic(testtopic)
assert testtopic.exists()
print "JSON data to be published is: " + encrypted_data + "\n"
response = testtopic.publish(encrypted_data)
print "Message ID: " + response + "\n"
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.