-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMovieFinderServiceImpl.java
More file actions
124 lines (106 loc) · 5.47 KB
/
MovieFinderServiceImpl.java
File metadata and controls
124 lines (106 loc) · 5.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package server;
import com.proto.moviefinder.MovieFinderServiceGrpc;
import com.proto.moviefinder.MovieRequest;
import com.proto.moviefinder.MovieResponse;
import com.proto.moviestore.MovieStoreRequest;
import com.proto.moviestore.MovieStoreServiceGrpc;
import com.proto.recommender.RecommenderRequest;
import com.proto.recommender.RecommenderResponse;
import com.proto.recommender.RecommenderServiceGrpc;
import com.proto.userpreferences.UserPreferencesRequest;
import com.proto.userpreferences.UserPreferencesResponse;
import com.proto.userpreferences.UserPreferencesServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Orchestrates logic needed to find, filter, recommend movies.
*/
public class MovieFinderServiceImpl extends MovieFinderServiceGrpc.MovieFinderServiceImplBase {
private static final int DEFAULT_MOVIE_STORE_SERVER_PORT = 50052;
private static final int DEFAULT_USER_PREFERENCES_SERVER_PORT = 50053;
private static final int DEFAULT_RECOMMENDER_SERVER_PORT = 50054;
private static final String DEFAULT_SERVER_HOST = "localhost";
private final String movieStoreEndpoint;
private final String userPreferencesEndpoint;
private final String recommenderEndpoint;
public MovieFinderServiceImpl() {
this.movieStoreEndpoint = Optional.ofNullable(System.getenv("MOVIE_STORE_SERVER_ENDPOINT"))
.orElseGet(() -> DEFAULT_SERVER_HOST + ":" + DEFAULT_MOVIE_STORE_SERVER_PORT);
this.userPreferencesEndpoint = Optional.ofNullable(System.getenv("USER_PREFERENCES_SERVER_ENDPOINT"))
.orElseGet(() -> DEFAULT_SERVER_HOST + ":" + DEFAULT_USER_PREFERENCES_SERVER_PORT);
this.recommenderEndpoint = Optional.ofNullable(System.getenv("RECOMMENDER_SERVER_ENDPOINT"))
.orElseGet(() -> DEFAULT_SERVER_HOST + ":" + DEFAULT_RECOMMENDER_SERVER_PORT);
System.out.println("Using movie-store endpoint " + this.movieStoreEndpoint);
System.out.println("Using user-preferences endpoint " + this.userPreferencesEndpoint);
System.out.println("Using recommender endpoint " + this.recommenderEndpoint);
}
/**
* Returns movies based on:
* - the genre specified in the request
* - the user's preferences
* - recommendations engine
* <p>
* <p>
* The order of filtering operations is:
* 1. get all movies by genre
* 2. filter by user preferences
* 3. recommends one of them
*/
@Override
public void getMovie(MovieRequest request, StreamObserver<MovieResponse> responseObserver) {
String userId = request.getUserid();
MovieStoreServiceGrpc.MovieStoreServiceBlockingStub movieStoreClient = MovieStoreServiceGrpc.newBlockingStub(getChannel(movieStoreEndpoint));
UserPreferencesServiceGrpc.UserPreferencesServiceStub userPreferencesClient = UserPreferencesServiceGrpc.newStub(getChannel(userPreferencesEndpoint));
RecommenderServiceGrpc.RecommenderServiceStub recommenderClient = RecommenderServiceGrpc.newStub(getChannel(recommenderEndpoint));
StreamObserver<RecommenderRequest> recommenderRequestObserver = recommenderClient.getRecommendedMovie(
new StreamObserver<>() {
@Override
public void onNext(RecommenderResponse value) {
responseObserver.onNext(MovieResponse.newBuilder().setMovie(value.getMovie()).build());
System.out.println("Recommended movie: " + value.getMovie());
}
@Override
public void onError(Throwable t) {
responseObserver.onError(t);
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
});
StreamObserver<UserPreferencesRequest> userPreferencesRequestObserver = userPreferencesClient.getShortlistedMovies(
new StreamObserver<>() {
@Override
public void onNext(UserPreferencesResponse value) {
recommenderRequestObserver.onNext(
RecommenderRequest.newBuilder().setUserid(userId).setMovie(value.getMovie()).build()
);
}
@Override
public void onError(Throwable t) {
recommenderRequestObserver.onError(t);
}
@Override
public void onCompleted() {
recommenderRequestObserver.onCompleted();
}
});
movieStoreClient.getMovies(MovieStoreRequest.newBuilder().setGenre(request.getGenre()).build())
.forEachRemaining(response -> {
userPreferencesRequestObserver.onNext(
UserPreferencesRequest.newBuilder().setMovie(response.getMovie()).setUserid(userId).build()
);
});
userPreferencesRequestObserver.onCompleted();
}
private ManagedChannel getChannel(String endpoint) {
var tokens = endpoint.split(":");
var host = tokens[0];
var port = Integer.parseInt(tokens[1]);
return ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
}
}