2022年 11月 7日

Python 操作 MySQL 数据库的三个模块

python使用MySQL主要有两个模块,pymysql(MySQLdb)和SQLAchemy。

  • pymysql(MySQLdb)为原生模块,直接执行sql语句,其中pymysql模块支持python 2和python3,MySQLdb只支持python2,两者使用起来几乎一样。

  • SQLAchemy为一个ORM框架,将数据对象转换成SQL,然后使用数据API执行SQL并获取执行结果

  • 另外DBUtils模块提供了一个数据库连接池,方便多线程场景中python操作数据库。

1.pymysql模块

安装:pip install pymysql

创建表格操作(注意中文格式设置)

  1. #coding:utf-8
  2. import pymysql
  3. #关于中文问题
  4. #1. mysql命令行创建数据库,设置编码为gbk:create databse demo2 character set utf8; 
  5. #2. python代码中连接时设置charset="gbk"
  6. #3. 创建表格时设置default charset=utf8
  7. #连接数据库
  8. conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306)  #和mysql服务端设置格式一样(还可设置为gbk, gb2312)
  9. #创建游标
  10. cursor = conn.cursor()
  11. #执行sql语句
  12. cursor.execute("""create table if not exists t_sales(
  13.                 id int primary key auto_increment not null,
  14.                  nickName varchar(128) not null,
  15.                  color varchar(128) not null,
  16.                   size varchar(128) not null, 
  17.                   comment text not null,
  18.                   saledate varchar(128) not null)engine=InnoDB default charset=utf8;""")
  19.                   
  20. # cursor.execute("""insert into t_sales(nickName,color,size,comment,saledate) 
  21.                 # values('%s','%s','%s','%s','%s');""" % ("zack", "黑色", "L", "大小合适", "2019-04-20"))
  22.                 
  23. cursor.execute("""insert into t_sales(nickName,color,size,comment,saledate) 
  24.                 values(%s,%s,%s,%s,%s);""" , ("zack""黑色""L""大小合适""2019-04-20"))
  25. #提交
  26. conn.commit()
  27. #关闭游标
  28. cursor.close()
  29. #关闭连接
  30. conn.close()

增删改查:

注意execute执行sql语句参数的两种情况:

  • execute("insert into t_sales(nickName, size) values('%s','%s');" % ("zack","L") )  #此时的%s为字符窜拼接占位符,需要引号加’%s’  (有sql注入风险)

  • execute("insert into t_sales(nickName, size) values(%s,%s);" , ("zack","L") ) #此时的%s为sql语句占位符,不需要引号%s

  1. #***************************增删改查******************************************************
  2. conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306)  #和mysql服务端设置格式一样(还可设置为gbk, gb2312)
  3. #创建游标
  4. cursor = conn.cursor()
  5. insert_sql = "insert into t_sales(nickName,color,size,comment,saledate) values(%s,%s,%s,%s,%s);"
  6. #返回受影响的行数
  7. row1 = cursor.execute(insert_sql,("Bob""黑色""XL""便宜实惠""2019-04-20"))
  8. update_sql = "update t_sales set color='白色' where id=%s;"
  9. #返回受影响的行数
  10. row2 = cursor.execute(update_sql,(1,))
  11. select_sql = "select * from t_sales where id>%s;"
  12. #返回受影响的行数
  13. row3 = cursor.execute(select_sql,(1,))
  14. delete_sql = "delete from t_sales where id=%s;"
  15. #返回受影响的行数
  16. row4 = cursor.execute(delete_sql,(4,))
  17. #提交,不然无法保存新建或者修改的数据(增删改得提交)
  18. conn.commit()
  19. cursor.close()
  20. conn.close()

批量插入和自增id

  1. #***************************批量插入******************************************************
  2. conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306)  #和mysql服务端设置格式一样(还可设置为gbk, gb2312)
  3. #创建游标
  4. cursor = conn.cursor()
  5. insert_sql = "insert into t_sales(nickName,color,size,comment,saledate) values(%s,%s,%s,%s,%s);"
  6. data = [("Bob""黑色""XL""便宜实惠""2019-04-20"),("Ted""黄色""M""便宜实惠""2019-04-20"),("Gary""黑色""M""穿着舒服""2019-04-20")]
  7. row1 = cursor.executemany(insert_sql, data)
  8. conn.commit()
  9. #为插入的第一条数据的id,即插入的为5,6,7,new_id=5
  10. new_id = cursor.lastrowid
  11. print(new_id)
  12. cursor.close()
  13. conn.close()

获取查询数据

  1. #***************************获取查找sql的查询数据******************************************************
  2. conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306)  #和mysql服务端设置格式一样(还可设置为gbk, gb2312)
  3. #创建游标
  4. cursor = conn.cursor()
  5. select_sql = "select id,nickname,size from t_sales where id>%s;"
  6. cursor.execute(select_sql, (3,))
  7. row1 = cursor.fetchone()      #获取第一条数据,获取后游标会向下移动一行
  8. row_n = cursor.fetchmany(3)  #获取前n条数据,获取后游标会向下移动n行
  9. row_all = cursor.fetchall()  #获取所有数据,获取后游标会向下移动到末尾
  10. print(row1)
  11. print(row_n)
  12. print(row_all)
  13. #conn.commit()
  14. cursor.close()
  15. conn.close()

注:在fetch数据时按照顺序进行,可以使用cursor.scroll(num,mode)来移动游标位置,如:

  • cursor.scroll(1,mode=’relative’)  # 相对当前位置移动

  • cursor.scroll(2,mode=’absolute’) # 相对绝对位置移动

fetch获取数据类型

fetch获取的数据默认为元组格式,还可以获取字典类型的,如下:

  1. #***************************获取字典格式数据******************************************************
  2. conn = pymysql.connect(host="localhost", user="root", passwd="", db='learningsql', charset='utf8', port=3306)  #和mysql服务端设置格式一样(还可设置为gbk, gb2312)
  3. #创建游标
  4. cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
  5. select_sql = "select id,nickname,size from t_sales where id>%s;"
  6. cursor.execute(select_sql, (3,))
  7. row1 = cursor.fetchall()      
  8. print(row1)
  9. conn.commit()
  10. cursor.close()
  11. conn.close()

2.SQLAlchmy框架

SQLAlchemy的整体架构如下,建立在第三方的DB API上,将类和对象操作转换为数据库sql,然后利用DB API执sql语句得到结果。其适用于多种数据库。另外其内部实现了数据库连接池,方便进行多线程操作。

  • Engine,框架的引擎

  • Connection Pooling ,数据库连接池

  • Dialect(http://docs.sqlalchemy.org/en/latest/dialects/index.html),选择连接数据库的DB API种类,(pymysql,mysqldb等)“

  • Schema/Types,架构和类型

  • SQL Exprression Language,SQL表达式语言

  • DB API(https://www.python.org/dev/peps/pep-0249/):Python Database API Specification

2.1 执行原生sql

安装:pip install sqlalchemy

SQLAlchmy也可以不利用ORM,使用数据库连接池,类似pymysql模块执行原生sql

  1. #coding:utf-8
  2. from sqlalchemy import create_engine
  3. from sqlalchemy.ext.declarative import declarative_base
  4. from sqlalchemy import Column, String, Integer
  5. import threading
  6. engine = create_engine(
  7.             "mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",
  8.             max_overflow = 0,  #超过连接池大小外最多创建的连接,为0表示超过5个连接后,其他连接请求会阻塞 (默认为10)
  9.             pool_size = 5,    #连接池大小(默认为5)
  10.             pool_timeout = 30,  #连接线程池中,没有连接时最多等待的时间,不设置无连接时直接报错 (默认为30)
  11.             pool_recycle = -1)  #多久之后对线程池中的线程进行一次连接的回收(重置) (默认为-1)
  12.             
  13. # def task():
  14.     # conn= engine.raw_connection() #建立原生连接,和pymysql的连接一样
  15.     # cur = conn.cursor()
  16.     # cur.execute("select * from t_sales where id>%s",(2,))
  17.     # result = cur.fetchone()
  18.     # cur.close()
  19.     # conn.close()
  20.     # print(result)
  21.     
  22. # def task():
  23.     # conn = engine.contextual_connect() #建立上下文管理器连接,自动打开和关闭
  24.     # with conn:
  25.         # cur = conn.execute("select * from t_sales where id>%s",(2,))
  26.         # result = cur.fetchone()
  27.     # print(result)
  28.     
  29.     
  30. def task():
  31.     cur =engine.execute("select * from t_sales where id>%s",(2,))  #engine直接执行
  32.     result = cur.fetchone()
  33.     cur.close()
  34.     print(result)
  35. if __name__=="__main__":
  36.     for i in range(10):
  37.         t = threading.Thread(target=task)
  38.         t.start()

2.2 执行ORM语句

A. 创建和删除表

  1. #coding:utf-8
  2. import datetime
  3. from sqlalchemy import create_engine
  4. from sqlalchemy.ext.declarative import declarative_base
  5. from sqlalchemy import Column, String, Integer, DateTime, Text
  6. Base = declarative_base()
  7. class User(Base):
  8.     __tablename__="users"
  9.     id = Column(Integer,primary_key=True)
  10.     name = Column(String(32),index=True, nullable=False#创建索引,不为空
  11.     email = Column(String(32),unique=True)
  12.     ctime = Column(DateTime, default = datetime.datetime.now)  #传入方法名datetime.datetime.now
  13.     extra = Column(Text,nullable=True)  
  14.     
  15.     __table_args__ = {
  16.     
  17.         # UniqueConstraint('id', 'name', name='uix_id_name'), #设置联合唯一约束
  18.         # Index('ix_id_name', 'name', 'email'),               # 创建索引
  19.     }
  20. def create_tbs():
  21.     engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",max_overflow=2,pool_size=5)
  22.     Base.metadata.create_all(engine)   #创建所有定义的表
  23. def drop_dbs():
  24.     engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",max_overflow=2,pool_size=5)
  25.     Base.metadata.drop_all(engine)   #删除所有创建的表
  26. if __name__=="__main__":
  27.     create_tbs() #创建表
  28.     #drop_dbs()   #删除表
  29.     

B.表中定义外键关系(一对多,多对多)

思考:下面代码中的一对多关系,relationship 定义在了 customer 表中,应该定义在 PurchaseOrder 更合理?

注意:mysql 数据库中避免使用 order做为表的名字,order 为一个 mysql 关键字,做为表名字时必须用反引号order (键盘数字1旁边的符号)

  1. #coding:utf-8
  2. from sqlalchemy import create_engine
  3. from sqlalchemy.ext.declarative import declarative_base
  4. from sqlalchemy import Column,Integer,String,Text,DateTime,ForeignKey,Float
  5. from sqlalchemy.orm import relationship
  6. import datetime
  7. engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8")  #数据库有密码时,//root:12345678@127.0.0.1:3306/
  8. Base = declarative_base()
  9. class Customer(Base):
  10.     __tablename__="customer"  #数据库中保存的表名字
  11.     
  12.     id = Column(Integer,primary_key=True)
  13.     name = Column(String(64),index=True,nullable=False)
  14.     phone = Column(String(16),nullable=False)
  15.     address = Column(String(256),nullable=False)
  16.     purchase_order_id = Column(Integer,ForeignKey("purchase_order.id"))  #关键关系,关联表的__tablename__="purchase_order"
  17.     
  18.     # 和建立表结构无关,方便外键关系查询,backref反向查询时使用order_obj.customer
  19.     purchase_order = relationship("PurchaseOrder",backref="customer")
  20.     
  21.     
  22. class PurchaseOrder(Base):  
  23.     __tablename__ = "purchase_order"   #mysql数据库中表的名字避免使用order,order为一个关键字,使用时必须用反引号`order` (键盘数字1旁边的符号)
  24.     id=Column(Integer,primary_key=True)
  25.     cost = Column(Float,nullable=True)
  26.     ctime = Column(DateTime,default =datetime.datetime.now)
  27.     desc = Column(String(528))
  28.     
  29.     #多对多关系时,secondary为中间表
  30.     product = relationship("Product",secondary="order_to_product",backref="purchase_order")
  31.     
  32. class Product(Base):
  33.     __tablename__ = "product"
  34.     id = Column(Integer,primary_key=True)
  35.     name = Column(String(256))
  36.     price = Column(Float,nullable=False)
  37.     
  38. class OrdertoProduct(Base):
  39.     __tablename__ = "order_to_product"
  40.     id = Column(Integer,primary_key=True)
  41.     product_id = Column(Integer,ForeignKey("product.id"))
  42.     purchase_order_id  = Column(Integer,ForeignKey("purchase_order.id"))
  43.     
  44. if __name__ == "__main__":
  45.     Base.metadata.create_all(engine)
  46.     #Base.metadata.drop_all(engine)

C.增删改查操作

增删改查

  1. #coding:utf-8
  2. from sqlalchemy import create_engine
  3. from sqlalchemy.ext.declarative import declarative_base
  4. from sqlalchemy import Column,Integer,String,Text,DateTime,ForeignKey,Float
  5. from sqlalchemy.orm import relationship,sessionmaker
  6. from sqlalchemy.sql import text
  7. import datetime
  8. engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8")  #数据库有密码时,//root:12345678@127.0.0.1:3306/, 设置utf8防止中文乱码
  9. Base = declarative_base()
  10. class Customer(Base):
  11.     __tablename__="customer"  #数据库中保存的表名字
  12.     
  13.     id = Column(Integer,primary_key=True)
  14.     name = Column(String(64),index=True,nullable=False)
  15.     phone = Column(String(16),nullable=False)
  16.     address = Column(String(256),nullable=False)
  17.     purchase_order_id = Column(Integer,ForeignKey("purchase_order.id"))  #关键关系,关联表的__tablename__="purchase_order"
  18.     
  19.     # 和建立表结构无关,方便外键关系查询,backref反向查询时使用order_obj.customer
  20.     purchase_order = relationship("PurchaseOrder",backref="customer")
  21.     
  22.     
  23. class PurchaseOrder(Base):  
  24.     __tablename__ = "purchase_order"   #mysql数据库中表的名字避免使用order,order为一个关键字,使用时必须用反引号`order` (键盘数字1旁边的符号)
  25.     id=Column(Integer,primary_key=True)
  26.     cost = Column(Float,nullable=True)
  27.     ctime = Column(DateTime,default =datetime.datetime.now)
  28.     desc = Column(String(528))
  29.     
  30.     #多对多关系时,secondary为中间表
  31.     product = relationship("Product",secondary="order_to_product",backref="purchase_order")
  32.     
  33. class Product(Base):
  34.     __tablename__ = "product"
  35.     id = Column(Integer,primary_key=True)
  36.     name = Column(String(256))
  37.     price = Column(Float,nullable=False)
  38.     
  39. class OrdertoProduct(Base):
  40.     __tablename__ = "order_to_product"
  41.     id = Column(Integer,primary_key=True)
  42.     product_id = Column(Integer,ForeignKey("product.id"))
  43.     purchase_order_id  = Column(Integer,ForeignKey("purchase_order.id"))
  44. if __name__ == "__main__":
  45.     #Base.metadata.create_all(engine)
  46.     #Base.metadata.drop_all(engine)
  47.     
  48.     Session = sessionmaker(bind=engine)
  49.     #每次进行数据库操作时都要创建session
  50.     session = Session()
  51.     #*****************增加数据********************
  52.     # pur_order = PurchaseOrder(cost=19.7,desc="python编程之路")
  53.     # session.add(pur_order)
  54.     # session.add_all(
  55.                 # [PurchaseOrder(cost=39.7,desc="linux操作系统"),
  56.                 # PurchaseOrder(cost=59.6,desc="python cookbook")])
  57.     # session.commit()
  58.     
  59.     #*****************修改数据********************
  60.     
  61.     #session.query(PurchaseOrder).filter(PurchaseOrder.id>2).update({"cost":29.7})
  62.     #session.query(PurchaseOrder).filter(PurchaseOrder.id==2).update({"cost":PurchaseOrder.cost+40.1},synchronize_session=False)  #synchronize_session用于query在进行delete or update操作时,对session的同步策略。
  63.     #session.commit()
  64.     
  65.     #*****************删除数据********************
  66.     #session.query(PurchaseOrder).filter(PurchaseOrder.id==1).delete()
  67.     #session.commit()
  68.     
  69.     #*****************查询数据********************
  70.     #ret = session.query(PurchaseOrder).all()
  71.     # ret = session.query(PurchaseOrder).filter(PurchaseOrder.id==2).all()  #包含对象的列表
  72.     # ret = session.query(PurchaseOrder).filter(PurchaseOrder.id==2).first() #单个对象
  73.     # ret = session.query(PurchaseOrder).filter_by(id=2).all()  #通过列名字的表达式
  74.     # ret = session.query(PurchaseOrder).filter_by(id=2).first()
  75.     #ret = session.query(PurchaseOrder).filter(text("id<:value and cost>:price")).params(value=6,price=15).order_by(PurchaseOrder.cost).all()
  76.     #ret = session.query(PurchaseOrder).from_statement(text("SELECT * FROM purchase_order WHERE cost>:price")).params(price=40).all()
  77.     # print ret
  78.     # for i in ret:
  79.         # print i.id, i.cost, i.ctime,i.desc
  80.         
  81.     #ret2 = session.query(PurchaseOrder.id,PurchaseOrder.cost.label('totalcost')).all()  #只查询两列,ret2为列表
  82.     #print ret2
  83.     
  84.     #关闭session
  85.     session.close()

查询语句

  1. # 条件
  2. ret = session.query(Users).filter_by(name='alex').all()
  3. ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
  4. ret = session.query(Users).filter(Users.id.between(13), Users.name == 'eric').all()
  5. ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
  6. ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
  7. ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
  8. from sqlalchemy import and_, or_
  9. ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
  10. ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
  11. ret = session.query(Users).filter(
  12.     or_(
  13.         Users.id < 2,
  14.         and_(Users.name == 'eric', Users.id > 3),
  15.         Users.extra != ""
  16.     )).all()
  17. # 通配符
  18. ret = session.query(Users).filter(Users.name.like('e%')).all()
  19. ret = session.query(Users).filter(~Users.name.like('e%')).all()
  20. # 限制
  21. ret = session.query(Users)[1:2]
  22. # 排序
  23. ret = session.query(Users).order_by(Users.name.desc()).all()
  24. ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
  25. # 分组
  26. from sqlalchemy.sql import func
  27. ret = session.query(Users).group_by(Users.extra).all()
  28. ret = session.query(
  29.     func.max(Users.id),
  30.     func.sum(Users.id),
  31.     func.min(Users.id)).group_by(Users.name).all()
  32. ret = session.query(
  33.     func.max(Users.id),
  34.     func.sum(Users.id),
  35.     func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
  36. # 连表
  37. ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
  38. ret = session.query(Person).join(Favor).all()
  39. ret = session.query(Person).join(Favor, isouter=True).all()
  40. # 组合
  41. q1 = session.query(Users.name).filter(Users.id > 2)
  42. q2 = session.query(Favor.caption).filter(Favor.nid < 2)
  43. ret = q1.union(q2).all()
  44. q1 = session.query(Users.name).filter(Users.id > 2)
  45. q2 = session.query(Favor.caption).filter(Favor.nid < 2)
  46. ret = q1.union_all(q2).all()

补充

  1. #coding:utf-8
  2. from sqlalchemy import create_engine
  3. from sqlalchemy.orm import sessionmaker
  4. from sqlalchemy.sql import text, func
  5. from sqlalchemy_orm2 import PurchaseOrder  #导入定义的PurchaseOrder表格类
  6. engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8")
  7. Session = sessionmaker(bind=engine)
  8. session = Session()
  9. #查询
  10. ret = session.execute("select * from purchase_order where id=:value",params={"value":3})
  11. print ret
  12. for i in ret:
  13.     print i.id, i.cost, i.ctime,i.desc
  14. #插入
  15. purchase_order  = PurchaseOrder.__table__  #拿到PurchaseOrder表格对象
  16. ret=session.execute(purchase_order.insert(),
  17.                 [{"cost":46.3,"desc":'python2'},
  18.                 {"cost":43.3,"desc":'python3'}])
  19. session.commit()
  20. print(ret.lastrowid)
  21. session.close()
  22. # 关联子查询
  23. subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
  24. result = session.query(Group.name, subqry)
  25. """
  26. SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid 
  27. FROM server 
  28. WHERE server.id = `group`.id) AS anon_1 
  29. FROM `group`
  30. """

D.多线程操作

  1. #coding:utf-8
  2. from sqlalchemy import create_engine
  3. from sqlalchemy.orm import sessionmaker
  4. from sqlalchemy_orm2 import Product
  5. from threading import Thread
  6. engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8",max_overflow=0,pool_size=5)
  7. Session = sessionmaker(bind=engine)
  8. def task(name,price):
  9.     session = Session()
  10.     pro = Product(name=name,price=price)
  11.     session.add(pro)
  12.     session.commit()
  13.     session.close()
  14. if __name__=="__main__":
  15.     for i in range(6):
  16.         t = Thread(target=task,args=("pro"+str(i),i*5))
  17.         t.start()
  18.     

E. 通过relationship操纵一对多和多对多关系

一对多

  1. #coding:utf-8
  2. from sqlalchemy import create_engine
  3. from sqlalchemy.orm import sessionmaker
  4. from sqlalchemy.sql import text, func
  5. from sqlalchemy_orm2 import PurchaseOrder,Product,OrdertoProduct,Customer  #导入定义的表格类
  6. engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8")
  7. Session = sessionmaker(bind=engine)
  8. session = Session()
  9. # #通过定义的关键关系添加(id值)
  10. # cus1 = Customer(name="zack",phone="13567682333",address="Nanjing",purchase_order_id=3)
  11. # session.add(cus1)
  12. # #通过relationship正向添加
  13. # cus2 = Customer(name="zack2",phone="13567682333",address="Nanjing",purchase_order=PurchaseOrder(cost=53,desc="java"))
  14. # session.add(cus2)
  15. # session.commit()
  16. #通过relationship反向添加
  17. # purchase_order=PurchaseOrder(cost=53,desc="php")
  18. # cus3 = Customer(name="zack3",phone="13567682333",address="Nanjing")
  19. # cus4 = Customer(name="zack4",phone="13567682333",address="Nanjing")
  20. # purchase_order.customer=[cus3,cus4]  #cus3,cus4的purchase_order_id都是purchase_order.id值,即同时添加了两组外键关系
  21. # session.add(purchase_order)
  22. # session.commit()
  23. ##通过relationship正向查询
  24. cus = session.query(Customer).first()
  25. print(cus.purchase_order_id)
  26. print(cus.purchase_order.desc)
  27. #通过relationship反向查询
  28. pur = session.query(PurchaseOrder).filter(PurchaseOrder.id==3).first()
  29. print(pur.desc)
  30. print(pur.customer) #返回一个list

多对多

  1. #coding:utf-8
  2. from sqlalchemy import create_engine
  3. from sqlalchemy.orm import sessionmaker
  4. from sqlalchemy.sql import text, func
  5. from sqlalchemy_orm2 import PurchaseOrder,Product,OrdertoProduct,Customer  #导入定义的表格类
  6. engine = create_engine("mysql+pymysql://root@127.0.0.1:3306/learningsql?charset=utf8")
  7. Session = sessionmaker(bind=engine)
  8. session = Session()
  9. # session.add_all([Product(name="java",price=24),
  10.                 # Product(name="python",price=34),
  11.                 # Product(name="php",price=27)])
  12. # session.commit()
  13. # #通过定义的关键关系添加(id值)
  14. # op = OrdertoProduct(product_id=1,purchase_order_id=16)
  15. # session.add(op)
  16. # session.commit()
  17. # #通过relationship添加
  18. # pur = PurchaseOrder(cost=27,desc="xxxx")
  19. # pur.product = [Product(name="C++",price=60),]  #正向
  20. # session.add(pur)
  21. # pro = Product(name="C",price=40)
  22. # pro.purchase_order=[PurchaseOrder(cost=27,desc="xxxx"),]  #反向
  23. # session.add(pro)
  24. # session.commit()
  25. #通过relationship正向查询
  26. pur = session.query(PurchaseOrder).filter(PurchaseOrder.id==19).first()
  27. print(pur.desc)
  28. print(pur.product) #结果为列表
  29. #通过relationship反向查询
  30. pro = session.query(Product).filter(Product.id==5).first()
  31. print(pro.name)
  32. print(pro.purchase_order) #结果为列表
  33. session.close()

3.数据库连接池

对于ORM框架,其内部维护了链接池,可以直接通过多线程操控数据库。对于pymysql模块,通过多线程操控数据库容易出错,得加锁串行执行。进行并发时,可以利用DBUtils模块来维护数据库连接池。

3.1 多线程操控pymysql

不采用DBUtils连接池时, pymysql多线程代码如下:

每个线程创建链接

  1. import pymysql
  2. import threadind
  3. #**************************无连接池*******************************    
  4. #每个线程都要创立一次连接,线程并发操作间可能有问题?
  5. def func():
  6.     conn = pymysql.connect(host="127.0.0.1",port=3306,db="learningsql",user="root",passwd="",charset="utf8")
  7.     cursor = conn.cursor()
  8.     cursor.execute("select * from user where nid>1;")
  9.     result = cursor.fetchone()
  10.     print(result)
  11.     cursor.close()
  12.     conn.close()
  13.     
  14. if __name__=="__main__":
  15.     for i in range(5):
  16.         t = threading.Thread(target=func,name="thread-%s"%i)
  17.         t.start()

一个连接串行执行

  1. #**************************无连接池*******************************
  2. #创建一个连接,加锁串行执行
  3. from threading import Lock
  4. import pymysql
  5. import threading
  6. conn = pymysql.connect(host="127.0.0.1",port=3306,db="learningsql",user="root",passwd="",charset="utf8")    
  7. lock = Lock()    
  8. def func():
  9.     with lock:
  10.         cursor = conn.cursor()
  11.         cursor.execute("select * from user where nid>1;")
  12.         result = cursor.fetchone()
  13.         print(result)
  14.         cursor.close()
  15.         
  16.         #conn.close()不能在线程中关闭连接,否则其他线程不可用了
  17.         
  18. if __name__=="__main__":
  19.     threads = []
  20.     for i in range(5):
  21.         t = threading.Thread(target=func,name="thread-%s"%i)
  22.         threads.append(t)
  23.         t.start()
  24.         
  25.     for t in threads:
  26.         t.join()
  27.     
  28.     conn.close()

3.2 DBUtils连接池

DBUtils连接池有两种连接模式:PersistentDB和PooledDB

官网文档:https://cito.github.io/DBUtils/UsersGuide.html

模式一(DBUtils.PersistentDB):

为每个线程创建一个连接,线程即使调用了close方法,也不会关闭,只是把连接重新放到连接池,供自己线程再次使用。当线程终止时,连接自动关闭。

PersistentDB使用代码如下:

  1. #coding:utf-8
  2. from DBUtils.PersistentDB import PersistentDB
  3. import pymysql
  4. import threading
  5. pool = PersistentDB(
  6.     creator = pymysql,  # 使用链接数据库的模块
  7.     maxusage = None,    # 一个链接最多被重复使用的次数,None表示无限制
  8.     setsession=[],     # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
  9.     ping = 0,           # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
  10.     closeable = False,    # 如果为False时, conn.close() 实际上被忽略,供下次使用,再线程关闭时,才会自动关闭链接。如果为True时, conn.close()则关闭链接,那么再次调用pool.connection时就会报错,因为已经真的关闭了连接(pool.steady_connection()可以获取一个新的链接)
  11.     threadlocal = None,    # 本线程独享值得对象,用于保存链接对象,如果链接对象被重置
  12.     host="127.0.0.1",
  13.     port = 3306,
  14.     user = "root",
  15.     password="",
  16.     database="learningsql",
  17.     charset = "utf8"
  18. )
  19. def func():
  20.     conn = pool.connection()
  21.     cursor = conn.cursor()
  22.     cursor.execute("select * from user where nid>1;")
  23.     result = cursor.fetchone()
  24.     print(result)
  25.     cursor.close()
  26.     conn.close()
  27.     
  28. if __name__ == "__main__":
  29.     for i in range(5):
  30.         t = threading.Thread(target=func,name="thread-%s"%i)
  31.         t.start()

模式二(DBUtils.PooledDB):

创建一批连接到连接池,供所有线程共享使用。

(由于pymysql、MySQLdb等threadsafety值为1,所以该模式连接池中的线程会被所有线程共享。)

PooledDB使用代码如下:

  1. from DBUtils.PooledDB import PooledDB
  2. import pymysql
  3. import threading
  4. import time
  5. pool = PooledDB(
  6.     creator = pymysql,  # 使用链接数据库的模块
  7.     maxconnections = 6,  # 连接池允许的最大连接数,0和None表示不限制连接数
  8.     mincached = 2,   # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
  9.     maxcached = 5,   # 链接池中最多闲置的链接,0和None不限制
  10.     maxshared = 3,   # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
  11.     blocking = True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
  12.     maxusage = None,   # 一个链接最多被重复使用的次数,None表示无限制
  13.     setsession = [],   # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
  14.     ping = 0,           # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
  15.     host="127.0.0.1",
  16.     port = 3306,
  17.     user="root",
  18.     password="",
  19.     database = "learningsql",
  20.     charset = "utf8"
  21. )
  22. def func():
  23.     conn = pool.connection()
  24.     cursor = conn.cursor()
  25.     cursor.execute("select * from user where nid>1;")
  26.     result = cursor.fetchone()
  27.     print(result)
  28.     time.sleep(5)  #为了查看mysql端的线程数量
  29.     cursor.close()
  30.     conn.close()
  31.     
  32. if __name__=="__main__":
  33.     for i in range(5):
  34.         t = threading.Thread(target=func,name="thread-%s"%i)
  35.         t.start()

上述代码中加入了sleep(5)使线程连接数据库时间延长,方便查看mysql数据库连接线程情况,下图分别为代码执行中和执行后的线程连接情况,可以发现,代码执行时,同时有6个线程连接上了数据库(有一个为mysql命令客户端),代码执行后,只有一个线程连接数据库,但仍有5个线程等待连接。

(show status like “Threads%” 查看线程连接情况)