Skip to content

Instantly share code, notes, and snippets.

@hannelita
Last active August 29, 2015 14:25
Show Gist options
  • Save hannelita/9b15d257f9a847c58d4e to your computer and use it in GitHub Desktop.
Save hannelita/9b15d257f9a847c58d4e to your computer and use it in GitHub Desktop.
"""
Neo4j implementation for the DocManager. Receives documents and
communicates with Neo4j Server.
"""
import base64
import logging
from threading import Timer
import bson.json_util
from py2neo import Graph, exceptions as es_exceptions
from mongo_connector import errors
from mongo_connector.compat import u
from mongo_connector.constants import (DEFAULT_COMMIT_INTERVAL,
DEFAULT_MAX_BULK)
from mongo_connector.util import exception_wrapper, retry_until_ok
from mongo_connector.doc_managers.doc_manager_base import DocManagerBase
from mongo_connector.doc_managers.formatters import DefaultDocumentFormatter
wrap_exceptions = exception_wrapper({
es_exceptions.ConnectionError: errors.ConnectionFailed,
es_exceptions.TransportError: errors.OperationFailed,
es_exceptions.NotFoundError: errors.OperationFailed,
es_exceptions.RequestError: errors.OperationFailed})
LOG = logging.getLogger(__name__)
class DocManager(DocManagerBase):
"""
Neo4j implementation for the DocManager. Receives documents and
communicates with Neo4j Server.
"""
def __init__(self, url, **kwargs):
self.remote_graph = Graph(url,
**kwargs.get('clientOptions', {}))
def stop(self):
"""Stop the auto-commit thread."""
self.auto_commit_interval = None
def upsert(self, doc, namespace, timestamp):
"""Inserts a document into Neo4j."""
def bulk_upsert(self, docs, namespace, timestamp):
"""Insert multiple documents into Neo4j."""
def update(self, document_id, update_spec, namespace, timestamp):
def remove(self, document_id, namespace, timestamp):
def search(self, start_ts, end_ts):
def commit(self):
def get_last_doc(self):
def handle_command(self, doc, namespace, timestamp):
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment