VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > temp > python入门教程 >
  • 参悟python元类(又称metaclass)系列实战(四)

创建全局连接池, 以求避免频繁地打开和关闭数据库连接

  • 因为查询耗时, 计划引入异步, 即async/await(不懂没关系, 就当是普通代码, 只是调用方式特别而已)

    $ pip3 install aiomysql    # 通过pip安装
    
  • 创建Mysql类, 定义静态方法createPool

    import aiomysql
    
    class Mysql:
        @staticmethod
        async def createPool():
            '''连接池由全局变量__pool存储,缺省情况下将编码设置为utf8,自动提交事务'''
            try:
                global __pool
                __pool = await aiomysql.create_pool(
                    host='localhost',        # mysql服务器地址
                    port=3306,               # mysql监听端口
                    user='root',             # mysql用户名
                    password='passwd',       # mysql密码
                    db='ormdemo',            # mysql实例名(database)
                    charset='utf8',          # 编码设置
                    autocommit=True,         # 自动提交insert/update/delete
                    maxsize=10,              # 连接池上限
                    minsize=1                # 连接池下限
                )
                print('创建成功')
            except Exception as e:
                print(f'连接 mysql 出错:{e}')
    
    # 测试代码
    if __name__ == '__main__':
        import asyncio
        loop = asyncio.get_event_loop()
        loop.run_until_complete(Mysql.createPool())
    
  • Mysql类新增静态方法selectexecute

    @staticmethod
    async def select(sql, args, size=None):
        '''传入SQL语句和SQL参数
        @sql :  sql语句
        @args:  条件值列表
        @size:  查询的结果集大小
        '''
        print(f'{sql} ==> {args}')
        # 使用with上下文管理器, 自动执行close
        async with __pool.acquire() as conn:
            # acquire: 从空闲池获得连接。如果需要,并且池的大小小于maxsize,则创建新连接。
            async with conn.cursor(aiomysql.DictCursor) as cur:
                # cursor: 连接的游标
                # DictCursor: 作为字典返回结果的游标
                await cur.execute(sql.replace('?', '%s'), args or ())
                # sql.replace('?', '%s'): 'SELECT * FROM users WHERE uid=?'.replace('?', '%s')
                # args or (): 'SELECT * FROM users WHERE uid=%s' % (101,)
                if size:
                    rs = await cur.fetchmany(size)
                else:
                    rs = await cur.fetchall()
            print(f'rows returned: {len(rs)}')
            return rs
    
    
    @staticmethod
    async def execute(sql, args, autocommit=True):
        '''
        INSERT、UPDATE、DELETE操作,可以通用的execute函数,
        因为这3种SQL的执行都需要相同的参数, 返回一个整数表示影响的行数
        '''
        print(f'{sql} ==> {args}')
        async with __pool.acquire() as conn:
            if not autocommit:
                await conn.begin()           # 开始处理
            try:
                async with conn.cursor(aiomysql.DictCursor) as cur:
                    await cur.execute(sql.replace('?', '%s'), args)
                    affected = cur.rowcount  # 受影响的行数
                if not autocommit:
                    await conn.commit()      # 提交更改, 非自动提交时才需要
            except Exception as e:
                if not autocommit:
                    await conn.rollback()    # 回滚更改
                raise Exception(e)
            return affected
    

测试下select

if __name__ == '__main__':
    import asyncio
    loop = asyncio.get_event_loop()
    loop.run_until_complete(Mysql.createPool())
    rs = loop.run_until_complete(Mysql.select('select uid, name from users where uid=?', [101]))
    print(rs)
  • 控制台输出
创建成功
select * from users where uid=? ==> [101]
rows returned: 1
[{'uid': 101, 'name': 'z417'}]

总结

  1. 定义了Mysql类, 其中有3个静态方法

  2. 引入了异步, 调用方式共分两步
     

    出处:https://www.cnblogs.com/z417/p/13993100.html


相关教程