数据库sqlite
sqlite3 数据库 2.0
import sqlite3
import re
from loguru import logger
""" sql封装2.0 更新时间:2025-04-04 """
class SQLDatabase:
def __init__(self, db_path, pragma="FULL"):
"""
pragma
OFF: 当设置为 OFF 时,SQLite 引擎不会等待数据实际写入磁盘;它只是将数据发送到操作系统,并立即继续执行。这种模式提供了最快的操作速度,因为减少了磁盘 I/O 操作。然而,这也意味着在系统崩溃或电源故障的情况下,最近的写操作可能会丢失,因为数据可能还没被实际写入磁盘。
NORMAL: 在 NORMAL 模式下,SQLite 会在关键时刻(如事务提交)等待数据写入磁盘,但不像 FULL 模式那样频繁。这提供了数据完整性和性能之间的平衡。
FULL: FULL 模式确保数据在继续之前写入磁盘,从而提供最高级别的数据完整性。在这种模式下,每次写操作都会同步到磁盘,但性能会因为增加的 I/O 操作而降低。
总的来说,PRAGMA synchronous = OFF; 是一个性能优化设置,它牺牲了一些数据完整性以换取更快的写操作速度。在不太关心数据丢失风险的场景中,比如临时数据库或缓存,这个设置可能是有益的。然而,在需要确保数据安全和完整性的应用中,使用 OFF 模式需要谨慎。
"""
self.db_path = db_path
self.pragma = pragma
self.conn = None
self.cursor = None
logger.info(f"数据库连接已建立,路径: {self.db_path}, 同步模式: {self.pragma}")
def __enter__(self):
"""进入上下文时自动连接数据库并创建游标"""
self.conn = sqlite3.connect(self.db_path, check_same_thread=True) # check_same_thread=True 限制单线程范围
self.conn.execute("PRAGMA synchronous = {}".format(self.pragma))
self.cursor = self.conn.cursor()
self.conn.create_function("REGEXP", 2, self.regexp)
logger.info("数据库上下文已进入")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文时自动提交事务并关闭连接"""
if self.conn:
if exc_type is None:
self.conn.commit() # 如果没有异常,提交事务
else:
self.conn.rollback() # 如果有异常,回滚事务
self.conn.close()
logger.info("数据库上下文已退出")
def regexp(self, expr, item):
reg = re.compile(expr)
return reg.search(item) is not None
def db_get(self, cmd=None, table='', columns='*', condition=None):
"""
查询函数:
cmd:sql脚本
table:表名
columns:返回的字段内容,默认为*查询字段全部
condition:条件str 名称='灯箱片'
"""
try:
if cmd:
self.cursor.execute(cmd)
else:
sql = f"SELECT {columns} FROM {table}"
if condition:
sql += f" WHERE {condition}"
self.cursor.execute(sql)
return self.cursor.fetchall()
except Exception as e:
logger.error(f"查询错误: {e} \n {sql}")
return []
def db_get_regexp(self, table, condition,regex,columns='*'):
"""
正则查询函数:
table:表名
columns:返回的字段内容,默认为*查询字段全部
condition:条件str , 查询的正则列
regex:正则表达式
"""
try:
sql = f"SELECT {columns} FROM {table} WHERE {condition} REGEXP '{regex}';"
self.cursor.execute(sql)
return self.cursor.fetchall()
except Exception as e:
logger.error(f"查询错误: {e} \n {sql}")
return []
def db_add(self, table, data):
"""
新增函数:
table:表名
data:新增字段dict格式
"""
keys = ', '.join(data.keys())
values = ', '.join(['?' for _ in data])
sql = f"INSERT INTO {table} ({keys}) VALUES ({values})"
try:
self.conn.execute(sql, tuple(data.values()))
except Exception as e:
logger.error(f"插入错误: {e} \n {sql}")
return []
def db_adds(self, table, data_list):
"""
批量新增函数:
table:表名
data_list:新增字段list格式,每个元素为dict
"""
keys = ', '.join(data_list[0].keys()) if data_list else ''
values = ', '.join(['?' for _ in data_list[0]]) if data_list else ''
sql = f"INSERT INTO {table} ({keys}) VALUES ({values})"
try:
self.conn.executemany(sql, [tuple(item.values()) for item in data_list])
except Exception as e:
logger.error(f"批量插入错误: {e} \n {sql}")
return []
def db_update(self, table, data, condition=False):
"""
更新函数:
table:表名
data:修改字段dict
condition:更新条件
"""
set_clause = ', '.join([f"{key} = ?" for key in data.keys()])
if condition:
sql = f"UPDATE {table} SET {set_clause} WHERE {condition}"
else:
sql = f"UPDATE {table} SET {set_clause}"
try:
self.conn.execute(sql, tuple(data.values()))
except Exception as e:
logger.error(f"更新错误: {e} \n {sql}")
return []
def db_del(self, table, condition):
"""
删除函数:
table:表名
condition:删除条件
"""
sql = f"DELETE FROM {table}" + (f" WHERE {condition}" if condition != '*' else '')
try:
self.conn.execute(sql)
except Exception as e:
logger.error(f"删除错误: {e} \n {sql}")
return []
def db_cmd(self, cmd):
"""
sql语句函数:
cmd:sql语法
"""
try:
self.cursor.execute(cmd)
return self.cursor
except Exception as e:
logger.error(f"sql错误: {e} \n {cmd}")
return []
if __name__ == "__main__":
# 示例:增删改查操作
with SQLDatabase("广告字.db") as db:
# row_data = {
# "名称": '123',
# "规格": '1',
# "归属": '',
# "物料单价": '',
# "物料单位": '',
# "合并": '',
# "计算方式": '',
# "图片": '',
# }
# 插入数据
# db.db_add("new_常规物料价格表", row_data)
# db.db_adds("new_常规物料价格表", [row_data,row_data])
# 更新数据
# db.db_update("new_常规物料价格表", {"物料单价": 0}, "合并 = '易诚:自定义- 单价0'")
# # 删除数据
# db.db_del("new_常规物料价格表", "合并 = '易诚:自定义- 单价0'")
db.db_del("new_常规物料价格表", "*")
sqlite3数据库1.1
import sqlite3
import re
""" sql封装1.1 更新时间:2024-10-03 """
class SQLDatabase:
def __init__(self, db_name, pragma='OFF'):
"""
pragma OFF: 当设置为 OFF 时,SQLite 引擎不会等待数据实际写入磁盘;它只是将数据发送到操作系统,并立即继续执行。这种模式提供了最快的操作速度,因为减少了磁盘 I/O 操作。然而,这也意味着在系统崩溃或电源故障的情况下,最近的写操作可能会丢失,因为数据可能还没被实际写入磁盘。
NORMAL: 在 NORMAL 模式下,SQLite 会在关键时刻(如事务提交)等待数据写入磁盘,但不像 FULL 模式那样频繁。这提供了数据完整性和性能之间的平衡。
FULL: FULL 模式确保数据在继续之前写入磁盘,从而提供最高级别的数据完整性。在这种模式下,每次写操作都会同步到磁盘,但性能会因为增加的 I/O 操作而降低。
总的来说,PRAGMA synchronous = OFF; 是一个性能优化设置,它牺牲了一些数据完整性以换取更快的写操作速度。在不太关心数据丢失风险的场景中,比如临时数据库或缓存,这个设置可能是有益的。然而,在需要确保数据安全和完整性的应用中,使用 OFF 模式需要谨慎。
"""
self.conn = sqlite3.connect(db_name, check_same_thread=False) # 默认的 check_same_thread=True 多线程访问回报错
self.conn.execute("PRAGMA synchronous = {}".format(pragma))
self.cursor = self.conn.cursor()
self.conn.create_function("REGEXP", 2, self.regexp)
def regexp(self, expr, item):
reg = re.compile(expr)
return reg.search(item) is not None
def query_regexp(self, table='', tiaojian='', re01=''):
"""
正则查询函数:
table:表名
tiaojian:条件 ,查询的正则列
re01:正则表达式
db.query_regexp(table='圣遗物权重表',tiaojian='角色名',re01='.*')
"""
sqls = self.cursor.execute("""SELECT * FROM {} WHERE {} REGEXP '{}';""".format(table,tiaojian,re01))
d = self.cursor.fetchall()
return d
def query(self, cmd=False, table='', columns=False, tiaojian=False):
"""
查询函数:
table:表名
colums:返回的字段内容,为空或者填写’*‘查询字段全部
tiaojian:条件
cmd:sql脚本
db.query(table='圣遗物权重表',columns='角色名,最近评分',tiaojian='元素属性="风"')
"""
with self.conn:
cursor = self.conn.cursor()
if cmd: # 如果cmd不是空
cursor.execute("{}".format(cmd))
else:
if not columns: # 如果columns为空
if tiaojian: # 如果tiaojiann不是空
cursor.execute("SELECT * FROM {} where {}".format(str(columns), str(table), str(tiaojian)))
else:
cursor.execute("SELECT * FROM {}".format(table))
else:
if tiaojian: # 如果tiaojiann不是空
cursor.execute("SELECT {} FROM {} where {}".format(str(columns), str(table), str(tiaojian)))
else:
cursor.execute("SELECT {} FROM {}".format(columns, table))
return cursor.fetchall()
def insert(self, table=False, data=None):
"""
新增函数:
table:表名
data:新增字段
db.insert('users', {'name': 'John', 'age': 25}) """
keys = ', '.join(data.keys())
values = ', '.join(['?' for _ in data.values()])
sql = f"INSERT INTO {table} ({keys}) VALUES ({values})"
with self.conn:
self.conn.execute(sql, tuple(data.values()))
def update(self, table, data, condition):
"""
更新函数:
table:表名
data:修改字段
condition:更新条件
db.update('cookies', {'网站':'qqqq'}, "名字 = 'cess'") """
set_clause = ', '.join([f"{key} = ?" for key in data.keys()])
sql = f"UPDATE {table} SET {set_clause} WHERE {condition}"
with self.conn:
self.conn.execute(sql, tuple(data.values()))
def delete(self, table, condition):
"""
更新函数:
table:表名
condition:删除条件
db.delete('cookies',"名字=5")
"""
if condition == '*':
sql = f"DELETE FROM {table}"
else:
sql = f"DELETE FROM {table} WHERE {condition}"
with self.conn:
self.conn.execute(sql)
def cmd(self, cmd=None):
"""
sql语句函数:
cmd:sql语法
"""
try:
with self.conn:
cursor = self.conn.cursor()
cursor.execute("{}".format(cmd))
return cursor.fetchall()
except Exception as e:
print(e)
return False
if __name__ == '__main__':
# 使用示例
db = SQLDatabase(r'E:\移动固态硬盘\03_tianzhenwuxie\python-bf\自动\下载\自动下载包子动漫\包子.db')
print(db.query(table='包子'))
# db.insert('users', {'name': 'John', 'age': 25})
# db.delete('cookies',"名字=5")
# print(db.query('cookies',['名字']))
# db.insert(table='cookies',data={"名字":'cess'})
# db.update('cookies', {'网站':'qqqq'}, "名字 = 'cess'") # db.delete('users', "name = 'John'")
mysql 数据库1.0
import pymysql
from loguru import logger
""" sql封装1.1 更新时间:2024-10-06 """
class SQLDatabase:
def __init__(self, db_name='ft_guanggaizi'):
self.conn = pymysql.connect(
host="sh-cynosdbmysql-grp-ijyo60u2.sql.tencentcdb.com",
port=27916,
user="root",
password="AGBdPreZQg4tF31j",
db=db_name,
charset="utf8mb4",
)
# self.cursor = self.conn.cursor()
def query(self, cmd=None, table="", columns="*", condition=None):
"""
查询函数:
table:表名
columns:返回的字段内容,默认为’*‘查询字段全部
condition:条件
cmd:sql脚本
"""
try:
with self.conn.cursor() as cursor:
if cmd:
cursor.execute(cmd)
else:
sql = f"SELECT {columns} FROM {table}"
if condition:
sql += f" WHERE {condition}"
cursor.execute(sql)
return cursor.fetchall()
except Exception as e:
logger.error(f"查询错误: {e}")
return []
def insert(self,table: str, data: dict) -> int:
try:
with self.conn.cursor() as cursor:
# 转义中文表名和字段名(关键步骤!)
escaped_table = f'`{table}`' # 用反引号包裹表名
escaped_columns = [f'`{k}`' for k in data.keys()] # 包裹字段名
# # 构建安全SQL
columns = ", ".join(escaped_columns)
placeholders = ", ".join(["%s"] * len(data))
sql = f"INSERT INTO {escaped_table} ({columns}) VALUES ({placeholders})"
# # 执行参数化查询
cursor.execute(sql, tuple(data.values()))
self.conn.commit() # 显式提交事务
# return cursor.lastrowid
except Exception as e:
logger.error(f"插入失败: {e}")
logger.error(sql)
self.conn.rollback() # 回滚事务
return -1
def update(self, table, data, condition=False):
"""
更新函数:
table:表名
data:修改字段
condition:更新条件
"""
set_clause = ", ".join([f"{key} = %s" for key in data.keys()])
if condition:
sql = f"UPDATE {table} SET {set_clause} WHERE {condition}"
else:
sql = f"UPDATE {table} SET {set_clause}"
try:
with self.conn.cursor() as cursor:
cursor.execute(sql, tuple(data.values()))
self.conn.commit()
except Exception as e:
logger.error(f"插入失败: {e}")
logger.error(sql)
self.conn.rollback()
def delete(self, table, condition='*'):
"""
删除函数:
table:表名
condition:删除条件
"""
sql = f"DELETE FROM {table}" + (f" WHERE {condition}" if condition != "*" else "")
try:
with self.conn.cursor() as cursor:
cursor.execute(sql)
self.conn.commit()
except Exception as e:
print(f"删除错误: {e}")
def cmd(self, cmd,params=None):
"""
sql语句函数:
cmd:sql语法
"""
try:
with self.conn.cursor() as cursor:
cursor.execute(cmd, params)
# 判断是否为查询语句(是否有结果集)
if cursor.description:
res = cursor.fetchall()
else:
res = [] # 非查询语句返回空列表,或返回rowcount
self.conn.commit() # 写操作后立即提交
return res
except Exception as e:
print(f"命令执行错误: {e}")
self.conn.rollback() # 异常时回滚事务
return []
if __name__ == "__main__":
# 使用示例
db = SQLDatabase(r"E:\移动固态硬盘\03_tianzhenwuxie\python-bf\自动\下载\自动下载包子动漫\包子.db")
print(db.query(table="包子"))
sqllite
import sqlite3
# 1.硬盘上创建连接
con = sqlite3.connect('D:/Python/StudyPython/Jupyter/数据库编程/sqlitedb/first.db')
# 获取cursor对象
cur = con.cursor()
# 执行sql创建表
sql = '''CREATE TABLE 表名字
(名字 TEXT,
网站 TEXT,
cookies TEXT);'''
try:
cur.execute(sql)
except Exception as e:
print(e)
print('创建表失败')
finally:
# 关闭游标
cur.close()
# 关闭连接
con.close()
创建一个关系表
语法:
CREATE TABLE 表名
(
列名1 数据类型 “约束”(有可无)
列名2 数据类型 “约束”(有可无)
列名3 数据类型 “约束”(有可无)
...
);//注意,SQL语句要以分号结尾
数据类型:
integer(size) 仅容纳整数,size规定数字的最大位数
int(size)
int
decimal(size,d);容纳带小数的数字size 规定带数字的最大位数,d规定小数点右侧的最大位数
real 实型
char(size) 固定长度的字符串,size规定字符串的长度
varchar(size) 可变长度的字符串,size规定字符串的最大字符数
text 文本
null 空
date 日期
2020/09/02
2020-09-02
blob:二进制
...
约束:
not null 不为空
unique 唯一
primary key 主键 (主键渐必须是唯一且不为空)
default 默认
eg: default "China"
check 用于限制列中值的范围
eg: check score >= 0 and score <= 100
添加数据
语法:
INSERT INTO 表名 VALUES(值1,值2,值3...);//给所有的列插入数据有多少列就要填多少值。
INSERT INTO 表名(列名1,列名2,....) VALUES (值1,值2,... );//给指定列插入数据
INSERT INTO 表名(列名1,列名2,....) VALUES (?,?, ...);//给先用?号,之后在传元祖
例如:
sql='insert into t_person(pname,age) values(?,?)'
cur.executemany(sql,[('小李',23),('小花',34),('小明',28)])
更新表数据
UPDATE 表名 set 列名 = 新值,...;
UPDATE 表名 set 列名 = 新值,...WHERE 条件;//条件用来定位要修改的某些行
条件:
列名 运算符 值
运算符: = != > >= <=
如果有多个条件,可以用and和or连接
eg:
update stu set age = 21;
update stu set age = 30 where num = 1001;
update stu set age = 30 where num > 60 and num < 999;
获取所有数据
语法:
SELECT 列名1,...FROM 表名;
SELECT *FROM 表名;//查询整个表的数据
SELECT *FROM 表名 where 条件;//查询整个表的数据
执行完成后要获取结果集,才能有结果
sql='select * from t_person'
cur.execute(sql)
#获取结果集
# fetchall 获取方法
person_all=cur.fetchall()
# person_all=cur.fetchone() 可以获取结果集中的第一条
# print(person_all)
for person in person_all:
print(person)
删除表、数据
语法: DROP TABLE 表名;
DELETE FROM 表名;//删除表中所有的数据
DELETE FROM 表名 WHERE 条件;//删除满足条件数据
eg:
delete from stu;
delete from stu where num = 1003;
例如:
sql='delete from t_person where pno=?'
#执行sql
try:
cur.execute(sql, (2,))
#提交事务
con.commit()
查询表、数据是否存在
drop table if exists table_name
create table if not exists table_name (column1, column2, …, columnN)
列出所以的表,行数
import sqlite3
con = sqlite3.connect('mydatabase.db')
def sql_fetch(con):
cursorObj = con.cursor()
cursorObj.execute('SELECT name from sqlite_master where type= "table"')
print(cursorObj.fetchall())
sql_fetch(con)
# 列出表内数据行数
cursorObj.execute('SELECT * FROM employees')
rows = cursorObj.fetchall()
print len (rows)
高级查找
模糊查询
列名 like 通配符 (_%)
_: 匹配单个字符
&:匹配任意单个字符
eg:
select *from stu where name like 'wang_';
select *from stu where name like '%wang%'
排序:
按指定的列名进行排序
order by 列名 asc/desc;//asc:升序 desc:降序
eg:
select *from stu order by age desc;//根据年龄降序排序
限制记录条数
limit 数字
eg:
select *from stu limit 2;//查看表中的前两条记录
排序+限制
eg:
select *from stu order by age desc limit 1;//查看年龄最大的记录
聚合函数
cout(列*名) //统计记录数量
sum(列名) //计算给定列的总和
avg(列名) //计算给定列的平均值
min(列名) //计算给定列的最小值
max(列名) //计算给定列*的最大值
eg:
select cout(*) from stu;
select max(age)-min(age) from stu;
别名
eg:
select num AS 学号,name AS 姓名 from stu;
分组:
group by
eg:
select age,cout(*) from stu group by age;//查询每个年龄的人数
范围查询:
between ... and ...
eg:
select *from stu where age between 10 and 20;
去重:
distinct
eg:
select distinct age from stu;
子查询:
in(值,...)
in(select ...)
eg:
select *from stu where age in (18,20,25);
select *from stu where age in (select age from stu where age < 30);
★★★★★★多表联接
select 列名,... 表1,表2,...where 条件
select 列名,... form 表1 join 表2 on 联接条件 ... where 过滤条件;
eg:
学生表:
学号 姓名 年龄 班级 学校
...
create table t_stu(num int primary key, name text,age int,class int school int);
intsrt into t_stu values(1001,'wb1',18,100101,2001);
insert into t_stu values(1002,'wb2',19,100101,2001);
insert into t_stu values(1003.'wb3',20,100101,2002);
学校表:
编号 校名 电话 地址
...
create table t_school(num int primary key,name text, tel text,addr text);
insert into t_school values(2001,'first',010-123456,'changsha');
insert into t_school values(2002,'second',010-0101011,'shanghai');
需求:
查询学生的学号,姓名,学校名
select t_stu.num,t_stu.name,t_school,name from t_stu,t_school);
===》学生表中的每一行都与学校表中的每一行进行联结,结果称之为 笛卡尔积
此时,需要指明两个表联结条件,在上述表中,联结条件是 学校的编号相同
两种联结写法:
第一种: select t_stu,num, t_stu.name,t_school.name from t_stu,t_school where t_stu.school = t_school.num;
第二种:select t_stu.num,t_stu.name,t_school.name from t_stu join t_school on t_stu.school = t_school.num;
select t_stu.num,t_stu.name,t_school.name from t_stu join t_school on t_stu.school = t_school.num where t_stu.num = 1002;
参考网址
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 程序员小航
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果