001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io.compress;
020
021 import java.io.EOFException;
022 import java.io.IOException;
023 import java.io.InputStream;
024
025 import org.apache.hadoop.classification.InterfaceAudience;
026 import org.apache.hadoop.classification.InterfaceStability;
027 import org.apache.hadoop.io.compress.Decompressor;
028
029 @InterfaceAudience.Public
030 @InterfaceStability.Evolving
031 public class DecompressorStream extends CompressionInputStream {
032 protected Decompressor decompressor = null;
033 protected byte[] buffer;
034 protected boolean eof = false;
035 protected boolean closed = false;
036 private int lastBytesSent = 0;
037
038 public DecompressorStream(InputStream in, Decompressor decompressor,
039 int bufferSize)
040 throws IOException {
041 super(in);
042
043 if (in == null || decompressor == null) {
044 throw new NullPointerException();
045 } else if (bufferSize <= 0) {
046 throw new IllegalArgumentException("Illegal bufferSize");
047 }
048
049 this.decompressor = decompressor;
050 buffer = new byte[bufferSize];
051 }
052
053 public DecompressorStream(InputStream in, Decompressor decompressor)
054 throws IOException {
055 this(in, decompressor, 512);
056 }
057
058 /**
059 * Allow derived classes to directly set the underlying stream.
060 *
061 * @param in Underlying input stream.
062 * @throws IOException
063 */
064 protected DecompressorStream(InputStream in) throws IOException {
065 super(in);
066 }
067
068 private byte[] oneByte = new byte[1];
069 public int read() throws IOException {
070 checkStream();
071 return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff);
072 }
073
074 public int read(byte[] b, int off, int len) throws IOException {
075 checkStream();
076
077 if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
078 throw new IndexOutOfBoundsException();
079 } else if (len == 0) {
080 return 0;
081 }
082
083 return decompress(b, off, len);
084 }
085
086 protected int decompress(byte[] b, int off, int len) throws IOException {
087 int n = 0;
088
089 while ((n = decompressor.decompress(b, off, len)) == 0) {
090 if (decompressor.needsDictionary()) {
091 eof = true;
092 return -1;
093 }
094
095 if (decompressor.finished()) {
096 // First see if there was any leftover buffered input from previous
097 // stream; if not, attempt to refill buffer. If refill -> EOF, we're
098 // all done; else reset, fix up input buffer, and get ready for next
099 // concatenated substream/"member".
100 int nRemaining = decompressor.getRemaining();
101 if (nRemaining == 0) {
102 int m = getCompressedData();
103 if (m == -1) {
104 // apparently the previous end-of-stream was also end-of-file:
105 // return success, as if we had never called getCompressedData()
106 eof = true;
107 return -1;
108 }
109 decompressor.reset();
110 decompressor.setInput(buffer, 0, m);
111 lastBytesSent = m;
112 } else {
113 // looks like it's a concatenated stream: reset low-level zlib (or
114 // other engine) and buffers, then "resend" remaining input data
115 decompressor.reset();
116 int leftoverOffset = lastBytesSent - nRemaining;
117 assert (leftoverOffset >= 0);
118 // this recopies userBuf -> direct buffer if using native libraries:
119 decompressor.setInput(buffer, leftoverOffset, nRemaining);
120 // NOTE: this is the one place we do NOT want to save the number
121 // of bytes sent (nRemaining here) into lastBytesSent: since we
122 // are resending what we've already sent before, offset is nonzero
123 // in general (only way it could be zero is if it already equals
124 // nRemaining), which would then screw up the offset calculation
125 // _next_ time around. IOW, getRemaining() is in terms of the
126 // original, zero-offset bufferload, so lastBytesSent must be as
127 // well. Cheesy ASCII art:
128 //
129 // <------------ m, lastBytesSent ----------->
130 // +===============================================+
131 // buffer: |1111111111|22222222222222222|333333333333| |
132 // +===============================================+
133 // #1: <-- off -->|<-------- nRemaining --------->
134 // #2: <----------- off ----------->|<-- nRem. -->
135 // #3: (final substream: nRemaining == 0; eof = true)
136 //
137 // If lastBytesSent is anything other than m, as shown, then "off"
138 // will be calculated incorrectly.
139 }
140 } else if (decompressor.needsInput()) {
141 int m = getCompressedData();
142 if (m == -1) {
143 throw new EOFException("Unexpected end of input stream");
144 }
145 decompressor.setInput(buffer, 0, m);
146 lastBytesSent = m;
147 }
148 }
149
150 return n;
151 }
152
153 protected int getCompressedData() throws IOException {
154 checkStream();
155
156 // note that the _caller_ is now required to call setInput() or throw
157 return in.read(buffer, 0, buffer.length);
158 }
159
160 protected void checkStream() throws IOException {
161 if (closed) {
162 throw new IOException("Stream closed");
163 }
164 }
165
166 public void resetState() throws IOException {
167 decompressor.reset();
168 }
169
170 private byte[] skipBytes = new byte[512];
171 public long skip(long n) throws IOException {
172 // Sanity checks
173 if (n < 0) {
174 throw new IllegalArgumentException("negative skip length");
175 }
176 checkStream();
177
178 // Read 'n' bytes
179 int skipped = 0;
180 while (skipped < n) {
181 int len = Math.min(((int)n - skipped), skipBytes.length);
182 len = read(skipBytes, 0, len);
183 if (len == -1) {
184 eof = true;
185 break;
186 }
187 skipped += len;
188 }
189 return skipped;
190 }
191
192 public int available() throws IOException {
193 checkStream();
194 return (eof) ? 0 : 1;
195 }
196
197 public void close() throws IOException {
198 if (!closed) {
199 in.close();
200 closed = true;
201 }
202 }
203
204 public boolean markSupported() {
205 return false;
206 }
207
208 public synchronized void mark(int readlimit) {
209 }
210
211 public synchronized void reset() throws IOException {
212 throw new IOException("mark/reset not supported");
213 }
214
215 }