001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.fs;
019
020 import java.io.*;
021
022 import org.apache.hadoop.classification.InterfaceAudience;
023 import org.apache.hadoop.classification.InterfaceStability;
024
025 /** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream},
026 * buffers output through a {@link BufferedOutputStream} and creates a checksum
027 * file. */
028 @InterfaceAudience.Public
029 @InterfaceStability.Stable
030 public class FSDataOutputStream extends DataOutputStream implements Syncable {
031 private OutputStream wrappedStream;
032
033 private static class PositionCache extends FilterOutputStream {
034 private FileSystem.Statistics statistics;
035 long position;
036
037 public PositionCache(OutputStream out,
038 FileSystem.Statistics stats,
039 long pos) throws IOException {
040 super(out);
041 statistics = stats;
042 position = pos;
043 }
044
045 public void write(int b) throws IOException {
046 out.write(b);
047 position++;
048 if (statistics != null) {
049 statistics.incrementBytesWritten(1);
050 }
051 }
052
053 public void write(byte b[], int off, int len) throws IOException {
054 out.write(b, off, len);
055 position += len; // update position
056 if (statistics != null) {
057 statistics.incrementBytesWritten(len);
058 }
059 }
060
061 public long getPos() throws IOException {
062 return position; // return cached position
063 }
064
065 public void close() throws IOException {
066 out.close();
067 }
068 }
069
070 @Deprecated
071 public FSDataOutputStream(OutputStream out) throws IOException {
072 this(out, null);
073 }
074
075 public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats)
076 throws IOException {
077 this(out, stats, 0);
078 }
079
080 public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats,
081 long startPosition) throws IOException {
082 super(new PositionCache(out, stats, startPosition));
083 wrappedStream = out;
084 }
085
086 /**
087 * Get the current position in the output stream.
088 *
089 * @return the current position in the output stream
090 */
091 public long getPos() throws IOException {
092 return ((PositionCache)out).getPos();
093 }
094
095 /**
096 * Close the underlying output stream.
097 */
098 public void close() throws IOException {
099 out.close(); // This invokes PositionCache.close()
100 }
101
102 /**
103 * Get a reference to the wrapped output stream. Used by unit tests.
104 *
105 * @return the underlying output stream
106 */
107 @InterfaceAudience.LimitedPrivate({"HDFS"})
108 public OutputStream getWrappedStream() {
109 return wrappedStream;
110 }
111
112 @Override // Syncable
113 @Deprecated
114 public void sync() throws IOException {
115 if (wrappedStream instanceof Syncable) {
116 ((Syncable)wrappedStream).sync();
117 }
118 }
119
120 @Override // Syncable
121 public void hflush() throws IOException {
122 if (wrappedStream instanceof Syncable) {
123 ((Syncable)wrappedStream).hflush();
124 } else {
125 wrappedStream.flush();
126 }
127 }
128
129 @Override // Syncable
130 public void hsync() throws IOException {
131 if (wrappedStream instanceof Syncable) {
132 ((Syncable)wrappedStream).hsync();
133 } else {
134 wrappedStream.flush();
135 }
136 }
137 }