1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 */
20 package org.apache.mina.transport.nio;
21
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.SelectableChannel;
25 import java.nio.channels.SocketChannel;
26 import java.util.Queue;
27 import java.util.concurrent.atomic.AtomicBoolean;
28
29 import org.apache.mina.api.IoFuture;
30 import org.apache.mina.api.IoService;
31 import org.apache.mina.api.IoSession;
32 import org.apache.mina.service.idlechecker.IdleChecker;
33 import org.apache.mina.session.AbstractIoSession;
34 import org.apache.mina.session.DefaultWriteFuture;
35 import org.apache.mina.session.DefaultWriteQueue;
36 import org.apache.mina.session.DefaultWriteRequest;
37 import org.apache.mina.session.WriteRequest;
38 import org.apache.mina.util.AbstractIoFuture;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 /**
43 * Common ancestor for NIO based {@link IoSession} implmentation.
44 *
45 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
46 */
47 public abstract class AbstractNioSession extends AbstractIoSession {
48 /** The logger for this class */
49 private static final Logger LOG = LoggerFactory.getLogger(AbstractNioSession.class);
50
51 // A speedup for logs
52 private static final boolean IS_DEBUG = LOG.isDebugEnabled();
53
54 /** the NIO channel for this session */
55 protected final SelectableChannel channel;
56
57 /** is this session registered for being polled for write ready events */
58 private final AtomicBoolean registeredForWrite = new AtomicBoolean();
59
60 /** the queue of pending writes for the session, to be dequeued by the {@link SelectorLoop} */
61 private final Queue<WriteRequest> writeQueue = new DefaultWriteQueue();
62
63 public AbstractNioSession(IoService service, SelectableChannel channel, IdleChecker idleChecker) {
64 super(service, idleChecker);
65 this.channel = channel;
66 }
67
68 /**
69 * Writes the message immediately. If we can't write all the message, we will get back the number of written bytes.
70 *
71 * @param message the message to write
72 * @return the number of written bytes
73 */
74 protected abstract int writeDirect(Object message);
75
76 /**
77 * Copy the HeapBuffer into a DirectBuffer, if needed.
78 *
79 * @param writeRequest The request containing the HeapBuffer
80 * @param createNew A flag to force the creation of a DirectBuffer
81 * @return A DirectBuffer
82 */
83 protected abstract ByteBuffer convertToDirectBuffer(WriteRequest writeRequest, boolean createNew);
84
85 // ------------------------------------------------------------------------
86 // Close session management
87 // ------------------------------------------------------------------------
88
89 /** we pre-allocate a close future for lock-less {@link #close(boolean)} */
90 private final IoFuture<Void> closeFuture = new AbstractIoFuture<Void>() {
91
92 /**
93 * {@inheritDoc}
94 */
95 @Override
96 protected boolean cancelOwner(final boolean mayInterruptIfRunning) {
97 // we don't cancel close
98 return false;
99 }
100 };
101
102 /**
103 * {@inheritDoc}
104 */
105 @Override
106 public IoFuture<Void> close(final boolean immediately) {
107 switch (state) {
108 case CREATED:
109 LOG.error("Session {} not opened", this);
110 throw new IllegalStateException("cannot close an not opened session");
111 case CONNECTED:
112 state = SessionState.CLOSING;
113 if (immediately) {
114 channelClose();
115 processSessionClosed();
116 } else {
117 // flush this session the flushing code will close the session
118 flushWriteQueue();
119 }
120 break;
121 case CLOSING:
122 // return the same future
123 LOG.warn("Already closing session {}", this);
124 break;
125 case CLOSED:
126 LOG.warn("Already closed session {}", this);
127 break;
128 default:
129 throw new IllegalStateException("not implemented session state : " + state);
130 }
131
132 return closeFuture;
133 }
134
135 /**
136 * Close the inner socket channel
137 */
138 protected abstract void channelClose();
139
140 /**
141 * {@inheritDoc}
142 */
143 @Override
144 public WriteRequest enqueueWriteRequest(WriteRequest writeRequest) {
145 if (IS_DEBUG) {
146 LOG.debug("enqueueWriteRequest {}", writeRequest);
147 }
148
149 if (isConnectedSecured()) {
150 // SSL/TLS : we have to encrypt the message
151 SslHelper sslHelper = getAttribute(SSL_HELPER, null);
152
153 if (sslHelper == null) {
154 throw new IllegalStateException();
155 }
156
157 writeRequest = sslHelper.processWrite(this, writeRequest.getMessage(), writeQueue);
158 }
159
160 /*synchronized (writeQueue)*/{
161 ByteBuffer message = (ByteBuffer) writeRequest.getMessage();
162
163 if (writeQueue.isEmpty()) {
164 // Transfer the buffer in a DirectByteBuffer if it's a HeapByteBuffer and if it's too big
165 message = convertToDirectBuffer(writeRequest, false);
166
167 // We don't have anything in the writeQueue, let's try to write the
168 // data in the channel immediately if we can
169 int written = writeDirect(writeRequest.getMessage());
170
171 if (IS_DEBUG) {
172 LOG.debug("wrote {} bytes to {}", written, this);
173 }
174
175 if (written > 0) {
176 incrementWrittenBytes(written);
177 }
178
179 // Update the idle status for this session
180 idleChecker.sessionWritten(this, System.currentTimeMillis());
181 int remaining = message.remaining();
182
183 if ((written < 0) || (remaining > 0)) {
184 // Create a DirectBuffer unconditionally
185 convertToDirectBuffer(writeRequest, true);
186
187 // We have to push the request on the writeQueue
188 writeQueue.add(writeRequest);
189
190 // If it wasn't, we register this session as interested to write.
191 // It's done in atomic fashion for avoiding two concurrent registering.
192 if (!registeredForWrite.getAndSet(true)) {
193 flushWriteQueue();
194 }
195 } else {
196 // The message has been fully written : update the stats, and signal the handler
197 // generate the message sent event
198 // complete the future if we have one (we should...)
199 final DefaultWriteFuture future = (DefaultWriteFuture) writeRequest.getFuture();
200
201 if (future != null) {
202 future.complete();
203 }
204
205 final Object highLevel = ((DefaultWriteRequest) writeRequest).getOriginalMessage();
206
207 if (highLevel != null) {
208 processMessageSent(highLevel);
209 }
210 }
211 } else {
212 // Transfer the buffer in a DirectByteBuffer if it's a HeapByteBuffer
213 message = convertToDirectBuffer(writeRequest, true);
214
215 // We have to push the request on the writeQueue
216 writeQueue.add(writeRequest);
217 }
218 }
219
220 return writeRequest;
221 }
222
223 public abstract void flushWriteQueue();
224
225 public void setNotRegisteredForWrite() {
226 registeredForWrite.set(false);
227 }
228
229 protected boolean isRegisteredForWrite() {
230 return registeredForWrite.get();
231 }
232
233 /**
234 * Get the {@link Queue} of this session. The write queue contains the pending writes.
235 *
236 * @return the write queue of this session
237 */
238 public Queue<WriteRequest> getWriteQueue() {
239 return writeQueue;
240 }
241
242 /**
243 * Process a write operation. This will be executed only because the session has something to write into the
244 * channel.
245 */
246 public void processWrite(SelectorLoop selectorLoop) {
247 try {
248 if (IS_DEBUG) {
249 LOG.debug("ready for write");
250 LOG.debug("writable session : {}", this);
251 }
252
253 do {
254 // get a write request from the queue. We left it in the queue,
255 // just in case we can't write all of the message content into
256 // the channel : we will have to retrieve the message later
257 final WriteRequest writeRequest = writeQueue.peek();
258
259 if (writeRequest == null) {
260 // Nothing to write : we are done
261 break;
262 }
263
264 // The message is necessarily a ByteBuffer at this point
265 ByteBuffer buf = (ByteBuffer) writeRequest.getMessage();
266
267 // Note that if the connection is secured, the buffer
268 // already contains encrypted data.
269
270 // Try to write the data, and get back the number of bytes
271 // actually written
272 int written = ((SocketChannel) channel).write(buf);
273
274 if (IS_DEBUG) {
275 LOG.debug("wrote {} bytes to {}", written, this);
276 }
277
278 if (written > 0) {
279 incrementWrittenBytes(written);
280 }
281
282 // Update the idle status for this session
283 idleChecker.sessionWritten(this, System.currentTimeMillis());
284
285 // Ok, we may not have written everything. Check that.
286 if (buf.remaining() == 0) {
287 // completed write request, let's remove it (we use poll() instead
288 // of remove(), because remove() may throw an exception if the
289 // queue is empty.
290 writeQueue.poll();
291
292 // complete the future if we have one (we should...)
293 final DefaultWriteFuture future = (DefaultWriteFuture) writeRequest.getFuture();
294
295 if (future != null) {
296 future.complete();
297 }
298
299 // generate the message sent event
300 final Object highLevel = ((DefaultWriteRequest) writeRequest).getOriginalMessage();
301
302 if (highLevel != null) {
303 processMessageSent(highLevel);
304 }
305 } else {
306 // output socket buffer is full, we need
307 // to give up until next selection for
308 // writing.
309 break;
310 }
311 } while (!writeQueue.isEmpty());
312
313 // We may have exited from the loop for some other reason
314 // that an empty queue
315 // if the session is no more interested in writing, we need
316 // to stop listening for OP_WRITE events
317 //
318 // IMPORTANT : this section is synchronized so that the OP_WRITE flag
319 // can be set safely by both the selector thread and the writer thread.
320 synchronized (writeQueue) {
321 if (writeQueue.isEmpty()) {
322 if (isClosing()) {
323 if (IS_DEBUG) {
324 LOG.debug("closing session {} have empty write queue, so we close it", this);
325 }
326
327 // we was flushing writes, now we to the close
328 channelClose();
329 } else {
330 // no more write event needed
331 selectorLoop.modifyRegistration(false, !isReadSuspended(), false, (SelectorListener) this,
332 channel, false);
333
334 // Reset the flag in IoSession too
335 setNotRegisteredForWrite();
336 }
337 }
338 // if the queue is not empty, that means we have some more data to write :
339 // the channel OP_WRITE interest remains as it was.
340 }
341 } catch (final IOException e) {
342 LOG.error("Exception while writing : ", e);
343 processException(e);
344 }
345 }
346 }