Skip to content

Instantly share code, notes, and snippets.

@smitchell
Last active July 29, 2020 12:08
Show Gist options
  • Save smitchell/35702d90240ee01ffe14735980d9cdd9 to your computer and use it in GitHub Desktop.
Save smitchell/35702d90240ee01ffe14735980d9cdd9 to your computer and use it in GitHub Desktop.
The trouble with tuples
#!/usr/bin/env python3
from pprint import pprint
from marshmallow import Schema
from marshmallow import fields
from marshmallow import post_load
class ModifyEntryRequest:
def __init__(self, dn, changes, controls=None):
self.dn = dn
self.changes = changes
self.controls = controls
class ModifyEntryRequestSchema(Schema):
dn = fields.Str()
changes = fields.Dict(
keys=fields.Str(),
values=fields.List(fields.Dict(keys=fields.Str(), values=fields.List(fields.Str()))), required=True)
controls = fields.Dict(keys=fields.Str(), values=fields.Str(), allow_none=True)
def convert_dict_to_tuple(self, changes):
for attribute_key in changes:
attribute = changes[attribute_key]
temp_dict = {}
result = []
for operation in attribute:
for operation_key in operation.keys():
if operation_key in temp_dict:
temp_dict[operation_key] += operation[operation_key]
else:
temp_dict[operation_key] = operation[operation_key]
for key in temp_dict.keys():
result.append(tuple([key] + temp_dict[key]))
changes[attribute_key] = result
@post_load
def create_add_entry_request(self, data, **kwargs):
self.convert_dict_to_tuple(data['changes'])
return ModifyEntryRequest(**data)
def test_schema():
print('test_schema()')
schema = ModifyEntryRequestSchema()
# changes = {'mobile': [{'MODIFY_ADD': ['913-222-3344']}]}
changeEntityRequestJson = {'dn': 'cn=mwatkins,cn=users,cn=employees,ou=test,o=lab', 'changes': {'mobile': [{'MODIFY_ADD': ['913-222-3344']}]}}
modify_entry_request = schema.load(changeEntityRequestJson)
changes = modify_entry_request.changes
for attribute_key in changes:
attribute = changes[attribute_key]
for operation in attribute:
operation_type = type(operation)
if operation_type != tuple:
raise Exception(f'Expected class tuple but found {type(operation)}')
if modify_entry_request.dn != changeEntityRequestJson['dn']:
raise Exception(f'Expected {changeEntityRequestJson["dn"]} but found {modify_entry_request.dn}')
if __name__ == '__main__':
test_schema()
#!/usr/bin/env python3
from pprint import pprint
def convert_dict_to_tuple(data):
for attribute_key in data:
attribute = data[attribute_key]
temp_dict = {}
result = []
for operation in attribute:
for operation_key in operation.keys():
if operation_key in temp_dict:
temp_dict[operation_key] += operation[operation_key]
else:
temp_dict[operation_key] = operation[operation_key]
for key in temp_dict.keys():
result.append(tuple([key] + temp_dict[key]))
data[attribute_key] = result
def test_convert_dict_to_tuple():
data = {'mobile': [{'MODIFY_ADD': ['913-222-3344']}]}
print('BEFORE:')
pprint(data)
convert_dict_to_tuple(data)
for attribute_key in data:
attribute = data[attribute_key]
for operation in attribute:
operation_type = type(operation)
if operation_type != tuple:
raise Exception(f'Expected class tuple but found {type(operation)}')
print('AFTER:')
pprint(data)
if __name__ == '__main__':
test_convert_dict_to_tuple()
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import logging
from nats.aio.client import Client as NATS, ErrNoServers
from nats.aio.errors import ErrSlowConsumer
DEFAULT_ITERATIONS = 10000
HASH_MODULO = 250
async def run(loop):
nc = NATS()
nats_host = 'nats://127.0.0.1:4222'
nats_subject = 'Test Subject'
async def disconnected_cb():
print('Got disconnected!')
async def reconnected_cb():
print('Got reconnected to {url}'.format(url=nc.connected_url.netloc))
async def error_cb(e):
if type(e) is ErrSlowConsumer:
print('Slow consumer error, unsubscribing from handling further messages...')
await nc.unsubscribe(e.sid)
async def closed_cb():
print('Connection is closed')
async def subscribe_handler(mesg):
print('Got message: ', mesg.subject, mesg.reply, mesg.data)
options = {
'servers': [nats_host],
'name': 'NATS test',
'loop': loop,
'disconnected_cb': disconnected_cb,
'reconnected_cb': reconnected_cb,
'error_cb': error_cb,
'closed_cb': closed_cb
}
try:
print(f'Connecting to {nats_host}')
await nc.connect(**options)
except ErrNoServers as e:
print(f'Connection error {e}')
return
if nc.is_connected:
print(f'Subscribing to {nats_subject}')
await nc.subscribe(nats_subject, cb=subscribe_handler)
#
# This was what was causing my subscriber to disconnect. Now, the code falls through to the run_forever().
#
# print('Draining')
# await nc.drain()
# return
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run(loop))
loop.run_forever()
except KeyboardInterrupt as e:
logging.warning('Keyboard interrupt')
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
@chaseWilliams
Copy link

chaseWilliams commented May 15, 2020

I think you're just passing a badly formatted dict to the schema, you've defined it correctly (although I would look at possibly using nested schemas rather than a nested and inflexible fields.Dict - Nested) I changed changes to the following:

changes = {"changes": {"mobile": [{"MODIFY_ADD": ["913-222-3344"]}]}, "dn": "blah"}

And the test case passed for me.

@ephoning
Copy link

you just beat me to it!:

payload = {'dn': 'som-DN', 'changes': {'mobile': [{'MODIFY_ADD': ['913-222-3344']}]}}
modify_entry_request = schema.load(payload)

@smitchell
Copy link
Author

smitchell commented May 15, 2020

Sweet! Thanks, @chaseWilliams, and @ephoning. It's has been a long week. Once I switched to the entire request object, and not just changes it worked and I was able to include the code to transform the list of dictionaries to a list of tuples successfully.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment