001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce;
020
021 import java.io.IOException;
022 import java.net.InetSocketAddress;
023 import java.security.PrivilegedExceptionAction;
024 import java.util.ArrayList;
025 import java.util.List;
026 import java.util.ServiceLoader;
027
028 import org.apache.commons.logging.Log;
029 import org.apache.commons.logging.LogFactory;
030 import org.apache.hadoop.classification.InterfaceAudience;
031 import org.apache.hadoop.classification.InterfaceStability;
032 import org.apache.hadoop.conf.Configuration;
033 import org.apache.hadoop.fs.FileSystem;
034 import org.apache.hadoop.fs.Path;
035 import org.apache.hadoop.io.Text;
036 import org.apache.hadoop.ipc.RemoteException;
037 import org.apache.hadoop.mapred.JobConf;
038 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
039 import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
040 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
041 import org.apache.hadoop.mapreduce.util.ConfigUtil;
042 import org.apache.hadoop.mapreduce.v2.LogParams;
043 import org.apache.hadoop.security.AccessControlException;
044 import org.apache.hadoop.security.UserGroupInformation;
045 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
046 import org.apache.hadoop.security.token.Token;
047
048 /**
049 * Provides a way to access information about the map/reduce cluster.
050 */
051 @InterfaceAudience.Public
052 @InterfaceStability.Evolving
053 public class Cluster {
054
055 @InterfaceStability.Evolving
056 public static enum JobTrackerStatus {INITIALIZING, RUNNING};
057
058 private ClientProtocolProvider clientProtocolProvider;
059 private ClientProtocol client;
060 private UserGroupInformation ugi;
061 private Configuration conf;
062 private FileSystem fs = null;
063 private Path sysDir = null;
064 private Path stagingAreaDir = null;
065 private Path jobHistoryDir = null;
066 private static final Log LOG = LogFactory.getLog(Cluster.class);
067
068 private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
069 ServiceLoader.load(ClientProtocolProvider.class);
070
071 static {
072 ConfigUtil.loadResources();
073 }
074
075 public Cluster(Configuration conf) throws IOException {
076 this(null, conf);
077 }
078
079 public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
080 throws IOException {
081 this.conf = conf;
082 this.ugi = UserGroupInformation.getCurrentUser();
083 initialize(jobTrackAddr, conf);
084 }
085
086 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
087 throws IOException {
088
089 synchronized (frameworkLoader) {
090 for (ClientProtocolProvider provider : frameworkLoader) {
091 LOG.debug("Trying ClientProtocolProvider : "
092 + provider.getClass().getName());
093 ClientProtocol clientProtocol = null;
094 try {
095 if (jobTrackAddr == null) {
096 clientProtocol = provider.create(conf);
097 } else {
098 clientProtocol = provider.create(jobTrackAddr, conf);
099 }
100
101 if (clientProtocol != null) {
102 clientProtocolProvider = provider;
103 client = clientProtocol;
104 LOG.debug("Picked " + provider.getClass().getName()
105 + " as the ClientProtocolProvider");
106 break;
107 }
108 else {
109 LOG.debug("Cannot pick " + provider.getClass().getName()
110 + " as the ClientProtocolProvider - returned null protocol");
111 }
112 }
113 catch (Exception e) {
114 LOG.info("Failed to use " + provider.getClass().getName()
115 + " due to error: " + e.getMessage());
116 }
117 }
118 }
119
120 if (null == clientProtocolProvider || null == client) {
121 throw new IOException(
122 "Cannot initialize Cluster. Please check your configuration for "
123 + MRConfig.FRAMEWORK_NAME
124 + " and the correspond server addresses.");
125 }
126 }
127
128 ClientProtocol getClient() {
129 return client;
130 }
131
132 Configuration getConf() {
133 return conf;
134 }
135
136 /**
137 * Close the <code>Cluster</code>.
138 */
139 public synchronized void close() throws IOException {
140 clientProtocolProvider.close(client);
141 }
142
143 private Job[] getJobs(JobStatus[] stats) throws IOException {
144 List<Job> jobs = new ArrayList<Job>();
145 for (JobStatus stat : stats) {
146 jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
147 }
148 return jobs.toArray(new Job[0]);
149 }
150
151 /**
152 * Get the file system where job-specific files are stored
153 *
154 * @return object of FileSystem
155 * @throws IOException
156 * @throws InterruptedException
157 */
158 public synchronized FileSystem getFileSystem()
159 throws IOException, InterruptedException {
160 if (this.fs == null) {
161 try {
162 this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
163 public FileSystem run() throws IOException, InterruptedException {
164 final Path sysDir = new Path(client.getSystemDir());
165 return sysDir.getFileSystem(getConf());
166 }
167 });
168 } catch (InterruptedException e) {
169 throw new RuntimeException(e);
170 }
171 }
172 return fs;
173 }
174
175 /**
176 * Get job corresponding to jobid.
177 *
178 * @param jobId
179 * @return object of {@link Job}
180 * @throws IOException
181 * @throws InterruptedException
182 */
183 public Job getJob(JobID jobId) throws IOException, InterruptedException {
184 JobStatus status = client.getJobStatus(jobId);
185 if (status != null) {
186 return Job.getInstance(this, status, new JobConf(status.getJobFile()));
187 }
188 return null;
189 }
190
191 /**
192 * Get all the queues in cluster.
193 *
194 * @return array of {@link QueueInfo}
195 * @throws IOException
196 * @throws InterruptedException
197 */
198 public QueueInfo[] getQueues() throws IOException, InterruptedException {
199 return client.getQueues();
200 }
201
202 /**
203 * Get queue information for the specified name.
204 *
205 * @param name queuename
206 * @return object of {@link QueueInfo}
207 * @throws IOException
208 * @throws InterruptedException
209 */
210 public QueueInfo getQueue(String name)
211 throws IOException, InterruptedException {
212 return client.getQueue(name);
213 }
214
215 /**
216 * Get log parameters for the specified jobID or taskAttemptID
217 * @param jobID the job id.
218 * @param taskAttemptID the task attempt id. Optional.
219 * @return the LogParams
220 * @throws IOException
221 * @throws InterruptedException
222 */
223 public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
224 throws IOException, InterruptedException {
225 return client.getLogFileParams(jobID, taskAttemptID);
226 }
227
228 /**
229 * Get current cluster status.
230 *
231 * @return object of {@link ClusterMetrics}
232 * @throws IOException
233 * @throws InterruptedException
234 */
235 public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
236 return client.getClusterMetrics();
237 }
238
239 /**
240 * Get all active trackers in the cluster.
241 *
242 * @return array of {@link TaskTrackerInfo}
243 * @throws IOException
244 * @throws InterruptedException
245 */
246 public TaskTrackerInfo[] getActiveTaskTrackers()
247 throws IOException, InterruptedException {
248 return client.getActiveTrackers();
249 }
250
251 /**
252 * Get blacklisted trackers.
253 *
254 * @return array of {@link TaskTrackerInfo}
255 * @throws IOException
256 * @throws InterruptedException
257 */
258 public TaskTrackerInfo[] getBlackListedTaskTrackers()
259 throws IOException, InterruptedException {
260 return client.getBlacklistedTrackers();
261 }
262
263 /**
264 * Get all the jobs in cluster.
265 *
266 * @return array of {@link Job}
267 * @throws IOException
268 * @throws InterruptedException
269 * @deprecated Use {@link #getAllJobStatuses()} instead.
270 */
271 @Deprecated
272 public Job[] getAllJobs() throws IOException, InterruptedException {
273 return getJobs(client.getAllJobs());
274 }
275
276 /**
277 * Get job status for all jobs in the cluster.
278 * @return job status for all jobs in cluster
279 * @throws IOException
280 * @throws InterruptedException
281 */
282 public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
283 return client.getAllJobs();
284 }
285
286 /**
287 * Grab the jobtracker system directory path where
288 * job-specific files will be placed.
289 *
290 * @return the system directory where job-specific files are to be placed.
291 */
292 public Path getSystemDir() throws IOException, InterruptedException {
293 if (sysDir == null) {
294 sysDir = new Path(client.getSystemDir());
295 }
296 return sysDir;
297 }
298
299 /**
300 * Grab the jobtracker's view of the staging directory path where
301 * job-specific files will be placed.
302 *
303 * @return the staging directory where job-specific files are to be placed.
304 */
305 public Path getStagingAreaDir() throws IOException, InterruptedException {
306 if (stagingAreaDir == null) {
307 stagingAreaDir = new Path(client.getStagingAreaDir());
308 }
309 return stagingAreaDir;
310 }
311
312 /**
313 * Get the job history file path for a given job id. The job history file at
314 * this path may or may not be existing depending on the job completion state.
315 * The file is present only for the completed jobs.
316 * @param jobId the JobID of the job submitted by the current user.
317 * @return the file path of the job history file
318 * @throws IOException
319 * @throws InterruptedException
320 */
321 public String getJobHistoryUrl(JobID jobId) throws IOException,
322 InterruptedException {
323 if (jobHistoryDir == null) {
324 jobHistoryDir = new Path(client.getJobHistoryDir());
325 }
326 return new Path(jobHistoryDir, jobId.toString() + "_"
327 + ugi.getShortUserName()).toString();
328 }
329
330 /**
331 * Gets the Queue ACLs for current user
332 * @return array of QueueAclsInfo object for current user.
333 * @throws IOException
334 */
335 public QueueAclsInfo[] getQueueAclsForCurrentUser()
336 throws IOException, InterruptedException {
337 return client.getQueueAclsForCurrentUser();
338 }
339
340 /**
341 * Gets the root level queues.
342 * @return array of JobQueueInfo object.
343 * @throws IOException
344 */
345 public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
346 return client.getRootQueues();
347 }
348
349 /**
350 * Returns immediate children of queueName.
351 * @param queueName
352 * @return array of JobQueueInfo which are children of queueName
353 * @throws IOException
354 */
355 public QueueInfo[] getChildQueues(String queueName)
356 throws IOException, InterruptedException {
357 return client.getChildQueues(queueName);
358 }
359
360 /**
361 * Get the JobTracker's status.
362 *
363 * @return {@link JobTrackerStatus} of the JobTracker
364 * @throws IOException
365 * @throws InterruptedException
366 */
367 public JobTrackerStatus getJobTrackerStatus() throws IOException,
368 InterruptedException {
369 return client.getJobTrackerStatus();
370 }
371
372 /**
373 * Get the tasktracker expiry interval for the cluster
374 * @return the expiry interval in msec
375 */
376 public long getTaskTrackerExpiryInterval() throws IOException,
377 InterruptedException {
378 return client.getTaskTrackerExpiryInterval();
379 }
380
381 /**
382 * Get a delegation token for the user from the JobTracker.
383 * @param renewer the user who can renew the token
384 * @return the new token
385 * @throws IOException
386 */
387 public Token<DelegationTokenIdentifier>
388 getDelegationToken(Text renewer) throws IOException, InterruptedException{
389 // client has already set the service
390 return client.getDelegationToken(renewer);
391 }
392
393 /**
394 * Renew a delegation token
395 * @param token the token to renew
396 * @return the new expiration time
397 * @throws InvalidToken
398 * @throws IOException
399 * @deprecated Use {@link Token#renew} instead
400 */
401 public long renewDelegationToken(Token<DelegationTokenIdentifier> token
402 ) throws InvalidToken, IOException,
403 InterruptedException {
404 try {
405 return client.renewDelegationToken(token);
406 } catch (RemoteException re) {
407 throw re.unwrapRemoteException(InvalidToken.class,
408 AccessControlException.class);
409 }
410 }
411
412 /**
413 * Cancel a delegation token from the JobTracker
414 * @param token the token to cancel
415 * @throws IOException
416 * @deprecated Use {@link Token#cancel} instead
417 */
418 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
419 ) throws IOException,
420 InterruptedException {
421 try {
422 client.cancelDelegationToken(token);
423 } catch (RemoteException re) {
424 throw re.unwrapRemoteException(InvalidToken.class,
425 AccessControlException.class);
426 }
427 }
428
429 }