001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package org.apache.hadoop.mapreduce.security;
020
021 import java.io.IOException;
022 import java.util.List;
023
024 import org.apache.commons.logging.Log;
025 import org.apache.commons.logging.LogFactory;
026 import org.apache.hadoop.classification.InterfaceAudience;
027 import org.apache.hadoop.classification.InterfaceStability;
028 import org.apache.hadoop.conf.Configuration;
029 import org.apache.hadoop.fs.FileSystem;
030 import org.apache.hadoop.fs.Path;
031 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
032 import org.apache.hadoop.io.Text;
033 import org.apache.hadoop.mapred.JobConf;
034 import org.apache.hadoop.mapred.Master;
035 import org.apache.hadoop.mapreduce.MRJobConfig;
036 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
037 import org.apache.hadoop.security.Credentials;
038 import org.apache.hadoop.security.UserGroupInformation;
039 import org.apache.hadoop.security.token.Token;
040 import org.apache.hadoop.security.token.TokenIdentifier;
041
042
043 /**
044 * This class provides user facing APIs for transferring secrets from
045 * the job client to the tasks.
046 * The secrets can be stored just before submission of jobs and read during
047 * the task execution.
048 */
049 @InterfaceAudience.Public
050 @InterfaceStability.Evolving
051 public class TokenCache {
052
053 private static final Log LOG = LogFactory.getLog(TokenCache.class);
054
055
056 /**
057 * auxiliary method to get user's secret keys..
058 * @param alias
059 * @return secret key from the storage
060 */
061 public static byte[] getSecretKey(Credentials credentials, Text alias) {
062 if(credentials == null)
063 return null;
064 return credentials.getSecretKey(alias);
065 }
066
067 /**
068 * Convenience method to obtain delegation tokens from namenodes
069 * corresponding to the paths passed.
070 * @param credentials
071 * @param ps array of paths
072 * @param conf configuration
073 * @throws IOException
074 */
075 public static void obtainTokensForNamenodes(Credentials credentials,
076 Path[] ps, Configuration conf) throws IOException {
077 if (!UserGroupInformation.isSecurityEnabled()) {
078 return;
079 }
080 obtainTokensForNamenodesInternal(credentials, ps, conf);
081 }
082
083 /**
084 * Remove jobtoken referrals which don't make sense in the context
085 * of the task execution.
086 *
087 * @param conf
088 */
089 public static void cleanUpTokenReferral(Configuration conf) {
090 conf.unset(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
091 }
092
093 static void obtainTokensForNamenodesInternal(Credentials credentials,
094 Path[] ps, Configuration conf) throws IOException {
095 for(Path p: ps) {
096 FileSystem fs = FileSystem.get(p.toUri(), conf);
097 obtainTokensForNamenodesInternal(fs, credentials, conf);
098 }
099 }
100
101 /**
102 * get delegation token for a specific FS
103 * @param fs
104 * @param credentials
105 * @param p
106 * @param conf
107 * @throws IOException
108 */
109 @SuppressWarnings("deprecation")
110 static void obtainTokensForNamenodesInternal(FileSystem fs,
111 Credentials credentials, Configuration conf) throws IOException {
112 String delegTokenRenewer = Master.getMasterPrincipal(conf);
113 if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
114 throw new IOException(
115 "Can't get Master Kerberos principal for use as renewer");
116 }
117 boolean readFile = true;
118
119 String fsName = fs.getCanonicalServiceName();
120 if (TokenCache.getDelegationToken(credentials, fsName) == null) {
121 //TODO: Need to come up with a better place to put
122 //this block of code to do with reading the file
123 if (readFile) {
124 readFile = false;
125 String binaryTokenFilename =
126 conf.get(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
127 if (binaryTokenFilename != null) {
128 Credentials binary;
129 try {
130 binary = Credentials.readTokenStorageFile(
131 new Path("file:///" + binaryTokenFilename), conf);
132 } catch (IOException e) {
133 throw new RuntimeException(e);
134 }
135 credentials.addAll(binary);
136 }
137 if (TokenCache.getDelegationToken(credentials, fsName) != null) {
138 LOG.debug("DT for " + fsName + " is already present");
139 return;
140 }
141 }
142 List<Token<?>> tokens =
143 fs.getDelegationTokens(delegTokenRenewer, credentials);
144 if (tokens != null) {
145 for (Token<?> token : tokens) {
146 credentials.addToken(token.getService(), token);
147 LOG.info("Got dt for " + fs.getUri() + ";uri="+ fsName +
148 ";t.service="+token.getService());
149 }
150 }
151 //Call getDelegationToken as well for now - for FS implementations
152 // which may not have implmented getDelegationTokens (hftp)
153 if (tokens == null || tokens.size() == 0) {
154 Token<?> token = fs.getDelegationToken(delegTokenRenewer);
155 if (token != null) {
156 credentials.addToken(token.getService(), token);
157 LOG.info("Got dt for " + fs.getUri() + ";uri=" + fsName
158 + ";t.service=" + token.getService());
159 }
160 }
161 }
162 }
163
164 /**
165 * file name used on HDFS for generated job token
166 */
167 @InterfaceAudience.Private
168 public static final String JOB_TOKEN_HDFS_FILE = "jobToken";
169
170 /**
171 * conf setting for job tokens cache file name
172 */
173 @InterfaceAudience.Private
174 public static final String JOB_TOKENS_FILENAME = "mapreduce.job.jobTokenFile";
175 private static final Text JOB_TOKEN = new Text("ShuffleAndJobToken");
176
177 /**
178 *
179 * @param namenode
180 * @return delegation token
181 */
182 @SuppressWarnings("unchecked")
183 @InterfaceAudience.Private
184 public static Token<DelegationTokenIdentifier> getDelegationToken(
185 Credentials credentials, String namenode) {
186 //No fs specific tokens issues by this fs. It may however issue tokens
187 // for other filesystems - which would be keyed by that filesystems name.
188 if (namenode == null)
189 return null;
190 return (Token<DelegationTokenIdentifier>) credentials.getToken(new Text(
191 namenode));
192 }
193
194 /**
195 * load job token from a file
196 * @param conf
197 * @throws IOException
198 */
199 @InterfaceAudience.Private
200 public static Credentials loadTokens(String jobTokenFile, JobConf conf)
201 throws IOException {
202 Path localJobTokenFile = new Path ("file:///" + jobTokenFile);
203
204 Credentials ts = Credentials.readTokenStorageFile(localJobTokenFile, conf);
205
206 if(LOG.isDebugEnabled()) {
207 LOG.debug("Task: Loaded jobTokenFile from: "+
208 localJobTokenFile.toUri().getPath()
209 +"; num of sec keys = " + ts.numberOfSecretKeys() +
210 " Number of tokens " + ts.numberOfTokens());
211 }
212 return ts;
213 }
214 /**
215 * store job token
216 * @param t
217 */
218 @InterfaceAudience.Private
219 public static void setJobToken(Token<? extends TokenIdentifier> t,
220 Credentials credentials) {
221 credentials.addToken(JOB_TOKEN, t);
222 }
223 /**
224 *
225 * @return job token
226 */
227 @SuppressWarnings("unchecked")
228 @InterfaceAudience.Private
229 public static Token<JobTokenIdentifier> getJobToken(Credentials credentials) {
230 return (Token<JobTokenIdentifier>) credentials.getToken(JOB_TOKEN);
231 }
232 }