Skip to content

Instantly share code, notes, and snippets.

@rogerdahl
Created November 2, 2016 18:11
Show Gist options
  • Save rogerdahl/217611b82f69464471d34b4786a191e3 to your computer and use it in GitHub Desktop.
Save rogerdahl/217611b82f69464471d34b4786a191e3 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import requests
import requests_toolbelt.streaming_iterator
import requests_toolbelt
import d1_client.d1baseclient
import d1_common.util
import d1_common.url
# Defaults
# The timeout timer runs while the POST is occurring, so must be set to a value
# that covers the entire transfer time of the largest object plus safety margin.
# The timer resets whenever data arrives, which works well for a GET, but not
# for a POST, where all data is outgoing. None disables the timeout.
TIMEOUT_SEC = 30 * 60
class StreamClient(d1_client.d1baseclient.DataONEBaseClient):
def __init__(self, *args, **kwargs):
kwargs.setdefault('api_major', 1)
kwargs.setdefault('api_minor', 0)
d1_client.d1baseclient.DataONEBaseClient.__init__(self, *args, **kwargs)
def createResponse(
self,
pid,
sciobj_stream,
sysmeta_pyxb,
cert_pub_path=None,
cert_key_path=None,
timeout_sec=TIMEOUT_SEC,
):
url = d1_common.url.joinPathElements(self.base_url, 'v1', 'object')
print url
mmp_stream = requests_toolbelt.MultipartEncoder(
fields={
'pid': pid.encode('utf-8'),
'object': ('content.bin', sciobj_stream),
'sysmeta': ('sysmeta.xml', sysmeta_pyxb.toxml().encode('utf-8')),
}
)
headers = {
'Content-Type': mmp_stream.content_type,
'Content-Length': str(mmp_stream.len),
}
session = requests.Session()
return session.post(
url, data=mmp_stream, headers=headers,
cert=(cert_pub_path, cert_key_path), verify=False, stream=True,
timeout=timeout_sec
)
@d1_common.util.utf8_to_unicode
def createStream(
self,
pid,
sciobj_stream,
sysmeta_pyxb,
cert_pub_path=None,
cert_key_path=None,
timeout_sec=TIMEOUT_SEC,
):
response = self.createResponse(
pid, sciobj_stream, sysmeta_pyxb, cert_pub_path, cert_key_path,
timeout_sec
)
response.status = response.status_code
return self._read_dataone_type_response(response, 'Identifier')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment