1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 */
20 package org.apache.mina.filter.query;
21
22 import java.util.Map;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.Executors;
25 import java.util.concurrent.ScheduledExecutorService;
26 import java.util.concurrent.TimeUnit;
27
28 import org.apache.mina.api.AbstractIoFilter;
29 import org.apache.mina.api.IoFuture;
30 import org.apache.mina.api.IoSession;
31 import org.apache.mina.filterchain.ReadFilterChainController;
32 import org.apache.mina.session.AttributeKey;
33
34 /**
35 * A filter providing {@link IoFuture} for request/response protocol.
36 *
37 * You send a request to the connected end-point and a {@link IoFuture} is provided for handling the received request
38 * response.
39 *
40 * The filter find the received message matching the request, using {@link Request#requestId()} and
41 * {@link Response#requestId()}.
42 *
43 * <pre>
44 * RequestFilter rq = new RequestFilter();
45 *
46 * service.setFilters(.., rq);
47 *
48 * IoFuture<Response> future = rq.request(session, message, 10000);
49 *
50 * response.register(new AbstractIoFutureListener<Response>() {
51 * @Override
52 * public void completed(Response result) {
53 * System.err.println("request completed ! response : " + result);
54 * }
55 * });
56 * </pre>
57 *
58 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
59 */
60 public class RequestFilter<REQUEST extends Request, RESPONSE extends Response> extends AbstractIoFilter {
61
62 /**
63 * Send a request message and provide a {@link IoFuture} for handling the response. WARNING : for now timeout
64 * doesn't work (WIP).
65 *
66 * @param session the session where to write the request
67 * @param request the request to be issued
68 * @param timeoutInMs the timeout in milli-seconds (doesn't work Work-in-progress).
69 * @return the {@link IoFuture} for waiting or listening the completion of this request.
70 */
71 @SuppressWarnings({ "rawtypes", "unchecked" })
72 public IoFuture<RESPONSE> request(IoSession session, REQUEST request, long timeoutInMs) {
73 Map inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
74 RequestFuture<REQUEST, RESPONSE> future = new RequestFuture<REQUEST, RESPONSE>(session, request.requestId());
75
76 // schedule a timeout task
77 future.setTimeoutFuture(schedExec.schedule(future.timeout, timeoutInMs, TimeUnit.MILLISECONDS));
78
79 // save the future for completion
80 inFlight.put(request.requestId(), future);
81 session.write(request);
82 return future;
83 }
84
85 @SuppressWarnings("rawtypes")
86 static final AttributeKey<Map> IN_FLIGHT_REQUESTS = new AttributeKey<Map>(Map.class, "request.in.flight");
87
88 private ScheduledExecutorService schedExec = Executors.newScheduledThreadPool(1);
89
90 @SuppressWarnings("rawtypes")
91 @Override
92 public void sessionOpened(IoSession session) {
93 session.setAttribute(IN_FLIGHT_REQUESTS, new ConcurrentHashMap());
94 }
95
96 @SuppressWarnings("unchecked")
97 @Override
98 public void messageReceived(IoSession session, Object message, ReadFilterChainController controller) {
99 if (message instanceof Response) {
100 Object id = ((Response) message).requestId();
101 if (id != null) {
102 // got a response, let's find the query
103 Map<?, ?> inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
104 RequestFuture<REQUEST, RESPONSE> future = (RequestFuture<REQUEST, RESPONSE>) inFlight.remove(id);
105 if (future != null) {
106 future.complete((RESPONSE) message);
107 }
108 }
109 }
110
111 super.messageReceived(session, message, controller);
112 }
113
114 /**
115 * {@inheritDoc} cancel remaining requests
116 */
117 @Override
118 public void sessionClosed(IoSession session) {
119 Map<?, ?> inFlight = session.getAttribute(IN_FLIGHT_REQUESTS);
120 for (Object v : inFlight.values()) {
121 ((RequestFuture<?, ?>) v).cancel(true);
122 }
123 }
124 }