#!/usr/bin/python # -*- coding: GBK -*- #------------------------------------------------------------------------------- # #------------------------------------------------------------------------------- # import pymongo from pymongo.son_manipulator import SONManipulator import base64 import traceback from functools import wraps from time import (sleep) from Common import (CommFuncEx) #Ä£ÄâSQLµÄIDENT def seq(db, collectionName, fieldName, feed, increment): try: result = 0 collection = db['%s_seq'%collectionName] resultObj = collection.find_and_modify(query={'_id':fieldName}, update={'$inc':{'seq':increment}}, new=True) if resultObj: result = resultObj['seq'] else: resultObj = collection.find_and_modify(query={'_id':fieldName}, update={'$set':{'seq':feed}}, new=True, upsert=True) if resultObj: result = resultObj['seq'] else: return False, None except Exception, e: return False, None return True, result class ObjectIdRemover(SONManipulator): def transform_outgoing(self, son, collection): if '_id' in son: del son['_id'] return son class EncodeStringManipulator(SONManipulator): def __init__(self, encoding): self.encoding = encoding def transform_incoming(self, son, collection): def transform_value(value): if isinstance(value, dict): return transform_dict(value) elif isinstance(value, list): return [transform_value(v) for v in value] elif isinstance(value, basestring): result, value = CommFuncEx.EncodingToUnicode(self.encoding, value) return value return value def transform_dict(object): for (key, value) in object.items(): object[key] = transform_value(value) return object def transform_list(container): for item in container: transform_dict(item) return container if isinstance(son, dict): return transform_dict(son) elif isinstance(son, list): return transform_list(son) return son def transform_outgoing(self, son, collection): def transform_value(value): if isinstance(value, dict): return transform_dict(value) elif isinstance(value, list): return [transform_value(v) for v in value] elif isinstance(value, basestring): result, value =CommFuncEx.UnicodeToEncoding(self.encoding, value) return value return value def transform_dict(object): for (key, value) in object.items(): object[key] = transform_value(value) return object def transform_list(container): for item in container: transform_dict(item) return container if isinstance(son, dict): return transform_dict(son) elif isinstance(son, list): return transform_list(son) return son class Base64StringManipulator(SONManipulator): def transform_incoming(self, son, collection): def transform_value(value): if isinstance(value, dict): return transform_dict(value) elif isinstance(value, list): return [transform_value(v) for v in value] elif isinstance(value, basestring): return base64.b64encode(value) return value def transform_dict(object): for (key, value) in object.items(): object[key] = transform_value(value) return object def transform_list(container): for item in container: transform_dict(item) return container if isinstance(son, dict): return transform_dict(son) elif isinstance(son, list): return transform_list(son) return son def transform_outgoing(self, son, collection): def transform_value(value): if isinstance(value, dict): return transform_dict(value) elif isinstance(value, list): return [transform_value(v) for v in value] elif isinstance(value, basestring): return base64.b64decode(value) return value def transform_dict(object): for (key, value) in object.items(): object[key] = transform_value(value) return object def transform_list(container): for item in container: transform_dict(item) return container if isinstance(son, dict): return transform_dict(son) elif isinstance(son, list): return transform_list(son) return son #ÓÃÓÚÐÞÊÎDBControllerµÄÊý¾Ý¿â²Ù×÷º¯Êý #¶ÏÏß×Ô¶¯ÖØÊÔ def reconnect_decorator(func): @wraps(func) def wrapper(*args, **kwds): MAX_RECONNECT = 10 RECONNECT_INTERVAL = 0.1 failCnt = 0 while True: try: #È¥µôself return func(*args, **kwds) except pymongo.errors.AutoReconnect, e: failCnt += 1 sleep(RECONNECT_INTERVAL) if failCnt > MAX_RECONNECT: raise e return wrapper class DBController: def __init__(self, host, port, dbName, user, pwd, encoding): self.host = host self.port = port self.dbName = dbName self.user = user self.pwd = pwd self.connected = False self.con = None self.db = None self.lastError = None if encoding == 'base64': self.translator = Base64StringManipulator() else: self.translator = EncodeStringManipulator(encoding) self.initialize() def initialize(self): if not self.connected: if not self.doConnect(self.host, self.port): return False authResult = self.doAuthentication(self.dbName, self.user, self.pwd) if self.db: self.db.add_son_manipulator(ObjectIdRemover()) return authResult return True def doConnect(self, ip, port): try: self.con = pymongo.Connection(ip, port) except TypeError, typeError: raise except pymongo.errors.ConnectionFailure, failure: self.lastError = failure return False except Exception, e: self.lastError = e return False except: self.lastError = 'Unknown exception occur!' return False self.connected = True return True def doAuthentication(self, dbName, user, pwd): if not self.connected or not self.con: self.lastError = 'Not connected yet!' return False self.db = self.con[dbName] authDB = self.con['admin'] try: return authDB.authenticate(user, pwd) # return self.db.authenticate(user, pwd) except TypeError, typeError: self.lastError = typeError return False except Exception, e: self.lastError = e return False except: self.lastError = 'Unknown exception occur!' return False def find_one(self, colName, spec, filter = None): result, recList = self.find(colName, spec, filter, 1) if not result: return False, None for rec in recList: return True, rec return True, None @reconnect_decorator def find(self, colName, spec = None, filter = None, maxCnt = 0, sortBy = None): if not self.connected: if not self.initialize(): return False, [] result = False resultDictList = [] col = self.db[colName] if self.translator: spec = self.translator.transform_incoming(spec, None) try: resultCollection = col.find(spec, filter, limit = maxCnt, sort = sortBy) if self.translator: resultDictList = self.translator.transform_outgoing(list(resultCollection), None) else: resultDictList = list(resultCollection) return True, resultDictList except TypeError, typeError: self.lastError = typeError return result, resultDictList except pymongo.errors.OperationFailure, err: self.lastError = err return result, resultDictList except Exception, e: self.lastError = e return result, resultDictList except: self.lastError = 'Unknown exception occur!' return result, resultDictList @reconnect_decorator def insert(self, colName, doc_or_docs, isSafe = True): if not self.connected: if not self.initialize(): return False col = self.db[colName] if self.translator: doc_or_docs = self.translator.transform_incoming(doc_or_docs, None) try: col.insert(doc_or_docs, safe = isSafe) except pymongo.errors.OperationFailure, err: self.lastError = err return False except Exception, e: self.lastError = e return False except: self.lastError = 'Unknown exception occur!' return False return True @reconnect_decorator def update(self, colName, spec, doc, isUpsert = False, isSafe = True, isMulti = False): if not self.connected: if not self.initialize(): return False col = self.db[colName] #ÐèÒªÏȶÔdoc½øÐд¦Àí£¬µ«Óɲ»ÄÜ¿ªÆôcollection.updateµÄmanipulate,ÒòΪÄÇ»áÓ¦ÓÃËùÓд¦Àí if self.translator: spec = self.translator.transform_incoming(spec, None) doc = self.translator.transform_incoming(doc, None) try: col.update(spec, doc, upsert = isUpsert, safe = isSafe, multi = isMulti) except TypeError, typeError: self.lastError = typeError return False except pymongo.errors.OperationFailure, err: self.lastError = err return False except Exception, e: self.lastError = e return False except: self.lastError = 'Unknown exception occur!' return False return True @reconnect_decorator def save(self, colName, doc, isSafe = True): if not self.connected: if not self.initialize(): return False col = self.db[colName] if self.translator: doc = self.translator.transform_incoming(doc, None) try: col.save(doc, safe = isSafe) except TypeError, typeError: self.lastError = typeError return False except pymongo.errors.OperationFailure, err: self.lastError = err return False except Exception, e: self.lastError = e return False except: self.lastError = 'Unknown exception occur!' return False return True @reconnect_decorator def remove(self, colName, spec = None, isSafe = True): if not self.connected: if not self.initialize(): return False col = self.db[colName] if self.translator: spec = self.translator.transform_incoming(spec, None) try: col.remove(spec, safe = isSafe) except pymongo.errors.OperationFailure, err: self.lastError = err return False except Exception, e: self.lastError = e return False except: self.lastError = 'Unknown exception occur!' return False return True @reconnect_decorator def drop(self, colName): if not self.connected: if not self.initialize(): return False col = self.db[colName] try: col.drop() except TypeError, typeError: self.lastError = typeError return False except Exception, e: self.lastError = e return False except: self.lastError = 'Unknown exception occur!' return False return True @reconnect_decorator def count(self, colName): if not self.connected: if not self.initialize(): return False, 0 col = self.db[colName] try: cnt = col.count() except pymongo.errors.OperationFailure, err: self.lastError = err return False, 0 except Exception, e: self.lastError = e return False, 0 except: self.lastError = 'Unknown exception occur!' return False, 0 return True, cnt def test_seq(): con = pymongo.Connection() db = con.admin if not db.authenticate('sa', 'sa'): print 'auth failed!' return colName = 'tagSeqTest' fieldName = 'ID' db = con['test'] db.drop_collection(colName) db.drop_collection('%s_seq'%colName) result, ID = seq(db, colName, fieldName, 1, 1) assert (result and ID == 1) result, ID = seq(db, colName, fieldName, 1, 1) assert (result and ID == 2) def test_StringManipulator(): translator = Base64StringManipulator() son = [] result = translator.transform_incoming(son, None) assert (son == result) result = translator.transform_outgoing(son, None) assert (son == result) son = [{'a':1}] result = translator.transform_incoming(son, None) assert (son == result) result = translator.transform_outgoing(son, None) assert (son == result) son = [{'a':'a'}] result = translator.transform_incoming(son, None) assert (result and result == [{'a':base64.b64encode('a')}]) result = translator.transform_outgoing(result, None) assert (result and result == son) son = [{'a':[{'b':'b'}, {'c':'c'}]}] result = translator.transform_incoming(son, None) assert (result and result == [{'a':[{'b':base64.b64encode('b')}, {'c':base64.b64encode('c')}]}]) result = translator.transform_outgoing(result, None) assert (result and result == son) def test_DBController(): testColName = 'tagTestController' dbController = DBController('localhost', 27017, 'test', 'test', '1') result = dbController.drop(testColName) assert result result, cnt = dbController.count(testColName) assert (result and cnt == 0) doc = {'a':1} result = dbController.insert(testColName, doc) assert result result, recs = dbController.find(testColName) assert (result and len(recs) == 1) rec = recs[0] # del rec['_id'] # print 'rec = %s\r\ndoc = %s'%(rec, doc) assert (rec == doc) spec = {'a':1} updateDoc = {'a':2} updateDocWithModifier = {'$set':updateDoc} result = dbController.update(testColName, spec, updateDocWithModifier) assert result result, recs = dbController.find(testColName) assert (result and len(recs) == 1) rec = recs[0] del rec['_id'] # print 'rec = %s\r\nupdateDoc = %s'%(rec, updateDoc) assert (rec == updateDoc) result = dbController.remove(testColName) assert result result, recs = dbController.find(testColName) assert (result and recs == []) saveDoc = {'b':3} result = dbController.save(testColName, saveDoc) assert result result, recs = dbController.find(testColName) assert (result and len(recs) == 1) rec = recs[0] # del rec['_id'] assert (rec == saveDoc) def test(): test_seq() test_StringManipulator() test_DBController() print 'test ok!' if __name__ == '__main__': test()