Package repoze :: Package who :: Package plugins :: Module auth_tkt
[hide private]

Source Code for Module repoze.who.plugins.auth_tkt

  1  import datetime 
  2  from codecs import utf_8_decode 
  3  from codecs import utf_8_encode 
  4  import os 
  5  import time 
  6   
  7  from paste.request import get_cookies 
  8  from paste.auth import auth_tkt 
  9   
 10  from zope.interface import implements 
 11   
 12  from repoze.who.interfaces import IIdentifier 
 13   
 14  _NOW_TESTING = None  # unit tests can replace 
15 -def _now(): #pragma NO COVERAGE
16 if _NOW_TESTING is not None: 17 return _NOW_TESTING 18 return datetime.datetime.now() 19
20 -class AuthTktCookiePlugin(object):
21 22 implements(IIdentifier) 23 24 userid_type_decoders = { 25 'int':int, 26 'unicode':lambda x: utf_8_decode(x)[0], 27 } 28 29 userid_type_encoders = { 30 int: ('int', str), 31 long: ('int', str), 32 unicode: ('unicode', lambda x: utf_8_encode(x)[0]), 33 } 34
35 - def __init__(self, secret, cookie_name='auth_tkt', 36 secure=False, include_ip=False, 37 timeout=None, reissue_time=None, userid_checker=None):
38 self.secret = secret 39 self.cookie_name = cookie_name 40 self.include_ip = include_ip 41 self.secure = secure 42 if timeout and ( (not reissue_time) or (reissue_time > timeout) ): 43 raise ValueError('When timeout is specified, reissue_time must ' 44 'be set to a lower value') 45 self.timeout = timeout 46 self.reissue_time = reissue_time 47 self.userid_checker = userid_checker
48 49 # IIdentifier
50 - def identify(self, environ):
51 cookies = get_cookies(environ) 52 cookie = cookies.get(self.cookie_name) 53 54 if cookie is None or not cookie.value: 55 return None 56 57 if self.include_ip: 58 remote_addr = environ['REMOTE_ADDR'] 59 else: 60 remote_addr = '0.0.0.0' 61 62 try: 63 timestamp, userid, tokens, user_data = auth_tkt.parse_ticket( 64 self.secret, cookie.value, remote_addr) 65 except auth_tkt.BadTicket: 66 return None 67 68 if self.userid_checker and not self.userid_checker(userid): 69 return None 70 71 if self.timeout and ( (timestamp + self.timeout) < time.time() ): 72 return None 73 74 userid_typename = 'userid_type:' 75 user_data_info = user_data.split('|') 76 for datum in filter(None, user_data_info): 77 if datum.startswith(userid_typename): 78 userid_type = datum[len(userid_typename):] 79 decoder = self.userid_type_decoders.get(userid_type) 80 if decoder: 81 userid = decoder(userid) 82 83 environ['REMOTE_USER_TOKENS'] = tokens 84 environ['REMOTE_USER_DATA'] = user_data 85 environ['AUTH_TYPE'] = 'cookie' 86 87 identity = {} 88 identity['timestamp'] = timestamp 89 identity['repoze.who.userid'] = userid 90 identity['tokens'] = tokens 91 identity['userdata'] = user_data 92 return identity
93
94 - def _get_cookies(self, environ, value, max_age=None):
95 if max_age is not None: 96 later = _now() + datetime.timedelta(seconds=int(max_age)) 97 # Wdy, DD-Mon-YY HH:MM:SS GMT 98 expires = later.strftime('%a, %d %b %Y %H:%M:%S') 99 # the Expires header is *required* at least for IE7 (IE7 does 100 # not respect Max-Age) 101 max_age = "; Max-Age=%s; Expires=%s" % (max_age, expires) 102 else: 103 max_age = '' 104 105 cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME')) 106 wild_domain = '.' + cur_domain 107 cookies = [ 108 ('Set-Cookie', '%s="%s"; Path=/%s' % ( 109 self.cookie_name, value, max_age)), 110 ('Set-Cookie', '%s="%s"; Path=/; Domain=%s%s' % ( 111 self.cookie_name, value, cur_domain, max_age)), 112 ('Set-Cookie', '%s="%s"; Path=/; Domain=%s%s' % ( 113 self.cookie_name, value, wild_domain, max_age)) 114 ] 115 return cookies
116 117 # IIdentifier
118 - def forget(self, environ, identity):
119 # return a set of expires Set-Cookie headers 120 return self._get_cookies(environ, 'INVALID', 0)
121 122 # IIdentifier
123 - def remember(self, environ, identity):
124 if self.include_ip: 125 remote_addr = environ['REMOTE_ADDR'] 126 else: 127 remote_addr = '0.0.0.0' 128 129 cookies = get_cookies(environ) 130 old_cookie = cookies.get(self.cookie_name) 131 existing = cookies.get(self.cookie_name) 132 old_cookie_value = getattr(existing, 'value', None) 133 max_age = identity.get('max_age', None) 134 135 timestamp, userid, tokens, userdata = None, '', '', '' 136 137 if old_cookie_value: 138 try: 139 timestamp,userid,tokens,userdata = auth_tkt.parse_ticket( 140 self.secret, old_cookie_value, remote_addr) 141 except auth_tkt.BadTicket: 142 pass 143 144 who_userid = identity['repoze.who.userid'] 145 who_tokens = identity.get('tokens', '') 146 who_userdata = identity.get('userdata', '') 147 148 encoding_data = self.userid_type_encoders.get(type(who_userid)) 149 if encoding_data: 150 encoding, encoder = encoding_data 151 who_userid = encoder(who_userid) 152 who_userdata = 'userid_type:%s' % encoding 153 154 if not isinstance(tokens, basestring): 155 tokens = ','.join(tokens) 156 if not isinstance(who_tokens, basestring): 157 who_tokens = ','.join(who_tokens) 158 old_data = (userid, tokens, userdata) 159 new_data = (who_userid, who_tokens, who_userdata) 160 161 if old_data != new_data or (self.reissue_time and 162 ( (timestamp + self.reissue_time) < time.time() )): 163 ticket = auth_tkt.AuthTicket( 164 self.secret, 165 who_userid, 166 remote_addr, 167 tokens=who_tokens, 168 user_data=who_userdata, 169 cookie_name=self.cookie_name, 170 secure=self.secure) 171 new_cookie_value = ticket.cookie_value() 172 173 cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME')) 174 wild_domain = '.' + cur_domain 175 if old_cookie_value != new_cookie_value: 176 # return a set of Set-Cookie headers 177 return self._get_cookies(environ, new_cookie_value, max_age)
178
179 - def __repr__(self):
180 return '<%s %s>' % (self.__class__.__name__, 181 id(self)) #pragma NO COVERAGE
182
183 -def _bool(value):
184 if isinstance(value, basestring): 185 return value.lower() in ('yes', 'true', '1') 186 return value
187
188 -def make_plugin(secret=None, 189 secretfile=None, 190 cookie_name='auth_tkt', 191 secure=False, 192 include_ip=False, 193 timeout=None, 194 reissue_time=None, 195 userid_checker=None, 196 ):
197 from repoze.who.utils import resolveDotted 198 if (secret is None and secretfile is None): 199 raise ValueError("One of 'secret' or 'secretfile' must not be None.") 200 if (secret is not None and secretfile is not None): 201 raise ValueError("Specify only one of 'secret' or 'secretfile'.") 202 if secretfile: 203 secretfile = os.path.abspath(os.path.expanduser(secretfile)) 204 if not os.path.exists(secretfile): 205 raise ValueError("No such 'secretfile': %s" % secretfile) 206 secret = open(secretfile).read().strip() 207 if timeout: 208 timeout = int(timeout) 209 if reissue_time: 210 reissue_time = int(reissue_time) 211 if userid_checker is not None: 212 userid_checker = resolveDotted(userid_checker) 213 plugin = AuthTktCookiePlugin(secret, 214 cookie_name, 215 _bool(secure), 216 _bool(include_ip), 217 timeout, 218 reissue_time, 219 userid_checker, 220 ) 221 return plugin
222