Skip to content

Instantly share code, notes, and snippets.

@johndpope
Last active August 21, 2023 11:44
Show Gist options
  • Save johndpope/33e6acec7505b3d7a539b0ad763f088b to your computer and use it in GitHub Desktop.
Save johndpope/33e6acec7505b3d7a539b0ad763f088b to your computer and use it in GitHub Desktop.
python elasticfeeds/tests/test_02_aggregators.py
test: [{'published': '2023-08-21T15:50:20.496417', 'published_date': '2023-08-21', 'published_time': '15:50:20', 'published_year': 2023, 'published_month': 8, 'actor': {'id': 'cquiros', 'type': 'person', 'extra': {'some_extra_data': 'test'}}, 'type': 'move', 'object': {'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'type': 'form', 'extra': {'some_extra_data': 'test'}}, 'origin': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project', 'extra': {'some_extra_data': 'test'}}, 'target': {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'type': 'project', 'extra': {'some_extra_data': 'test'}}, 'extra': {'some_extra_data': 'test'}}, {'published': '2023-08-21T15:50:20.481065', 'published_date': '2023-08-21', 'published_time': '15:50:20', 'published_year': 2023, 'published_month': 8, 'actor': {'id': 'cquiros', 'type': 'person'}, 'type': 'add', 'object': {'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'type': 'form'}, 'target': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project'}}, {'published': '2023-08-21T15:50:20.463093', 'published_date': '2023-08-21', 'published_time': '15:50:20', 'published_year': 2023, 'published_month': 8, 'actor': {'id': 'cquiros', 'type': 'person'}, 'type': 'add', 'object': {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'type': 'project'}}, {'published': '2023-08-21T15:50:20.359051', 'published_date': '2023-08-21', 'published_time': '15:50:20', 'published_year': 2023, 'published_month': 8, 'actor': {'id': 'cquiros', 'type': 'person'}, 'type': 'add', 'object': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project'}}]
test: [{'type': 'move', 'activities': [{'published': '2023-08-21T15:50:20.496417', 'actor': {'id': 'cquiros', 'type': 'person', 'extra': {'some_extra_data': 'test'}}, 'object': {'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'type': 'form', 'extra': {'some_extra_data': 'test'}}, 'origin': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project', 'extra': {'some_extra_data': 'test'}}, 'target': {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'type': 'project', 'extra': {'some_extra_data': 'test'}}, 'extra': {'some_extra_data': 'test'}}]}, {'type': 'add', 'activities': [{'published': '2023-08-21T15:50:20.481065', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'type': 'form'}, 'target': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project'}}, {'published': '2023-08-21T15:50:20.463093', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'type': 'project'}}, {'published': '2023-08-21T15:50:20.359051', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project'}}]}]
test: [{'type': 'move', 'ids': [{'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'activities': [{'published': '2023-08-21T15:50:20.496417', 'actor': {'id': 'cquiros', 'type': 'person', 'extra': {'some_extra_data': 'test'}}, 'object': {'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'type': 'form', 'extra': {'some_extra_data': 'test'}}, 'origin': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project', 'extra': {'some_extra_data': 'test'}}, 'target': {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'type': 'project', 'extra': {'some_extra_data': 'test'}}, 'extra': {'some_extra_data': 'test'}}]}]}, {'type': 'add', 'ids': [{'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'activities': [{'published': '2023-08-21T15:50:20.481065', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'type': 'form'}, 'target': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project'}}]}, {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'activities': [{'published': '2023-08-21T15:50:20.463093', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'type': 'project'}}]}, {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'activities': [{'published': '2023-08-21T15:50:20.359051', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project'}}]}]}]
test: [{'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'types': [{'type': 'move', 'activities': [{'published': '2023-08-21T15:50:20.496417', 'actor': {'id': 'cquiros', 'type': 'person', 'extra': {'some_extra_data': 'test'}}, 'object': {'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'type': 'form', 'extra': {'some_extra_data': 'test'}}, 'origin': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project', 'extra': {'some_extra_data': 'test'}}, 'target': {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'type': 'project', 'extra': {'some_extra_data': 'test'}}, 'extra': {'some_extra_data': 'test'}}]}, {'type': 'add', 'activities': [{'published': '2023-08-21T15:50:20.481065', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'type': 'form'}, 'target': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project'}}]}]}, {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'types': [{'type': 'add', 'activities': [{'published': '2023-08-21T15:50:20.463093', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'type': 'project'}}]}]}, {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'types': [{'type': 'add', 'activities': [{'published': '2023-08-21T15:50:20.359051', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project'}}]}]}]
test: [{'date': '2023-08-21', 'activities': [{'published': '2023-08-21T15:50:20.496417', 'actor': {'id': 'cquiros', 'type': 'person', 'extra': {'some_extra_data': 'test'}}, 'object': {'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'type': 'form', 'extra': {'some_extra_data': 'test'}}, 'origin': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project', 'extra': {'some_extra_data': 'test'}}, 'target': {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'type': 'project', 'extra': {'some_extra_data': 'test'}}, 'extra': {'some_extra_data': 'test'}}, {'published': '2023-08-21T15:50:20.359051', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project'}}, {'published': '2023-08-21T15:50:20.463093', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'type': 'project'}}, {'published': '2023-08-21T15:50:20.481065', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'type': 'form'}, 'target': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project'}}]}]
test: [{'year': 2023, 'months': [{'month': 8, 'types': [{'type': 'move', 'activities': [{'published': '2023-08-21T15:50:20.496417', 'actor': {'id': 'cquiros', 'type': 'person', 'extra': {'some_extra_data': 'test'}}, 'object': {'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'type': 'form', 'extra': {'some_extra_data': 'test'}}, 'origin': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project', 'extra': {'some_extra_data': 'test'}}, 'target': {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'type': 'project', 'extra': {'some_extra_data': 'test'}}, 'extra': {'some_extra_data': 'test'}}]}, {'type': 'add', 'activities': [{'published': '2023-08-21T15:50:20.481065', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'type': 'form'}, 'target': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project'}}, {'published': '2023-08-21T15:50:20.463093', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'type': 'project'}}, {'published': '2023-08-21T15:50:20.359051', 'actor': {'id': 'cquiros', 'type': 'person'}, 'object': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project'}}]}]}]}]
test: [{'year': 2023, 'months': [{'month': 8, 'activities': [{'published': '2023-08-21T15:50:20.496417', 'actor': {'id': 'cquiros', 'type': 'person', 'extra': {'some_extra_data': 'test'}}, 'type': 'move', 'object': {'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'type': 'form', 'extra': {'some_extra_data': 'test'}}, 'origin': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project', 'extra': {'some_extra_data': 'test'}}, 'target': {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'type': 'project', 'extra': {'some_extra_data': 'test'}}, 'extra': {'some_extra_data': 'test'}}, {'published': '2023-08-21T15:50:20.481065', 'actor': {'id': 'cquiros', 'type': 'person'}, 'type': 'add', 'object': {'id': '326c1f4e-a489-4e36-9d0c-5638ef193f6f', 'type': 'form'}, 'target': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project'}}, {'published': '2023-08-21T15:50:20.463093', 'actor': {'id': 'cquiros', 'type': 'person'}, 'type': 'add', 'object': {'id': '152a3304-e78d-4fdf-9449-0943d6072596', 'type': 'project'}}, {'published': '2023-08-21T15:50:20.359051', 'actor': {'id': 'cquiros', 'type': 'person'}, 'type': 'add', 'object': {'id': '50a808d3-1227-4149-80e9-20922bded1cf', 'type': 'project'}}]}]}]
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import RequestError
from elasticfeeds.exceptions import (
LinkObjectError,
LinkExistError,
ActivityObjectError,
AggregatorObjectError,
MaxLinkError,
LinkNotExistError,
)
from elasticfeeds.network import Link, LinkedActivity
from elasticfeeds.activity import Activity
from elasticfeeds.aggregators import BaseAggregator
import uuid
import datetime
__all__ = ["Manager"]
def _get_feed_index_definition(number_of_shards, number_of_replicas):
"""
Constructs the Feed index with a given number of shards and replicas. Feeds are stored in an atomic form and
are based as much as possible on http://activitystrea.ms/
:param number_of_shards: Number of shards for the feeds index.
:param number_of_replicas: Number of replicas for the feeds index.
The index has the following parts:
published: Date when the feed was published. Stored in ISO 8601 format.
See https://docs.python.org/3.6/library/datetime.html#datetime.date.isoformat for more info.
published_date: Date section of the published date. YYY-MM-DD. E.g., 2002-12-04
published_time: Time section of the published date. HH:MM:SS. E.g., 13:00:23
type: Single word in infinitive. Also know as "verb", it describes some form of action that may happen,
is currently happening, or has already happened.
See https://www.w3.org/TR/activitystreams-vocabulary/#dfn-activity for more info.
actor: Describes the entity that is performing the activity.
See https://www.w3.org/TR/activitystreams-vocabulary/#dfn-actor for more info.
actor.id: The unique id of the actor. Only one ID is accepted.
actor.type: Single word. The type of actor performing the activity. E.g., Person, User, Member.
See https://www.w3.org/TR/activitystreams-vocabulary/#actor-types for more info.
actor.extra: Use this field to store extra information at actor level.
IMPORTANT NOTE: This field is "non-analyzable" which means that ES does not perform any
operations on it thus it cannot be used to order, aggregate, or filter query results.
object: Describes an object of any kind linked to the action itself.
See https://www.w3.org/TR/activitystreams-vocabulary/#dfn-object for more info.
object.id: Single ID. The unique id of the object
object.type: Single word. Provides some degree of specificity to the object. E.g., Document, Project, Form.
See https://www.w3.org/TR/activitystreams-vocabulary/#object-types for more info
object.extra: Use this field to store extra information at object level.
IMPORTANT NOTE: This field is "non-analyzable" which means that ES does not perform any
operations on it thus it cannot be used to order, aggregate, or filter query results.
origin [optional]: The origin is applicable to any type of activity for which the English preposition "from"
can be considered applicable in the sense of identifying the origin, source or provenance of
the activity's object. See https://www.w3.org/TR/activitystreams-vocabulary/#origin-target
for more information
origin.id: The unique ID the origin. Only one ID is accepted.
origin.type: Single word. Provides some degree of specificity to the origin. E.g., Collection
origin.extra: Use this field to store extra information at origin level.
IMPORTANT NOTE: This field is "non-analyzable" which means that ES does not perform any
operations on it thus it cannot be used to order, aggregate, or filter query results.
target [optional]: The target is applicable to any type of activity for which the English preposition
"to" can be considered applicable in the sense of identifying the indirect object or
destination of the activity's object.
See https://www.w3.org/TR/activitystreams-vocabulary/#origin-target for more information
target.id: The unique ID the target. Only one ID is accepted.
target.type: Single word. Provides some degree of specificity to the target. E.g., Collection
target.extra: Use this field to store extra information at target level.
IMPORTANT NOTE: This field is "non-analyzable" which means that ES does not perform any
operations on it thus it cannot be used to order, aggregate, or filter query results.
extra: Use this field to store extra information at activity level.
IMPORTANT NOTE: This field is "non-analyzable" which means that ES does not perform any
operations on it thus it cannot be used to order, aggregate, or filter query results.
:return: Dict.
"""
# noinspection SpellCheckingInspection
_json = {
"settings": {
"index": {
"number_of_shards": number_of_shards,
"number_of_replicas": number_of_replicas,
}
},
"mappings": {
"properties": {
"published": {"type": "date"},
"published_date": {"type": "date", "format": "yyyy-MM-dd"},
"published_time": {"type": "date", "format": "HH:mm:ss"},
"published_year": {"type": "integer"},
"published_month": {"type": "integer"},
"type": {"type": "keyword"},
"actor": {
"properties": {
"id": {"type": "keyword"},
"type": {"type": "keyword"},
"extra": {"type": "object", "enabled": "false"},
}
},
"object": {
"properties": {
"id": {"type": "keyword"},
"type": {"type": "keyword"},
"extra": {"type": "object", "enabled": "false"},
}
},
"origin": {
"properties": {
"id": {"type": "keyword"},
"type": {"type": "keyword"},
"extra": {"type": "object", "enabled": "false"},
}
},
"target": {
"properties": {
"id": {"type": "keyword"},
"type": {"type": "keyword"},
"extra": {"type": "object", "enabled": "false"},
}
},
"extra": {"type": "object", "enabled": "false"},
}
},
}
return _json
def _get_network_index_definition(number_of_shards, number_of_replicas):
"""
Constructs the Network index with a given number of shards and replicas.
Each connection is stored as individual ES documents
:param number_of_shards: Number of shards for the network index.
:param number_of_replicas: Number of replicas for the network index.
The index has the following parts:
linked: Date when the link was made. Stored in ISO 8601 format.
See https://docs.python.org/3.6/library/datetime.html#datetime.date.isoformat for more info.
actor_id: Single word. The actor who's connection is being declared. For example if the actor "mark" is
following two people then "mark" would appear twice, one per connection.
link_type: Single word in infinitive. The kind of link being declared. For example: Follow, Watch
linked_activity: Linked activity to this link in the network
linked_activity.activity_class: Single word. The type of activities that are being followed and watched.
This must be either "actor" or "object"
linked_activity.type: Single word. The type of feed component that is being followed or watched. For example
if the class is "actor" then it's type could be "Person", "User" or "Member".
If the class is "object" then its type could be "Document", or "Project".
linked_activity.id: Single ID. The ID that is being followed or watched.
link_weight: Numeric. Accept decimals. The wight of the connection. For example, if Mark follows Jane and
Katie depending on their interaction in social platform the connection between Mark and Jane
could be twice as strong as Mark and Katie, thus Mark and Katie will have a weight of 1 while
Mark and Jane will have a weight of 2. By default each connection has a weight of 1.
extra: Use this field to store extra information at link level.
IMPORTANT NOTE: This field is "non-analyzable" which means that ES does not perform any
operations on it thus it cannot be used to order, aggregate, or filter query results.
:return: A JSON object with the definition of the Network index.
"""
_json = {
"settings": {
"index": {
"number_of_shards": number_of_shards,
"number_of_replicas": number_of_replicas,
}
},
"mappings": {
"properties": {
"linked": {"type": "date"},
"actor_id": {"type": "keyword"},
"link_type": {"type": "keyword"},
"linked_activity": {
"properties": {
"activity_class": {"type": "keyword"},
"id": {"type": "keyword"},
"type": {"type": "keyword"},
}
},
"link_weight": {"type": "float"},
"extra": {"type": "object", "enabled": "false"},
}
},
}
return _json
class Manager(object):
"""
The Manager class handles all activity feed operations.
"""
def create_connection(self):
"""
Creates a connection to ElasticSearch and pings it.
:return: A tested (pinged) connection to ElasticSearch
"""
if not isinstance(self.port, int):
raise ValueError("Port must be an integer")
if not isinstance(self.host, str):
raise ValueError("Host must be string")
if self.url_prefix is not None:
if not isinstance(self.url_prefix, str):
raise ValueError("URL prefix must be string")
if not isinstance(self.use_ssl, bool):
raise ValueError("Use SSL must be boolean")
cnt_params = {"scheme":self.scheme,"host": self.host, "port": self.port}
if self.url_prefix is not None:
cnt_params["url_prefix"] = self.url_prefix
if self.use_ssl:
cnt_params["use_ssl"] = self.use_ssl
if self.basic_auth:
cnt_params["basic_auth"] = self.basic_auth
connection = Elasticsearch(
[cnt_params],
max_retries=100,
retry_on_timeout=True,
timeout=700
)
if connection.ping():
return connection
else:
return None
def __init__(
self,
feed_index= ["feeds"],
network_index="network",
host="localhost",
port=9200,
url_prefix=None,
use_ssl=False,
number_of_shards_in_feeds=5,
number_of_replicas_in_feeds=1,
number_of_shards_in_network=5,
number_of_replicas_in_network=1,
delete_feeds_if_exists=False,
delete_network_if_exists=False,
max_link_size=1000,
basic_auth=None,
scheme="http"
):
"""
The constructor of the Manager. It creates the feeds and network indices if they don't exist. See
https://www.elastic.co/guide/en/elasticsearch/reference/current/_basic_concepts.html#getting-started-shards-and-replicas
for more information about shards and replicas
:param feed_index: The name if the feed index. "feeds" by default
:param network_index: The name of the network index. "network" by default
:param host: ElasticSearch host name. "localhost" by default
:param port: ElasticSearch port. 9200 by default
:param url_prefix: URL prefix. None by default
:param use_ssl: Use SSL to connect to ElasticSearch. False by default
:param number_of_shards_in_feeds: Number of shards for the feeds index. 5 by default
:param number_of_replicas_in_feeds: Number of replicas for the feeds index. 1 by default
:param number_of_shards_in_network: Number of shards for the network index. 5 by default
:param number_of_replicas_in_network: Number of replicas for the network index. 1 by default
:param delete_feeds_if_exists: Delete the feeds index if already exist. False by default
:param delete_network_if_exists: Delete the network index if already exist. False by default
:param max_link_size: Maximum number of links to fetch from an actor
"""
self.host = host
self.port = port
self.url_prefix = url_prefix
self.use_ssl = use_ssl
self.feed_index = feed_index
self.network_index = network_index
self._max_link_size = max_link_size
self.basic_auth = basic_auth
self.scheme = scheme
connection = self.create_connection()
if connection is not None:
if not connection.indices.exists(index=feed_index):
try:
connection.indices.create(index=feed_index,
body=_get_feed_index_definition(
number_of_shards_in_feeds, number_of_replicas_in_feeds
),
)
except RequestError as e:
print("e:",e)
if e.status_code == 400:
if e.error.find("already_exists") >= 0:
if delete_feeds_if_exists:
self.delete_feeds_index()
connection.indices.create(index=feed_index,
body=_get_feed_index_definition(
number_of_shards_in_feeds,
number_of_replicas_in_feeds,
),
)
else:
pass
else:
raise e
else:
raise e
if not connection.indices.exists(index=network_index):
try:
connection.indices.create(
index=network_index,
body=_get_network_index_definition(
number_of_shards_in_network, number_of_replicas_in_network
),
)
except RequestError as e:
print("e:",e)
if e.status_code == 400:
if e.error.find("already_exists") >= 0:
if delete_network_if_exists:
self.delete_network_index()
connection.indices.create(
network_index,
body=_get_network_index_definition(
number_of_shards_in_network,
number_of_replicas_in_network,
),
)
else:
pass
else:
raise e
else:
raise e
else:
raise RequestError("Cannot connect to ElasticSearch")
@property
def max_link_size(self):
"""
Maximum number of links to return from an actor
:return:
"""
return self._max_link_size
@max_link_size.setter
def max_link_size(self, value):
if not isinstance(value, int):
raise MaxLinkError()
self._max_link_size = value
def delete_feeds_index(self):
"""
Deletes the feed index
:return: True if the index was deleted successfully
"""
connection = self.create_connection()
if connection is not None:
connection.indices.delete(index=self.feed_index)
return True
return False
def delete_network_index(
self,
):
"""
Deleted the network index
:return: True if the index was deleted successfully
"""
connection = self.create_connection()
if connection is not None:
connection.indices.delete(index=self.network_index)
return True
return False
def link_network_exists(self, link_object):
"""
Check whether a link object already exists in the network index
:param link_object: The link object to check if exists
:return: True if exists otherwise False
"""
if not isinstance(link_object, Link):
raise LinkObjectError()
connection = self.create_connection()
if connection is not None:
res = connection.search(
index=self.network_index, body=link_object.get_search_dict()
)
if res["hits"]["total"]["value"] > 0:
return True
else:
raise RequestError("Cannot connect to ElasticSearch")
return False
def add_network_link(self, link_object):
"""
Adds a link to the network index
:param link_object: The Link object being added to the index
:return: The unique ID give to the link
"""
if not isinstance(link_object, Link):
raise LinkObjectError()
if not self.link_network_exists(link_object):
connection = self.create_connection()
if connection is not None:
unique_id = str(uuid.uuid4())
connection.index(
index=self.network_index,
id=unique_id,
body=link_object.get_dict(),
)
return unique_id
else:
raise RequestError("Cannot connect to ElasticSearch")
else:
raise LinkExistError()
def remove_network_link(self, link_object):
"""
Removes a link from the network
:param link_object: The Link object being removed from the index.
:return: Bool
"""
if not isinstance(link_object, Link):
raise LinkObjectError()
if self.link_network_exists(link_object):
connection = self.create_connection()
if connection is not None:
connection.delete_by_query(
index=self.network_index,
body=link_object.get_search_dict(),
)
return True
else:
raise RequestError("Cannot connect to ElasticSearch")
else:
raise LinkNotExistError()
def follow(
self,
actor_id,
following,
linked=datetime.datetime.now(),
activity_type="person",
):
"""
A convenience function to declare a follow link
:param actor_id: Actor ID who's link is being declared in the network
:param following: The person that is being followed
:param linked: Datetime of the link
:param activity_type: String. Single word. The type of feed component that is being followed or watched.
For example, if the class is "actor" then it's type could be "Person", "User" or "Member".
If the class is "object" then its type could be "Document", or "Project".
:return: None
"""
a_linked_activity = LinkedActivity(following, activity_type=activity_type)
a_link = Link(actor_id, a_linked_activity, linked=linked)
self.add_network_link(a_link)
def un_follow(self, actor_id, following):
"""
A convenience function to un-follow a person
:param actor_id: Actor ID who's link is being declared in the network
:param following: The person that is being un-followed
:return: Bool
"""
a_linked_activity = LinkedActivity(following)
a_link = Link(actor_id, a_linked_activity)
return self.remove_network_link(a_link)
def watch(self, actor_id, watch_id, watch_type, linked=datetime.datetime.now()):
"""
A convenience function to declare a watch link
:param actor_id: Actor ID who's link is being declared in the network
:param watch_id: The object that is being watched
:param watch_type: The object type that is being watched
:param linked: Datetime of the link
:return: None
"""
a_linked_activity = LinkedActivity(watch_id, "object", watch_type)
a_link = Link(actor_id, a_linked_activity, linked=linked, link_type="watch")
self.add_network_link(a_link)
def un_watch(self, actor_id, watch_id, watch_type):
"""
A convenience function to un-watch an object
:param actor_id: Actor ID who's link is being declared in the network
:param watch_id: The object that is being un-watched
:param watch_type: The object type that is being un-watched
:return: Bool
"""
a_linked_activity = LinkedActivity(watch_id, "object", watch_type)
a_link = Link(actor_id, a_linked_activity, link_type="watch")
return self.remove_network_link(a_link)
def add_activity_feed(self, activity_object):
"""
Adds an activity to the feed index
:param activity_object: The activity object being added to the index
:return: The unique ID given to the activity
"""
if not isinstance(activity_object, Activity):
raise ActivityObjectError()
connection = self.create_connection()
if connection is not None:
unique_id = str(uuid.uuid4())
connection.index(
index=self.feed_index,
id=unique_id,
body=activity_object.get_dict(),
)
return unique_id
else:
raise RequestError("Cannot connect to ElasticSearch")
def get_search_dict(self, actor_id):
"""
Constructs a ES search that will be used to search for the network of actor_id
:param actor_id: The actor to search for its network links
:return: A dict that will be passes to ES
"""
_dict = {
"size": self.max_link_size,
"query": {"bool": {"must": {"term": {"actor_id": actor_id}}}},
"sort": [{"linked": {"order": "desc"}}],
}
return _dict
def get_network(self, actor_id):
"""
Creates an array of the current network.
:return: Dict array
"""
result = []
connection = self.create_connection()
if connection is not None:
es_result = connection.search(
index=self.network_index, body=self.get_search_dict(actor_id)
)
if es_result["hits"]["total"]["value"] > 0:
for hit in es_result["hits"]["hits"]:
result.append(hit["_source"])
return result
else:
raise RequestError("Cannot connect to ElasticSearch")
def get_feeds(self, aggregator):
"""
Return an array of feeds. The structure of the elements will depend of the aggregator
:param aggregator: Aggregator class
:return: Array of feeds or empty array of the actor_id does not have any network links
"""
if not isinstance(aggregator, BaseAggregator):
raise AggregatorObjectError()
connection = self.create_connection()
if connection is not None:
aggregator.connection = connection
aggregator.feed_index = self.feed_index
aggregator.network_array = self.get_network(aggregator.actor_id)
aggregator.set_query_dict()
if aggregator.query_dict is not None:
aggregator.set_aggregation_section()
aggregator.query_feeds()
return aggregator.get_feeds()
else:
return []
else:
raise RequestError("Cannot connect to ElasticSearch")
def execute_raw_network_query(self, query_dict):
connection = self.create_connection()
if connection is not None:
es_result = connection.search(index=self.network_index, body=query_dict)
return es_result
else:
raise RequestError("Cannot connect to ElasticSearch")
def execute_raw_feeds_query(self, query_dict):
connection = self.create_connection()
if connection is not None:
es_result = connection.search(index=self.feed_index, body=query_dict)
return es_result
else:
raise RequestError("Cannot connect to ElasticSearch")
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from elasticfeeds.manager import Manager
from elasticfeeds.network import Link, LinkedActivity
from elasticfeeds.activity import Actor, Object, Origin, Target, Activity
import datetime
import time
import requests
from elasticfeeds.aggregators import (
UnAggregated,
RecentTypeAggregator,
RecentTypeObjectAggregator,
RecentObjectTypeAggregator,
DateWeightAggregator,
YearMonthTypeAggregator,
YearMonthAggregator,
)
def test_manager():
es_host = "0.0.0.0"
es_port = 9200
use_ssl = "False"
ready = False
# username = 'elastic'
# password = 'changeme'
host = "http://{}:{}/_cluster/health".format(es_host, es_port) # noqa: E501
print("Waiting for ES to be ready check :",host)
while not ready:
try:
if use_ssl == "False":
resp = requests.get(host)
else:
resp = requests.get(host)
data = resp.json()
print("json:",data)
if data["status"] == "yellow" or data["status"] == "green":
ready = True
else:
time.sleep(30)
except Exception as e:
print(str(e))
time.sleep(30)
print("ES is ready")
now = datetime.datetime.now()
tst_manager = Manager(
"testfeeds",
"testnetwork",
delete_network_if_exists=True,
delete_feeds_if_exists=True
)
# Creates a linked activity
tst_linked_activity = LinkedActivity("cquiros")
# Testing properties
tst_linked_activity.activity_id = "cquiros"
tst_linked_activity.activity_class = "actor"
tst_linked_activity.activity_type = "person"
# Creates a link
tst_link = Link("cquiros", tst_linked_activity)
# Testing properties
tst_link.actor_id = "cquiros"
tst_link.linked_activity = tst_linked_activity
tst_link.linked = now
tst_link.link_type = "follow"
tst_link.link_weight = 1
tst_link.extra = {"some_extra_data": "test"}
# Adds the network link
# tst_manager.delete_feeds_index()
# tst_manager.delete_network_index()
tst_manager.add_network_link(tst_link)
# Carlos follow Eduardo. Test of convenience function
tst_manager.follow("cquiros", "edoquiros", now)
# --------------------------- Adds some activity feeds ------------------------------
# An actor called cquiros adds project A
# Creates an actor
tst_actor = Actor("cquiros", "person")
# Creates an object
tst_object = Object("50a808d3-1227-4149-80e9-20922bded1cf", "project")
# Creates an Activity
tst_activity = Activity(
"add", tst_actor, tst_object, published=now + datetime.timedelta(minutes=12)
)
# Adds the activity
tst_manager.add_activity_feed(tst_activity)
# cquiros adds project B
# Creates an object
tst_object = Object("152a3304-e78d-4fdf-9449-0943d6072596", "project")
# Creates an Activity
tst_activity = Activity(
"add", tst_actor, tst_object, published=now + datetime.timedelta(minutes=24)
)
# Adds the activity
tst_manager.add_activity_feed(tst_activity)
# cquiros adds Form 1 in project A
# Creates an object
tst_object = Object("326c1f4e-a489-4e36-9d0c-5638ef193f6f", "form")
# Creates a target
tst_target = Target("50a808d3-1227-4149-80e9-20922bded1cf", "project")
# Creates an Activity
tst_activity = Activity(
"add",
tst_actor,
tst_object,
activity_target=tst_target,
published=now + datetime.timedelta(minutes=48),
)
# Adds the activity
tst_manager.add_activity_feed(tst_activity)
# An actor called cquiros moves Form 1 from Project A to Project B
# Creates an actor
tst_actor = Actor("cquiros", "person", {"some_extra_data": "test"})
# Testing properties
tst_actor.actor_id = "cquiros"
tst_actor.actor_type = "person"
tst_actor.extra = {"some_extra_data": "test"}
# Creates an object
tst_object = Object(
"326c1f4e-a489-4e36-9d0c-5638ef193f6f", "form", {"some_extra_data": "test"}
)
# Testing properties
tst_object.object_id = "326c1f4e-a489-4e36-9d0c-5638ef193f6f"
tst_object.object_type = "form"
tst_object.extra = {"some_extra_data": "test"}
# Creates an origin
tst_origin = Origin(
"50a808d3-1227-4149-80e9-20922bded1cf", "project", {"some_extra_data": "test"}
)
# Testing properties
tst_origin.origin_id = "50a808d3-1227-4149-80e9-20922bded1cf"
tst_origin.origin_type = "project"
tst_origin.extra = {"some_extra_data": "test"}
# Creates a target
tst_target = Target(
"152a3304-e78d-4fdf-9449-0943d6072596", "project", {"some_extra_data": "test"}
)
# Testing properties
tst_target.target_id = "152a3304-e78d-4fdf-9449-0943d6072596"
tst_target.target_type = "project"
tst_target.extra = {"some_extra_data": "test"}
# Creates an Activity
tst_activity = Activity(
"move",
tst_actor,
tst_object,
activity_origin=tst_origin,
activity_target=tst_target,
extra={"some_extra_data": "test"},
)
# Testing properties
tst_activity.activity_type = "move"
tst_activity.activity_actor = tst_actor
tst_activity.activity_object = tst_object
tst_activity.published = datetime.datetime.now() + datetime.timedelta(minutes=72)
tst_activity.activity_origin = tst_origin
tst_activity.activity_target = tst_target
tst_activity.extra = {"some_extra_data": "test"}
# Adds the activity
tst_manager.add_activity_feed(tst_activity)
# Carlos Watches project A. Test of convenience function
tst_manager.watch("cquiros", "50a808d3-1227-4149-80e9-20922bded1cf", "project")
# Wait 2 seconds for ES to store previous data. This is only for this testing script
time.sleep(2)
# Test Un-aggregated aggregator
tst_base_aggregator = UnAggregated("cquiros")
# This will bring 5 records
test = tst_manager.get_feeds(tst_base_aggregator)
print("test:",test)
# Test recent type aggregator
tst_recent_type_aggregator = RecentTypeAggregator("cquiros")
test = tst_manager.get_feeds(tst_recent_type_aggregator)
print("test:",test)
# Test recent type object aggregator
tst_recent_type_object_aggregator = RecentTypeObjectAggregator("cquiros")
test = tst_manager.get_feeds(tst_recent_type_object_aggregator)
print("test:",test)
# Test recent object type aggregator
tst_recent_object_type_aggregator = RecentObjectTypeAggregator("cquiros")
test = tst_manager.get_feeds(tst_recent_object_type_aggregator)
print("test:",test)
# Test recent object type aggregator
tst_date_weight_aggregator = DateWeightAggregator("cquiros")
test = tst_manager.get_feeds(tst_date_weight_aggregator)
print("test:",test)
# Test year, month, type aggregator
tst_year_month_type_aggregator = YearMonthTypeAggregator("cquiros")
test = tst_manager.get_feeds(tst_year_month_type_aggregator)
print("test:",test)
# Test year, month, type aggregator
tst_year_month_aggregator = YearMonthAggregator("cquiros")
test = tst_manager.get_feeds(tst_year_month_aggregator)
print("test:",test)
if __name__ == "__main__":
test_manager()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment