001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.join;
020
021 import java.io.IOException;
022
023 import org.apache.hadoop.classification.InterfaceAudience;
024 import org.apache.hadoop.classification.InterfaceStability;
025 import org.apache.hadoop.io.Writable;
026 import org.apache.hadoop.io.WritableComparable;
027 import org.apache.hadoop.io.WritableComparator;
028 import org.apache.hadoop.io.WritableUtils;
029 import org.apache.hadoop.mapred.RecordReader;
030
031 /**
032 * Proxy class for a RecordReader participating in the join framework.
033 * This class keeps track of the "head" key-value pair for the
034 * provided RecordReader and keeps a store of values matching a key when
035 * this source is participating in a join.
036 */
037 @InterfaceAudience.Public
038 @InterfaceStability.Stable
039 public class WrappedRecordReader<K extends WritableComparable,
040 U extends Writable>
041 implements ComposableRecordReader<K,U> {
042
043 private boolean empty = false;
044 private RecordReader<K,U> rr;
045 private int id; // index at which values will be inserted in collector
046
047 private K khead; // key at the top of this RR
048 private U vhead; // value assoc with khead
049 private WritableComparator cmp;
050
051 private ResetableIterator<U> vjoin;
052
053 /**
054 * For a given RecordReader rr, occupy position id in collector.
055 */
056 WrappedRecordReader(int id, RecordReader<K,U> rr,
057 Class<? extends WritableComparator> cmpcl) throws IOException {
058 this.id = id;
059 this.rr = rr;
060 khead = rr.createKey();
061 vhead = rr.createValue();
062 try {
063 cmp = (null == cmpcl)
064 ? WritableComparator.get(khead.getClass())
065 : cmpcl.newInstance();
066 } catch (InstantiationException e) {
067 throw (IOException)new IOException().initCause(e);
068 } catch (IllegalAccessException e) {
069 throw (IOException)new IOException().initCause(e);
070 }
071 vjoin = new StreamBackedIterator<U>();
072 next();
073 }
074
075 /** {@inheritDoc} */
076 public int id() {
077 return id;
078 }
079
080 /**
081 * Return the key at the head of this RR.
082 */
083 public K key() {
084 return khead;
085 }
086
087 /**
088 * Clone the key at the head of this RR into the object supplied.
089 */
090 public void key(K qkey) throws IOException {
091 WritableUtils.cloneInto(qkey, khead);
092 }
093
094 /**
095 * Return true if the RR- including the k,v pair stored in this object-
096 * is exhausted.
097 */
098 public boolean hasNext() {
099 return !empty;
100 }
101
102 /**
103 * Skip key-value pairs with keys less than or equal to the key provided.
104 */
105 public void skip(K key) throws IOException {
106 if (hasNext()) {
107 while (cmp.compare(khead, key) <= 0 && next());
108 }
109 }
110
111 /**
112 * Read the next k,v pair into the head of this object; return true iff
113 * the RR and this are exhausted.
114 */
115 protected boolean next() throws IOException {
116 empty = !rr.next(khead, vhead);
117 return hasNext();
118 }
119
120 /**
121 * Add an iterator to the collector at the position occupied by this
122 * RecordReader over the values in this stream paired with the key
123 * provided (ie register a stream of values from this source matching K
124 * with a collector).
125 */
126 // JoinCollector comes from parent, which has
127 @SuppressWarnings("unchecked") // no static type for the slot this sits in
128 public void accept(CompositeRecordReader.JoinCollector i, K key)
129 throws IOException {
130 vjoin.clear();
131 if (0 == cmp.compare(key, khead)) {
132 do {
133 vjoin.add(vhead);
134 } while (next() && 0 == cmp.compare(key, khead));
135 }
136 i.add(id, vjoin);
137 }
138
139 /**
140 * Write key-value pair at the head of this stream to the objects provided;
141 * get next key-value pair from proxied RR.
142 */
143 public boolean next(K key, U value) throws IOException {
144 if (hasNext()) {
145 WritableUtils.cloneInto(key, khead);
146 WritableUtils.cloneInto(value, vhead);
147 next();
148 return true;
149 }
150 return false;
151 }
152
153 /**
154 * Request new key from proxied RR.
155 */
156 public K createKey() {
157 return rr.createKey();
158 }
159
160 /**
161 * Request new value from proxied RR.
162 */
163 public U createValue() {
164 return rr.createValue();
165 }
166
167 /**
168 * Request progress from proxied RR.
169 */
170 public float getProgress() throws IOException {
171 return rr.getProgress();
172 }
173
174 /**
175 * Request position from proxied RR.
176 */
177 public long getPos() throws IOException {
178 return rr.getPos();
179 }
180
181 /**
182 * Forward close request to proxied RR.
183 */
184 public void close() throws IOException {
185 rr.close();
186 }
187
188 /**
189 * Implement Comparable contract (compare key at head of proxied RR
190 * with that of another).
191 */
192 public int compareTo(ComposableRecordReader<K,?> other) {
193 return cmp.compare(key(), other.key());
194 }
195
196 /**
197 * Return true iff compareTo(other) retn true.
198 */
199 @SuppressWarnings("unchecked") // Explicit type check prior to cast
200 public boolean equals(Object other) {
201 return other instanceof ComposableRecordReader
202 && 0 == compareTo((ComposableRecordReader)other);
203 }
204
205 public int hashCode() {
206 assert false : "hashCode not designed";
207 return 42;
208 }
209
210 }