001 /*
002 * GangliaContext.java
003 *
004 * Licensed to the Apache Software Foundation (ASF) under one
005 * or more contributor license agreements. See the NOTICE file
006 * distributed with this work for additional information
007 * regarding copyright ownership. The ASF licenses this file
008 * to you under the Apache License, Version 2.0 (the
009 * "License"); you may not use this file except in compliance
010 * with the License. You may obtain a copy of the License at
011 *
012 * http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 */
020
021 package org.apache.hadoop.metrics.ganglia;
022
023 import java.io.IOException;
024 import java.net.DatagramPacket;
025 import java.net.DatagramSocket;
026 import java.net.SocketAddress;
027 import java.net.SocketException;
028 import java.util.HashMap;
029 import java.util.List;
030 import java.util.Map;
031
032 import org.apache.commons.logging.Log;
033 import org.apache.commons.logging.LogFactory;
034
035 import org.apache.hadoop.classification.InterfaceAudience;
036 import org.apache.hadoop.classification.InterfaceStability;
037 import org.apache.hadoop.metrics.ContextFactory;
038 import org.apache.hadoop.metrics.MetricsException;
039 import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
040 import org.apache.hadoop.metrics.spi.OutputRecord;
041 import org.apache.hadoop.metrics.spi.Util;
042
043 /**
044 * Context for sending metrics to Ganglia.
045 *
046 */
047 @InterfaceAudience.Public
048 @InterfaceStability.Evolving
049 public class GangliaContext extends AbstractMetricsContext {
050
051 private static final String PERIOD_PROPERTY = "period";
052 private static final String SERVERS_PROPERTY = "servers";
053 private static final String UNITS_PROPERTY = "units";
054 private static final String SLOPE_PROPERTY = "slope";
055 private static final String TMAX_PROPERTY = "tmax";
056 private static final String DMAX_PROPERTY = "dmax";
057
058 private static final String DEFAULT_UNITS = "";
059 private static final String DEFAULT_SLOPE = "both";
060 private static final int DEFAULT_TMAX = 60;
061 private static final int DEFAULT_DMAX = 0;
062 private static final int DEFAULT_PORT = 8649;
063 private static final int BUFFER_SIZE = 1500; // as per libgmond.c
064
065 private final Log LOG = LogFactory.getLog(this.getClass());
066
067 private static final Map<Class,String> typeTable = new HashMap<Class,String>(5);
068
069 static {
070 typeTable.put(String.class, "string");
071 typeTable.put(Byte.class, "int8");
072 typeTable.put(Short.class, "int16");
073 typeTable.put(Integer.class, "int32");
074 typeTable.put(Long.class, "float");
075 typeTable.put(Float.class, "float");
076 }
077
078 protected byte[] buffer = new byte[BUFFER_SIZE];
079 protected int offset;
080
081 protected List<? extends SocketAddress> metricsServers;
082 private Map<String,String> unitsTable;
083 private Map<String,String> slopeTable;
084 private Map<String,String> tmaxTable;
085 private Map<String,String> dmaxTable;
086
087 protected DatagramSocket datagramSocket;
088
089 /** Creates a new instance of GangliaContext */
090 @InterfaceAudience.Private
091 public GangliaContext() {
092 }
093
094 @InterfaceAudience.Private
095 public void init(String contextName, ContextFactory factory) {
096 super.init(contextName, factory);
097 parseAndSetPeriod(PERIOD_PROPERTY);
098
099 metricsServers =
100 Util.parse(getAttribute(SERVERS_PROPERTY), DEFAULT_PORT);
101
102 unitsTable = getAttributeTable(UNITS_PROPERTY);
103 slopeTable = getAttributeTable(SLOPE_PROPERTY);
104 tmaxTable = getAttributeTable(TMAX_PROPERTY);
105 dmaxTable = getAttributeTable(DMAX_PROPERTY);
106
107 try {
108 datagramSocket = new DatagramSocket();
109 }
110 catch (SocketException se) {
111 se.printStackTrace();
112 }
113 }
114
115 /**
116 * method to close the datagram socket
117 */
118 @Override
119 public void close() {
120 super.close();
121 if (datagramSocket != null) {
122 datagramSocket.close();
123 }
124 }
125
126 @InterfaceAudience.Private
127 public void emitRecord(String contextName, String recordName,
128 OutputRecord outRec)
129 throws IOException {
130 // Setup so that the records have the proper leader names so they are
131 // unambiguous at the ganglia level, and this prevents a lot of rework
132 StringBuilder sb = new StringBuilder();
133 sb.append(contextName);
134 sb.append('.');
135
136 if (contextName.equals("jvm") && outRec.getTag("processName") != null) {
137 sb.append(outRec.getTag("processName"));
138 sb.append('.');
139 }
140
141 sb.append(recordName);
142 sb.append('.');
143 int sbBaseLen = sb.length();
144
145 // emit each metric in turn
146 for (String metricName : outRec.getMetricNames()) {
147 Object metric = outRec.getMetric(metricName);
148 String type = typeTable.get(metric.getClass());
149 if (type != null) {
150 sb.append(metricName);
151 emitMetric(sb.toString(), type, metric.toString());
152 sb.setLength(sbBaseLen);
153 } else {
154 LOG.warn("Unknown metrics type: " + metric.getClass());
155 }
156 }
157 }
158
159 protected void emitMetric(String name, String type, String value)
160 throws IOException {
161 String units = getUnits(name);
162 int slope = getSlope(name);
163 int tmax = getTmax(name);
164 int dmax = getDmax(name);
165
166 offset = 0;
167 xdr_int(0); // metric_user_defined
168 xdr_string(type);
169 xdr_string(name);
170 xdr_string(value);
171 xdr_string(units);
172 xdr_int(slope);
173 xdr_int(tmax);
174 xdr_int(dmax);
175
176 for (SocketAddress socketAddress : metricsServers) {
177 DatagramPacket packet =
178 new DatagramPacket(buffer, offset, socketAddress);
179 datagramSocket.send(packet);
180 }
181 }
182
183 protected String getUnits(String metricName) {
184 String result = unitsTable.get(metricName);
185 if (result == null) {
186 result = DEFAULT_UNITS;
187 }
188 return result;
189 }
190
191 protected int getSlope(String metricName) {
192 String slopeString = slopeTable.get(metricName);
193 if (slopeString == null) {
194 slopeString = DEFAULT_SLOPE;
195 }
196 return ("zero".equals(slopeString) ? 0 : 3); // see gmetric.c
197 }
198
199 protected int getTmax(String metricName) {
200 if (tmaxTable == null) {
201 return DEFAULT_TMAX;
202 }
203 String tmaxString = tmaxTable.get(metricName);
204 if (tmaxString == null) {
205 return DEFAULT_TMAX;
206 }
207 else {
208 return Integer.parseInt(tmaxString);
209 }
210 }
211
212 protected int getDmax(String metricName) {
213 String dmaxString = dmaxTable.get(metricName);
214 if (dmaxString == null) {
215 return DEFAULT_DMAX;
216 }
217 else {
218 return Integer.parseInt(dmaxString);
219 }
220 }
221
222 /**
223 * Puts a string into the buffer by first writing the size of the string
224 * as an int, followed by the bytes of the string, padded if necessary to
225 * a multiple of 4.
226 */
227 protected void xdr_string(String s) {
228 byte[] bytes = s.getBytes();
229 int len = bytes.length;
230 xdr_int(len);
231 System.arraycopy(bytes, 0, buffer, offset, len);
232 offset += len;
233 pad();
234 }
235
236 /**
237 * Pads the buffer with zero bytes up to the nearest multiple of 4.
238 */
239 private void pad() {
240 int newOffset = ((offset + 3) / 4) * 4;
241 while (offset < newOffset) {
242 buffer[offset++] = 0;
243 }
244 }
245
246 /**
247 * Puts an integer into the buffer as 4 bytes, big-endian.
248 */
249 protected void xdr_int(int i) {
250 buffer[offset++] = (byte)((i >> 24) & 0xff);
251 buffer[offset++] = (byte)((i >> 16) & 0xff);
252 buffer[offset++] = (byte)((i >> 8) & 0xff);
253 buffer[offset++] = (byte)(i & 0xff);
254 }
255 }