ambari二次开发分享 (三)
2024-06-20 11:52:58 # Ambari # 二次开发

ambari二次开发分享(三)

目录

quicklinks也就是ambari组件右侧的跳转链接,对应的是组件的web ui

添加metainfo.xml

1
2
3
4
5
6
7
<!-- <quickLinksConfigurations-dir>quicklinks-es</quickLinksConfigurations-dir> -->
<quickLinksConfigurations>
<quickLinksConfiguration>
<fileName>quicklinks.json</fileName>
<default>true</default>
</quickLinksConfiguration>
</quickLinksConfigurations>

quickjson.json文件的默认目录实在服务根目录的quicklinks目录下

先确定协议(HTTP 与 HTTPS),然后“links”部分用于显示,在 Ambari Web UI 上显示的每个快速链接的元信息。 JSON 文件的顶部有一个 “name” 属性,用于标识快速链接 JSON文件的名称。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
"name": "default", // 这里的值为default,前提是metainfo.xml文件里面的quickLinksConfiguration.default为true。
"description": "default quick links configuration",
"configuration": {
"protocol": {
"type":"https", // type告诉Ambari Web UI如果所有下面的checks都满足使用https协议。该属性值可为 http 或者 https
"checks":[
{
"property":"es.head.http.policy", ---该属性的值为HTTP_ONLY或是HTTPS_ONLY
"desired":"HTTPS_ONLY",
"site":"elastic-config" ---property属性所在的文件
}
]//这里也就是看一下这个checks中的所有属性值这里写的HTTPS_ONLY是否和site里面写的文件中设置的这个参数的属性值一致,如果一致就说明满足检查,根据上面的配置,使用https协议
},
"links": [
{
"name": "elasticsearch_ui", // 快速链接的名称
"label": "Elasticsearch-head UI", // 快链UI上的显示名称
"component_name": "ELASTICSEARCH_HEAD", // 指定该link所关联的组件名称,这个组件名称必须是在metainfo.xml文件中配置过的
"url":"%@://%@:%@", // 第一个%@为通信协议,第二个是主机名,第三个是端口号,这个一般就写死是这个格式
"port":{
"http_property": "elasticsearch_head_port", // 端口号,动态获取
"http_default_port": "9100",//如果没有获取到就使用这个默认的9100端口号
"https_property": "elasticsearch_head_port", // 端口号,动态获取
"https_default_port": "9100",//如果没有获取到就使用这个默认的9100端口号
"regex": "^(\\d+)$", // 正则表达式,只支持多个数字组合。第一个\为转义符
"site": "elastic-config" // http_property或https_property参数所在的文件名称,省略了”.xml“
}
}
]
}
}

为自定义服务添加监控指标并展示

metainfo.xml

这里应该在metainfo.xml里面配置好监控哪个组件,在需要监控的组件下添加

1
<timelineAppid>elasticsearch</timelineAppid>

比如下面的写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<component>
<name>ELASTICSEARCH_SERVICE</name>
<!-- ambariWeb页面上显示的服务名称 -->
<displayName>Elasticsearch Service</displayName>
<!-- 组件的类别,MASTER、SLAVE、CLIENT -->
<category>MASTER</category>
<cardinality>1+</cardinality>
<!-- 监控指标 -->
<timelineAppid>elasticsearch</timelineAppid>
<!-- 服务主程序 -->
<commandScript>
<script>scripts/master.py</script>
<scriptType>PYTHON</scriptType>
<timeout>1800</timeout>
</commandScript>
<customCommands>
<customCommand>
<name>test_master_check</name>
<background>true</background>
<commandScript>
<script>scripts/master.py</script>
<scriptType>PYTHON</scriptType>
</commandScript>
</customCommand>
</customCommands>
</component>

这里注意这里的timeLineAppid是唯一的,不区分大小写,写的都是服务的名称Metrics Collector会根据这个来区分各个服务的监控指标信息

添加自定义监控指标

步骤总结

  1. 通过 widgets.json 文件定义服务 widgets 。
  2. 通过 metrics.json 文件声明服务指标。
  3. 将服务指标推送到 Ambari Metrics Collector 中。

服务仪表板 widgets.json

公共部分

1
2
3
4
5
6
7
8
9
10
11
12
{
"layouts": [
{
"layout_name": "default_elasticsearch_dashboard",//唯一名称
"display_name": "Standard Elasticsearch Dashboard",//显示名称
"section_name": "ELASTICSEARCH_SUMMARY",//唯一名称
"widgetLayoutInfo": [
//...widget组件
]
}
]
}

注意

  • “layout_name”: “default_${服务名小写}_dashboard”
  • “section_name”: “${服务名大写}_SUMMARY”

这俩属性的值不能乱定义。

widget支持类型
  1. Graph类型
    alt text
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
{
"widget_name": "NameNode GC count",//最上面显示的指标名称
"description": "Count of total garbage collections and count of major type garbage collections of the JVM.",
"widget_type": "GRAPH",//类型
"is_visible": true,//都写成true,默认开启
"metrics": [
{
"name": "jvm.JvmMetrics.GcCount._rate", // 数据推送到collector的实际名称。
"metric_path": "metrics/jvm/gcCount._rate", // 这个路径和metrics.json文件的metricKey要保持一致(去掉._rate的部分)
"service_name": "HDFS", // 服务名
"component_name": "NAMENODE", // 组件名
"host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active" // 可选参数,该值为主机指标,不加这个参数的话,就是服务指标,这里下面的开发都统一是服务指标,不加上这个主机指标
},
{
"name": "jvm.JvmMetrics.GcCountConcurrentMarkSweep._rate",//这里的_rate表示的是速率,表示每秒的速率
"metric_path": "metrics/jvm/GcCountConcurrentMarkSweep._rate",
"service_name": "HDFS",
"component_name": "NAMENODE",
"host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active"
}
],
"values": [
{
"name": "GC total count", // 该字段仅用于“GRAPH图形”窗口小部件类型。作为放大图例中的标签名称。
"value": "${jvm.JvmMetrics.GcCount._rate}" // 这是用来计算数据集值的表达式。表达式包含对声明的度量名称和作为有效操作数的常量的引用。表达式还包含一组有效的运算符{+、-、*、/},可以与有效的操作数一起使用。表达式中也允许使用括号。
},
{
"name": "GC count of type major collection",
"value": "${jvm.JvmMetrics.GcCountConcurrentMarkSweep._rate}"
}
],
"properties": { // 包含显示单位、阈值标识等等
"graph_type": "LINE",//线型图
"time_range": "1"//1个时间单位
}
}

Gauge支持类型

alt text

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
"widget_name": "heap使用内存的百分比",
"description": "heap使用内存的百分比,当值达到75%的时候开始GC",
"widget_type": "GAUGE",
"is_visible": true,
"metrics": [
{
"name": "heap.used.memory",
"metric_path": "metrics/heap/used/memory",
"service_name": "ELASTICSEARCH",
"component_name": "ELASTICSEARCH_SERVICE"
},
{
"name": "heap.max.memory",
"metric_path": "metrics/heap/max/memory",
"service_name": "ELASTICSEARCH",
"component_name": "ELASTICSEARCH_SERVICE"
}
],
"values": [
{
"name": "heap used memory",
"value": "${heap.used.memory/heap.max.memory}"
}
],
"properties": {
"error_threshold": "0.9",//这里设置的是当达到了90%的时候,就显示红色
"warning_threshold": "0.75"//这里设置的是当达到了75%的时候,就显示黄色
}
}

Number支持类型

alt text

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
"widget_name": "Under Replicated Blocks",
"description": "Number represents file blocks that does not meet the replication factor criteria. Its indicative of HDFS bad health.",
"widget_type": "NUMBER",
"is_visible": true,
"metrics": [
{
"name": "Hadoop:service=NameNode,name=FSNamesystem.UnderReplicatedBlocks",
"metric_path": "metrics/dfs/FSNamesystem/UnderReplicatedBlocks",
"service_name": "HDFS",
"component_name": "NAMENODE",
"host_component_criteria": "host_components/metrics/dfs/FSNamesystem/HAState=active"
}
],
"values": [
{
"name": "Under Replicated Blocks",
"value": "${Hadoop:service=NameNode,name=FSNamesystem.UnderReplicatedBlocks}"
}
],
"properties": {
"warning_threshold": "0",
"error_threshold": "50"
//这里可以添加单位进行单位显示
}
}

Number 支持单位显示:常用的有 ms,min,d 等,用 display_unit 定义。

Template支持类型

alt text

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
"widget_name": "Elasticsearch集群节点的存活占比",
"description": "Elasticsearch集群节点的存活占比,存活个数/总个数",
"widget_type": "TEMPLATE",
"is_visible": true,
"metrics": [
{
"name": "nodes.number._max",
"metric_path": "metrics/nodes/number._max",
"service_name": "ELASTICSEARCH",
"component_name": "ELASTICSEARCH_SERVICE"
},
{
"name": "total.nodes.number._max",
"metric_path": "metrics/total/nodes/number._max",
"service_name": "ELASTICSEARCH",
"component_name": "ELASTICSEARCH_SERVICE"
}
],
"values": [
{
"name": "the number of nodes",
"value": "${nodes.number._max} / ${total.nodes.number._max}"
}
]
}

指标的聚合函数

  • max:所有主机组件的指标最大值
  • min:所有主机组件中指标的最小值
  • avg:所有主机组件的指标平均值
  • sum:所有每个主机组件的度量值总和

metrics.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"<服务组件名称>": {
"Component": [
{
"type": "ganglia", // ganglia是负责收集指标数据的
"metrics": {
"default": {
"<metricKey>": { // 这里的metricKey值为widgets.json里面的metrics[i].metric_path的值。去掉._max、._min之类的聚合标识
"metric": "<metricName>", // 这里的metricName值为widgets.json里面的metrics[i].name的值。去掉._max、._min之类的聚合标识
"pointInTime": true, // 表示该 Metric 属性是否允许时间段的查询,如果为 false 则代表不允许,这样就会只取最后一次的值。
"temporal": true // 代表是否支持时间段的查询请求,这里一般都是 true。
},
...
}
}
}
]
}
}

启动服务

最后当我们将上述的文件都改好后,还要将这个文件放到 /var/lib/ambari-server/resources/stacks/HDP/3.1/services/ELASTICSEARCH 目录下。然后重启 ambari-server,重新安装服务才行

向 Ambari Metrics Collector 发送指标数据

这里还没有数据,因为我们还没有写关于监控对应的数据,然后将监控的数据根据发送请求的方式来将监控的数据定期通过curl发送给Collector

POSTMAN测试

首先我们通过postman向Collector发送一下测试的指标数据,测试一下前面的配置是否成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"metrics": [
{
"metricname": "indices.count2",//这里的指标名对应的是metrics.json中的metrics[i].default.metric中的值
"appid": "elasticsearch",
"hostname": "hdp2.com",//安装服务的主机名
"timestamp": 1612608282778,//时间戳
"starttime": 1612608282778,
"metrics": {
"1612608282778": 333//这里的key就是时间戳,value就是指标值
}
}
]
}

alt text
这里我们看到了standalone:"yes"就说明已经将指标发送成功

正式编写监控指标脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#!/usr/bin/env python
# -*- coding: utf-8 -*--

import json
import logging
import os
import sys
import time
import socket
import requests

##############################
# 相关ES监控指标脚本逻辑如下:
# 1、判断ES是否启动,如果服务宕机,则停止向metrcis collector发送指标数据
# 2、通过requests模块,读取ES api接口,解析json,获取需要的指标数据。
# 3、组装请求参数,并向etrcis collector发送POST请求
# 4、过程中,将发送的指标数据打印到日志文件中,方便运维人员排查问题。
# 5、每隔10s向metrics collector发送一系列POST请求。
# 6、以上是一个while循环,循环条件就是判断ES进程是否存在,如果ES进程不存在,则终止循环。
# 7、把该脚本放在生命周期start()方法里面执行。
##############################

# '''
# 需要提供的参数有:
# {
# "metrics_collector": {
# "ip": "hdp1.com",
# "port": "6188"
# },
# "es": {
# "pid_file": "/var/run/elasticsearch/elasticsearch.pid",
# "ip": "hdp1.com",
# "port": "9200",
# "log_dir": "/var/log/elasticsearch"
# }
# }
# '''


# 从命令行参数中获取数据
params_data = sys.argv[1]
params_json = json.loads(params_data)

# 解析数据
mc_ip = params_json['metrics_collector']['ip']
mc_port = params_json['metrics_collector']['port']
es_pid_file = params_json['es']['pid_file']
es_ip = params_json['es']['ip']
es_port = params_json['es']['port']
es_log_dir = params_json['es']['log_dir']

# 设置日志文件目录和文件名
elastic_log_dir = "{0}".format(es_log_dir)
metrics_log_file = "es_metrics.log"
metrics_log = os.path.join(elastic_log_dir, metrics_log_file)

# Elasticsearch API URLs
es_cluster_stats_api = "http://{0}:{1}/_cluster/stats".format(es_ip, es_port)
es_cluster_health_api = "http://{0}:{1}/_cluster/health".format(es_ip, es_port)

# Metrics Collector API URL
metrics_collector_api = "http://{0}:{1}/ws/v1/timeline/metrics".format(mc_ip, mc_port)

# 检查日志目录是否存在,不存在则创建
if not os.path.exists(elastic_log_dir):
os.makedirs(elastic_log_dir, 0o644) # 注意:在Python 3.x中,文件权限应使用八进制表示法0o644

# 配置日志输出到控制台
ch = logging.StreamHandler()
ch.setLevel(logging.INFO) # 设置控制台日志级别为INFO

# 配置日志输出到文件
logging.basicConfig(level=logging.INFO, # 设置日志级别为INFO
filename=metrics_log, # 设置日志文件名
filemode='a', # 文件模式为追加
format='%(asctime)s - %(filename)s [line:%(lineno)d] %(levelname)s: %(message)s', # 日志格式
)

# 创建日志记录器
logger = logging.getLogger()
logger.addHandler(ch)

# 记录参数数据的类型和内容
logger.info("data type: {0}, params_data: {1}".format(type(params_data), params_data))


# 从PID文件中读取PID
def read_file(pid_file, encoding=None):
with open(pid_file, "rb") as fp:
content = fp.read()
content = content.decode(encoding) if encoding else content
return content


# 根据PID文件检查进程是否存在
def check_process(pid_file):
if not pid_file or not os.path.isfile(pid_file):
logging.error("Pid file {0} is empty or does not exist".format(str(pid_file)))
return False
else:
pid = -1
try:
pid = int(read_file(pid_file))
except RuntimeError:
logging.error("Pid file {0} does not exist or does not contain a process id number".format(pid_file))

try:
# 使用信号0检查进程是否存在
os.kill(pid, 0)
logging.info("process already exists! ")
process_is_exist = True
except OSError:
logging.error("Process with pid {0} is not running. Stale pid file"
" at {1}".format(pid, pid_file))
process_is_exist = False
return process_is_exist


# 获取API数据
def get_api_data():
try:
logging.info("elasticsearch cluster stats api: {0}".format(es_cluster_stats_api))
stats_resp = requests.get(es_cluster_stats_api)
stats_content = json.loads(stats_resp.content)
indices_count = stats_content['indices']['count']
heap_used_memory = stats_content['nodes']['jvm']['mem']['heap_used_in_bytes']
heap_max_memory = stats_content['nodes']['jvm']['mem']['heap_max_in_bytes']
nodes_number = stats_content['_nodes']['successful']
total_nodes_number = stats_content['_nodes']['total']
nodes_mem_percent = stats_content['nodes']['os']['mem']['used_percent']

# 发送各个指标到Metrics Collector
send_metric_to_collector("indices.count", indices_count)
send_metric_to_collector("heap.used.memory", heap_used_memory)
send_metric_to_collector("heap.max.memory", heap_max_memory)
send_metric_to_collector("nodes.number", nodes_number)
send_metric_to_collector("nodes.total", total_nodes_number)
send_metric_to_collector("nodes.mem.percent", nodes_mem_percent)

logging.info("elasticsearch cluster health api: {0}".format(es_cluster_health_api))
health_resp = requests.get(es_cluster_health_api)
health_content = json.loads(health_resp.content)
unassigned_shards = health_content['unassigned_shards']
send_metric_to_collector("unassigned.shards", unassigned_shards)
except Exception as e:
logging.error(e)


# 发送指标数据到Metrics Collector
def send_metric_to_collector(metric_name, metric_data):
appid = "elasticsearch"
millon_time = int(time.time() * 1000)
hostname = socket.gethostname()
header = {
"Content-Type": "application/json"
}
metrics_json = {
"metrics": [
{
"metricname": metric_name,
"appid": appid,
"hostname": hostname,
"timestamp": millon_time,
"starttime": millon_time,
"metrics": {
millon_time: metric_data
}
}
]
}
logging.info("[{0}] send metrics to collector data: {1}".format(metric_name, metrics_json))
try:
resp = requests.post(metrics_collector_api, json=metrics_json, headers=header)
logging.info("send metrics result: {0}".format(resp.content))
except Exception as e:
logging.error("send metrics failure: {0}".format(e))
pass


# 检测进程并发送数据
def check_and_send():
# 当Elasticsearch进程存在时,不断获取并发送数据
while check_process(es_pid_file):
get_api_data()
time.sleep(10)


if __name__ == "__main__":
check_and_send()

根据api接口查看指标详情

http://192.168.6.128:6188/ws/v1/timeline/metrics?metricNames=indices.count1&appId=elasticsearch

注意大小写,是 appId 。

  • indices.count1 为 metrics.json 文件的 metric 属性值。

  • elasticsearch 为 metainfo.xml 文件的 component -> timelineAppid 属性值。

FAQ

  1. 页面展示乱码

    需要查看mysql数据库的编码,是不是 utf8。

  2. 如何查看日志

    成功部署之前看ambari-server.log

    成功部署之后看ambari-metrics-collector.log

  3. 注意widgets.json的两种写法

    alt text

    alt text

    这里的具体区别,详细看一下es的ambari更改的源码

  4. 控制台爆出js报错

    这里需要注意下面两个参数是不是写法正确

    “layout_name”: “default_${服务名小写}_dashboard”

    “section_name”: “${服务名大写}_SUMMARY”

调试步骤

更改完成后需要重启ambari-server和重装服务才能生效

这里需要注意的是:生效之后需要在Actions中点击Browse Widgets添加最新的Widget部件,添加新部件后才会生效,但是这里ambari有一个bug,就是旧的之前的监控指标信息会保存在 widget表中,如果要不显示之前的指标只能手动删除 widget表之前的数据,这里可能要更改一下源码再调试一下ambari,看看这里的逻辑应该在新增之前清理一下旧数据