Skip to content

Instantly share code, notes, and snippets.

@GarryOne
Created October 25, 2023 23:51
Show Gist options
  • Save GarryOne/0965529c8aa2f0d3ee3d870fdd48e3bf to your computer and use it in GitHub Desktop.
Save GarryOne/0965529c8aa2f0d3ee3d870fdd48e3bf to your computer and use it in GitHub Desktop.
Migrating Users from One Auth0 Tenant to Another in Python: Handling Large Payloads
import gzip
import json
import time
import requests
export_management_token = "YOUR_MANAGEMENT_TOKEN" # Replace with your token
domain = "YOUR_SOURCE_TENANT_DOMAIN" # Replace with your source tenant's domain
connection_id = "YOUR_CONNECTION_ID" # Replace with your connection ID
export_url = f'https://{domain}/api/v2/jobs/users-exports'
export_payload = {
'connection_id': connection_id, # Replace with your connection ID
'format': 'json',
'fields': [
{"name": "user_id"},
{"name": "given_name"},
{"name": "family_name"},
{"name": "nickname"},
{"name": "name"},
{"name": "email"},
{"name": "email_verified"},
{"name": "picture"},
{"name": "identities[0].connection"},
{"name": "created_at"},
{"name": "updated_at"},
{"name": "password_hash"},
{"name": "username"},
{"name": "custom_password_hash"}
]
}
headers = {
'Authorization': f'Bearer {export_management_token}',
'Content-Type': 'application/json'
}
export_response = requests.post(export_url, data=json.dumps(export_payload), headers=headers)
export_data = export_response.json()
# Print the decoded data (optional)
print(export_data)
job_id = export_data.get('id')
connection_name = export_data.get('connection')
# Print the initial response
print(export_data)
# Check the job status until completion or error
while True:
time.sleep(1)
# Check job status
url = f"https://{domain}/api/v2/jobs/{job_id}"
response = requests.get(url, headers=headers)
status_data = response.json()
print(status_data) # Print the status
# If the "location" field is in the response or if the job has failed, stop checking
if "location" in status_data or status_data.get('status') in ['completed', 'failed']:
break
# If the "location" field exists, download the file
if "location" in status_data:
download_url = status_data['location']
response = requests.get(download_url, stream=True)
# Get the filename from the Content-Disposition header or use a default name
compressed_filename = response.headers.get('content-disposition', '').split('filename=')[-1].strip('"')
if not compressed_filename:
compressed_filename = f'./scripts/users-{connection_name}.gz' # Default name if none found
with open(compressed_filename, 'wb') as compressed_file:
for chunk in response.iter_content(chunk_size=8192):
compressed_file.write(chunk)
print(f"File downloaded and saved as {compressed_filename}")
# Decompress the file
decompressed_ndjson_filename = compressed_filename.replace('.gz', '.ndjson')
with gzip.open(compressed_filename, 'rt') as gz_file:
with open(decompressed_ndjson_filename, 'w') as out_file:
out_file.write(gz_file.read())
print(f"File decompressed and saved as {decompressed_ndjson_filename}")
# Convert NDJSON to JSON
with open(decompressed_ndjson_filename, 'r') as ndjson_file:
ndjson_data = ndjson_file.readlines()
json_data = [json.loads(item) for item in ndjson_data]
json_filename = decompressed_ndjson_filename.replace('.ndjson', '.json')
with open(json_filename, 'w') as json_file:
json.dump(json_data, json_file, indent=4)
print(f"Converted NDJSON to JSON and saved as {json_filename}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment