001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapred.lib.db;
020
021 import java.io.IOException;
022 import java.sql.Connection;
023 import java.sql.SQLException;
024 import java.util.List;
025
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.io.LongWritable;
029 import org.apache.hadoop.io.Writable;
030 import org.apache.hadoop.mapred.InputFormat;
031 import org.apache.hadoop.mapred.InputSplit;
032 import org.apache.hadoop.mapred.JobConf;
033 import org.apache.hadoop.mapred.JobConfigurable;
034 import org.apache.hadoop.mapred.RecordReader;
035 import org.apache.hadoop.mapred.Reporter;
036 import org.apache.hadoop.mapreduce.Job;
037
038 @InterfaceAudience.Public
039 @InterfaceStability.Stable
040 public class DBInputFormat<T extends DBWritable>
041 extends org.apache.hadoop.mapreduce.lib.db.DBInputFormat<T>
042 implements InputFormat<LongWritable, T>, JobConfigurable {
043 /**
044 * A RecordReader that reads records from a SQL table.
045 * Emits LongWritables containing the record number as
046 * key and DBWritables as value.
047 */
048 protected class DBRecordReader extends
049 org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>
050 implements RecordReader<LongWritable, T> {
051 /**
052 * @param split The InputSplit to read data for
053 * @throws SQLException
054 */
055 protected DBRecordReader(DBInputSplit split, Class<T> inputClass,
056 JobConf job, Connection conn, DBConfiguration dbConfig, String cond,
057 String [] fields, String table) throws SQLException {
058 super(split, inputClass, job, conn, dbConfig, cond, fields, table);
059 }
060
061 /** {@inheritDoc} */
062 public LongWritable createKey() {
063 return new LongWritable();
064 }
065
066 /** {@inheritDoc} */
067 public T createValue() {
068 return super.createValue();
069 }
070
071 public long getPos() throws IOException {
072 return super.getPos();
073 }
074
075 /** {@inheritDoc} */
076 public boolean next(LongWritable key, T value) throws IOException {
077 return super.next(key, value);
078 }
079 }
080
081 /**
082 * A RecordReader implementation that just passes through to a wrapped
083 * RecordReader built with the new API.
084 */
085 private static class DBRecordReaderWrapper<T extends DBWritable>
086 implements RecordReader<LongWritable, T> {
087
088 private org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> rr;
089
090 public DBRecordReaderWrapper(
091 org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T> inner) {
092 this.rr = inner;
093 }
094
095 public void close() throws IOException {
096 rr.close();
097 }
098
099 public LongWritable createKey() {
100 return new LongWritable();
101 }
102
103 public T createValue() {
104 return rr.createValue();
105 }
106
107 public float getProgress() throws IOException {
108 return rr.getProgress();
109 }
110
111 public long getPos() throws IOException {
112 return rr.getPos();
113 }
114
115 public boolean next(LongWritable key, T value) throws IOException {
116 return rr.next(key, value);
117 }
118 }
119
120 /**
121 * A Class that does nothing, implementing DBWritable
122 */
123 public static class NullDBWritable extends
124 org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable
125 implements DBWritable, Writable {
126 }
127 /**
128 * A InputSplit that spans a set of rows
129 */
130 protected static class DBInputSplit extends
131 org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit
132 implements InputSplit {
133 /**
134 * Default Constructor
135 */
136 public DBInputSplit() {
137 }
138
139 /**
140 * Convenience Constructor
141 * @param start the index of the first row to select
142 * @param end the index of the last row to select
143 */
144 public DBInputSplit(long start, long end) {
145 super(start, end);
146 }
147 }
148
149 /** {@inheritDoc} */
150 public void configure(JobConf job) {
151 super.setConf(job);
152 }
153
154 /** {@inheritDoc} */
155 @SuppressWarnings("unchecked")
156 public RecordReader<LongWritable, T> getRecordReader(InputSplit split,
157 JobConf job, Reporter reporter) throws IOException {
158
159 // wrap the DBRR in a shim class to deal with API differences.
160 return new DBRecordReaderWrapper<T>(
161 (org.apache.hadoop.mapreduce.lib.db.DBRecordReader<T>)
162 createDBRecordReader(
163 (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit) split, job));
164 }
165
166 /** {@inheritDoc} */
167 public InputSplit[] getSplits(JobConf job, int chunks) throws IOException {
168 List<org.apache.hadoop.mapreduce.InputSplit> newSplits =
169 super.getSplits(new Job(job));
170 InputSplit[] ret = new InputSplit[newSplits.size()];
171 int i = 0;
172 for (org.apache.hadoop.mapreduce.InputSplit s : newSplits) {
173 org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit split =
174 (org.apache.hadoop.mapreduce.lib.db.DBInputFormat.DBInputSplit)s;
175 ret[i++] = new DBInputSplit(split.getStart(), split.getEnd());
176 }
177 return ret;
178 }
179
180 /**
181 * Initializes the map-part of the job with the appropriate input settings.
182 *
183 * @param job The job
184 * @param inputClass the class object implementing DBWritable, which is the
185 * Java object holding tuple fields.
186 * @param tableName The table to read data from
187 * @param conditions The condition which to select data with, eg. '(updated >
188 * 20070101 AND length > 0)'
189 * @param orderBy the fieldNames in the orderBy clause.
190 * @param fieldNames The field names in the table
191 * @see #setInput(JobConf, Class, String, String)
192 */
193 public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
194 String tableName,String conditions, String orderBy, String... fieldNames) {
195 job.setInputFormat(DBInputFormat.class);
196
197 DBConfiguration dbConf = new DBConfiguration(job);
198 dbConf.setInputClass(inputClass);
199 dbConf.setInputTableName(tableName);
200 dbConf.setInputFieldNames(fieldNames);
201 dbConf.setInputConditions(conditions);
202 dbConf.setInputOrderBy(orderBy);
203 }
204
205 /**
206 * Initializes the map-part of the job with the appropriate input settings.
207 *
208 * @param job The job
209 * @param inputClass the class object implementing DBWritable, which is the
210 * Java object holding tuple fields.
211 * @param inputQuery the input query to select fields. Example :
212 * "SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
213 * @param inputCountQuery the input query that returns the number of records in
214 * the table.
215 * Example : "SELECT COUNT(f1) FROM Mytable"
216 * @see #setInput(JobConf, Class, String, String, String, String...)
217 */
218 public static void setInput(JobConf job, Class<? extends DBWritable> inputClass,
219 String inputQuery, String inputCountQuery) {
220 job.setInputFormat(DBInputFormat.class);
221
222 DBConfiguration dbConf = new DBConfiguration(job);
223 dbConf.setInputClass(inputClass);
224 dbConf.setInputQuery(inputQuery);
225 dbConf.setInputCountQuery(inputCountQuery);
226
227 }
228 }