001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.hadoop.io.compress;
019
020 import java.util.ArrayList;
021 import java.util.HashMap;
022 import java.util.List;
023 import java.util.Map;
024
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.hadoop.classification.InterfaceAudience;
028 import org.apache.hadoop.classification.InterfaceStability;
029 import org.apache.hadoop.conf.Configuration;
030 import org.apache.hadoop.util.ReflectionUtils;
031
032 /**
033 * A global compressor/decompressor pool used to save and reuse
034 * (possibly native) compression/decompression codecs.
035 */
036 @InterfaceAudience.Public
037 @InterfaceStability.Evolving
038 public class CodecPool {
039 private static final Log LOG = LogFactory.getLog(CodecPool.class);
040
041 /**
042 * A global compressor pool used to save the expensive
043 * construction/destruction of (possibly native) decompression codecs.
044 */
045 private static final Map<Class<Compressor>, List<Compressor>> compressorPool =
046 new HashMap<Class<Compressor>, List<Compressor>>();
047
048 /**
049 * A global decompressor pool used to save the expensive
050 * construction/destruction of (possibly native) decompression codecs.
051 */
052 private static final Map<Class<Decompressor>, List<Decompressor>> decompressorPool =
053 new HashMap<Class<Decompressor>, List<Decompressor>>();
054
055 private static <T> T borrow(Map<Class<T>, List<T>> pool,
056 Class<? extends T> codecClass) {
057 T codec = null;
058
059 // Check if an appropriate codec is available
060 synchronized (pool) {
061 if (pool.containsKey(codecClass)) {
062 List<T> codecList = pool.get(codecClass);
063
064 if (codecList != null) {
065 synchronized (codecList) {
066 if (!codecList.isEmpty()) {
067 codec = codecList.remove(codecList.size()-1);
068 }
069 }
070 }
071 }
072 }
073
074 return codec;
075 }
076
077 private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
078 if (codec != null) {
079 Class<T> codecClass = ReflectionUtils.getClass(codec);
080 synchronized (pool) {
081 if (!pool.containsKey(codecClass)) {
082 pool.put(codecClass, new ArrayList<T>());
083 }
084
085 List<T> codecList = pool.get(codecClass);
086 synchronized (codecList) {
087 codecList.add(codec);
088 }
089 }
090 }
091 }
092
093 /**
094 * Get a {@link Compressor} for the given {@link CompressionCodec} from the
095 * pool or a new one.
096 *
097 * @param codec the <code>CompressionCodec</code> for which to get the
098 * <code>Compressor</code>
099 * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor
100 * @return <code>Compressor</code> for the given
101 * <code>CompressionCodec</code> from the pool or a new one
102 */
103 public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
104 Compressor compressor = borrow(compressorPool, codec.getCompressorType());
105 if (compressor == null) {
106 compressor = codec.createCompressor();
107 LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
108 } else {
109 compressor.reinit(conf);
110 if(LOG.isDebugEnabled()) {
111 LOG.debug("Got recycled compressor");
112 }
113 }
114 return compressor;
115 }
116
117 public static Compressor getCompressor(CompressionCodec codec) {
118 return getCompressor(codec, null);
119 }
120
121 /**
122 * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
123 * pool or a new one.
124 *
125 * @param codec the <code>CompressionCodec</code> for which to get the
126 * <code>Decompressor</code>
127 * @return <code>Decompressor</code> for the given
128 * <code>CompressionCodec</code> the pool or a new one
129 */
130 public static Decompressor getDecompressor(CompressionCodec codec) {
131 Decompressor decompressor = borrow(decompressorPool, codec.getDecompressorType());
132 if (decompressor == null) {
133 decompressor = codec.createDecompressor();
134 LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
135 } else {
136 if(LOG.isDebugEnabled()) {
137 LOG.debug("Got recycled decompressor");
138 }
139 }
140 return decompressor;
141 }
142
143 /**
144 * Return the {@link Compressor} to the pool.
145 *
146 * @param compressor the <code>Compressor</code> to be returned to the pool
147 */
148 public static void returnCompressor(Compressor compressor) {
149 if (compressor == null) {
150 return;
151 }
152 // if the compressor can't be reused, don't pool it.
153 if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
154 return;
155 }
156 compressor.reset();
157 payback(compressorPool, compressor);
158 }
159
160 /**
161 * Return the {@link Decompressor} to the pool.
162 *
163 * @param decompressor the <code>Decompressor</code> to be returned to the
164 * pool
165 */
166 public static void returnDecompressor(Decompressor decompressor) {
167 if (decompressor == null) {
168 return;
169 }
170 // if the decompressor can't be reused, don't pool it.
171 if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
172 return;
173 }
174 decompressor.reset();
175 payback(decompressorPool, decompressor);
176 }
177 }