-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathEventSourceClient.java
More file actions
91 lines (76 loc) · 3.45 KB
/
EventSourceClient.java
File metadata and controls
91 lines (76 loc) · 3.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package com.github.eventsource.client;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
import org.jboss.netty.handler.codec.string.StringDecoder;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class EventSourceClient {
private final ClientBootstrap bootstrap;
private final Executor eventExecutor;
private final HashMap<Channel, ChannelUpstreamHandler> handlerMap = new HashMap<Channel, ChannelUpstreamHandler>();
public EventSourceClient() {
this(Executors.newSingleThreadExecutor());
}
public EventSourceClient(Executor eventExecutor) {
this.eventExecutor = eventExecutor;
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newSingleThreadExecutor(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("line", new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, Delimiters.lineDelimiter()));
pipeline.addLast("string", new StringDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder());
pipeline.addLast("es-handler", new Handler());
return pipeline;
}
});
}
public ChannelFuture connect(InetSocketAddress address, ChannelUpstreamHandler handler) {
synchronized (handlerMap) {
ChannelFuture channelFuture = bootstrap.connect(address);
handlerMap.put(channelFuture.getChannel(), handler);
return channelFuture;
}
}
private class Handler extends SimpleChannelUpstreamHandler {
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
final ChannelUpstreamHandler handler;
synchronized (handlerMap) {
handler = handlerMap.get(ctx.getChannel());
}
if (handler == null) {
super.handleUpstream(ctx, e);
if (e instanceof ChannelStateEvent && ((ChannelStateEvent) e).getState() == ChannelState.OPEN) {
return; //Do nothing, this one will not be dispatched to handler, but it's ok
}
System.err.println("Something wrong with dispatching");
} else {
handler.handleUpstream(ctx, e);
if (e instanceof ChannelStateEvent) {
ChannelStateEvent stateEvent = (ChannelStateEvent) e;
if (stateEvent.getState() == ChannelState.BOUND && stateEvent.getValue() == null) {
synchronized (handlerMap) {
handlerMap.remove(ctx.getChannel());
}
}
}
}
}
}
public Executor getEventExecutor() {
return eventExecutor;
}
public void shutdown() {
bootstrap.releaseExternalResources();
}
}