Skip to content

Instantly share code, notes, and snippets.

@Filirom1
Last active March 22, 2019 17:21
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Filirom1/8d74adb39e7e1023aa50 to your computer and use it in GitHub Desktop.
Save Filirom1/8d74adb39e7e1023aa50 to your computer and use it in GitHub Desktop.
Extend Ambari

Extend Ambari

Introduction

Hortonworks wrote a great article how to add new components into Amabri:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=38571133

Unfortunately, this document is not enough if you really want to add your own component.

Add rpm dependencies

Your component must be available as an rpm in your CentOS repo.

If you want Ambari to install the rpm, specify OS dependency in : COMPONENT_NAME/metainfo.xml

  <osSpecifics>
    <osSpecific>
      <osFamily>any</osFamily>
      <packages>
        <package>
          <name>storm</name>
        </package>
      </packages>
    </osSpecific>
  </osSpecifics>

The listed packages will be installed by self.install_packages(env) in COMPONENT_NAME/package/scripts/ui_server.py

import sys
from resource_management import *
from storm import storm
from service import service
from service_check import ServiceCheck


class UiServer(Script):
  def install(self, env):
    self.install_packages(env)
    self.configure(env)

stop/start scripts

If your component has an initd script, it will be easy for Ambari to start/stop it.

The start/stop definition is also did in COMPONENT_NAME/package/scripts/ui_server.py

  def start(self, env):
    import params
    env.set_params(params)
    self.configure(env)

    service("ui", action="start")

  def stop(self, env):
    import params
    env.set_params(params)

    service("ui", action="stop")

service_check after install

The service_check is called after the installation to check that everything works well : COMPONENT_NAME/package/scripts/service_check.py

You can execute any command to checks that everything works perfectly well in your component.

from resource_management import *
from resource_management.libraries.functions import get_unique_id_and_date

class ServiceCheck(Script):
  def service_check(self, env):
    import params
    env.set_params(params)

    unique = get_unique_id_and_date()

    File("/tmp/wordCount.jar",
         content=StaticFile("wordCount.jar")
    )

    cmd = format("env JAVA_HOME={java64_home} storm jar /tmp/wordCount.jar storm.starter.WordCountTopology WordCount{unique} -c nimbus.host={nimbus_host}")

    Execute(cmd,
            logoutput=True
    )

    Execute(format("env JAVA_HOME={java64_home} storm kill WordCount{unique}"))

if __name__ == "__main__":
  ServiceCheck().execute()

global variables

In your component you may want to contact other installed components, you will use global variables for that.

Here is an exemple how to read a global variable : COMPONENT_NAME/package/scripts/params.py

storm_user = config['configurations']['global']['storm_user']

You can also define your own global variable in COMPONENT_NAME/configuration/global.xml

<configuration>
  <property>
    <name>storm_user</name>
    <value>storm</value>
    <description></description>
  </property>

alerting via Nagios

Alerting of every components are defined in one file NAGIOS/package/templates/hadoop-services.cfg.j2

You will have to patch the Nagios Component if you want Nagios to check your new component :-(

{% if hostgroup_defs['storm_ui'] %}
# STORM UI Checks
define service {
        hostgroup_name          storm_ui
        use                     hadoop-service
        service_description     STORM_UI_SERVER::Storm UI on {{ hostgroup_defs['storm_ui'][0] }}
        servicegroups           STORM
        check_command           check_webui!storm_ui!{{ storm_ui_port }}
        normal_check_interval   1
        retry_check_interval    1
        max_check_attempts      3
}
{% endif %}

{% if hostgroup_defs['storm_ui'] %}
# STORM UI Checks
define service {
        hostgroup_name          storm_ui
        use                     hadoop-service
        service_description     STORM_UI_SERVER::Storm UI Server process
        servicegroups           STORM
        check_command           check_tcp_wrapper!{{ storm_ui_port }}!-w 1 -c 1
        normal_check_interval   0.5
        retry_check_interval    0.25
        max_check_attempts      3
}
{% endif %}

metrics

Metrics are ingested by Ganglia and are defined in COMPONENT_NAME/metrics.json

"metrics/api/cluster/summary/nimbus.uptime": {
   "metric": "nimbus.uptime",
   "pointInTime": true,
   "temporal": false
}

Every metric has a path metrics/TYPE/ID. The definition of Hadoop type is available at http://aajisaka.github.io/hadoop-project/hadoop-project-dist/hadoop-common/Metrics.html

But Storm type is quite different and is not referenced in the metrics documentation.

In found the source code that produce this metric. ./contrib/storm-rest/src/main/java/org/apache/storm/rest/resources/NimbusResource.java

@Path("/api")
@Produces(MediaType.APPLICATION_JSON)
public class NimbusResource {

    private Nimbus.Client client;

    public NimbusResource(Nimbus.Client client){
        this.client = client;
    }

    @GET
    @Path("/cluster/summary")
    public Object clusterData() throws Exception {
        ClusterSummary cs = this.client.getClusterInfo();
        Map<String, Object> retval = new HashMap<String, Object>();
        retval.put("nimbus.uptime", cs.get_nimbus_uptime_secs());
        retval.put("supervisors", cs.get_supervisors_size());
        retval.put("topologies", cs.get_topologies_size());

        List<SupervisorSummary> sups = cs.get_supervisors();
        int totalSlots = 0;
        int usedSlots = 0;
        for(SupervisorSummary ssum : sups){
            totalSlots += ssum.get_num_workers();
            usedSlots += ssum.get_num_used_workers();
        }
        int freeSlots = totalSlots - usedSlots;

        retval.put("slots.total", totalSlots);
        retval.put("slots.used", usedSlots);
        retval.put("slots.free", freeSlots);

        List<TopologySummary> topos = cs.get_topologies();
        int totalExecutors = 0;
        int totalTasks = 0;
        for(TopologySummary topo : topos){
            totalExecutors += topo.get_num_executors();
            totalTasks = topo.get_num_tasks();
        }
        retval.put("executors.total", totalExecutors);
        retval.put("tasks.total", totalTasks);

        return retval;
    }

An ouput exemple is available here https://github.com/apache/ambari/blob/branch-1.6.1/ambari-server/src/test/resources/storm_rest_api_jmx.json

Good luck.

This gist is a work in progress and may change in the near future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment