自己动手写一个ORM

Published On 四月 01, 2016

category python | tags python ORM MySQL


前言

Model层最基本的功能是操作数据库,一般的框架(比如java web中的SSH)都具有ORM(对象关系映射)的功能,也就是不需要使用sql语句而是直接通过面向对象的方式来读写数据。为了简单起见,下面将要实现的model层只提供了底层的数据库接口以及简单的ORM功能,当然可以通过配置使用SQLAlchemy,使得它支持更全面的ORM功能。

使用mysql-connector-python

最流行的数据库是mysql,python操作mysql需要调用mysql的python驱动(MySQL Connector/Python),可以从这里下载安装。也可以使用pip install mysql-connector-python安装。先看几个使用该驱动的例子:

1. 建立连接

import mysql.connector
# 默认连接的是localhost:3307,use_unicode=True,让MySQL的DB-API始终返回Unicode
conn = mysql.connector.connect(user='root', password='', database='test',  use_unicode=True)

2. 创建表

所有的DDL语句都是通过游标来执行的

cursor = conn.cursor()
cursor.execute('create table user (id int unsigned primary key auto_increment, name varchar(20), birthday date)')

此时test数据库中就已经创建好了user表

3. 插入数据

DML语句也是通过游标来执行的,excute函数的第一个参数是要执行的语句,使用%s作为占位符,python驱动会自动将python的数据类型转换成mysql的类型并加上必要的引号,下面的日期会转化成'199308-09';参数通过list或tuple传进去

cursor.execute('insert into user (name,birthday) values (%s, %s)', ['Asoka',date(1993, 8, 9)])

同样的可以一次插入多条记录

cursor.executemany('insert into user (name,birthday) values (%s, %s)', [('Kitty',date(1993, 8, 10)),('Michael',date(1993,8,11))])

返回受影响的行数

cursor.rowcount

也可以使用dict作为参数

args={'name':'Mila','birth':date(1993,8,8)}
cursor.execute('insert into user (name,birthday) values (%(name)s, %(birth)s)', args)

可以通过cursor的lastrowid获得自增的id

cursor.lastrowid

执行修改后一定要执行commit方法,因为mysql默认使用的数据表是事务类型的InnoDB,而且mysql驱动的auto-commit是关闭的,此时user表中还没有这条记录,不信可以在mysql.exe中执行select * from user;结果为empty。

conn.commit()

MySQL的官网上给出了如下说明:

When you use a transactional storage engine such as InnoDB (the default in MySQL 5.5 and later), you must commit the data after a sequence of INSERT, DELETE, and UPDATE statements.

更新和删除操作同上。

4. 查询

cursor.execute('select * from user')

从cursor中读取查询结果有多种方式:

  • 取出游标指向的当前行,返回的结果是一个tuple

    cursor.fetchone()
    

    此时返回第一个结果,如果再次执行fetchOne,返回下一条结果

  • 还可以返回所有行(从游标当前位置开始),返回的结果是一个由tuple组成的list,每个tuple代表一条记录

    cursor.fetchall()
    

    如果要指定返回的行数可以调用fetchmany

  • 还可以对cursor直接迭代所有行

    for (id, name, birth) in cursor:
      print "%s(id:%s) was born on %s"%(name, id, birth)
    

注意:如果当前cursor没有读完就执行execute或close方法将会抛出异常: mysql.connector.errors.InternalError: Unread result found 避免这种情况的方法是创建CursorBuffered,这种cursor将查询结果一次性传输到客户端,而不是每次执行fetch的时候再进行数据传输

cursor = conn.cursor(buffered=True)

最后关闭游标

cursor.close()

5. 事务回滚

撤销当前事务所做的所有更改

cursor.execute('insert into user (name,birthday) values (%s, %s)', ['Anonymous',date(1993, 8, 9)])
conn.rollback()

6. 关闭连接

conn.close()

封装DB

由上可见,很多操作都是固定的,比如数据库建立连接,关闭连接,获得游标,关闭游标、提交等,如果再抽象一层,可以将这些固定的操作省去。我将实现一个mysql_db模块,该模块提供select、update和insert三个函数,当然调用这些函数还是需要sql语句的,这个模块带来的好处是我们只需要建立一个数据库的实例,就可以对他执行任何sql语句,不需要考虑其他的事情,而且它还支持事务处理。
先从需求入手,我希望提供一个数据库名就可以得到一个数据库实例,之后调用该实例的方法可以轻松的操作数据库,比如:

db = DbCtx('default')
db.update('insert into user (name,birthday) values (%s, %s)', 'Kitty', date(1993, 8, 10))

1. DbCtx

构造函数和析构函数如下:

class DbCtx:

    '''
    A database context which use lazy connection.
    '''

    def __init__(self, db_name):
        '''
        common args are user, password, database, host, port
        '''
        # paras holds the connection info
        if not db_name:
            db_name='default'
        self.params = DATABASES[db_name]
        # turn auto commit on
        self.params.update(autocommit=True)
        self._conn = None
        self.transactions = None

    def __del__(self):
        self.close()
        print 'connection closed'

DATABASES配置信息保存在config.py配置文件中,

# config database info
DATABASES = {
    'default': {
        'user': 'root',
        'password': '',
        'database': 'test'
    }
}

autocommit=True意味着每次执行update都及时生效,无需调用commit,mysql的驱动默认是False。
该类并非单实例类,因为数据库连接需要并发,所以在不同线程中创建该类的示例就建立了多个连接。
我们的构造函数并未创建连接,而是仅仅保存了配置信息。操作数据库是通过cursor完成的,所以第一次获取cursor时才真正连接数据库,这就是所谓的lazy connection,目的是节省资源,代码如下:

2. connect

@property
def connection(self):
    if self._conn is None:
        self._conn = mysql.connector.connect(**self.params)
    return self._conn

def cursor(self, buf=False):
    return self.connection.cursor(buffered=buf)

获取了游标,下面让我们来完善该类的功能

3. select

def select(self, sql, *args, **kw):
    '''
    The raw select method which return cursor directly
    if you specific the buffered to be False then the cursor must be closed after use
    '''
    buffered = kw.get('buffered', True)
    curs = self.cursor(buffered)
    curs.execute(sql, args)
    return curs

def select_dict(self, sql,*args,**kw):
    '''
    Return list of dict
    '''
    try:
        cursor=self.select(sql,*args)
        names = [x[0] for x in cursor.description]
        first=kw.get('first',False)
        if first:
            values = cursor.fetchone()
            if not values:
                return None
            return dict(zip(names, values))
        return [dict(zip(names, x)) for x in cursor.fetchall()]
    finally:
        if cursor:
            cursor.close()

第一个select返回的是游标对象,如果创建cursor时buffered参数为False,则意味着用户使用完查询的结果必须要手动关闭该游标,否则发起下一次查询将会报错,因为此时的cursor还在和数据库服务器建立者连接。如果使用了buffered cursor则可以不必关闭。
第二个select返回的是包含dict的list的查询结果,可以通过传入first=True来限定值获取第一个记录。

4. update

def update(self, sql, *args):
    '''
    For update and delete
    Return the row number affected
    '''
    try:
        cur = self.cursor()
        cur.execute(sql, args)
        return cur.rowcount
    finally:
        cur.close()

def insert(self,table,**kw):
    keys=kw.keys()
    values=kw.values()
    import pdb
    sql='INSERT INTO %s(%s) VALUES(%s)'%(table, ','.join(keys), ','.join('%s' for i in range(len(keys))))
    try:
        cur=self.cursor()
        cur.execute(sql,values)
        return cur.lastrowid
    finally:
        cur.close()

update返回受影响的行数,insert返回插入的记录的id,这个非常有用。 insert的第一个参数是表名,然后传入键值对。

5. close

def close(self):
    self.connection.close()
    self._conn = None

该方法不需要手动调用,在Dbctx实例销毁的时候会被自动调用。

6. transaction

class TransactionCtx:

'''
This class can be used by 'with' statement to execute transaction
'''

def __init__(self, db_ctx):
    self.db = db_ctx

def __enter__(self):
    logging.info('start transaction...')
    self.db.connection.start_transaction()

def __exit__(self, exctype, excvalue, traceback):
    if exctype is None:
        logging.info('commit transaction...')
        self.db.connection.commit()
        logging.info('commit ok.')
    else:
        self.db.connection.rollback()
        logging.warning('rollback ok.')

在事务开始之前,发送start_trasction命令,如果遇到异常,则调用rollback回滚。
使用很简单:

db = DbCtx()
with TransactionCtx(db):
    db.update(
        'insert into user (name,birthday) values (%s, %s)', 'Asoka', date.today())
    db.update(
        'insert into user (name,birthday) values (%s, %s)', 'Alice', date.today())

说明

为什么connection、cursor、close函数前面没有_?将这些函数直接暴露给用户是为了提供最大的灵活性,总会有些情况是必须要直接操作连接对象或游标对象才能完成的。

编写ORM

直接写sql的缺点是工作量大,而且容易出粗,尤其是查询结果转化成对象,或者将对象保存到数据库都很麻烦,于是ORM应运而生。 ORM(对象关系映射)能让我们直接调用对象的方法来操作数据库,而无须小心的构造每一条sql语句。 我将实现一个Model类,web应用中的实体类只要继承该类就和数据库中的表对应起来了。 先来看Model类的定义吧:

class Model(dict):
    __metaclass__ = ModelMetaclass

    def __init__(self, **kw):
        super(Model, self).__init__(**kw)

    def __getattr__(self, key):
        try:
            return self[key]
        except KeyError:
            raise AttributeError("'%s' instance has no attribute '%s'" % (self.__class__.__name__, key))

    def __setattr__(self, key, value):
        self[key]=value

Model类继承了dict,使得可以像dict构造函数一样初始化model实例,比如User(name='yxr',age=22) 通过__metaclass__指定了它的元类,一般元指的就是用来定义一种东西的东西。类似于装饰器用来增强函数的功能一样,元类可以修改类的行为。 ModelMetaclass的定义如下:

class ModelMetaclass(type):

    def __new__(cls, name, bases, attrs):
        # skip base Model class:
        if name == 'Model':
            return type.__new__(cls, name, bases, attrs)
        if not '__database__' in attrs:
            attrs['__database__'] = 'default'
        if not '__table__' in attrs:
            attrs['__table__'] = name.lower()
        attrs['db'] = DbCtx(attrs['__database__'])
        return type.__new__(cls, name, bases, attrs)

这个类有什么用呢?它在实体类中保存了有关数据库连接的信息。在创建一个实体类的时候,比如:

class User(Model):
    __table__='user'
    __database__='default'
    __fields__=('name', 'birthday')

通过以下三个参数控制ORM的对应关系:

  • __table__:指定了该实体对应的表明,默认为小写的实体名
  • __database__:指定了数据库名,默认为default
  • __fields__:必填,指定了该实体类所使用的字段

ModelMetaclass类会在User类中保存四个类变量,除了以上三还包括db变量,该变量就是该实体类的数据库连接上下文(DbCtx),但是并未真正建立连接。 在Model类中定义的方法将被所有实体类继承,让我们来丰富它的功能吧:

@classmethod
def get(cls, id):
    '''
    Fetch from database by primary key id
    '''
    d=cls.db.select_dict(
        'select * from %s where id=%s' % (cls.__table__, id), first=True)
    return cls(**d) if d else None

@classmethod
def filter(cls, where='', *args):
    '''
    Fetch a list of object according to the condition
    '''
    sql='select * from `%s` %s' % (cls.__table__,'where %s'%where if where else '')
    L=cls.db.select_dict(sql,*args)
    return [cls(**d) for d in L]

@classmethod
def filter_one(cls, where='', *args):
    '''
    Fetch the first object according to the condition
    '''
    L=cls.filter(where,*args)
    if len(L)>0:
        return L[0]
    return None

def save(self):
    '''
    Persisted to the database
    '''
    params={}
    for k in self.__fields__:
        if hasattr(self, k):
            params[k]=getattr(self, k)
    # update or insert
    if hasattr(self, 'id'):
        sql='UPDATE %s SET %s WHERE id=%s' % (self.__table__,
                                            ','.join(
                                                ['{}=%s'.format(key) for key in params.keys()]),
                                            self.id)
        self.db.update(sql, *params.values())
    else:
        self.id=self.db.insert(self.__table__, **params)

def remove(self):
    sql='DELETE FROM {} WHERE id=%s'.format(self.__table__)
    self.db.update(sql,self.id)
    self=None

每个方法都有注释,这里不再解释。


qq email facebook github
© 2024 - Xurui Yan. All rights reserved
Built using pelican