001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.io.compress;
020
021 import java.io.BufferedInputStream;
022 import java.io.IOException;
023 import java.io.InputStream;
024 import java.io.OutputStream;
025
026
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029 import org.apache.hadoop.fs.Seekable;
030 import org.apache.hadoop.io.compress.bzip2.BZip2Constants;
031 import org.apache.hadoop.io.compress.bzip2.BZip2DummyCompressor;
032 import org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor;
033 import org.apache.hadoop.io.compress.bzip2.CBZip2InputStream;
034 import org.apache.hadoop.io.compress.bzip2.CBZip2OutputStream;
035
036 /**
037 * This class provides CompressionOutputStream and CompressionInputStream for
038 * compression and decompression. Currently we dont have an implementation of
039 * the Compressor and Decompressor interfaces, so those methods of
040 * CompressionCodec which have a Compressor or Decompressor type argument, throw
041 * UnsupportedOperationException.
042 */
043 @InterfaceAudience.Public
044 @InterfaceStability.Evolving
045 public class BZip2Codec implements SplittableCompressionCodec {
046
047 private static final String HEADER = "BZ";
048 private static final int HEADER_LEN = HEADER.length();
049 private static final String SUB_HEADER = "h9";
050 private static final int SUB_HEADER_LEN = SUB_HEADER.length();
051
052 /**
053 * Creates a new instance of BZip2Codec
054 */
055 public BZip2Codec() { }
056
057 /**
058 * Creates CompressionOutputStream for BZip2
059 *
060 * @param out
061 * The output Stream
062 * @return The BZip2 CompressionOutputStream
063 * @throws java.io.IOException
064 * Throws IO exception
065 */
066 public CompressionOutputStream createOutputStream(OutputStream out)
067 throws IOException {
068 return new BZip2CompressionOutputStream(out);
069 }
070
071 /**
072 * Creates a compressor using given OutputStream.
073 *
074 * @return CompressionOutputStream
075 @throws java.io.IOException
076 */
077 public CompressionOutputStream createOutputStream(OutputStream out,
078 Compressor compressor) throws IOException {
079 return createOutputStream(out);
080 }
081
082 /**
083 * This functionality is currently not supported.
084 *
085 * @return BZip2DummyCompressor.class
086 */
087 public Class<? extends org.apache.hadoop.io.compress.Compressor> getCompressorType() {
088 return BZip2DummyCompressor.class;
089 }
090
091 /**
092 * This functionality is currently not supported.
093 *
094 * @return Compressor
095 */
096 public Compressor createCompressor() {
097 return new BZip2DummyCompressor();
098 }
099
100 /**
101 * Creates CompressionInputStream to be used to read off uncompressed data.
102 *
103 * @param in
104 * The InputStream
105 * @return Returns CompressionInputStream for BZip2
106 * @throws java.io.IOException
107 * Throws IOException
108 */
109 public CompressionInputStream createInputStream(InputStream in)
110 throws IOException {
111 return new BZip2CompressionInputStream(in);
112 }
113
114 /**
115 * This functionality is currently not supported.
116 *
117 * @return CompressionInputStream
118 */
119 public CompressionInputStream createInputStream(InputStream in,
120 Decompressor decompressor) throws IOException {
121 return createInputStream(in);
122 }
123
124 /**
125 * Creates CompressionInputStream to be used to read off uncompressed data
126 * in one of the two reading modes. i.e. Continuous or Blocked reading modes
127 *
128 * @param seekableIn The InputStream
129 * @param start The start offset into the compressed stream
130 * @param end The end offset into the compressed stream
131 * @param readMode Controls whether progress is reported continuously or
132 * only at block boundaries.
133 *
134 * @return CompressionInputStream for BZip2 aligned at block boundaries
135 */
136 public SplitCompressionInputStream createInputStream(InputStream seekableIn,
137 Decompressor decompressor, long start, long end, READ_MODE readMode)
138 throws IOException {
139
140 if (!(seekableIn instanceof Seekable)) {
141 throw new IOException("seekableIn must be an instance of " +
142 Seekable.class.getName());
143 }
144
145 //find the position of first BZip2 start up marker
146 ((Seekable)seekableIn).seek(0);
147
148 // BZip2 start of block markers are of 6 bytes. But the very first block
149 // also has "BZh9", making it 10 bytes. This is the common case. But at
150 // time stream might start without a leading BZ.
151 final long FIRST_BZIP2_BLOCK_MARKER_POSITION =
152 CBZip2InputStream.numberOfBytesTillNextMarker(seekableIn);
153 long adjStart = Math.max(0L, start - FIRST_BZIP2_BLOCK_MARKER_POSITION);
154
155 ((Seekable)seekableIn).seek(adjStart);
156 SplitCompressionInputStream in =
157 new BZip2CompressionInputStream(seekableIn, adjStart, end, readMode);
158
159
160 // The following if clause handles the following case:
161 // Assume the following scenario in BZip2 compressed stream where
162 // . represent compressed data.
163 // .....[48 bit Block].....[48 bit Block].....[48 bit Block]...
164 // ........................[47 bits][1 bit].....[48 bit Block]...
165 // ................................^[Assume a Byte alignment here]
166 // ........................................^^[current position of stream]
167 // .....................^^[We go back 10 Bytes in stream and find a Block marker]
168 // ........................................^^[We align at wrong position!]
169 // ...........................................................^^[While this pos is correct]
170
171 if (in.getPos() <= start) {
172 ((Seekable)seekableIn).seek(start);
173 in = new BZip2CompressionInputStream(seekableIn, start, end, readMode);
174 }
175
176 return in;
177 }
178
179 /**
180 * This functionality is currently not supported.
181 *
182 * @return BZip2DummyDecompressor.class
183 */
184 public Class<? extends org.apache.hadoop.io.compress.Decompressor> getDecompressorType() {
185 return BZip2DummyDecompressor.class;
186 }
187
188 /**
189 * This functionality is currently not supported.
190 *
191 * @return Decompressor
192 */
193 public Decompressor createDecompressor() {
194 return new BZip2DummyDecompressor();
195 }
196
197 /**
198 * .bz2 is recognized as the default extension for compressed BZip2 files
199 *
200 * @return A String telling the default bzip2 file extension
201 */
202 public String getDefaultExtension() {
203 return ".bz2";
204 }
205
206 private static class BZip2CompressionOutputStream extends
207 CompressionOutputStream {
208
209 // class data starts here//
210 private CBZip2OutputStream output;
211 private boolean needsReset;
212 // class data ends here//
213
214 public BZip2CompressionOutputStream(OutputStream out)
215 throws IOException {
216 super(out);
217 needsReset = true;
218 }
219
220 private void writeStreamHeader() throws IOException {
221 if (super.out != null) {
222 // The compressed bzip2 stream should start with the
223 // identifying characters BZ. Caller of CBZip2OutputStream
224 // i.e. this class must write these characters.
225 out.write(HEADER.getBytes());
226 }
227 }
228
229 public void finish() throws IOException {
230 if (needsReset) {
231 // In the case that nothing is written to this stream, we still need to
232 // write out the header before closing, otherwise the stream won't be
233 // recognized by BZip2CompressionInputStream.
234 internalReset();
235 }
236 this.output.finish();
237 needsReset = true;
238 }
239
240 private void internalReset() throws IOException {
241 if (needsReset) {
242 needsReset = false;
243 writeStreamHeader();
244 this.output = new CBZip2OutputStream(out);
245 }
246 }
247
248 public void resetState() throws IOException {
249 // Cannot write to out at this point because out might not be ready
250 // yet, as in SequenceFile.Writer implementation.
251 needsReset = true;
252 }
253
254 public void write(int b) throws IOException {
255 if (needsReset) {
256 internalReset();
257 }
258 this.output.write(b);
259 }
260
261 public void write(byte[] b, int off, int len) throws IOException {
262 if (needsReset) {
263 internalReset();
264 }
265 this.output.write(b, off, len);
266 }
267
268 public void close() throws IOException {
269 if (needsReset) {
270 // In the case that nothing is written to this stream, we still need to
271 // write out the header before closing, otherwise the stream won't be
272 // recognized by BZip2CompressionInputStream.
273 internalReset();
274 }
275 this.output.flush();
276 this.output.close();
277 needsReset = true;
278 }
279
280 }// end of class BZip2CompressionOutputStream
281
282 /**
283 * This class is capable to de-compress BZip2 data in two modes;
284 * CONTINOUS and BYBLOCK. BYBLOCK mode makes it possible to
285 * do decompression starting any arbitrary position in the stream.
286 *
287 * So this facility can easily be used to parallelize decompression
288 * of a large BZip2 file for performance reasons. (It is exactly
289 * done so for Hadoop framework. See LineRecordReader for an
290 * example). So one can break the file (of course logically) into
291 * chunks for parallel processing. These "splits" should be like
292 * default Hadoop splits (e.g as in FileInputFormat getSplit metod).
293 * So this code is designed and tested for FileInputFormat's way
294 * of splitting only.
295 */
296
297 private static class BZip2CompressionInputStream extends
298 SplitCompressionInputStream {
299
300 // class data starts here//
301 private CBZip2InputStream input;
302 boolean needsReset;
303 private BufferedInputStream bufferedIn;
304 private boolean isHeaderStripped = false;
305 private boolean isSubHeaderStripped = false;
306 private READ_MODE readMode = READ_MODE.CONTINUOUS;
307 private long startingPos = 0L;
308
309 // Following state machine handles different states of compressed stream
310 // position
311 // HOLD : Don't advertise compressed stream position
312 // ADVERTISE : Read 1 more character and advertise stream position
313 // See more comments about it before updatePos method.
314 private enum POS_ADVERTISEMENT_STATE_MACHINE {
315 HOLD, ADVERTISE
316 };
317
318 POS_ADVERTISEMENT_STATE_MACHINE posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
319 long compressedStreamPosition = 0;
320
321 // class data ends here//
322
323 public BZip2CompressionInputStream(InputStream in) throws IOException {
324 this(in, 0L, Long.MAX_VALUE, READ_MODE.CONTINUOUS);
325 }
326
327 public BZip2CompressionInputStream(InputStream in, long start, long end,
328 READ_MODE readMode) throws IOException {
329 super(in, start, end);
330 needsReset = false;
331 bufferedIn = new BufferedInputStream(super.in);
332 this.startingPos = super.getPos();
333 this.readMode = readMode;
334 if (this.startingPos == 0) {
335 // We only strip header if it is start of file
336 bufferedIn = readStreamHeader();
337 }
338 input = new CBZip2InputStream(bufferedIn, readMode);
339 if (this.isHeaderStripped) {
340 input.updateReportedByteCount(HEADER_LEN);
341 }
342
343 if (this.isSubHeaderStripped) {
344 input.updateReportedByteCount(SUB_HEADER_LEN);
345 }
346
347 this.updatePos(false);
348 }
349
350 private BufferedInputStream readStreamHeader() throws IOException {
351 // We are flexible enough to allow the compressed stream not to
352 // start with the header of BZ. So it works fine either we have
353 // the header or not.
354 if (super.in != null) {
355 bufferedIn.mark(HEADER_LEN);
356 byte[] headerBytes = new byte[HEADER_LEN];
357 int actualRead = bufferedIn.read(headerBytes, 0, HEADER_LEN);
358 if (actualRead != -1) {
359 String header = new String(headerBytes);
360 if (header.compareTo(HEADER) != 0) {
361 bufferedIn.reset();
362 } else {
363 this.isHeaderStripped = true;
364 // In case of BYBLOCK mode, we also want to strip off
365 // remaining two character of the header.
366 if (this.readMode == READ_MODE.BYBLOCK) {
367 actualRead = bufferedIn.read(headerBytes, 0,
368 SUB_HEADER_LEN);
369 if (actualRead != -1) {
370 this.isSubHeaderStripped = true;
371 }
372 }
373 }
374 }
375 }
376
377 if (bufferedIn == null) {
378 throw new IOException("Failed to read bzip2 stream.");
379 }
380
381 return bufferedIn;
382
383 }// end of method
384
385 public void close() throws IOException {
386 if (!needsReset) {
387 input.close();
388 needsReset = true;
389 }
390 }
391
392 /**
393 * This method updates compressed stream position exactly when the
394 * client of this code has read off at least one byte passed any BZip2
395 * end of block marker.
396 *
397 * This mechanism is very helpful to deal with data level record
398 * boundaries. Please see constructor and next methods of
399 * org.apache.hadoop.mapred.LineRecordReader as an example usage of this
400 * feature. We elaborate it with an example in the following:
401 *
402 * Assume two different scenarios of the BZip2 compressed stream, where
403 * [m] represent end of block, \n is line delimiter and . represent compressed
404 * data.
405 *
406 * ............[m]......\n.......
407 *
408 * ..........\n[m]......\n.......
409 *
410 * Assume that end is right after [m]. In the first case the reading
411 * will stop at \n and there is no need to read one more line. (To see the
412 * reason of reading one more line in the next() method is explained in LineRecordReader.)
413 * While in the second example LineRecordReader needs to read one more line
414 * (till the second \n). Now since BZip2Codecs only update position
415 * at least one byte passed a maker, so it is straight forward to differentiate
416 * between the two cases mentioned.
417 *
418 */
419
420 public int read(byte[] b, int off, int len) throws IOException {
421 if (needsReset) {
422 internalReset();
423 }
424
425 int result = 0;
426 result = this.input.read(b, off, len);
427 if (result == BZip2Constants.END_OF_BLOCK) {
428 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE;
429 }
430
431 if (this.posSM == POS_ADVERTISEMENT_STATE_MACHINE.ADVERTISE) {
432 result = this.input.read(b, off, off + 1);
433 // This is the precise time to update compressed stream position
434 // to the client of this code.
435 this.updatePos(true);
436 this.posSM = POS_ADVERTISEMENT_STATE_MACHINE.HOLD;
437 }
438
439 return result;
440
441 }
442
443 public int read() throws IOException {
444 byte b[] = new byte[1];
445 int result = this.read(b, 0, 1);
446 return (result < 0) ? result : (b[0] & 0xff);
447 }
448
449 private void internalReset() throws IOException {
450 if (needsReset) {
451 needsReset = false;
452 BufferedInputStream bufferedIn = readStreamHeader();
453 input = new CBZip2InputStream(bufferedIn, this.readMode);
454 }
455 }
456
457 public void resetState() throws IOException {
458 // Cannot read from bufferedIn at this point because bufferedIn
459 // might not be ready
460 // yet, as in SequenceFile.Reader implementation.
461 needsReset = true;
462 }
463
464 public long getPos() {
465 return this.compressedStreamPosition;
466 }
467
468 /*
469 * As the comments before read method tell that
470 * compressed stream is advertised when at least
471 * one byte passed EOB have been read off. But
472 * there is an exception to this rule. When we
473 * construct the stream we advertise the position
474 * exactly at EOB. In the following method
475 * shouldAddOn boolean captures this exception.
476 *
477 */
478 private void updatePos(boolean shouldAddOn) {
479 int addOn = shouldAddOn ? 1 : 0;
480 this.compressedStreamPosition = this.startingPos
481 + this.input.getProcessedByteCount() + addOn;
482 }
483
484 }// end of BZip2CompressionInputStream
485
486 }