Skip to content

Instantly share code, notes, and snippets.

Created April 18, 2012 16:04
Show Gist options
  • Save kopertop/2414564 to your computer and use it in GitHub Desktop.
Save kopertop/2414564 to your computer and use it in GitHub Desktop.
Modified version of
Modified version of as provided by
from math import ceil
import time
import json
import re
import boto
import boto.jsonresponse
import boto.exception
from boto.connection import AWSQueryConnection
from boto.auth import QuerySignatureV2AuthHandler
import requests
import logging
log = logging.getLogger(__name__)
_valid_key_expr = re.compile(r'[a-zA-Z0-9][a-zA-Z0-9_]*$')
def is_valid_key(key):
return _valid_key_expr.match(_valid_key_expr) != None
def escape_control_characters(s):
return s.replace("'", '%27').replace('"', '%22')\
.replace("\\", '%5C').replace('&', '%26')
class SearchServiceException(Exception):
def get_document_service(domain=None, endpoint=None):
return DocumentServiceConnection(domain=domain, endpoint=endpoint)
def get_search_service(domain=None, endpoint=None):
return SearchConnection(domain=domain, endpoint=endpoint)
def connect_cloudsearch(aws_access_key_id=None, aws_secret_access_key=None,
:type aws_access_key_id: string
:param aws_access_key_id: Your AWS Access Key ID
:type aws_secret_access_key: string
:param aws_secret_access_key: Your AWS Secret Access Key
:rtype: :class:`boto.ec2.autoscale.CloudSearchConnection`
:return: A connection to Amazon's CloudSearch service
# from boto.cloudsearch import CloudSearchConnection
return CloudSearchConnection(aws_access_key_id, aws_secret_access_key,
class Domain(object):
created = False
deleted = False
doc_service = {}
domain_id = None
domain_name = None
num_searchable_docs = 0
requires_index_docs = False
search_instance_count = 0
search_instance_type = None
search_partition_count = 0
search_service = {}
def __init__(self, **attrs):
for k, v in attrs.iteritems():
setattr(self, k, v)
def name(self):
return self.domain_name
def index_documents(self):
def create_index_field(self, field_name, field_type,
default='', facet=False, result=False, searchable=False,
return self.connection.create_index_field(, field_name,
field_type, default=default, facet=facet, result=result,
searchable=searchable, source_attributes=source_attributes)
def create_rank_expression(self, name, expression):
return self.connection.create_rank_expression(, name,
def get_document_service(self):
return DocumentServiceConnection(domain=self)
def get_search_service(self):
return SearchConnection(domain=self)
def allow_ip(self, ip):
policies = self.connection.get_service_access_policies(
if not policies.get('options'):
p = {
"Statement": [
"Action":"*", # Docs say use GET, but denies unless *
"Resource": self.search_service['arn'],
"Condition": {
"IpAddress": {
"aws:SourceIp": [ip]
"Action":"*", # Docs say use POST, but denies unless *
"Resource": self.doc_service['arn'],
"Condition": {
"IpAddress": {
"aws:SourceIp": [ip]
return self.connection.update_service_access_policies(,
noop = True
policies = json.loads(policies['options'])
for statement in policies['Statement']:
if ip in statement['Condition']['IpAddress']['aws:SourceIp']:
boto.log.warn('IpAddress %s already allowed for resource '
'%s. Noop.' % (ip, statement['Resource']))
noop = False
if not noop:
def disallow_ip(self, ip):
policies = self.connection.get_service_access_policies(
if not policies.get('options'):
noop = True
policies = json.loads(policies['options'])
for statement in policies['Statement']:
if ip in statement['Condition']['IpAddress']['aws:SourceIp']:
noop = False
index = statement['Condition']['IpAddress']['aws:SourceIp'].index(ip)
boto.log.warn('IpAddress %s already not allowed for resource '
'%s. Noop.' % (ip, statement['Resource']))
if not noop:
def wait_for_acl_changes(self):
finished = False
while not finished:
policies = self.connection.get_service_access_policies(
if policies['status']['state'] == 'Processing':
finished = True
def __repr__(self):
return '<Domain: %s>' % self.domain_name
class SearchResults(object):
def __init__(self, **attrs):
self.rid = attrs['info']['rid']
# self.doc_coverage_pct = attrs['info']['doc-coverage-pct']
self.cpu_time_ms = attrs['info']['cpu-time-ms']
self.time_ms = attrs['info']['time-ms']
self.hits = attrs['hits']['found'] = attrs['hits']['hit']
self.start = attrs['hits']['start']
self.rank = attrs['rank']
self.match_expression = attrs['match-expr']
self.query = attrs['query']
self.search_service = attrs['search_service']
self.num_pages_needed = ceil(self.hits / self.query.real_size)
def __len__(self):
return len(
def __iter__(self):
return iter(
def next_page(self):
"""Call Cloudsearch to get the next page of search results
:rtype: :class:`exfm.cloudsearch.SearchResults`
:return: A cloudsearch SearchResults object
if <= self.num_pages_needed:
self.query.start += self.query.real_size += 1
return self.search_service(self.query)
raise StopIteration
class Query(object):
def __init__(self, q=None, bq=None, rank=[], return_fields=[], size=10,
start=0, facet=[], facet_constraints={}, facet_sort={}, facet_top_n={},
self.q = q = bq
self.rank = rank
self.return_fields = return_fields
self.start = start
self.facet = facet
self.facet_constraints = facet_constraints
self.facet_sort = facet_sort
self.facet_top_n = facet_top_n
self.t = t = 0
def update_size(self, new_size):
self.size = new_size
self.real_size = Query.RESULTS_PER_PAGE if (self.size >
Query.RESULTS_PER_PAGE or self.size == 0) else self.size
def to_params(self):
"""Transform search parameters from instance properties to a dictionary
:rtype: dict
:return: search parameters
params = {'start': self.start, 'size': self.real_size}
if self.q:
params['q'] = self.q
params['bq'] =
if self.rank:
params['rank'] = ','.join(self.rank)
if self.return_fields:
params['return-fields'] = ','.join(self.return_fields)
if self.facet:
params['facet'] = ','.join(self.facet)
if self.facet_constraints:
for k, v in self.facet_constraints.iteritems():
params['facet-%s-constraints' % k] = v
if self.facet_sort:
for k, v in self.facet_sort.iteritems():
params['facet-%s-sort' % k] = v
if self.facet_top_n:
for k, v in self.facet_top_n.iteritems():
params['facet-%s-top-n' % k] = v
if self.t:
for k, v in self.t.iteritems():
params['t-%s' % k] = v
return params
class SearchConnection(object):
def __init__(self, domain=None, endpoint=None):
if endpoint:
self.endpoint = endpoint
self.endpoint = domain.doc_service['endpoint']
if not self.endpoint.startswith('search-'):
self.endpoint = "search-%s" % self.endpoint
def build_query(self, q=None, bq=None, rank=[], return_fields=[], size=10,
start=0, facet=[], facet_constraints={}, facet_sort={}, facet_top_n={},
return Query(q=q, bq=bq, rank=rank, return_fields=return_fields,
size=size, start=start, facet=facet,
facet_constraints=facet_constraints, facet_sort=facet_sort,
facet_top_n=facet_top_n, t=t)
def search(self, q=None, bq=None, rank=[], return_fields=[], size=10,
start=0, facet=[], facet_constraints={}, facet_sort={}, facet_top_n={},
"""Query Cloudsearch
:type q:
:param q:
:type bq:
:param bq:
:type rank:
:param rank:
:type return_fields:
:param return_fields:
:type size:
:param size:
:type start:
:param start:
:type facet:
:param facet:
:type facet_constraints:
:param facet_constraints:
:type facet_sort:
:param facet_sort:
:type facet_top_n:
:param facet_top_n:
:type t:
:param t:
:rtype: :class:`exfm.cloudsearch.SearchResults`
:return: A cloudsearch SearchResults object
query = self.build_query(q=q, bq=bq, rank=rank, return_fields=return_fields,
size=size, start=start, facet=facet,
facet_constraints=facet_constraints, facet_sort=facet_sort,
facet_top_n=facet_top_n, t=t)
return self(query)
def __call__(self, query):
"""Make a call to CloudSearch
:type query: :class:`exfm.cloudsearch.Query`
:param query: A fully specified Query instance
:rtype: :class:`exfm.cloudsearch.SearchResults`
:return: A cloudsearch SearchResults object
url = "http://%s/2011-02-01/search" % (self.endpoint)
params = query.to_params()
r = requests.get(url, params=params)
data = json.loads(r.content)
data['query'] = query
data['search_service'] = self
# import urllib
# print "%s?%s" % (url, urllib.urlencode(params))
if 'messages' in data and 'error' in data:
for m in data['messages']:
if m['severity'] == 'fatal':
raise SearchServiceException("Error processing search %s "
"=> %s" % (params, m['message']), query)
elif 'error' in data:
raise SearchServiceException("Unknown error processing search %s"
% (params), query)
return SearchResults(**data)
def get_all_paged(self, query, per_page):
"""Get a generator to iterate over all pages of search results
:type query: :class:`exfm.cloudsearch.Query`
:param query: A fully specified Query instance
:type per_page: int
:param per_page: Number of docs in each SearchResults object.
:rtype: generator
:return: Generator containing :class:`exfm.cloudsearch.SearchResults`
page = 0
num_pages_needed = 0
while page <= num_pages_needed:
results = self(query)
num_pages_needed = results.num_pages_needed
yield results
query.start += query.real_size
page += 1
def get_all_hits(self, query):
"""Get a generator to iterate over all search results
Transparently handles the results paging from Cloudsearch search results
so even if you have many thousands of results you can iterate over all
results in a reasonably efficient manner.
:type query: :class:`exfm.cloudsearch.Query`
:param query: A fully specified Query instance
:rtype: generator
:return: All docs matching query
page = 0
num_pages_needed = 0
while page <= num_pages_needed:
results = self(query)
num_pages_needed = results.num_pages_needed
for doc in results:
yield doc
query.start += query.real_size
page += 1
def get_num_hits(self, query):
"""Return the total number of hits for query
:type query: :class:`exfm.cloudsearch.Query`
:param query: A fully specified Query instance
:rtype: int
:return: Total number of hits for query
return self(query).hits
_illegal_xml_chars_pattern = re.compile(u'[\x00-\x08\x0b\x0c\x0e-\x1F\uD800-\uDFFF\uFFFE\uFFFF]')
def remove_illegal_xml_chars(s):
return re.sub(_illegal_xml_chars_pattern, '', s)
class DocumentServiceConnection(object):
endpoint = None
def __init__(self, domain=None, endpoint=None):
if endpoint:
self.endpoint = endpoint
self.endpoint = domain.doc_service['endpoint']
self.documents_batch = []
self._sdf = None
if not self.endpoint.startswith('doc-'):
self.endpoint = "doc-%s" % self.endpoint
def add(self, _id, version, fields, lang='en'):
d = {'type': 'add', 'id': _id, 'version': version, 'lang': lang,
'fields': fields}
def delete(self, _id, version):
d = {'type': 'delete', 'id': _id, 'version': version}
def get_sdf(self):
return self._sdf if self._sdf else json.dumps(self.documents_batch)
def clear_sdf(self):
self._sdf = None
self.documents_batch = []
def add_sdf_from_s3(self, key_obj):
"""@todo (lucas) would be nice if this could just take an s3://uri..."""
self._sdf = key_obj.get_contents_as_string()
def commit(self):
sdf = self.get_sdf()
if ': null' in sdf:
log.error('null value in sdf %s detected. This will probably raise '
'500 error.' % sdf['id'])
index = sdf.index(': null')
log.error(sdf[index - 100:index + 100])
url = "http://%s/2011-02-01/documents/batch" % (self.endpoint)
request_config = {
'pool_connections': 20,
'keep_alive': True,
'max_retries': 5,
'pool_maxsize': 50
r =, data=sdf, config=request_config,
headers={'Content-Type': 'application/json'})
return CommitResponse(r, self, sdf)
class CommitResponse(object):
"""Wrapper for response to Cloudsearch document batch commit.
:type response: :class:`requests.models.Response`
:param response: Response from Cloudsearch /documents/batch API
:type doc_service: :class:`exfm.cloudsearch.DocumentServiceConnection`
:param doc_service: Object containing the documents posted and methods to
:raises: :class:`boto.exception.BotoServerError`
:raises: :class:`exfm.cloudsearch.SearchServiceException`
def __init__(self, response, doc_service, sdf):
self.response = response
self.doc_service = doc_service
self.sdf = sdf
self.content = json.loads(response.content)
print response.content
print self.sdf
log.error('Error indexing documents.\nResponse Content:\n{}\n\n'
'SDF:\n{}'.format(response.content, self.sdf))
raise boto.exception.BotoServerError(self.response.status_code, '',
self.status = self.content['status']
if self.status == 'error':
self.errors = [e.get('message') for e in self.content.get('errors',
self.errors = []
self.adds = self.content['adds']
self.deletes = self.content['deletes']
self._check_num_ops('add', self.adds)
self._check_num_ops('delete', self.deletes)
def _check_num_ops(self, type_, response_num):
"""Raise exception if number of ops in response doesn't match commit
:type type_: str
:param type_: Type of commit operation: 'add' or 'delete'
:type response_num: int
:param response_num: Number of adds or deletes in the response.
:raises: :class:`exfm.cloudsearch.SearchServiceException`
commit_num = len([d for d in self.doc_service.documents_batch
if d['type'] == type_])
if response_num != commit_num:
log.error('Incorrect number of %s returned. Commit: %s '
'Respose: %s' % (type_, commit_num, response_num))
class CloudSearchConnection(AWSQueryConnection):
APIVersion = '2011-02-01'
def __init__(self, aws_access_key_id=None, aws_secret_access_key=None,
is_secure=True, port=None, proxy=None, proxy_port=None,
proxy_user=None, proxy_pass=None,
host='', debug=0,
https_connection_factory=None, path='/'):
AWSQueryConnection.__init__(self, aws_access_key_id,
is_secure, port, proxy,
proxy_port, proxy_user, proxy_pass,
host, debug, https_connection_factory,
def _required_auth_capability(self):
return ['cloudsearch']
def _prepare_boolean(self, val):
return 'true' if val in [True, 1, '1', 'true'] else 'false'
def index_documents(self, domain_name):
return self.get_response(
dict, 'IndexDocuments', {'DomainName': domain_name}, verb='POST',
def create_domain(self, name):
"""Create a new search domain.
<CreateDomainResponse xmlns="">
:type name: string
:param name: The name of the domain to create
:rtype: :class:`boto.cloudsearch.Domain`
:returns: Domain with status.
return self.get_response(
Domain, 'CreateDomain', {'DomainName': name}, verb='POST')
def delete_domain(self, name):
"""Delete a search domain.
<DeleteDomainResponse xmlns="">
:type name: string
:param name: The name of the domain to delete
:rtype: :class:`boto.cloudsearch.Domain`
:returns: Domain with status.
return self.get_response(
Domain, 'DeleteDomain', {'DomainName': name}, verb='POST')
def get_domain(self, name):
:type name: string
:param name: The name of the domain to get
:rtype: :class:`boto.cloudsearch.Domain`
:returns: Domain
for d in self.get_domains():
if == name:
return d
def get_domains(self):
"""Describes the domains (optionally limited to one or more domains by
name) owned by this account.
<DescribeDomainsResponse xmlns="">
:rtype: :class:`boto.cloudsearch.Domain`
:returns: A list of all domains
return self.get_response(
Domain, 'DescribeDomains', {}, verb='POST',
# source_attribute = {'copy': '', 'function': '', 'map': '',
# 'trim_title': ''}
# literal_options = {'default': 0, 'facet': True, 'result': True,
# 'search': True}
# text_options = {'default': '', 'facet': True, 'result': True}
# uint_options = {'default': 0}
def create_index_field(self, domain_name, field_name, field_type,
default='', facet=False, result=False, searchable=False,
@todo (lucas) Actually do something with source attributes.
Defines an IndexField, either replacing an existing definition or
creating a new one.
:type domain_name: string
:param domain_name: The name of the domain to add the index field to
:type field_name: string
:param field_name: The name of a field in the search index.
:type field_type: string
:param field_type: The type of field. Based on this type, exactly one
of the UIntOptions, LiteralOptions or TextOptions must be present.
Valid Values: uint | literal | text
:type field_options: dict
:param field_options: Specify default value, facet enabled, search
enabled, or result enabled, depending on the field type.
:rtype: dict
:returns: The new index field
<DefineIndexFieldResponse xmlns="">
if field_type not in ['literal', 'uint', 'text']:
raise Exception(
'Invalid field type `%s`. Must be literal, uint, or text.'
% field_type)
params = {
'DomainName': domain_name,
'IndexField.IndexFieldName': field_name,
'IndexField.IndexFieldType': field_type
if field_type == 'literal':
params['IndexField.LiteralOptions.DefaultValue'] = default
params['IndexField.LiteralOptions.FacetEnabled'] = self._prepare_boolean(facet)
params['IndexField.LiteralOptions.ResultEnabled'] = self._prepare_boolean(result)
params['IndexField.LiteralOptions.SearchEnabled'] = self._prepare_boolean(searchable)
elif field_type == 'uint':
params['IndexField.UIntOptions.DefaultValue'] = default
elif field_type == 'text':
params['IndexField.TextOptions.DefaultValue'] = default
params['IndexField.TextOptions.FacetEnabled'] = self._prepare_boolean(facet)
params['IndexField.TextOptions.ResultEnabled'] = self._prepare_boolean(result)
return self.get_response(
dict, 'DefineIndexField', params, verb='POST')
def delete_index_field(self, domain_name, index_field_name):
<DeleteIndexFieldResponse xmlns="">
params = {'DomainName': domain_name, 'IndexFieldName': index_field_name}
return self.get_response(
dict, 'DeleteIndexField', params, verb='POST')
def get_index_fields(self, domain_name):
"""Describes index fields in the search domain
<DescribeIndexFieldsResponse xmlns="">
return self.get_response(
dict, 'DescribeIndexFields', {'DomainName': domain_name},
verb='POST', list_marker='IndexFields')
def create_rank_expression(self, domain_name, name, expression):
<DefineRankExpressionResponse xmlns="">
params = {
'DomainName': domain_name,
'RankExpression.RankName': name,
'RankExpression.RankExpression': expression
return self.get_response(
dict, 'DefineRankExpression', params, verb='POST')
def delete_rank_expression(self, domain_name, rank_name):
<DeleteRankExpressionResponse xmlns="">
params = {
'DomainName': domain_name,
'RankName': rank_name
return self.get_response(
dict, 'DeleteRankExpression', params, verb='POST')
def get_rank_expressions(self, domain_name):
<DescribeRankExpressionsResponse xmlns="">
return self.get_response(
dict, 'DescribeRankExpressions', {'DomainName': domain_name},
verb='POST', list_marker='RankExpressions')
def get_default_search_field(self, domain_name):
return self.get_response(
dict, 'DescribeDefaultSearchField', {'DomainName': domain_name},
def update_default_search_field(self, domain_name, field_name):
params = {
'DomainName': domain_name,
'DefaultSearchField': field_name
return self.get_response(
dict, 'UpdateDefaultSearchField', params, verb='POST')
def get_service_access_policies(self, domain_name):
<DescribeServiceAccessPoliciesResponse xmlns="">
<Options>{&quot;Statement&quot;: [{&quot;Action&quot;: &quot;*&quot;, &quot;Resource&quot;: &quot;arn:aws:cs:us-east-1:160241911954:search/testsong&quot;, &quot;Effect&quot;: &quot;Allow&quot;, &quot;Condition&quot;: {&quot;IpAddress&quot;: {&quot;aws:SourceIp&quot;: [&quot;;]}}}, {&quot;Action&quot;: &quot;*&quot;, &quot;Resource&quot;: &quot;arn:aws:cs:us-east-1:160241911954:doc/testsong&quot;, &quot;Effect&quot;: &quot;Allow&quot;, &quot;Condition&quot;: {&quot;IpAddress&quot;: {&quot;aws:SourceIp&quot;: [&quot;;]}}}]}</Options>
return self.get_response('describe_service_access_policies_response.describe_service_access_policies_result.access_policies',
dict, 'DescribeServiceAccessPolicies', {'DomainName': domain_name},
def update_service_access_policies(self, domain_name, policies):
<UpdateServiceAccessPoliciesResponse xmlns="">
<Options>{&quot;Statement&quot;: [{&quot;Action&quot;: &quot;*&quot;, &quot;Resource&quot;: &quot;arn:aws:cs:us-east-1:160241911954:search/testsong&quot;, &quot;Effect&quot;: &quot;Allow&quot;, &quot;Condition&quot;: {&quot;IpAddress&quot;: {&quot;aws:SourceIp&quot;: []}}}, {&quot;Action&quot;: &quot;*&quot;, &quot;Resource&quot;: &quot;arn:aws:cs:us-east-1:160241911954:doc/testsong&quot;, &quot;Effect&quot;: &quot;Allow&quot;, &quot;Condition&quot;: {&quot;IpAddress&quot;: {&quot;aws:SourceIp&quot;: []}}}]}</Options>
params = {
'DomainName': domain_name,
'AccessPolicies': policies
return self.get_response(
dict, 'UpdateServiceAccessPolicies', params,
verb='POST', list_marker='AccessPolicies')
def describe_stemming_options(self, domain_name):
def update_stemming_options(self, domain_name, stems={}):
def describe_stopword_options(self, domain_name):
def update_stopword_options(self, domain_name, stopwords=[]):
def describe_synonym_options(self, domain_name):
def update_synonym_options(self, domain_name, synonyms={}):
def get_audit_log(self, domain_name, size):
return self.get_response(
dict, 'GetAuditLog', {'DomainName': domain_name, 'Size': size},
verb='GET', list_marker='AuditRecords')
def get_response(self, obj_path, obj_cls, action, params, path='/',
parent=None, verb='GET', list_marker=None):
if not parent:
parent = self
response = self.make_request(action, params, path, verb)
body =
if response.status == 200:
e = boto.jsonresponse.Element(
list_marker=list_marker if list_marker else 'Set',
h = boto.jsonresponse.XmlHandler(e, parent)
inner = e
paths = obj_path.split('.')
for p in paths:
inner = inner.get(p)
if not inner:
return None if list_marker == None else []
if isinstance(inner, list):
return [obj_cls(connection=self, **i) for i in inner]
return obj_cls(connection=self, **inner)
boto.log.error('%s %s' % (response.status, response.reason))
boto.log.error('%s' % body)
raise self.ResponseError(response.status, response.reason, body)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment