Skip to content

Instantly share code, notes, and snippets.

@lemonprogis
Last active August 29, 2015 14:25
Show Gist options
  • Save lemonprogis/dc8feeaa8ed730d34a3e to your computer and use it in GitHub Desktop.
Save lemonprogis/dc8feeaa8ed730d34a3e to your computer and use it in GitHub Desktop.
Use these files to interact with arcgis server rest api. Lots of functionality exposed via REST. Good for automating tasks in python.
# -*- coding: iso-8859-1 -*-
"""
ArcGIS Server Module
"""
__author__='Ed Briggler, Jason Tipton'
__VERSION__='1.0'
# http://resources.arcgis.com/en/help/rest/apiref/
# http://resources.arcgis.com/en/help/server-admin-api/
import requests, sys, os, logging
sys.path.append(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
from conf import settings
class Endpoint(object):
'''
Endpoints: ArcGIS for Server
'''
def __init__(self, base_url):
''' init all our endpoints '''
self.BASE_URL = base_url
self.ADMIN = '{}/arcgis/admin'.format(base_url)
self.VALIDATE_ITEM_DATA = '{}/data/validateDataItem'.format(self.ADMIN)
self.REGISTER_DATA_ITEM = '{}/data/registerItem'.format(self.ADMIN)
self.TOKEN = '{}/arcgis/admin/generateToken'.format(base_url)
self._USER_URL = '{}/arcgis/admin/security/users'.format(base_url)
self._ROLE_URL = '{}/arcgis/admin/security/roles'.format(base_url)
self._SERVICE_URL = '{}/arcgis/admin/services'.format(base_url)
self._REST_SRVCS_URL = '{}/arcgis/rest/services'.format(base_url)
self.ADD_USER = '{}/add'.format(self._USER_URL)
self.GET_USERS = '{}/getUsers'.format(self._USER_URL)
self.SEARCH_USER = '{}/search'.format(self._USER_URL)
self.REMOVE_USER = '{}/remove'.format(self._USER_URL)
self.UPDATE_USER = '{}/update'.format(self._USER_URL)
self.ADD_ROLE = '{}/assignRoles'.format(self._USER_URL)
self.GET_ROLE = '{}/getRoles'.format(self._USER_URL)
self.REMOVE_ROLE = '{}/removeRoles'.format(self._USER_URL)
self.GET_PRIVILEGE = '{}/getPrivilege'.format(self._USER_URL)
self.GET_ROLES_FOR_USER = '{}/getRolesForUser'.format(self._ROLE_URL)
self.ADD_USER_TO_ROLE = '{}/addUsersToRole'.format(self._ROLE_URL)
self.ASSIGN_PRIVILEGE = '{}/assignPrivilege'.format(self._ROLE_URL)
self.COUNT_ERROR = '{}/arcgis/arcgis/admin/logs/countErrorReports'.format(base_url)
self.SYSTEM = '{}/System'.format(self._REST_SRVCS_URL)
class Manager(object):
'''
MANAGER CLASS
inherit from this class to communicate with an arcgis server
resource docs for ESRI: http://resources.arcgis.com/en/help/server-admin-api/
'''
def __init__(self,url=None, proxies=False, username=None, password=None):
self.username = username
self.password = password
self.cookies = None
'''
init method
disable urllib3 warnings because we know what we are doing
setup proxies and get our token so we can call the api's
'''
requests.packages.urllib3.disable_warnings()
self.token = None
if url is not None:
self._endpoints = Endpoint(url) # create endpoints
if proxies:
self.proxies = settings.config['proxies']
elif not proxies:
self.proxies = {
'http':'',
'https':''
}
else:
self.proxies = proxies
if username:
self._get_token(username, password)
def _get_token(self,username, password):
''' gets a token from arcgis server '''
# payload
params = {'username': username, 'password': password, 'client': 'requestip', 'f': 'json'}
# Read response
response = requests.post(self._endpoints.TOKEN, data=params, proxies=self.proxies, verify=False )
self.cookies = response.cookies
#print(self.cookies)
data = self.checkResponse(response)
self.token = data['token']
def checkResponse(self, req):
''' using this to check for errors, will raise, otherwise, will return json '''
req.raise_for_status()
return req.json()
def _post(self, r_url, r_params={}, files={}):
''' do post with payload, always append format and token '''
if r_params.has_key('f') == False:
r_params['f'] = 'json'
r_params['token'] = self.token
response = requests.post(r_url, data=r_params, proxies=self.proxies, verify=False, files=files) #, cookies=self.cookies)
response = self.checkResponse(response)
if response.has_key('status'):
if response.has_key('code'):
if response['code'] == 498:
from pprint import pprint
#print('Invalid Token: Requesting a new token')
logging.info(response)
## Invalid Token, request a new and try again
self._get_token(self.username, self.password)
r_params['token'] = self.token
response = self._post(r_url, r_params, files)
#response = requests.post(r_url, data=r_params, proxies=self.proxies, verify=False, files=files)
#response = self.checkResponse(response)
return response
class GP(Manager):
""" GEOPROCESSING CLASS
Interacts with the Geoprocessors on the server
"""
def __init__(self, url = None, proxies = False, username = None, password = None):
return super(GP, self).__init__(url, proxies, username, password)
def submit_job(self,workspace,service_name, task, params):
url = '{}/{}/{}/GPServer/{}/submitJob'.format(self._endpoints._REST_SRVCS_URL,workspace,service_name,task, params)
#print url
response = self._post(url,params)
from pprint import pprint
##pprint(response)
return response
def get_result(self,workspace,service_name, task, job_obj, param_name):
url = '{}/{}/{}/GPServer/{}/jobs/{}/{}'.format(self._endpoints._REST_SRVCS_URL,workspace,service_name,task, job_obj['jobId'], job_obj['results'][param_name].values()[0] )
response = self._post(url)
##print response
return response
def is_job_complete(self,workspace,service_name, task, id):
js = self.job_status(workspace,service_name, task, id)
##print js
if js.has_key('error') == False:
if js['jobStatus'] == "esriJobExecuting" or js['jobStatus'] == "esriJobSubmitted":
return False
else: return True
def wait_for_job_completion(self,workspace,service_name, task, id, wait_time=1):
from time import sleep
#print 'WAITING FOR COMPLETION'
js = self.job_status(workspace,service_name, task, id)
#print js
if js.has_key('error') == False:
##print js['jobStatus']
while js['jobStatus'] == "esriJobExecuting" or js['jobStatus'] == "esriJobSubmitted":
try:
# Need to find a better way
js = self.job_status(workspace,service_name, task, id)
if isinstance(js,dict) == False:
self._get_token(self.username,self.password)
js = self.job_status(workspace,service_name, task, id)
elif js.has_key('jobStatus') == False:
self._get_token(self.username,self.password)
js = self.job_status(workspace,service_name, task, id)
##print js['jobStatus']
except:
pass
#print sys.exc_info()[0]
sleep(wait_time)
#print js['jobStatus']
return js
def job_status(self,workspace,service_name, task, id):
url = '{}/{}/{}/GPServer/{}/jobs/{}'.format(self._endpoints._REST_SRVCS_URL,workspace,service_name,task, id)
##print url
response = self._post(url)
return response
# https://esri-dev.entergy.com/arcgis/rest/services/
# System/PublishingTools/GPServer/Get%20Database%20Connection%20String/submitJob
def submit_report(self,task, svc, folder_name=None, svc_type='MapServer'):
"""
This was created for getting a report on caching status. Not sure what else it can be used for
"""
report_url = '{}/ReportingTools/GPServer'.format(self._endpoints.SYSTEM)
svc_url = ''
if folder_name != None:
svc_url = folder_name + '/'
svc_url += '{}:{}'.format(svc,svc_type)
url = '{report}/{task}/execute'.format(report=report_url, task=task)
params = {'service_url': svc_url}
response = self._post(url, params)
return response
class Admin(Manager):
def move_site(self, path_from, path_to):
"""
Moves site to new location. Moves configstore and directories that match path_from.
Also copies cache directories
"""
self.move_configstore(path_from, path_to)
self.move_directories(path_from, path_to)
def get_directories(self):
""" Gets the server directories. Ex: Cache, Jobs, Output... """
url = '{}/arcgis/admin/system/directories'.format(self._endpoints.BASE_URL)
response = self._post(url)
return response
def copy_directory_contents(self, path_from, path_to, async=True, silent=True):
""" Copies contents from "from" folder to "to" folder. Useful for Caches"""
import subprocess
# /e Copy all subdirectories
# /XD ".*" Exclude directories that being with ".". Ex: ".site"
silent_str = ''
if silent == True:
silent_str = '/NFL /NDL /NJH /NJS /nc /ns /np'
cmd = 'robocopy {path_from} {path_to} {silent_str} /e /XD ".*" '.format(path_from=path_from, path_to=path_to, silent_str=silent_str)
result = ''
if async == True:
result = subprocess.Popen(cmd)
pass
else:
result = subprocess.call(cmd)
return result
def move_directories(self, path_from, path_to, copy_cache=True):
""" Moves all directories that match the from path. If copy_cache=True, copies the cache data as well """
responses = []
# For directory in server directories
for d in self.get_directories()['directories']:
if path_from in d['physicalPath']: # If the path_from matches
print d['name']
new_path = d['physicalPath'].replace(path_from, path_to)
print new_path
responses.append(self.edit_directory(d['name'], new_path)) # Change the directory
# Copy cache contents if set to copy_cahce=True
if d['directoryType'] == 'CACHE' and copy_cache == True:
self.copy_directory_contents(d['physicalPath'], d['physicalPath'].replace(path_from, path_to))
return responses
def get_directory(self,directory):
url = url = '{}/arcgis/admin/system/directories/{}'.format(self._endpoints.BASE_URL, directory)
response = self._post(url)
return response
def edit_directory(self, directory, new_path, cleanup_mode=None, max_file_age=None, description=None, name=None, directory_type=None):
"""
The server directory's edit operation allows you to change the path and clean up properties of the directory.
This operation updates the GIS service configurations (and points them to the new path) that are using this directory, causing them to restart.
It is therefore recommended that any edit to the server directories be performed when the server is not under load.
This operation is mostly used when growing a single machine site to a multiple machine site configuration,
which requires that the server directories and configuration store be put on a network-accessible file share.
Keyword arguments:
cleanup_mode -- how to cleanup files ('TIME_ELAPSED_SINCE_LAST_MODIFIED'/ default 'NONE')
max_file_age -- How long to keep file before deletion in minutes
"""
if directory in ['arcgisindex', 'arcgisinput', 'arcgisjobregistry', 'arcgisuploads', 'kml']:
return {'warning':"Can't edit this direcotry", 'directory':directory}
orig_directory = self.get_directory(directory)
# Maps python params to web params
param_map = {'cleanup_mode':'cleanupMode', 'max_file_age':'maxFileAge',
'description':'description','name':'name', 'new_path':'physicalPath', 'directory_type':'directoryType'}
params = {}
for k,v in param_map.items():
if eval(k) is not None:
params[v] = eval(k)
else:
# Get values from original (You actually have to do this)
params[v] = orig_directory[v]
url = url = '{}/arcgis/admin/system/directories/{}/edit'.format(self._endpoints.BASE_URL, directory)
print params
response = self._post(url, params)
print response
return response
def get_configstore(self):
"""
The configuration store maintains all of server's configurations.
Typical configurations include all the resources such as clusters, machines, GIS services, and security rules that are required to power a Site.
In a way, the configuration store is a physical representation of a site.
"""
url = '{}/arcgis/admin/system/configstore'.format(self._endpoints.BASE_URL)
response = self._post(url)
return response
def move_configstore(self, folder_in, folder_out):
""" Moves the config store """
current_configstore = self.get_configstore()['connectionString']
if folder_in in current_configstore:
folder_out = current_configstore.replace(folder_in, folder_out)
return self.edit_configstore(folder_out, move=True, run_async=True)
else:
return {"Message": "Config store not at the old location"}
def edit_configstore(self, folder_out, move=True, run_async=True, keep_trying=True):
"""
You can use this operation to update the configuration store. Typically, this operation is used to change the location of the store.
When ArcGIS Server is installed, the default configuration store uses local paths. As the site grows (more server machines are added),
the location of the store must be updated to use a shared file system path. On the other hand,
if you know at the onset that your site will have two or more server machines, you can start from a shared path while creating a site and skip this step altogether.
"""
import time
url = '{}/arcgis/admin/system/configstore/edit'.format(self._endpoints.BASE_URL)
params = {'type': 'FILESYSTEM', 'connectionString': folder_out,
'move': move, 'runAsync': run_async}
response = {}
while True:
# If site is busy, keep trying...
response = self._post(url, params)
if response['status'] == 'error':
if keep_trying == False:
break
elif any(['Please try again later' in x for x in response['messages']]):
# break if actual error
break
else:
# break if no error
break
print('sleeping for a minute')
time.sleep(60)
return response
def export_site(self, folder_out):
"""
Exports the site configuration to the folder path given
"""
url = '{}/arcgis/admin/exportSite'.format(self._endpoints.BASE_URL)
params = {'location':folder_out}
response = self._post(url, params)
return response
def import_site(self, file_in):
"""
Exports the site configuration from the file given
"""
url = '{}/arcgis/admin/importSite'.format(self._endpoints.BASE_URL)
params = {'location': file_in}
response = self._post(url, params)
return response
def delete_site(self):
"""
Deletes site and removes all clusters/machines from it
"""
url = '{}/deleteSite'.format(self._endpoints.ADMIN)
#print url
response = self._post(url)
return response
def create_site(self, username, password, dir_config, dir_output, dir_output_obj = None, cluster='default', logsSettings = '', runAsync='true' ):
"""
NEEDS WORK
http://resources.arcgis.com/en/help/server-admin-api/createSite.html
"""
import os, json
config_store = {'connectionString':dir_config}
if dir_output_obj == None:
directories_obj = {
"directories": [
{
"name": "arcgiscache",
"physicalPath": os.path.join(dir_output,"arcgiscache"),
"directoryType": "CACHE",
"cleanupMode": "NONE",
"maxFileAge": 0,
"description": "Stores tile caches used by map, globe, and image services for rapid performance.",
},
{
"name": "arcgisjobs",
"physicalPath": os.path.join(dir_output,"arcgisjobs"),
"directoryType": "JOBS",
"cleanupMode": "TIME_ELAPSED_SINCE_LAST_MODIFIED",
"maxFileAge": 360,
"description": "Stores results and other information from geoprocessing services.",
},
{
"name": "arcgisoutput",
"physicalPath": os.path.join(dir_output,"arcgisoutput"),
"directoryType": "OUTPUT",
"cleanupMode": "TIME_ELAPSED_SINCE_LAST_MODIFIED",
"maxFileAge": 10,
"description": "Stores various information generated by services, such as map images.",
},
{
"name": "arcgissystem",
"physicalPath": os.path.join(dir_output,"arcgissystem"),
"directoryType": "SYSTEM",
"cleanupMode": "NONE",
"maxFileAge": 0,
"description": "Stores directories and files used internally by ArcGIS Server.",
}
]
}
params = {
'username': username,
'password': password,
'configStoreConnection': json.dumps(config_store),
'directories': json.dumps(directories_obj)
}
url = '{}/createNewSite'.format(self._endpoints.ADMIN, params)
response = self._post(url)
## cluster stuff
def get_clusters(self):
url = "{}/clusters".format(self._endpoints.ADMIN)
response = self._post(url)
return response
def get_cluster(self, cluster_name='default'):
url = "{}/clusters/{}".format(self._endpoints.ADMIN, cluster_name)
response = self._post(url)
return response
def get_machines_in_cluster(self, cluster_name='default'):
url = "{}/clusters/{}/machines".format(self._endpoints.ADMIN, cluster_name)
response = self._post(url)
return response
def remove_from_cluster(self, machine_names, cluster_name='default'):
url = "{}/clusters/{}/machines/remove".format(self._endpoints.ADMIN, cluster_name)
params = {"machineNames": machine_names}
response = self._post(url, params)
return response
def remove_all_other_machines_from_cluster(self, cluster_name='default'):
import urlparse
current_server = urlparse.urlparse(self._endpoints.BASE_URL).hostname.upper()
l_machines = []
for serv in self.get_machines_in_cluster(cluster_name)['machines']:
if serv['machineName'] <> current_server:
l_machines.append(serv['machineName'])
machines = ", ".join(l_machines)
#print("Removing machine from cluster: " + machines)
#print(self.remove_from_cluster(machines, cluster_name))
def add_machines_to_cluster(self, machine_names, cluster_name='default'):
url = "{}/clusters/{}/machines/add".format(self._endpoints.ADMIN, cluster_name)
params = {"machineNames": machine_names}
response = self._post(url, params)
return response
def get_available_machines(self):
url = "{}/clusters/getAvailableMachines".format(self._endpoints.ADMIN)
response = self._post(url)
return response
def add_available_machines_to_cluster(self, cluster_name='default'):
l_machines = []
for m in self.get_available_machines()['machines']:
l_machines.append(m)
if len(l_machines) > 0:
machines = ", ".join(l_machines)
#print("Adding machines to cluster: " + machines)
return self.add_machines_to_cluster(machines, cluster_name)
else: return "No servers to add"
def unregister_machine(self, machine_name):
url = "{}/machines/{}/unregister".format(self._endpoints.ADMIN,machine_name)
response = self._post(url)
return response
def register_machine(self, machine_name, machine_admin_url=None):
"""
machine_admin_url is the url where the admin is running on the server to be added. Ex: http://gis.server.com:6080/arcgis/admin.
If left blank, ArcGIS Server will attempt to reach it at <machineName>:6080/arcgis/admin
"""
params = {"machineName": machine_name}
if machine_admin_url <> None:
params['machineName'] = machine_name
url = "{}/machines/register".format(self._endpoints.ADMIN,machine_name)
response = self._post(url)
return response
def register_machines(self, l_machine_names, admin_rel_path=r":6080/arcgis/admin"):
for m in l_machine_names:
if admin_rel_path == r":6080/arcgis/admin":
self.register_machine(m)
else:
self.register_machine(m, '{}/{}'.format(m,admin_rel_path))
def unregister_machines(self, l_machine_names):
for m in l_machine_names:
self.unregister_machine(m)
def unregister_available_machines(self):
"""
Unregisters all machines not currently participating in a site
"""
l_machines = self.get_available_machines()['machines']
#print l_machines
self.unregister_machines(l_machines)
def validate_data_item(self, item, item_name, type):
import json
url = self._endpoints.VALIDATE_ITEM_DATA
p1 = {}
if type == 'gdb':
p1 = { 'info': {
'connectionString': item['value'],
'dataStoreConnectionType': 'shared',
'isManaged': 'false'},
'path': '/enterpriseDatabases/' + item_name,
'type': 'egdb'
}
params = {'item':json.dumps(p1)}
response = self._post(url, params)
#print response
return response
def get_all_egdb(self):
"""
Gets all SDE Connections
"""
url = '{}/{}'.format(self._endpoints.ADMIN,'data/findItems')
params = {'types': 'egdb'}
return self._post(url, params)
def unregister_all_egdb(self):
"""
Unregisters all SDE Connections
"""
for x in self.get_all_egdb()['items']:
gdb = x['path'].replace(r'/enterpriseDatabases/','')
self.unregister_gdb(gdb)
def unregister_gdb(self, cxn_name):
#print('Unregistering GDB {}'.format(cxn_name))
self.unregister_data_item(r'/enterpriseDatabases/{}'.format(cxn_name))
def unregister_data_item(self, item_path):
url = '{}/data/unregisterItem'.format(self._endpoints.ADMIN)
params = {'itempath':item_path}
response = self._post(url, params)
#print response
return response
def register_data_item(self,item, item_name, type):
import json
url = self._endpoints.REGISTER_DATA_ITEM
p1 = {}
if type == 'gdb':
p1 = { 'info': {
'connectionString': item['value'],
'dataStoreConnectionType': 'shared'
},
'path': '/enterpriseDatabases/' + item_name,
'type': 'egdb'
}
#print p1
params = {'item':json.dumps(p1)}
if self.validate_data_item(item, item_name, type)['status'] == 'success':
#print('Registering item')
response = self._post(url, params)
#print response
return response
def upload(self, fpath, desc=''):
""" Function to upload a file to the ArcGIS REST Admin
"""
url='{}/uploads/upload'.format(self._endpoints.ADMIN)
params = {
"file": 'fffffffile',
"description":desc
}
files = {"itemFile": open(fpath, 'rb')}
response = self._post(url, params,files=files)
from pprint import pprint
#pprint(response)
return response
def registerDB(self, cxn_name, cxn_file, server_cxn='', pt_workspace='System', pt_svc_name='PublishingTools'):
gp = GP(self._endpoints.BASE_URL,username = self.username, password=self.password)
uploaded = self.upload(cxn_file, cxn_name)
task = 'Get Database Connection String'
params = {
'in_inputData': uploaded['item']['itemID'],
'in_connDataType':'UPLOADED_CONNECTION_FILE_ID'
}
job_status = gp.submit_job(pt_workspace,pt_svc_name,task,params)
job_status = gp.wait_for_job_completion(pt_workspace,pt_svc_name,task,job_status['jobId'])
result = gp.get_result(pt_workspace,pt_svc_name,task,job_status, 'out_connectionString')
if self.validate_data_item(result, cxn_name,'gdb'):
'registering item'
return self.register_data_item(result, cxn_name,'gdb')
class User(Manager):
'''
USER CLASS
this inherits from Manager and contains methods pertaining to user management
'''
def __init__(self,url=None, proxies=False, username=None, password=None):
super(User, self).__init__(url, proxies, username, password)
def addUser(self, username, password, params=None):
''' adds a user to arcgis server '''
if params == None:
params = {
'username':username,
'password':password
}
else:
params['username'] = username
params['password'] = password
return self._post(self._endpoints.ADD_USER, params)
def getUsers(self,startIndex=0, pageSize=10):
'''
gets users from arcgis server
Max of pageSize max is 1,000
'''
requestURL = self._endpoints.GET_USERS
params = {'startIndex':startIndex,'pageSize':pageSize}
return self._post(requestURL, params)
def getAllUsers(self):
"""
Iterates through getUsers() using the
startIndex and pageSize to return all users
"""
users = []
i = 0
step = 1000
while True:
response = self.getUsers(startIndex=i,pageSize=step)
[users.append(u) for u in response['users']]
if response.has_key('hasMore') == False:
response['users'] = users
break
elif response['hasMore'] == True:
# Run again
i += step
else:
response['users'] = users
break
return response
def getRolesForUser(self,user):
''' gets all of the roles a user '''
params ={
'username': user
}
return self._post(self._endpoints.GET_ROLES_FOR_USER, params)
def doesUserHaveRole(self,user,role):
''' see if user belongs to a role '''
return role in self.getRolesForUser(user)["roles"]
def returnAvailableUsername(self,username):
''' check if username is available '''
i = 1
origUserName = username
while len(self.searchUsers(username)['users']) <> 0:
username = origUserName + '_' + str(i)
i+= 1
return username
def getUser(self,username):
''' return a user '''
users = self.searchUsers(username, maxcount=50) ['users']
for u in users:
if u['username'] == username:
return u
return {}
def searchUsers(self, searchfilter, maxcount=10):
''' search for users based on a search filter '''
params = {
'filter':searchfilter,
'maxCount':maxcount
}
return self._post(self._endpoints.SEARCH_USER, params)
def removeUser(self,username):
''' removes a user from arcgis server '''
#print('{}: removing user'.format(username))
params = {
'username':username
}
return self._post(self._endpoints.REMOVE_USER, params)
def updateUser(self, username, password=None, fullname=None, description=None, email=None):
''' update a user on arcgis server '''
params = {
'username':username,
}
if password:
params['password'] = password
if fullname:
params['fullname'] = fullname
if description:
params['description'] = description
if email:
params['email'] = email
return self._post(self._endpoints.UPDATE_USER, params)
def createKeyIfNotExists(self, dict, key):
''' if a key for a dictionary doesn't exist, create it. this is a helper method '''
if dict.has_key(key) == False:
dict[key] = ''
return dict
def updateUserPWD(self, username, password):
''' update a users password '''
u = self.getUser(username)
if len(u) <> 0:
self.createKeyIfNotExists(u,'fullname')
self.createKeyIfNotExists(u,'description')
self.createKeyIfNotExists(u,'email')
#print u
return self.updateUser(username, password, u['fullname'], u['description'], u['email'])
else:
#print('User not found')
return 'error'
def addRoles(self, username, roles):
''' add a roles to a user. roles should be a list ['role1','role2','role3'] '''
params = {
'username':username,
'roles':','.join(roles)
}
return self._post(self._endpoints.ADD_ROLE, params)
def removeRoles(self, username, roles):
''' remove roles from a user. roles should be a list ['role1','role2','role3'] '''
params = {
'username':username,
'roles':','.join(roles)}
return self._post(self._endpoints.REMOVE_ROLE, params)
def getPrivileges(self, username):
''' get a users privileges ADMINISTRATOR etc... '''
params = {
'username':username
}
return self._post(self._endpoints.GET_PRIVILEGE, params)
def addUsersToRole(self, usernames, role):
''' adds users to a role'''
params = {
'users': usernames,
'rolename' : role
}
return self._post(self._endpoints.ADD_USER_TO_ROLE, params)
class Service(Manager):
def __init__(self,url=None, proxies=False, username=None, password=None):
super(Service, self).__init__(url, proxies, username, password)
def _collectServices(self, d):
''' this method packages up our services that reside on arcgis server and returns a dictionary of services '''
folders = [i for i in d['folders']]
data = []
for folder in folders:
requestURL = '{}/{}'.format(self._endpoints._SERVICE_URL,folder)
resp = self._post(requestURL)
for service in resp['services']:
name = service['serviceName']
folder = service['folderName']
type = service['type']
status = self.getServiceStatus(folder, name, type)['realTimeState']
data.append(
{
'name':name,
'folder': folder,
'type': type,
'status': status,
}
)
return data
def getAllServices(self):
''' returns all services for our arcgis server '''
data = self._post(self._endpoints._SERVICE_URL)
services = self._collectServices(data)
return services
def getServiceStatus(self,folder, servicename, servicetype):
''' gets the status of a server services '''
requestURL = '{}/{}/{}.{}/status'.format(self._endpoints._SERVICE_URL, folder,servicename, servicetype)
return self._post(requestURL)
def startService(self,folder,servicename, servicetype):
''' starts a service '''
requestURL = '{}/{}/{}.{}/start'.format(self._endpoints._SERVICE_URL, folder,servicename, servicetype)
return self._post(requestURL)
def stopService(self,folder,servicename, servicetype):
''' stops a service '''
requestURL = '{}/{}/{}.{}/stop'.format(self._endpoints._SERVICE_URL, folder, servicename, servicetype)
return self._post(requestURL)
def getServiceScales(self, folder, servicename, servicetype):
requestURL = '{}/{}/{}/{}'.format(self._endpoints._REST_SRVCS_URL, folder, servicename, servicetype)
# #print requestURL
data = self._post(requestURL)
return data['tileInfo']['lods']
def getServiceScalesAsString(self, folder, servicename, servicetype):
json_data = self.getServiceScales(folder, servicename, servicetype)
# #print json_data
return ",".join([str(s["scale"]) for s in json_data])
def _format_extent(self, xmin, ymin, xmax, ymax, wkid=26915):
return {"xmin":xmin,
"ymin":ymin,
"xmax":xmax,
"ymax":ymax,
"spatialReference": {"wkid":wkid}
}
def exportTiles(self, folder, servicename, servicetype, extent, wkid=26915):
params = {
'tilePackage':'true',
'exportExtent': str(self._format_extent(extent[0], extent[1], extent[2], extent[3], wkid)),
'optimizeTilesForSize':'true',
'compressionQuality':50,
'exportBy' : 'scale',
'levels' : self.getServiceScalesAsString(folder,servicename,servicetype),
}
requestURL = '{}/{}/{}/{}/exportTiles'.format(self._endpoints._REST_SRVCS_URL,folder, servicename, servicetype)
data = self._post(requestURL, params)
return data
def createReplica(self, folder, servicename, servicetype, extent, wkid=26915):
url = '{}/{}/{}/{}'.format(self._endpoints._REST_SRVCS_URL,folder, servicename, servicetype)
service_details = self._post(url)
extent = {str(k) : v for k,v in service_details['fullExtent'].iteritems() }
extent.pop('spatialReference')
extent = [str(v) for k, v in extent.iteritems()]
params = {
"replicaName" : servicename,
"layers" :",".join([str(i['id']) for i in service_details['layers']]),
"geometry" : ",".join(extent),
"geometryType" : "esriGeometryEnvelope",
#"inSR" : 4326, #wkid,
"transportType": "esriTransportTypeUrl",
"returnAttachments" : "false",
"returnAttachmentDataByURL" : "false",
"syncModel" : "perLayer",
"async" : "true",
"dataFormat" : "sqlite",
"replicaOptions": ""
}
print params
requestURL = '{}/{}/{}/{}/createReplica'.format(self._endpoints._REST_SRVCS_URL,folder, servicename, servicetype)
print requestURL
data = self._post(requestURL, params)
return data
def checkServiceJobStatus(self, folder, servicename, servicetype, task, jobid):
requestURL = '{}/{}/{}/{}/{}/jobs/{}'.format(self._endpoints._REST_SRVCS_URL,folder, servicename, servicetype, task, jobid)
data = self._post(requestURL)
return data
def getJobResultsOutputUrl(self, folder, servicename, servicetype, task, jobid):
requestURL = '{}/{}/{}/{}/{}/jobs/{}/results/out_service_url'.format(self._endpoints._REST_SRVCS_URL,folder, servicename, servicetype, task, jobid)
data = self._post(requestURL)
return data
def getJobResults(self, folder, servicename, servicetype, task, jobid):
data = self.getJobResultsOutputUrl(folder, servicename, servicetype, task, jobid)
requestURL = data['value']
results = self._post(requestURL)
return requestURL, results
class Status(Manager):
def __init__(self, url=None, proxies=False, username=None, password=None):
super(Status, self).__init__(url, proxies, username, password)
def countErrorReport(self,servername):
''' count errors report for server'''
params = {'machine': servername}
return self._post(self._endpoints.COUNT_ERRORS)
import os,json
class settings(object):
"""
These will be settings that all other modules may use.
Ex: Databases, environments, paths.
Anything that may need to be reused across multiple modules
- - - - - - - -SET THE GLOBAL ENVIRONMENT HERE- - - - - - - - -
"""
_config_fpath = configs_fpath = os.path.join(os.path.dirname(os.path.realpath(__file__)),'config.json')
_envir_fpath = configs_fpath = os.path.join(os.path.dirname(os.path.realpath(__file__)),'envir.json')
config = {}
envir = ''
with open(_config_fpath) as _config_file:
config = json.load(_config_file)
with open(_envir_fpath) as _config_file:
envir = json.load(_config_file)['envir']
config['envir'] = envir
env_vars = config['env_vars']
dbo = env_vars[envir]['dbs']
{
"proxies": {
"http": "http://<username>:<pword>@proxy.<>.com:80",
"https": "https://<username>:<pword>@proxy.<>.com:80"
},
"email_address": "",
"smtp_server": "",
"email_group": "",
"nas_storage": "",
"env_vars": {
"dev": {
"dbs": {
"db1": {
"name": "exampledb",
"users": {
"omdba": { "user": "test", "password": "test" },
},
"conn_string": ""
},
"db2": {
"name": "exampledb2",
"users": {
"sde": { "user": "test", "password": "test" },
},
"conn_string": ""
}
},
"esri_storage": "",
"arcgis_server": {
"url": "gisserver.example.com",
"servers": [
"servername",
"servername2"
]
},
"arcgis_cache": { }
}
}
}
{"envir": "dev"}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment