001    /**
002     * Copyright (c) 2000-2012 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.io.unsync;
016    
017    import java.io.IOException;
018    import java.io.InputStream;
019    
020    /**
021     * <p>
022     * See http://issues.liferay.com/browse/LPS-6648.
023     * </p>
024     *
025     * @author Shuyang Zhou
026     */
027    public class UnsyncBufferedInputStream extends UnsyncFilterInputStream {
028    
029            public UnsyncBufferedInputStream(InputStream inputStream) {
030                    this(inputStream, _DEFAULT_BUFFER_SIZE);
031            }
032    
033            public UnsyncBufferedInputStream(InputStream inputStream, int size) {
034                    super(inputStream);
035    
036                    if (size <= 0) {
037                            throw new IllegalArgumentException("Size is less than 0");
038                    }
039    
040                    buffer = new byte[size];
041            }
042    
043            @Override
044            public int available() throws IOException {
045                    if (inputStream == null) {
046                            throw new IOException("Input stream is null");
047                    }
048    
049                    return inputStream.available() + (firstInvalidIndex - index);
050            }
051    
052            @Override
053            public void close() throws IOException {
054                    if (inputStream != null) {
055                            inputStream.close();
056    
057                            inputStream = null;
058                            buffer = null;
059                    }
060            }
061    
062            @Override
063            public void mark(int readLimit) {
064                    if (readLimit <= 0) {
065                            return;
066                    }
067    
068                    markLimitIndex = readLimit;
069    
070                    if (index > 0) {
071    
072                            int available = firstInvalidIndex - index;
073    
074                            if (available > 0) {
075    
076                                    // Shuffle mark beginning to buffer beginning
077    
078                                    System.arraycopy(buffer, index, buffer, 0, available);
079    
080                                    index = 0;
081    
082                                    firstInvalidIndex = available;
083                            }
084                            else {
085    
086                                    // Reset buffer states
087    
088                                    index = firstInvalidIndex = 0;
089                            }
090                    }
091            }
092    
093            @Override
094            public boolean markSupported() {
095                    return true;
096            }
097    
098            @Override
099            public int read() throws IOException {
100                    if (inputStream == null) {
101                            throw new IOException("Input stream is null");
102                    }
103    
104                    if (index >= firstInvalidIndex) {
105                            fillInBuffer();
106    
107                            if (index >= firstInvalidIndex) {
108                                    return -1;
109                            }
110                    }
111    
112                    return buffer[index++] & 0xff;
113            }
114    
115            @Override
116            public int read(byte[] bytes) throws IOException {
117                    return read(bytes, 0, bytes.length);
118            }
119    
120            @Override
121            public int read(byte[] bytes, int offset, int length)
122                    throws IOException {
123    
124                    if (inputStream == null) {
125                            throw new IOException("Input stream is null");
126                    }
127    
128                    if (length <= 0) {
129                            return 0;
130                    }
131    
132                    int read = 0;
133    
134                    while (true) {
135    
136                            // Try to at least read some data
137    
138                            int currentRead = readOnce(bytes, offset + read, length - read);
139    
140                            if (currentRead <= 0) {
141                                    if (read == 0) {
142                                            read = currentRead;
143                                    }
144    
145                                    break;
146                            }
147    
148                            read += currentRead;
149    
150                            if ((read >= length) || (inputStream.available() <= 0)) {
151    
152                                    // Read enough or further reading may be blocked, stop reading
153    
154                                    break;
155                            }
156                    }
157    
158                    return read;
159            }
160    
161            @Override
162            public void reset() throws IOException {
163                    if (inputStream == null) {
164                            throw new IOException("Input stream is null");
165                    }
166    
167                    if (markLimitIndex < 0) {
168                            throw new IOException("Resetting to invalid mark");
169                    }
170    
171                    index = 0;
172            }
173    
174            @Override
175            public long skip(long skip) throws IOException {
176                    if (inputStream == null) {
177                            throw new IOException("Input stream is null");
178                    }
179    
180                    if (skip <= 0) {
181                            return 0;
182                    }
183    
184                    long available = firstInvalidIndex - index;
185    
186                    if (available <= 0) {
187                            if (markLimitIndex < 0) {
188    
189                                    // No mark required, skip the underlying input stream
190    
191                                    return inputStream.skip(skip);
192                            }
193                            else {
194    
195                                    // Mark required, save the skipped data
196    
197                                    fillInBuffer();
198    
199                                    available = firstInvalidIndex - index;
200    
201                                    if (available <= 0) {
202                                            return 0;
203                                    }
204                            }
205                    }
206    
207                    // Skip the data in buffer
208    
209                    if (available < skip) {
210                            skip = available;
211                    }
212    
213                    index += skip;
214    
215                    return skip;
216            }
217    
218            protected void fillInBuffer() throws IOException {
219                    if (markLimitIndex < 0) {
220    
221                            // No mark required, fill the buffer
222    
223                            index = firstInvalidIndex = 0;
224    
225                            int number = inputStream.read(buffer);
226    
227                            if (number > 0) {
228                                    firstInvalidIndex = number;
229                            }
230    
231                            return;
232                    }
233    
234                    // Mark required
235    
236                    if (index >= markLimitIndex) {
237    
238                            // Passed mark limit indexs, get rid of all cache data
239    
240                            markLimitIndex = -1;
241    
242                            index = firstInvalidIndex = 0;
243                    }
244                    else if (index == buffer.length) {
245    
246                            // Cannot get rid of cache data and there is no room to read in any
247                            // more data, so grow the buffer
248    
249                            int newBufferSize = buffer.length * 2;
250    
251                            if (newBufferSize > markLimitIndex) {
252                                    newBufferSize = markLimitIndex;
253                            }
254    
255                            byte[] newBuffer = new byte[newBufferSize];
256    
257                            System.arraycopy(buffer, 0, newBuffer, 0, buffer.length);
258    
259                            buffer = newBuffer;
260                    }
261    
262                    // Read underlying input stream since the buffer has more space
263    
264                    firstInvalidIndex = index;
265    
266                    int number = inputStream.read(buffer, index, buffer.length - index);
267    
268                    if (number > 0) {
269                            firstInvalidIndex += number;
270                    }
271            }
272    
273            protected int readOnce(byte[] bytes, int offset, int length)
274                    throws IOException {
275    
276                    int available = firstInvalidIndex - index;
277    
278                    if (available <= 0) {
279    
280                            // Buffer is empty, read from under input stream
281    
282                            if ((markLimitIndex < 0) && (length >= buffer.length)) {
283    
284                                    // No mark required, left read block is no less than buffer,
285                                    // read through buffer is inefficient, so directly read from
286                                    // underlying input stream
287    
288                                    return inputStream.read(bytes, offset, length);
289                            }
290                            else {
291    
292                                    // Mark is required, has to read through the buffer to remember
293                                    // data
294    
295                                    fillInBuffer();
296    
297                                    available = firstInvalidIndex - index;
298    
299                                    if (available <= 0) {
300                                            return -1;
301                                    }
302                            }
303                    }
304    
305                    if (length > available) {
306                            length = available;
307                    }
308    
309                    System.arraycopy(buffer, index, bytes, offset, length);
310    
311                    index += length;
312    
313                    return length;
314            }
315    
316            protected byte[] buffer;
317            protected int firstInvalidIndex;
318            protected int index;
319            protected int markLimitIndex = -1;
320    
321            private static final int _DEFAULT_BUFFER_SIZE = 8192;
322    
323    }