001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.fs;
019
020 import java.io.*;
021
022 import org.apache.hadoop.classification.InterfaceAudience;
023 import org.apache.hadoop.classification.InterfaceStability;
024
025 /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
026 * and buffers input through a {@link BufferedInputStream}. */
027 @InterfaceAudience.Public
028 @InterfaceStability.Stable
029 public class FSDataInputStream extends DataInputStream
030 implements Seekable, PositionedReadable, Closeable {
031
032 public FSDataInputStream(InputStream in)
033 throws IOException {
034 super(in);
035 if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
036 throw new IllegalArgumentException(
037 "In is not an instance of Seekable or PositionedReadable");
038 }
039 }
040
041 /**
042 * Seek to the given offset.
043 *
044 * @param desired offset to seek to
045 */
046 public synchronized void seek(long desired) throws IOException {
047 ((Seekable)in).seek(desired);
048 }
049
050 /**
051 * Get the current position in the input stream.
052 *
053 * @return current position in the input stream
054 */
055 public long getPos() throws IOException {
056 return ((Seekable)in).getPos();
057 }
058
059 /**
060 * Read bytes from the given position in the stream to the given buffer.
061 *
062 * @param position position in the input stream to seek
063 * @param buffer buffer into which data is read
064 * @param offset offset into the buffer in which data is written
065 * @param length maximum number of bytes to read
066 * @return total number of bytes read into the buffer, or <code>-1</code>
067 * if there is no more data because the end of the stream has been
068 * reached
069 */
070 public int read(long position, byte[] buffer, int offset, int length)
071 throws IOException {
072 return ((PositionedReadable)in).read(position, buffer, offset, length);
073 }
074
075 /**
076 * Read bytes from the given position in the stream to the given buffer.
077 * Continues to read until <code>length</code> bytes have been read.
078 *
079 * @param position position in the input stream to seek
080 * @param buffer buffer into which data is read
081 * @param offset offset into the buffer in which data is written
082 * @param length the number of bytes to read
083 * @throws EOFException If the end of stream is reached while reading.
084 * If an exception is thrown an undetermined number
085 * of bytes in the buffer may have been written.
086 */
087 public void readFully(long position, byte[] buffer, int offset, int length)
088 throws IOException {
089 ((PositionedReadable)in).readFully(position, buffer, offset, length);
090 }
091
092 /**
093 * See {@link #readFully(long, byte[], int, int)}.
094 */
095 public void readFully(long position, byte[] buffer)
096 throws IOException {
097 ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length);
098 }
099
100 /**
101 * Seek to the given position on an alternate copy of the data.
102 *
103 * @param targetPos position to seek to
104 * @return true if a new source is found, false otherwise
105 */
106 public boolean seekToNewSource(long targetPos) throws IOException {
107 return ((Seekable)in).seekToNewSource(targetPos);
108 }
109
110 /**
111 * Get a reference to the wrapped input stream. Used by unit tests.
112 *
113 * @return the underlying input stream
114 */
115 @InterfaceAudience.LimitedPrivate({"HDFS"})
116 public InputStream getWrappedStream() {
117 return in;
118 }
119 }