001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce;
020
021 import java.io.IOException;
022 import java.net.InetSocketAddress;
023 import java.security.PrivilegedExceptionAction;
024 import java.util.ArrayList;
025 import java.util.List;
026 import java.util.ServiceLoader;
027
028 import org.apache.commons.logging.Log;
029 import org.apache.commons.logging.LogFactory;
030 import org.apache.hadoop.classification.InterfaceAudience;
031 import org.apache.hadoop.classification.InterfaceStability;
032 import org.apache.hadoop.conf.Configuration;
033 import org.apache.hadoop.fs.FileSystem;
034 import org.apache.hadoop.fs.Path;
035 import org.apache.hadoop.io.Text;
036 import org.apache.hadoop.ipc.RemoteException;
037 import org.apache.hadoop.mapred.JobConf;
038 import org.apache.hadoop.mapred.Master;
039 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
040 import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
041 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
042 import org.apache.hadoop.mapreduce.util.ConfigUtil;
043 import org.apache.hadoop.mapreduce.v2.LogParams;
044 import org.apache.hadoop.net.NetUtils;
045 import org.apache.hadoop.security.AccessControlException;
046 import org.apache.hadoop.security.UserGroupInformation;
047 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
048 import org.apache.hadoop.security.token.Token;
049
050 /**
051 * Provides a way to access information about the map/reduce cluster.
052 */
053 @InterfaceAudience.Public
054 @InterfaceStability.Evolving
055 public class Cluster {
056
057 @InterfaceStability.Evolving
058 public static enum JobTrackerStatus {INITIALIZING, RUNNING};
059
060 private ClientProtocolProvider clientProtocolProvider;
061 private ClientProtocol client;
062 private UserGroupInformation ugi;
063 private Configuration conf;
064 private FileSystem fs = null;
065 private Path sysDir = null;
066 private Path stagingAreaDir = null;
067 private Path jobHistoryDir = null;
068 private static final Log LOG = LogFactory.getLog(Cluster.class);
069
070 private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
071 ServiceLoader.load(ClientProtocolProvider.class);
072
073 static {
074 ConfigUtil.loadResources();
075 }
076
077 public Cluster(Configuration conf) throws IOException {
078 this(null, conf);
079 }
080
081 public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)
082 throws IOException {
083 this.conf = conf;
084 this.ugi = UserGroupInformation.getCurrentUser();
085 initialize(jobTrackAddr, conf);
086 }
087
088 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
089 throws IOException {
090
091 synchronized (frameworkLoader) {
092 for (ClientProtocolProvider provider : frameworkLoader) {
093 LOG.debug("Trying ClientProtocolProvider : "
094 + provider.getClass().getName());
095 ClientProtocol clientProtocol = null;
096 try {
097 if (jobTrackAddr == null) {
098 clientProtocol = provider.create(conf);
099 } else {
100 clientProtocol = provider.create(jobTrackAddr, conf);
101 }
102
103 if (clientProtocol != null) {
104 clientProtocolProvider = provider;
105 client = clientProtocol;
106 LOG.debug("Picked " + provider.getClass().getName()
107 + " as the ClientProtocolProvider");
108 break;
109 }
110 else {
111 LOG.debug("Cannot pick " + provider.getClass().getName()
112 + " as the ClientProtocolProvider - returned null protocol");
113 }
114 }
115 catch (Exception e) {
116 LOG.info("Failed to use " + provider.getClass().getName()
117 + " due to error: " + e.getMessage());
118 }
119 }
120 }
121
122 if (null == clientProtocolProvider || null == client) {
123 throw new IOException(
124 "Cannot initialize Cluster. Please check your configuration for "
125 + MRConfig.FRAMEWORK_NAME
126 + " and the correspond server addresses.");
127 }
128 }
129
130 ClientProtocol getClient() {
131 return client;
132 }
133
134 Configuration getConf() {
135 return conf;
136 }
137
138 /**
139 * Close the <code>Cluster</code>.
140 */
141 public synchronized void close() throws IOException {
142 clientProtocolProvider.close(client);
143 }
144
145 private Job[] getJobs(JobStatus[] stats) throws IOException {
146 List<Job> jobs = new ArrayList<Job>();
147 for (JobStatus stat : stats) {
148 jobs.add(Job.getInstance(this, stat, new JobConf(stat.getJobFile())));
149 }
150 return jobs.toArray(new Job[0]);
151 }
152
153 /**
154 * Get the file system where job-specific files are stored
155 *
156 * @return object of FileSystem
157 * @throws IOException
158 * @throws InterruptedException
159 */
160 public synchronized FileSystem getFileSystem()
161 throws IOException, InterruptedException {
162 if (this.fs == null) {
163 try {
164 this.fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
165 public FileSystem run() throws IOException, InterruptedException {
166 final Path sysDir = new Path(client.getSystemDir());
167 return sysDir.getFileSystem(getConf());
168 }
169 });
170 } catch (InterruptedException e) {
171 throw new RuntimeException(e);
172 }
173 }
174 return fs;
175 }
176
177 /**
178 * Get job corresponding to jobid.
179 *
180 * @param jobId
181 * @return object of {@link Job}
182 * @throws IOException
183 * @throws InterruptedException
184 */
185 public Job getJob(JobID jobId) throws IOException, InterruptedException {
186 JobStatus status = client.getJobStatus(jobId);
187 if (status != null) {
188 return Job.getInstance(this, status, new JobConf(status.getJobFile()));
189 }
190 return null;
191 }
192
193 /**
194 * Get all the queues in cluster.
195 *
196 * @return array of {@link QueueInfo}
197 * @throws IOException
198 * @throws InterruptedException
199 */
200 public QueueInfo[] getQueues() throws IOException, InterruptedException {
201 return client.getQueues();
202 }
203
204 /**
205 * Get queue information for the specified name.
206 *
207 * @param name queuename
208 * @return object of {@link QueueInfo}
209 * @throws IOException
210 * @throws InterruptedException
211 */
212 public QueueInfo getQueue(String name)
213 throws IOException, InterruptedException {
214 return client.getQueue(name);
215 }
216
217 /**
218 * Get log parameters for the specified jobID or taskAttemptID
219 * @param jobID the job id.
220 * @param taskAttemptID the task attempt id. Optional.
221 * @return the LogParams
222 * @throws IOException
223 * @throws InterruptedException
224 */
225 public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
226 throws IOException, InterruptedException {
227 return client.getLogFileParams(jobID, taskAttemptID);
228 }
229
230 /**
231 * Get current cluster status.
232 *
233 * @return object of {@link ClusterMetrics}
234 * @throws IOException
235 * @throws InterruptedException
236 */
237 public ClusterMetrics getClusterStatus() throws IOException, InterruptedException {
238 return client.getClusterMetrics();
239 }
240
241 /**
242 * Get all active trackers in the cluster.
243 *
244 * @return array of {@link TaskTrackerInfo}
245 * @throws IOException
246 * @throws InterruptedException
247 */
248 public TaskTrackerInfo[] getActiveTaskTrackers()
249 throws IOException, InterruptedException {
250 return client.getActiveTrackers();
251 }
252
253 /**
254 * Get blacklisted trackers.
255 *
256 * @return array of {@link TaskTrackerInfo}
257 * @throws IOException
258 * @throws InterruptedException
259 */
260 public TaskTrackerInfo[] getBlackListedTaskTrackers()
261 throws IOException, InterruptedException {
262 return client.getBlacklistedTrackers();
263 }
264
265 /**
266 * Get all the jobs in cluster.
267 *
268 * @return array of {@link Job}
269 * @throws IOException
270 * @throws InterruptedException
271 * @deprecated Use {@link #getAllJobStatuses()} instead.
272 */
273 @Deprecated
274 public Job[] getAllJobs() throws IOException, InterruptedException {
275 return getJobs(client.getAllJobs());
276 }
277
278 /**
279 * Get job status for all jobs in the cluster.
280 * @return job status for all jobs in cluster
281 * @throws IOException
282 * @throws InterruptedException
283 */
284 public JobStatus[] getAllJobStatuses() throws IOException, InterruptedException {
285 return client.getAllJobs();
286 }
287
288 /**
289 * Grab the jobtracker system directory path where
290 * job-specific files will be placed.
291 *
292 * @return the system directory where job-specific files are to be placed.
293 */
294 public Path getSystemDir() throws IOException, InterruptedException {
295 if (sysDir == null) {
296 sysDir = new Path(client.getSystemDir());
297 }
298 return sysDir;
299 }
300
301 /**
302 * Grab the jobtracker's view of the staging directory path where
303 * job-specific files will be placed.
304 *
305 * @return the staging directory where job-specific files are to be placed.
306 */
307 public Path getStagingAreaDir() throws IOException, InterruptedException {
308 if (stagingAreaDir == null) {
309 stagingAreaDir = new Path(client.getStagingAreaDir());
310 }
311 return stagingAreaDir;
312 }
313
314 /**
315 * Get the job history file path for a given job id. The job history file at
316 * this path may or may not be existing depending on the job completion state.
317 * The file is present only for the completed jobs.
318 * @param jobId the JobID of the job submitted by the current user.
319 * @return the file path of the job history file
320 * @throws IOException
321 * @throws InterruptedException
322 */
323 public String getJobHistoryUrl(JobID jobId) throws IOException,
324 InterruptedException {
325 if (jobHistoryDir == null) {
326 jobHistoryDir = new Path(client.getJobHistoryDir());
327 }
328 return new Path(jobHistoryDir, jobId.toString() + "_"
329 + ugi.getShortUserName()).toString();
330 }
331
332 /**
333 * Gets the Queue ACLs for current user
334 * @return array of QueueAclsInfo object for current user.
335 * @throws IOException
336 */
337 public QueueAclsInfo[] getQueueAclsForCurrentUser()
338 throws IOException, InterruptedException {
339 return client.getQueueAclsForCurrentUser();
340 }
341
342 /**
343 * Gets the root level queues.
344 * @return array of JobQueueInfo object.
345 * @throws IOException
346 */
347 public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
348 return client.getRootQueues();
349 }
350
351 /**
352 * Returns immediate children of queueName.
353 * @param queueName
354 * @return array of JobQueueInfo which are children of queueName
355 * @throws IOException
356 */
357 public QueueInfo[] getChildQueues(String queueName)
358 throws IOException, InterruptedException {
359 return client.getChildQueues(queueName);
360 }
361
362 /**
363 * Get the JobTracker's status.
364 *
365 * @return {@link JobTrackerStatus} of the JobTracker
366 * @throws IOException
367 * @throws InterruptedException
368 */
369 public JobTrackerStatus getJobTrackerStatus() throws IOException,
370 InterruptedException {
371 return client.getJobTrackerStatus();
372 }
373
374 /**
375 * Get the tasktracker expiry interval for the cluster
376 * @return the expiry interval in msec
377 */
378 public long getTaskTrackerExpiryInterval() throws IOException,
379 InterruptedException {
380 return client.getTaskTrackerExpiryInterval();
381 }
382
383 /**
384 * Get a delegation token for the user from the JobTracker.
385 * @param renewer the user who can renew the token
386 * @return the new token
387 * @throws IOException
388 */
389 public Token<DelegationTokenIdentifier>
390 getDelegationToken(Text renewer) throws IOException, InterruptedException{
391 Token<DelegationTokenIdentifier> result =
392 client.getDelegationToken(renewer);
393
394 if (result == null) {
395 return result;
396 }
397
398 InetSocketAddress addr = Master.getMasterAddress(conf);
399 StringBuilder service = new StringBuilder();
400 service.append(NetUtils.normalizeHostName(addr.getAddress().
401 getHostAddress()));
402 service.append(':');
403 service.append(addr.getPort());
404 result.setService(new Text(service.toString()));
405 return result;
406 }
407
408 /**
409 * Renew a delegation token
410 * @param token the token to renew
411 * @return the new expiration time
412 * @throws InvalidToken
413 * @throws IOException
414 * @deprecated Use {@link Token#renew} instead
415 */
416 public long renewDelegationToken(Token<DelegationTokenIdentifier> token
417 ) throws InvalidToken, IOException,
418 InterruptedException {
419 try {
420 return client.renewDelegationToken(token);
421 } catch (RemoteException re) {
422 throw re.unwrapRemoteException(InvalidToken.class,
423 AccessControlException.class);
424 }
425 }
426
427 /**
428 * Cancel a delegation token from the JobTracker
429 * @param token the token to cancel
430 * @throws IOException
431 * @deprecated Use {@link Token#cancel} instead
432 */
433 public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
434 ) throws IOException,
435 InterruptedException {
436 try {
437 client.cancelDelegationToken(token);
438 } catch (RemoteException re) {
439 throw re.unwrapRemoteException(InvalidToken.class,
440 AccessControlException.class);
441 }
442 }
443
444 }