Skip to content

Instantly share code, notes, and snippets.

@yogananda-muthaiah
Created February 17, 2021 17:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yogananda-muthaiah/4588305ae3ba6ae11df9608b31dfbb4a to your computer and use it in GitHub Desktop.
Save yogananda-muthaiah/4588305ae3ba6ae11df9608b31dfbb4a to your computer and use it in GitHub Desktop.
SAP Cloud Integration API
class SAPCloudIntegration:
def __init__(self, session, api_endpoint, base64AuthString):
self.sess = session
self.api_endpoint = api_endpoint
self.base64AuthString = base64AuthString
self.CSRF_Token = self.get_csrf_token()
print(f"Token {self.CSRF_Token}")
def get_csrf_token(self):
headers = {
'Authorization': f"Basic {self.base64AuthString}",
'Content-Type': 'application/json',
'Accept': 'application/json',
'X-CSRF-Token': 'Fetch'
}
response = self.sess.get(self.api_endpoint, headers=headers)
token = response.headers['X-CSRF-Token']
return token
def get_authentication_headers(self):
return {
'Authorization': f"Basic {self.base64AuthString}",
'Content-Type': 'application/json',
'Accept': 'application/json',
'X-CSRF-token': self.CSRF_Token
}
def create_iflow(self, name, iflow_id, package_id, zip_file_path):
headers = self.get_authentication_headers()
encodedStr = ""
with open(zip_file_path, "rb") as zip_file:
data = zip_file.read()
encodedStr = base64.b64encode(data).decode('utf-8')
payload = {
"Name": name,
"Id": iflow_id,
"PackageId": package_id,
"ArtifactContent": encodedStr
}
url = self.api_endpoint + "/IntegrationDesigntimeArtifacts"
response = sess.post(url, headers=headers, data=json.dumps(payload))
print(f"create_iflow -> {response.status_code}")
return response
def delete_iflow(self, iflow_id, version):
headers = self.get_authentication_headers()
url = self.api_endpoint + f"/IntegrationDesigntimeArtifacts(Id='{iflow_id}',Version='{version}')"
response = sess.delete(url, headers=headers)
print(f"delete_iflow -> {response.status_code}")
return response
def get_iflow_configuration(self, iflow_id, version):
url = self.api_endpoint + f"/IntegrationDesigntimeArtifacts(Id='{iflow_id}',Version='{version}')/Configurations"
response = self.sess.get(url, headers=self.get_authentication_headers())
print(f"get_iflow_configuration -> {response.status_code}")
return response
def update_iflow_configuration(self, iflow_config, new_config, iflow_id, version):
batch_id = str(uuid.uuid4())
change_set_id = str(uuid.uuid4())
headers = self.get_authentication_headers()
headers['Content-Type'] = f"multipart/mixed; boundary=batch_{batch_id}"
# print(f"Content-Type: {headers['Content-Type']}\n\n")
# Escaping all characters in the payload with \r\n as only using \n will return a 400 error
payload = f"--batch_{batch_id}\r\n"
payload += f"Content-Type: multipart/mixed; boundary=changeset_{change_set_id}\r\n\r\n"
for param in iflow_config:
key = param['ParameterKey']
param_value_json = {
"ParameterKey" : key,
"ParameterValue" : f"{new_config[key]}",
"DataType" : param['DataType']
}
# Construct payload for parameter change set
payload += f"--changeset_{change_set_id}\r\n"
payload += f"Content-Type: application/http\r\n"
payload += f"Content-Transfer-Encoding: binary\r\n\r\n"
payload += f"PUT IntegrationDesigntimeArtifacts(Id='{iflow_id}',Version='{version}')/$links/Configurations('{key}') HTTP/1.1\r\n"
payload += f"Accept: application/json\r\n"
payload += f"Content-Type: application/json\r\n\r\n"
payload += json.dumps(param_value_json, indent=0) + "\r\n\r\n"
# End change set and batch
payload += f"--changeset_{change_set_id}--\r\n"
payload += f"--batch_{batch_id}--\r\n"
# print(payload)
url = self.api_endpoint + f"/$batch"
response = self.sess.post(url, headers=headers, data=payload)
print(f"update_iflow_configuration -> {response.status_code}")
return response
def deploy_iflow(self, iflow_id, version):
url = self.api_endpoint + f"/DeployIntegrationDesigntimeArtifact?Id='{iflow_id}'&Version='{version}'"
response = sess.post(url, headers=self.get_authentication_headers())
print(f"deploy_iflow -> {response.status_code}")
return response
def save_version_iflow(self, iflow_id, new_version):
url = self.api_endpoint + f"/IntegrationDesigntimeArtifactSaveAsVersion?SaveAsVersion='{new_version}'&Id='{iflow_id}'"
response = sess.post(url, headers=self.get_authentication_headers())
print(f"save_version_iflow -> {response.status_code}")
return response
def undeploy_iflow(self, iflow_id):
url = self.api_endpoint + f"/IntegrationRuntimeArtifacts('{iflow_id}')"
response = sess.delete(url, headers=self.get_authentication_headers())
print(f"undeploy_iflow -> {response.status_code}")
return response
def update_resource_in_iflow(self, iflow_id, version, resource, resource_type, file_path):
encodedStr = ""
with open(file_path, "rb") as file:
data = file.read()
encodedStr = base64.b64encode(data).decode('utf-8')
payload = {
"ResourceContent": encodedStr
}
url = self.api_endpoint + f"/IntegrationDesigntimeArtifacts(Id='{iflow_id}',Version='{version}')/$links/Resources(Name='{resource}',ResourceType='{resource_type}')"
response = sess.put(url, headers=self.get_authentication_headers(), data=json.dumps(payload))
print(f"update_resource_in_iflow -> {response.status_code}")
return response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment