Skip to content

Instantly share code, notes, and snippets.

@MichelDiz
Created March 12, 2023 19:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MichelDiz/86fe552ecfca9afd7aeefe4deaeb32da to your computer and use it in GitHub Desktop.
Save MichelDiz/86fe552ecfca9afd7aeefe4deaeb32da to your computer and use it in GitHub Desktop.

just an example

import pydgraph
import json
def do_query(query=None, mutation=None, cond=None):
client_stub = pydgraph.DgraphClientStub("localhost:9080")
client = pydgraph.DgraphClient(client_stub)
txn = client.txn()
try:
if mutation:
mutation = txn.create_mutation(set_obj=mutation, cond=cond)
request = txn.create_request(query=query, mutations=[mutation], commit_now=True)
txn.do_request(request)
print('OK - Mutation Dgraph done')
else:
res = client.txn(read_only=True).query(query)
reqs = json.loads(res.json)
#print("Response",json.dumps(reqs, indent=4))
txn.commit()
print('OK - Query Dgraph done')
return reqs
except pydgraph.AbortedError:
print("error")
finally:
txn.discard()
client_stub.close()
from dg import do_query
def get_snapshots(data):
query = f"""
{{
var(func: uid(0x1)){{
C as cred
}}
q(func: uid(C)) @filter(regexp(name.addr, /{address}/i)) {{
v as uid
snapshots(first:1) (orderdesc:createdAt) {{
at as createdAt
T as since: math(since(at)/(60))
}}
}}
var(func: uid(at)) @filter(gt(val(T), 15)) {{
test: math(since(at)/(60))
G as uid
}}
}}
"""
# Upsert Condition
cond = "@if(gt(len(G), 0) ) "
# Defining Root/parent Mutation Object
m_obj = {
'uid': "uid(v)",
'snapshots': []
}
# Appending Child Mutation Object
m_obj['snapshots'].append(
{
'somedata': data,
})
response = do_query(query=query, mutation=m_obj, cond=cond)
return response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment