sqlite3 数据库 2.0

import sqlite3
import re
from loguru import logger

"""  sql封装2.0  更新时间:2025-04-04  """  
class SQLDatabase:
    def __init__(self, db_path, pragma="FULL"):
        """
        pragma
        OFF: 当设置为 OFF 时,SQLite 引擎不会等待数据实际写入磁盘;它只是将数据发送到操作系统,并立即继续执行。这种模式提供了最快的操作速度,因为减少了磁盘 I/O 操作。然而,这也意味着在系统崩溃或电源故障的情况下,最近的写操作可能会丢失,因为数据可能还没被实际写入磁盘。
        NORMAL: 在 NORMAL 模式下,SQLite 会在关键时刻(如事务提交)等待数据写入磁盘,但不像 FULL 模式那样频繁。这提供了数据完整性和性能之间的平衡。
        FULL: FULL 模式确保数据在继续之前写入磁盘,从而提供最高级别的数据完整性。在这种模式下,每次写操作都会同步到磁盘,但性能会因为增加的 I/O 操作而降低。
        总的来说,PRAGMA synchronous = OFF; 是一个性能优化设置,它牺牲了一些数据完整性以换取更快的写操作速度。在不太关心数据丢失风险的场景中,比如临时数据库或缓存,这个设置可能是有益的。然而,在需要确保数据安全和完整性的应用中,使用 OFF 模式需要谨慎。
        """
        self.db_path = db_path
        self.pragma = pragma
        self.conn = None
        self.cursor = None
        logger.info(f"数据库连接已建立,路径: {self.db_path}, 同步模式: {self.pragma}")


    def __enter__(self):
        """进入上下文时自动连接数据库并创建游标"""
        self.conn = sqlite3.connect(self.db_path, check_same_thread=True)  #  check_same_thread=True 限制单线程范围
        self.conn.execute("PRAGMA synchronous = {}".format(self.pragma))
        self.cursor = self.conn.cursor()
        self.conn.create_function("REGEXP", 2, self.regexp)
        logger.info("数据库上下文已进入")
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """退出上下文时自动提交事务并关闭连接"""
        if self.conn:
            if exc_type is None:
                self.conn.commit()  # 如果没有异常,提交事务
            else:
                self.conn.rollback()  # 如果有异常,回滚事务
            self.conn.close()
        logger.info("数据库上下文已退出")

    def regexp(self, expr, item):  
        reg = re.compile(expr)  
        return reg.search(item) is not None  

    def db_get(self, cmd=None, table='', columns='*', condition=None):
        """  
        查询函数:  
        cmd:sql脚本  
        table:表名  
        columns:返回的字段内容,默认为*查询字段全部  
        condition:条件str  名称='灯箱片'
        """
        try:
            if cmd:  
                self.cursor.execute(cmd)  
            else:  
                sql = f"SELECT {columns} FROM {table}"
                if condition:
                    sql += f" WHERE {condition}"
                self.cursor.execute(sql)  
            return self.cursor.fetchall()  
        except Exception as e:
            logger.error(f"查询错误: {e} \n {sql}")
            return []
    def db_get_regexp(self,  table, condition,regex,columns='*'):  
        """  
        正则查询函数:  
        table:表名  
        columns:返回的字段内容,默认为*查询字段全部  
        condition:条件str , 查询的正则列  
        regex:正则表达式  
        """
        try:
            sql = f"SELECT {columns} FROM {table} WHERE {condition} REGEXP '{regex}';"
            self.cursor.execute(sql)  
            return self.cursor.fetchall()  
        except Exception as e:
            logger.error(f"查询错误: {e} \n {sql}")
            return []

    def db_add(self, table, data):  
        """  
        新增函数:  
        table:表名  
        data:新增字段dict格式
        """
        keys = ', '.join(data.keys())  
        values = ', '.join(['?' for _ in data])  
        sql = f"INSERT INTO {table} ({keys}) VALUES ({values})"  
        try:
            self.conn.execute(sql, tuple(data.values()))  
        except Exception as e:
            logger.error(f"插入错误: {e} \n {sql}")
            return []

    def db_adds(self, table, data_list):
        """  
        批量新增函数:  
        table:表名  
        data_list:新增字段list格式,每个元素为dict
        """
        keys = ', '.join(data_list[0].keys()) if data_list else ''
        values = ', '.join(['?' for _ in data_list[0]]) if data_list else ''
        sql = f"INSERT INTO {table} ({keys}) VALUES ({values})"  
        try:
            self.conn.executemany(sql, [tuple(item.values()) for item in data_list])
        except Exception as e:
            logger.error(f"批量插入错误: {e} \n {sql}")
            return []

    def db_update(self, table, data, condition=False):
        """  
        更新函数:  
        table:表名  
        data:修改字段dict
        condition:更新条件  
        """
        set_clause = ', '.join([f"{key} = ?" for key in data.keys()])  
        if condition:
            sql = f"UPDATE {table} SET {set_clause} WHERE {condition}"  
        else:
            sql = f"UPDATE {table} SET {set_clause}"  
        try:
            self.conn.execute(sql, tuple(data.values()))  
        except Exception as e:
            logger.error(f"更新错误: {e} \n {sql}")
            return []

    def db_del(self, table, condition):  
        """  
        删除函数:  
        table:表名  
        condition:删除条件  
        """
        sql = f"DELETE FROM {table}" + (f" WHERE {condition}" if condition != '*' else '')  
        try:
            self.conn.execute(sql)  
        except Exception as e:
            logger.error(f"删除错误: {e} \n {sql}")
            return []

    def db_cmd(self, cmd):  
        """  
        sql语句函数:  
        cmd:sql语法  
        """
        try:
            self.cursor.execute(cmd)  
            return self.cursor
        except Exception as e:
            logger.error(f"sql错误: {e} \n {cmd}")
            return []


if __name__ == "__main__":

    # 示例:增删改查操作
    with SQLDatabase("广告字.db") as db:
        # row_data = {
        #         "名称": '123',
        #         "规格": '1',
        #         "归属": '',
        #         "物料单价": '',
        #         "物料单位": '',
        #         "合并": '',
        #         "计算方式": '',
        #         "图片": '',
        #     }
        # 插入数据
        # db.db_add("new_常规物料价格表", row_data)
        # db.db_adds("new_常规物料价格表", [row_data,row_data])


        # 更新数据
        # db.db_update("new_常规物料价格表", {"物料单价": 0}, "合并 = '易诚:自定义-  单价0'")

        # # 删除数据
        # db.db_del("new_常规物料价格表", "合并 = '易诚:自定义-  单价0'")
        db.db_del("new_常规物料价格表", "*")

sqlite3数据库1.1

import sqlite3  
import re  
  
"""  sql封装1.1  更新时间:2024-10-03  """  
  
  
class SQLDatabase:  
    def __init__(self, db_name, pragma='OFF'):  
        """  
        pragma        OFF: 当设置为 OFF 时,SQLite 引擎不会等待数据实际写入磁盘;它只是将数据发送到操作系统,并立即继续执行。这种模式提供了最快的操作速度,因为减少了磁盘 I/O 操作。然而,这也意味着在系统崩溃或电源故障的情况下,最近的写操作可能会丢失,因为数据可能还没被实际写入磁盘。  
        NORMAL: 在 NORMAL 模式下,SQLite 会在关键时刻(如事务提交)等待数据写入磁盘,但不像 FULL 模式那样频繁。这提供了数据完整性和性能之间的平衡。  
        FULL: FULL 模式确保数据在继续之前写入磁盘,从而提供最高级别的数据完整性。在这种模式下,每次写操作都会同步到磁盘,但性能会因为增加的 I/O 操作而降低。  
        总的来说,PRAGMA synchronous = OFF; 是一个性能优化设置,它牺牲了一些数据完整性以换取更快的写操作速度。在不太关心数据丢失风险的场景中,比如临时数据库或缓存,这个设置可能是有益的。然而,在需要确保数据安全和完整性的应用中,使用 OFF 模式需要谨慎。  
        """
        self.conn = sqlite3.connect(db_name, check_same_thread=False)  # 默认的 check_same_thread=True  多线程访问回报错  
        self.conn.execute("PRAGMA synchronous = {}".format(pragma))  
        self.cursor = self.conn.cursor()  
        self.conn.create_function("REGEXP", 2, self.regexp)  
  
    def regexp(self, expr, item):  
        reg = re.compile(expr)  
        return reg.search(item) is not None  
  
    def query_regexp(self, table='', tiaojian='', re01=''):  
        """  
        正则查询函数:  
        table:表名  
        tiaojian:条件  ,查询的正则列  
        re01:正则表达式  
        db.query_regexp(table='圣遗物权重表',tiaojian='角色名',re01='.*')  
        """
        sqls = self.cursor.execute("""SELECT * FROM {} WHERE {} REGEXP '{}';""".format(table,tiaojian,re01))  
        d = self.cursor.fetchall()  
        return d  
  
    def query(self, cmd=False, table='', columns=False, tiaojian=False):  
        """  
        查询函数:  
        table:表名  
        colums:返回的字段内容,为空或者填写’*‘查询字段全部  
        tiaojian:条件  
        cmd:sql脚本  
        db.query(table='圣遗物权重表',columns='角色名,最近评分',tiaojian='元素属性="风"')  
        """
        with self.conn:  
            cursor = self.conn.cursor()  
            if cmd:  # 如果cmd不是空  
                cursor.execute("{}".format(cmd))  
            else:  
                if not columns:  # 如果columns为空  
                    if tiaojian:  # 如果tiaojiann不是空  
                        cursor.execute("SELECT * FROM {} where {}".format(str(columns), str(table), str(tiaojian)))  
                    else:  
                        cursor.execute("SELECT * FROM {}".format(table))  
                else:  
                    if tiaojian:  # 如果tiaojiann不是空  
                        cursor.execute("SELECT {} FROM {} where {}".format(str(columns), str(table), str(tiaojian)))  
                    else:  
                        cursor.execute("SELECT {} FROM {}".format(columns, table))  
            return cursor.fetchall()  
  
    def insert(self, table=False, data=None):  
        """  
        新增函数:  
        table:表名  
        data:新增字段  
        db.insert('users', {'name': 'John', 'age': 25})        """
        keys = ', '.join(data.keys())  
        values = ', '.join(['?' for _ in data.values()])  
        sql = f"INSERT INTO {table} ({keys}) VALUES ({values})"  
        with self.conn:  
            self.conn.execute(sql, tuple(data.values()))  
  
    def update(self, table, data, condition):  
        """  
        更新函数:  
        table:表名  
        data:修改字段  
        condition:更新条件  
        db.update('cookies', {'网站':'qqqq'}, "名字 = 'cess'")        """
        set_clause = ', '.join([f"{key} = ?" for key in data.keys()])  
        sql = f"UPDATE {table} SET {set_clause} WHERE {condition}"  
        with self.conn:  
            self.conn.execute(sql, tuple(data.values()))  
  
    def delete(self, table, condition):  
        """  
        更新函数:  
        table:表名  
        condition:删除条件  
        db.delete('cookies',"名字=5")  
        """
        if condition == '*':  
            sql = f"DELETE FROM {table}"  
        else:  
            sql = f"DELETE FROM {table} WHERE {condition}"  
        with self.conn:  
            self.conn.execute(sql)  
  
    def cmd(self, cmd=None):  
        """  
        sql语句函数:  
        cmd:sql语法  
        """
        try:
            with self.conn:  
                cursor = self.conn.cursor()  
                cursor.execute("{}".format(cmd))  
            return cursor.fetchall() 
        except Exception as e:
            print(e)
            return False  
  
  
if __name__ == '__main__':  
    # 使用示例  
    db = SQLDatabase(r'E:\移动固态硬盘\03_tianzhenwuxie\python-bf\自动\下载\自动下载包子动漫\包子.db')  
    print(db.query(table='包子'))  
    # db.insert('users', {'name': 'John', 'age': 25})  
    # db.delete('cookies',"名字=5")  
    # print(db.query('cookies',['名字']))  
  
    # db.insert(table='cookies',data={"名字":'cess'})  
    # db.update('cookies', {'网站':'qqqq'}, "名字 = 'cess'")    # db.delete('users', "name = 'John'")

mysql 数据库1.0

import pymysql
from loguru import logger

"""  sql封装1.1  更新时间:2024-10-06  """


class SQLDatabase:
    def __init__(self, db_name='ft_guanggaizi'):
        self.conn = pymysql.connect(
            host="sh-cynosdbmysql-grp-ijyo60u2.sql.tencentcdb.com",
            port=27916,
            user="root",
            password="AGBdPreZQg4tF31j",
            db=db_name,
            charset="utf8mb4",
        )
        # self.cursor = self.conn.cursor()

    def query(self, cmd=None, table="", columns="*", condition=None):
        """
        查询函数:
        table:表名
        columns:返回的字段内容,默认为’*‘查询字段全部
        condition:条件
        cmd:sql脚本
        """
        try:
            with self.conn.cursor() as cursor:
                if cmd:
                    cursor.execute(cmd)
                else:
                    sql = f"SELECT {columns} FROM {table}"
                    if condition:
                        sql += f" WHERE {condition}"
                    cursor.execute(sql)
                return cursor.fetchall()
        except Exception as e:
            logger.error(f"查询错误: {e}")
            return []

    def insert(self,table: str, data: dict) -> int:
        try:
            with self.conn.cursor() as cursor:
                # 转义中文表名和字段名(关键步骤!)
                escaped_table = f'`{table}`'  # 用反引号包裹表名
                escaped_columns = [f'`{k}`' for k in data.keys()]  # 包裹字段名
                
                # # 构建安全SQL
                columns = ", ".join(escaped_columns)
                placeholders = ", ".join(["%s"] * len(data))
                sql = f"INSERT INTO {escaped_table} ({columns}) VALUES ({placeholders})"
                
                # # 执行参数化查询
                cursor.execute(sql, tuple(data.values()))
                self.conn.commit()  # 显式提交事务
                # return cursor.lastrowid
        except Exception as e:
            logger.error(f"插入失败: {e}")
            logger.error(sql)
            self.conn.rollback()  # 回滚事务
            return -1

    def update(self, table, data, condition=False):
        """
        更新函数:
        table:表名
        data:修改字段
        condition:更新条件
        """
        set_clause = ", ".join([f"{key} = %s" for key in data.keys()])
        if condition:
            sql = f"UPDATE {table} SET {set_clause} WHERE {condition}"
        else:
            sql = f"UPDATE {table} SET {set_clause}"
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(sql, tuple(data.values()))
                self.conn.commit()
        except Exception as e:
            logger.error(f"插入失败: {e}")
            logger.error(sql)
            self.conn.rollback()


    def delete(self, table, condition='*'):
        """
        删除函数:
        table:表名
        condition:删除条件
        """
        sql = f"DELETE FROM {table}" + (f" WHERE {condition}" if condition != "*" else "")
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(sql)
                self.conn.commit()
        except Exception as e:
            print(f"删除错误: {e}")

    def cmd(self, cmd,params=None):
        """
        sql语句函数:
        cmd:sql语法
        """
        try:
            with self.conn.cursor() as cursor:
                cursor.execute(cmd, params)
                # 判断是否为查询语句(是否有结果集)
                if cursor.description:
                    res = cursor.fetchall()
                else:
                    res = []  # 非查询语句返回空列表,或返回rowcount
                    self.conn.commit()  # 写操作后立即提交
                return res
        except Exception as e:
            print(f"命令执行错误: {e}")
            self.conn.rollback()  # 异常时回滚事务
            return []

if __name__ == "__main__":
    # 使用示例
    db = SQLDatabase(r"E:\移动固态硬盘\03_tianzhenwuxie\python-bf\自动\下载\自动下载包子动漫\包子.db")
    print(db.query(table="包子"))

sqllite

import sqlite3
# 1.硬盘上创建连接
con = sqlite3.connect('D:/Python/StudyPython/Jupyter/数据库编程/sqlitedb/first.db')
# 获取cursor对象
cur = con.cursor()
# 执行sql创建表
sql = '''CREATE TABLE 表名字  
		(名字 TEXT,
		网站 TEXT,
		cookies TEXT);'''
try:
    cur.execute(sql)
except Exception as e:
    print(e)
    print('创建表失败')
finally:
    # 关闭游标
    cur.close()
    # 关闭连接
    con.close()

创建一个关系表

语法:
		CREATE TABLE 表名
		(
				列名1	数据类型	“约束”(有可无)
				列名2	数据类型	“约束”(有可无)
				列名3	数据类型	“约束”(有可无)
				...
		);//注意,SQL语句要以分号结尾
数据类型:
	integer(size)	仅容纳整数,size规定数字的最大位数
	int(size)
	int 
	decimal(size,d);容纳带小数的数字size 规定带数字的最大位数,d规定小数点右侧的最大位数
	real 实型
	char(size) 固定长度的字符串,size规定字符串的长度
	varchar(size) 可变长度的字符串,size规定字符串的最大字符数
	text	文本
	null 空
	date	日期
		2020/09/02
		2020-09-02
	blob:二进制
	...
	约束:
		not null 不为空
		unique 唯一
		primary  key	主键		(主键渐必须是唯一且不为空)
		default 	默认
			eg: default "China"
		check	用于限制列中值的范围
			eg: check score >= 0 and score <= 100

添加数据

语法:

	INSERT INTO 表名 VALUES(值1,值2,值3...);//给所有的列插入数据有多少列就要填多少值。
	INSERT INTO 表名(列名1,列名2,....) VALUES (值1,值2,... );//给指定列插入数据
	INSERT INTO 表名(列名1,列名2,....) VALUES (?,?, ...);//给先用?号,之后在传元祖
	例如:
	sql='insert into t_person(pname,age) values(?,?)'
    cur.executemany(sql,[('小李',23),('小花',34),('小明',28)])

更新表数据

UPDATE 表名 set 列名 = 新值,...;
UPDATE 表名 set 列名 = 新值,...WHERE 条件;//条件用来定位要修改的某些行
条件:
	列名 运算符 值
	运算符: = 	!= 	>	 >=	 <=
	如果有多个条件,可以用and和or连接
eg:
	update stu set age = 21;
	update stu set age = 30 where num = 1001;
	update stu set age = 30 where num > 60 and num < 999;

获取所有数据

语法:
	SELECT	列名1,...FROM 表名;
	SELECT *FROM 表名;//查询整个表的数据
	SELECT *FROM 表名 where 条件;//查询整个表的数据

执行完成后要获取结果集,才能有结果

sql='select * from t_person'
    cur.execute(sql)
    #获取结果集
    # fetchall  获取方法
    person_all=cur.fetchall()
    # person_all=cur.fetchone()  可以获取结果集中的第一条
    # print(person_all)
    for person in person_all:
        print(person)

删除表、数据

语法: DROP TABLE 表名;

DELETE FROM  表名;//删除表中所有的数据
DELETE FROM 表名 WHERE 条件;//删除满足条件数据
	eg:
		delete from stu;
		delete from stu where num = 1003;

例如:
sql='delete from t_person where pno=?'
#执行sql
try:
    cur.execute(sql, (2,))
    #提交事务
    con.commit()

查询表、数据是否存在

drop table if exists table_name
create table if not exists table_name (column1, column2, …, columnN)

列出所以的表,行数

import sqlite3
con = sqlite3.connect('mydatabase.db')
def sql_fetch(con):
    cursorObj = con.cursor()
    cursorObj.execute('SELECT name from sqlite_master where type= "table"')
    print(cursorObj.fetchall())
sql_fetch(con)

# 列出表内数据行数

cursorObj.execute('SELECT * FROM employees')
rows = cursorObj.fetchall()
 
print len (rows)

高级查找

模糊查询
	列名	like	通配符	(_%)
	_:	匹配单个字符
	&:匹配任意单个字符
	eg:
		select *from stu where name like 'wang_';
		select	*from stu where name like	'%wang%'
排序:
	按指定的列名进行排序
	order by 列名 asc/desc;//asc:升序	desc:降序
		eg:
			select *from stu order by age desc;//根据年龄降序排序
	限制记录条数
		limit 数字
				eg:
					select *from stu limit 2;//查看表中的前两条记录
	排序+限制
		eg:
			select *from stu order by age desc limit 1;//查看年龄最大的记录
	聚合函数
		cout(列*名)	//统计记录数量
		sum(列名)	//计算给定列的总和
		avg(列名)		//计算给定列的平均值
		min(列名)		//计算给定列的最小值
		max(列名)	//计算给定列*的最大值
	eg:
		select	cout(*)  from stu;
		select max(age)-min(age) from stu;
	别名
		eg:
			select num AS 学号,name AS	姓名 from stu;
	分组:
		group by
			eg:
				select age,cout(*) from stu group by age;//查询每个年龄的人数
	范围查询:
		between ... and ...
			eg:	
				select *from stu where age between 10 and 20;
	去重:
		distinct
			eg:
				select distinct age from stu;
	子查询:
		in(值,...)
		in(select ...)
			eg:
				select *from stu where age in (18,20,25);
				select *from stu where age in (select age from stu where age < 30);		
		★★★★★★多表联接
			select	列名,... 表1,表2,...where 条件
			select 列名,... form 表1	join	表2 on	联接条件  ... where	过滤条件;
					eg:
						学生表:	
						学号 	姓名 	年龄		班级		学校	
						...
						create table t_stu(num int primary key, name text,age int,class int school int);
						intsrt into t_stu values(1001,'wb1',18,100101,2001);
						insert into t_stu values(1002,'wb2',19,100101,2001);
						insert into t_stu values(1003.'wb3',20,100101,2002);
						学校表:
						编号		校名		电话		地址
						...
						create table t_school(num int primary key,name text, tel text,addr text);
						insert into t_school values(2001,'first',010-123456,'changsha');
						insert into t_school values(2002,'second',010-0101011,'shanghai');
				需求:
					查询学生的学号,姓名,学校名
						select  t_stu.num,t_stu.name,t_school,name from t_stu,t_school);
				===》学生表中的每一行都与学校表中的每一行进行联结,结果称之为 笛卡尔积
				此时,需要指明两个表联结条件,在上述表中,联结条件是 学校的编号相同
	两种联结写法:
		第一种: select t_stu,num, t_stu.name,t_school.name from t_stu,t_school where	t_stu.school = t_school.num;
		第二种:select t_stu.num,t_stu.name,t_school.name from t_stu join t_school on t_stu.school = t_school.num;
		select t_stu.num,t_stu.name,t_school.name from t_stu join t_school on t_stu.school = t_school.num where t_stu.num = 1002;

参考网址

时间日期
时间日期2
SQLite数据库基本语法介绍
Python SQLite3 教程