Skip to content

Instantly share code, notes, and snippets.

@sagaraivale
Created February 16, 2018 10:29
Show Gist options
  • Save sagaraivale/418e8b8a12998c0c5974ae5bd6b175ad to your computer and use it in GitHub Desktop.
Save sagaraivale/418e8b8a12998c0c5974ae5bd6b175ad to your computer and use it in GitHub Desktop.
import etcd
from etcd import Client
import __builtin__
import maps
import mock
from mock import patch, Mock
from tendrl.gluster_integration.objects.volume import Volume
import tendrl.commons.objects.node_context as node
from tendrl.commons import TendrlNS
import pytest
from tendrl.commons.message import ExceptionMessage
import importlib
from tendrl.gluster_integration import sds_sync
from tendrl.gluster_integration.sds_sync import \
GlusterIntegrationSdsSyncStateThread, brick_status_alert, \
sync_volumes
def test_thread_init():
data = GlusterIntegrationSdsSyncStateThread()
assert isinstance(data, GlusterIntegrationSdsSyncStateThread)
def test_volume_profiling():
obj = GlusterIntegrationSdsSyncStateThread()
assert hasattr(obj, "_enable_disable_volume_profiling" ) is True
@patch.object(etcd, "Client")
@patch.object(Client, "read")
@patch.object(node.NodeContext, '_get_node_id')
def init(patch_get_node_id, patch_read, patch_client):
patch_get_node_id.return_value = 1
patch_read.return_value = etcd.Client()
patch_client.return_value = etcd.Client()
setattr(__builtin__, "NS", maps.NamedDict())
setattr(NS, "_int", maps.NamedDict())
NS._int.etcd_kwargs = {
'port': 1,
'host': 2,
'allow_reconnect': True}
NS._int.client = etcd.Client(**NS._int.etcd_kwargs)
NS["config"] = maps.NamedDict()
NS.config["data"] = maps.NamedDict()
NS.config.data['tags'] = "test"
NS.state_sync_thread = mock.MagicMock()
NS.sds_sync_thread = mock.MagicMock()
NS.message_handler_thread = mock.MagicMock()
tendrlNS = TendrlNS()
return tendrlNS
def test_enable_disable_volume_profiling():
init()
setattr(NS, "publisher_id", "gluster-integration")
setattr(NS, "gluster", maps.NamedDict())
NS.gluster["objects"] = maps.NamedDict()
obj = importlib.import_module(
'tendrl.gluster_integration.objects.volume'
)
assert isinstance(NS, maps.NamedDict)
def test_brick_status_alert():
init()
setattr(NS, "publisher_id", "gluster-integration")
setattr(NS, "gluster", maps.NamedDict())
NS.gluster["objects"] = maps.NamedDict()
obj = importlib.import_module(
'tendrl.gluster_integration.objects.brick'
)
with pytest.raises(Exception):
ExceptionMessage(priority="info", publisher="node_context", payload={"message": "Test Exception Message"})
ex = Exception("Test Exception")
ExceptionMessage(priority="error", publisher=NS.publisher_id, payload={ "message": "Unable to raise an brick status "
"alert for host %s" % hostname, "exception": ex })
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment