-
Notifications
You must be signed in to change notification settings - Fork 136
Expand file tree
/
Copy pathNonBlockingStatsDClient.java
More file actions
303 lines (270 loc) · 11.7 KB
/
NonBlockingStatsDClient.java
File metadata and controls
303 lines (270 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
package com.timgroup.statsd;
import java.nio.charset.Charset;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
/**
* A simple StatsD client implementation facilitating metrics recording.
*
* <p>Upon instantiation, this client will establish a socket connection to a StatsD instance
* running on the specified host and port. Metrics are then sent over this connection as they are
* received by the client.
* </p>
*
* <p>Three key methods are provided for the submission of data-points for the application under
* scrutiny:
* <ul>
* <li>{@link #incrementCounter} - adds one to the value of the specified named counter</li>
* <li>{@link #recordGaugeValue} - records the latest fixed value for the specified named gauge</li>
* <li>{@link #recordExecutionTime} - records an execution time in milliseconds for the specified named operation</li>
* </ul>
* From the perspective of the application, these methods are non-blocking, with the resulting
* IO operations being carried out in a separate thread. Furthermore, these methods are guaranteed
* not to throw an exception which may disrupt application execution.
* </p>
*
* <p>As part of a clean system shutdown, the {@link #stop()} method should be invoked
* on any StatsD clients.</p>
*
* @author Tom Denley
*
*/
public final class NonBlockingStatsDClient extends ConvenienceMethodProvidingStatsDClient {
private static final Charset STATS_D_ENCODING = Charset.forName("UTF-8");
private static final String STATS_D_TAG_PREFIX = "|#";
private static final StatsDClientErrorHandler NO_OP_HANDLER = new StatsDClientErrorHandler() {
@Override public void handle(Exception e) { /* No-op */ }
};
private final String prefix;
private final NonBlockingUdpSender sender;
private String clientTags = "";
/**
* Create a new StatsD client communicating with a StatsD instance on the
* specified host and port. All messages send via this client will have
* their keys prefixed with the specified string. The new client will
* attempt to open a connection to the StatsD server immediately upon
* instantiation, and may throw an exception if that a connection cannot
* be established. Once a client has been instantiated in this way, all
* exceptions thrown during subsequent usage are consumed, guaranteeing
* that failures in metrics will not affect normal code execution.
*
* @param prefix
* the prefix to apply to keys sent via this client (can be null or empty for no prefix)
* @param hostname
* the host name of the targeted StatsD server
* @param port
* the port of the targeted StatsD server
* @throws StatsDClientException
* if the client could not be started
*/
public NonBlockingStatsDClient(String prefix, String hostname, int port) throws StatsDClientException {
this(prefix, hostname, port, NO_OP_HANDLER);
}
/**
* Create a new StatsD client communicating with a StatsD instance on the
* specified host and port. All messages send via this client will have
* their keys prefixed with the specified string. The new client will
* attempt to open a connection to the StatsD server immediately upon
* instantiation, and may throw an exception if that a connection cannot
* be established. Once a client has been instantiated in this way, all
* exceptions thrown during subsequent usage are passed to the specified
* handler and then consumed, guaranteeing that failures in metrics will
* not affect normal code execution.
*
* @param prefix
* the prefix to apply to keys sent via this client (can be null or empty for no prefix)
* @param hostname
* the host name of the targeted StatsD server
* @param port
* the port of the targeted StatsD server
* @param errorHandler
* handler to use when an exception occurs during usage
* @throws StatsDClientException
* if the client could not be started
*/
public NonBlockingStatsDClient(String prefix, String hostname, int port, StatsDClientErrorHandler errorHandler) throws StatsDClientException {
this.prefix = (prefix == null || prefix.trim().isEmpty()) ? "" : (prefix.trim() + ".");
try {
this.sender = new NonBlockingUdpSender(hostname, port, STATS_D_ENCODING, errorHandler);
} catch (Exception e) {
throw new StatsDClientException("Failed to start StatsD client", e);
}
}
/**
* Cleanly shut down this StatsD client. This method may throw an exception if
* the socket cannot be closed.
*/
@Override
public void stop() {
sender.stop();
}
/**
* Adjusts the specified counter by a given delta.
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the counter to adjust
* @param delta
* the amount to adjust the counter by
* @param sampleRate
* the sampling rate being employed. For example, a rate of 0.1 would tell StatsD that this counter is being sent
* sampled every 1/10th of the time.
*/
@Override
public void count(String aspect, long delta, double sampleRate) {
send(messageFor(aspect, Long.toString(delta), "c", sampleRate));
}
/**
* Adjusts the specified counter by a given delta.
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the counter to adjust
* @param delta
* the amount to adjust the counter by
* @param sampleRate
* the sampling rate being employed. For example, a rate of 0.1 would tell StatsD that this counter is being sent
* sampled every 1/10th of the time.
* @param tags
* A string array containing one or more tags. Each tag can be in the format of key:value, e.g. key1:value1. Or it can be just a key, e.g. key3.
*/
public void count(String aspect, long delta, double sampleRate, String[] tags) {
send(messageFor(aspect, Long.toString(delta), "c", sampleRate, tags));
}
/**
* Records the latest fixed value for the specified named gauge.
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the gauge
* @param value
* the new reading of the gauge
*/
@Override
public void recordGaugeValue(String aspect, long value) {
recordGaugeCommon(aspect, Long.toString(value), value < 0, false);
}
@Override
public void recordGaugeValue(String aspect, double value) {
recordGaugeCommon(aspect, stringValueOf(value), value < 0, false);
}
@Override
public void recordGaugeDelta(String aspect, long value) {
recordGaugeCommon(aspect, Long.toString(value), value < 0, true);
}
@Override
public void recordGaugeDelta(String aspect, double value) {
recordGaugeCommon(aspect, stringValueOf(value), value < 0, true);
}
public void recordGaugeValue(String aspect, long value, String[] tags) {
recordGaugeCommon(aspect, Long.toString(value), value < 0, false, tags);
}
public void recordGaugeValue(String aspect, double value, String[] tags) {
recordGaugeCommon(aspect, stringValueOf(value), value < 0, false, tags);
}
public void recordGaugeDelta(String aspect, long value, String[] tags) {
recordGaugeCommon(aspect, Long.toString(value), value < 0, true, tags);
}
public void recordGaugeDelta(String aspect, double value, String[] tags) {
recordGaugeCommon(aspect, stringValueOf(value), value < 0, true, tags);
}
private void recordGaugeCommon(String aspect, String value, boolean negative, boolean delta) {
final StringBuilder message = new StringBuilder();
if (!delta && negative) {
message.append(messageFor(aspect, "0", "g")).append('\n');
}
message.append(messageFor(aspect, (delta && !negative) ? ("+" + value) : value, "g"));
send(message.toString());
}
private void recordGaugeCommon(String aspect, String value, boolean negative, boolean delta, String[] tags) {
final StringBuilder message = new StringBuilder();
if (!delta && negative) {
message.append(messageFor(aspect, "0", "g", 1.0, tags)).append('\n');
}
message.append(messageFor(aspect, (delta && !negative) ? ("+" + value) : value, "g", 1.0, tags));
send(message.toString());
}
/**
* StatsD supports counting unique occurrences of events between flushes, Call this method to records an occurrence
* of the specified named event.
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the set
* @param eventName
* the value to be added to the set
*/
@Override
public void recordSetEvent(String aspect, String eventName) {
send(messageFor(aspect, eventName, "s"));
}
public void recordSetEvent(String aspect, String eventName, String[] tags) {
send(messageFor(aspect, eventName, "s", 1.0, tags));
}
/**
* Records an execution time in milliseconds for the specified named operation.
*
* <p>This method is non-blocking and is guaranteed not to throw an exception.</p>
*
* @param aspect
* the name of the timed operation
* @param timeInMs
* the time in milliseconds
*/
@Override
public void recordExecutionTime(String aspect, long timeInMs, double sampleRate) {
send(messageFor(aspect, Long.toString(timeInMs), "ms", sampleRate));
}
public void recordExecutionTime(String aspect, long timeInMs, double sampleRate, String[] tags) {
send(messageFor(aspect, Long.toString(timeInMs), "ms", sampleRate, tags));
}
/**
* Set tags at the client level. These tags will be added to all metrics.
*
* @param tags
* A string array containing one or more tags. Each tag can be in the format of key:value, e.g. key1:value1. Or it can be just a key, e.g. key3.
*/
public void setClientTags(String[] tags) {
List<String> tagsList = Arrays.asList(tags);
clientTags += String.join(",", tagsList);
}
private String messageFor(String aspect, String value, String type) {
return messageFor(aspect, value, type, 1.0);
}
private String messageFor(String aspect, String value, String type, double sampleRate) {
return messageFor(aspect, value, type, sampleRate, null);
}
private String messageFor(String aspect, String value, String type, double sampleRate, String[] tags) {
final String message = prefix + aspect + ':' + value + '|' + type;
return addTags(tags, (sampleRate == 1.0)
? message
: (message + "|@" + stringValueOf(sampleRate)));
}
private void send(final String message) {
sender.send(message);
}
private String stringValueOf(double value) {
NumberFormat formatter = NumberFormat.getInstance(Locale.US);
formatter.setGroupingUsed(false);
formatter.setMaximumFractionDigits(19);
return formatter.format(value);
}
private String addTags(final String[] tags, String message) {
if(tags == null && clientTags.isEmpty()) return message;
StringBuilder sb = new StringBuilder(message);
sb.append(STATS_D_TAG_PREFIX);
if(!clientTags.isEmpty()) {
sb.append(clientTags);
if(tags != null) {
sb.append(",");
}
}
sb.append(String.join(",",Arrays.asList(tags)));
return sb.toString();
}
}