| # $Id: p3.py,v 1.2 2003/11/18 19:04:03 phr Exp phr $  | 
|   | 
| # Simple p3 encryption "algorithm": it's just SHA used as a stream  | 
| # cipher in output feedback mode.  | 
|   | 
| # Author: Paul Rubin, Fort GNOX Cryptography, <phr-crypto at nightsong.com>.  | 
| # Algorithmic advice from David Wagner, Richard Parker, Bryan  | 
| # Olson, and Paul Crowley on sci.crypt is gratefully acknowledged.  | 
|   | 
| # Copyright 2002,2003 by Paul Rubin  | 
| # Copying license: same as Python 2.3 license  | 
|   | 
| # Please include this revision number in any bug reports: $Revision: 1.2 $.  | 
|   | 
| from string import join  | 
| from array import array  | 
| try:  | 
|     import hashlib as sha  | 
| except:  | 
|     import sha  | 
| from time import time  | 
|   | 
| class CryptError(Exception): pass  | 
| #def _hash(str): return sha.new(str).digest()  | 
| def _hash(str):  | 
|     print str  | 
|     return sha.new(name="sha",string=str).digest()  | 
|   | 
| _ivlen = 16  | 
| _maclen = 8  | 
| _state = _hash(`time()`)  | 
|   | 
| try:  | 
|     import os  | 
|     _pid = `os.getpid()`  | 
| except ImportError, AttributeError:  | 
|     _pid = ''  | 
|   | 
| def _expand_key(key, clen):  | 
|     blocks = (clen+19)/20  | 
|     xkey=[]  | 
|     seed=key  | 
|     for i in xrange(blocks):  | 
|         seed=_hash(key+seed)  | 
|         xkey.append(seed)  | 
|     j = join(xkey,'')  | 
|     return array ('L', j)  | 
|   | 
| def p3_encrypt(plain,key):  | 
|     global _state  | 
|     H = _hash  | 
|   | 
|     # change _state BEFORE using it to compute nonce, in case there's  | 
|     # a thread switch between computing the nonce and folding it into  | 
|     # the state.  This way if two threads compute a nonce from the  | 
|     # same data, they won't both get the same nonce.  (There's still  | 
|     # a small danger of a duplicate nonce--see below).  | 
|     _state = 'X'+_state  | 
|   | 
|     # Attempt to make nlist unique for each call, so we can get a  | 
|     # unique nonce.  It might be good to include a process ID or  | 
|     # something, but I don't know if that's portable between OS's.  | 
|     # Since is based partly on both the key and plaintext, in the  | 
|     # worst case (encrypting the same plaintext with the same key in  | 
|     # two separate Python instances at the same time), you might get  | 
|     # identical ciphertexts for the identical plaintexts, which would  | 
|     # be a security failure in some applications.  Be careful.  | 
|     nlist = [`time()`, _pid, _state, `len(plain)`,plain, key]  | 
|     nonce = H(join(nlist,','))[:_ivlen]  | 
|     _state = H('update2'+_state+nonce)  | 
|     k_enc, k_auth = H('enc'+key+nonce), H('auth'+key+nonce)  | 
|     n=len(plain)                        # cipher size not counting IV  | 
|   | 
|     stream = array('L', plain+'0000'[n&3:]) # pad to fill 32-bit words  | 
|     xkey = _expand_key(k_enc, n+4)  | 
|     for i in xrange(len(stream)):  | 
|         stream[i] = stream[i] ^ xkey[i]  | 
|     ct = nonce + stream.tostring()[:n]  | 
|     auth = _hmac(ct, k_auth)  | 
|     return ct + auth[:_maclen]  | 
|   | 
| def p3_decrypt(cipher,key):  | 
|     H = _hash  | 
|     n=len(cipher)-_ivlen-_maclen        # length of ciphertext  | 
|     if n < 0:  | 
|         raise CryptError, "invalid ciphertext"  | 
|     nonce,stream,auth = \  | 
|       cipher[:_ivlen], cipher[_ivlen:-_maclen]+'0000'[n&3:],cipher[-_maclen:]  | 
|     k_enc, k_auth = H('enc'+key+nonce), H('auth'+key+nonce)  | 
|     vauth = _hmac (cipher[:-_maclen], k_auth)[:_maclen]  | 
|     if auth != vauth:  | 
|         raise CryptError, "invalid key or ciphertext"  | 
|   | 
|     stream = array('L', stream)  | 
|     xkey = _expand_key (k_enc, n+4)  | 
|     for i in xrange (len(stream)):  | 
|         stream[i] = stream[i] ^ xkey[i]  | 
|     plain = stream.tostring()[:n]  | 
|     return plain  | 
|   | 
| # RFC 2104 HMAC message authentication code  | 
| # This implementation is faster than Python 2.2's hmac.py, and also works in  | 
| # old Python versions (at least as old as 1.5.2).  | 
| from string import translate  | 
| def _hmac_setup():  | 
|     global _ipad, _opad, _itrans, _otrans  | 
|     _itrans = array('B',[0]*256)  | 
|     _otrans = array('B',[0]*256)  | 
|     for i in xrange(256):  | 
|         _itrans[i] = i ^ 0x36  | 
|         _otrans[i] = i ^ 0x5c  | 
|     _itrans = _itrans.tostring()  | 
|     _otrans = _otrans.tostring()  | 
|   | 
|     _ipad = '\x36'*64  | 
|     _opad = '\x5c'*64  | 
|   | 
| def _hmac(msg, key):  | 
|     if len(key)>64:  | 
|         key=_hash(key).digest()  | 
|     ki = (translate(key,_itrans)+_ipad)[:64] # inner  | 
|     ko = (translate(key,_otrans)+_opad)[:64] # outer  | 
|     return _hash(ko+_hash(ki+msg))  | 
|   | 
| #  | 
| # benchmark and unit test  | 
| #  | 
|   | 
| def _time_p3(n=1000,len=20):  | 
|     plain="a"*len  | 
|     t=time()  | 
|     for i in xrange(n):  | 
|         p3_encrypt(plain,"abcdefgh")  | 
|     dt=time()-t  | 
|     print "plain p3:", n,len,dt,"sec =",n*len/dt,"bytes/sec"  | 
|   | 
| def _speed():  | 
|     _time_p3(len=5)  | 
|     _time_p3()  | 
|     _time_p3(len=200)  | 
|     _time_p3(len=2000,n=100)  | 
|   | 
| def _test():  | 
|     e=p3_encrypt  | 
|     d=p3_decrypt  | 
|   | 
|     plain="test plaintext"  | 
|     key = "test key"  | 
|     c1 = e(plain,key)  | 
|     c2 = e(plain,key)  | 
|     assert c1!=c2  | 
|     assert d(c2,key)==plain  | 
|     assert d(c1,key)==plain  | 
|     c3 = c2[:20]+chr(1+ord(c2[20]))+c2[21:] # change one ciphertext character  | 
|   | 
|     try:  | 
|         print d(c3,key)         # should throw exception  | 
|         print "auth verification failure"  | 
|     except CryptError:  | 
|         pass  | 
|   | 
|     try:  | 
|         print d(c2,'wrong key')         # should throw exception  | 
|         print "test failure"  | 
|     except CryptError:  | 
|         pass  | 
|   | 
| _hmac_setup()  | 
| #_test()  | 
| # _speed()                                # uncomment to run speed test |