1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.mina.util;
20
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.concurrent.CancellationException;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.TimeoutException;
28 import java.util.concurrent.atomic.AtomicReference;
29
30 import org.apache.mina.api.IoFuture;
31 import org.apache.mina.api.IoFutureListener;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 /**
36 * An abstract implementation of {@link IoFuture}. Owners of this future must implement {@link #cancelOwner(boolean)} to
37 * receive notifications of when the future should be canceled.
38 * <p>
39 * Concrete implementations of this abstract class should consider overriding the two methods
40 * {@link #scheduleResult(org.apache.mina.api.IoFutureListener, Object)} and
41 * {@link #scheduleException(org.apache.mina.api.IoFutureListener, Throwable)} so that listeners are called in a
42 * separate thread. The default implementations may end up calling the listener in the same thread that is registering
43 * the listener, before the registration has completed.
44 *
45 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
46 */
47 public abstract class AbstractIoFuture<V> implements IoFuture<V> {
48
49 static final Logger LOG = LoggerFactory.getLogger(AbstractIoFuture.class);
50
51 private final CountDownLatch latch = new CountDownLatch(1);
52
53 private final List<IoFutureListener<V>> listeners = new ArrayList<IoFutureListener<V>>();
54
55 private final AtomicReference<Object> result = new AtomicReference<Object>();
56
57 /**
58 * {@inheritDoc}
59 */
60 @Override
61 @SuppressWarnings({ "unchecked" })
62 public IoFuture<V> register(IoFutureListener<V> listener) {
63
64 LOG.debug("registering listener {}", listener);
65
66 synchronized (latch) {
67 if (!isDone()) {
68 LOG.debug("future is not done, adding listener to listener set");
69 listeners.add(listener);
70 listener = null;
71 }
72 }
73
74 if (listener != null) {
75 LOG.debug("future is done calling listener");
76 Object object = result.get();
77
78 if (object instanceof Throwable) {
79 scheduleException(listener, (Throwable) object);
80 } else {
81 scheduleResult(listener, (V) object);
82 }
83 }
84
85 return this;
86 }
87
88 /**
89 * {@inheritDoc}
90 */
91 @Override
92 public boolean cancel(boolean mayInterruptIfRunning) {
93
94 LOG.debug("Attempting to cancel");
95
96 CancellationException ce = null;
97 synchronized (latch) {
98 if (!isCancelled() && !isDone() && cancelOwner(mayInterruptIfRunning)) {
99
100 LOG.debug("Successfully cancelled");
101
102 ce = new CancellationException();
103 result.set(ce);
104 } else {
105 LOG.debug("Unable to cancel");
106 }
107
108 latch.countDown();
109 }
110
111 if (ce != null) {
112 LOG.debug("Calling listeners");
113
114 for (IoFutureListener<V> listener : listeners) {
115 scheduleException(listener, ce);
116 }
117 }
118
119 return ce != null;
120 }
121
122 /**
123 * {@inheritDoc}
124 */
125 @Override
126 public boolean isCancelled() {
127 return result.get() instanceof CancellationException;
128 }
129
130 /**
131 * {@inheritDoc}
132 */
133 @Override
134 public boolean isDone() {
135 return latch.getCount() == 0;
136 }
137
138 /**
139 * {@inheritDoc}
140 */
141 @Override
142 @SuppressWarnings({ "unchecked" })
143 public V get() throws InterruptedException, ExecutionException {
144
145 LOG.trace("Entering wait");
146 latch.await();
147 LOG.trace("Wait completed");
148
149 if (isCancelled()) {
150 throw new CancellationException();
151 }
152
153 Object object = result.get();
154
155 if (object instanceof ExecutionException) {
156 throw (ExecutionException) object;
157 } else {
158 return (V) object;
159 }
160 }
161
162 /**
163 * {@inheritDoc}
164 */
165 @Override
166 @SuppressWarnings({ "unchecked" })
167 public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
168
169 LOG.trace("Entering wait");
170
171 if (!latch.await(timeout, unit)) {
172 throw new TimeoutException();
173 }
174
175 LOG.trace("Wait completed");
176
177 if (isCancelled()) {
178 throw new CancellationException();
179 }
180
181 Object object = result.get();
182
183 if (object instanceof ExecutionException) {
184 throw (ExecutionException) object;
185 } else {
186 return (V) object;
187 }
188 }
189
190 /**
191 * Notify the owner of this future that a client is attempting to cancel. This attempt will fail if the task has
192 * already completed, has already been cancelled, or could not be cancelled for some other reason. If successful,
193 * and this task has not started when <tt>cancel</tt> is called, this task should never run. If the task has already
194 * started, then the <tt>mayInterruptIfRunning</tt> parameter determines whether the thread executing this task
195 * should be interrupted in an attempt to stop the task.
196 * <p/>
197 * <p>
198 * After this method returns, subsequent calls to {@link #isDone} will always return <tt>true</tt>. Subsequent calls
199 * to {@link #isCancelled} will always return <tt>true</tt> if this method returned <tt>true</tt>.
200 * <p/>
201 * <b>Note:</b> implementations must never throw an exception.
202 *
203 * @param mayInterruptIfRunning <tt>true</tt> if the owner executing this task should be interrupted; otherwise,
204 * in-progress tasks are allowed to complete
205 * @return <tt>false</tt> if the task could not be cancelled, typically because it has already completed normally;
206 * <tt>true</tt> otherwise
207 */
208 protected abstract boolean cancelOwner(boolean mayInterruptIfRunning);
209
210 /**
211 * Default implementation to call a listener's {@link IoFutureListener#completed(Object)} method. Owners may
212 * override this method so that the listener is called from a thread pool.
213 *
214 * @param listener the listener to call
215 * @param result the result to pass to the listener
216 */
217 protected void scheduleResult(IoFutureListener<V> listener, V result) {
218 LOG.debug("Calling the default result scheduler");
219
220 try {
221 listener.completed(result);
222 } catch (Exception e) {
223 LOG.warn("Listener threw an exception", e);
224 }
225 }
226
227 /**
228 * Default implementation to call a listener's {@link IoFutureListener#exception(Throwable)} method. Owners may
229 * override this method so that the listener is called from a thread pool.
230 *
231 * @param listener the listener to call
232 * @param throwable the exception to pass to the listener
233 */
234 protected void scheduleException(IoFutureListener<V> listener, Throwable throwable) {
235 LOG.debug("Calling the default exception scheduler");
236
237 try {
238 listener.exception(throwable);
239 } catch (Exception e) {
240 LOG.warn("Listener threw an exception", e);
241 }
242 }
243
244 /**
245 * Set the future result of the executing task. Any {@link IoFutureListener}s are notified of the
246 *
247 * @param value the value returned by the executing task.
248 */
249 protected final void setResult(V value) {
250 assert !isDone();
251
252 synchronized (latch) {
253 result.set(value);
254 latch.countDown();
255 }
256
257 for (IoFutureListener<V> listener : listeners) {
258 scheduleResult(listener, value);
259 }
260
261 listeners.clear();
262 }
263
264 /**
265 * Set the future result as a {@link Throwable}, indicating that a throwable was thrown while executing the task.
266 * This value is usually set by the future result owner.
267 * <p/>
268 * Any {@link IoFutureListener}s are notified of the exception.
269 *
270 * @param t the throwable that was thrown while executing the task.
271 */
272 protected final void setException(Throwable t) {
273 assert !isDone();
274
275 ExecutionException ee = new ExecutionException(t);
276
277 synchronized (latch) {
278 result.set(ee);
279 latch.countDown();
280 }
281
282 for (IoFutureListener<V> listener : listeners) {
283 scheduleException(listener, ee);
284 }
285
286 listeners.clear();
287 }
288 }