|
数据表[data_test]结构:
CREATE TABLE `data_test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`datestr` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP,
`count` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;工具类:
# coding:utf-8
# @Time : 2022/9/23 10:47
# @Author : KeenLeung
# @Remark :
import arrow
import pandas as pd
from sqlalchemy import create_engine
class DBHandleTool(object):
"""
数据库操作类
SQLAlchemy==1.4.22
"""
def __init__(self, db_config):
self.db_config = db_config
mysql_url = "mysql://{user}:{password}@{hostname}:{port}/{dbname}?charset=utf8".format(**self.db_config)
self.engine = create_engine(mysql_url, encoding='utf-8')
# ================================================== 原生sql ==================================================
def execute(self, sql):
"""
执行sql
:param sql:
:return:
"""
with self.engine.begin() as conn:
conn.execute(sql)
def fetchcolumn(self, sql):
"""
获取某一个值
:param sql:
:return:
"""
with self.engine.begin() as conn:
result = conn.execute(sql)
result = result.fetchone()
if result:
return result[0]
else:
return None
def fetchone(self, sql):
"""
获取一行数据
:param sql:
:return:
"""
with self.engine.begin() as conn:
result = conn.execute(sql)
result = result.fetchone()
return result
def fetchall(self, sql):
"""
获取多行数据
:param sql:
:return:
"""
with self.engine.begin() as conn:
result = conn.execute(sql)
result = result.fetchall()
return result
# ================================================== 非原生sql ==================================================
def __correct_table__(self, table):
"""
获取正确的table名称
:param table:表名称
:return:
"""
if ',' in table:
return table.split(',')[0]
elif ';' in table:
return table.split(';')[0]
else:
return table
def __build_query_condition__(self, filter_dict):
"""
构建查询条件
:param filter_dict: 查询条件(dict),比如:{"datestr": "2022-09-16"}
:return:
"""
keys = list(filter_dict.keys())
values = list(filter_dict.values())
placestr = ""
if keys:
placestr = map(lambda x: x + '=%s', keys)
placestr = ' and '.join(placestr)
placestr = f"where {placestr}"
params = tuple(values)
return placestr, params
def __build_update_condition__(self, value_dict, filter_dict):
"""
构建更新语句条件
:param filter_dict:
:return:
"""
keys1 = list(value_dict.keys())
values1 = list(value_dict.values())
placestr1 = map(lambda x: x + ' = %s', keys1)
placestr1 = ', '.join(placestr1)
keys2 = list(filter_dict.keys())
values2 = list(filter_dict.values())
placestr2 = ""
if keys2:
placestr2 = map(lambda x: x + ' = %s', keys2)
placestr2 = ' and '.join(placestr2)
placestr2 = f"where {placestr2}"
values = values1 + values2
params = tuple(values)
return placestr1, placestr2, params
def row_exists(self, table, filter_dict={}):
"""
数据行是否存在
:param table: 表名称
:param filter_dict: 查询条件(dict),比如:{"datestr": "2022-09-16"}
:return:
"""
table = self.__correct_table__(table)
placestr, params = self.__build_query_condition__(filter_dict)
sql = f"select * from {table} {placestr}"
with self.engine.begin() as conn:
result = conn.execute(sql, params)
result = result.fetchone()
if result:
return True
else:
return False
def insert_row(self, table, value_dict):
"""
插入数据
:param table: 表名称
:param value_dict: 字段、数值数据字典(dict)
:return:
"""
table = self.__correct_table__(table)
keys = list(value_dict.keys())
values = list(value_dict.values())
fields_str = ','.join(keys)
placestr = ['%s' for x in values]
placestr = ','.join(placestr)
params = tuple(values)
sql = f"insert into {table}({fields_str}) values({placestr})"
with self.engine.begin() as conn:
conn.execute(sql, params)
def insert_many(self, table, value_dicts):
"""
插入多行数据
:param table: 表名称
:param value_dicts: 字段、数值数据字典列表(list)
:return:
"""
table = self.__correct_table__(table)
keys = list(value_dicts[0].keys())
params = [tuple(x.values()) for x in value_dicts]
fields_str = ','.join(keys)
placestr = ['%s' for x in keys]
placestr = ','.join(placestr)
sql = f"insert into {table}({fields_str}) values({placestr})"
with self.engine.begin() as conn:
conn.execute(sql, params)
def delete(self, table, filter_dict={}):
"""
删除数据
:param table: 数据表
:param filter_dict: 查询条件(dict)
:return:
"""
table = self.__correct_table__(table)
placestr, params = self.__build_query_condition__(filter_dict)
sql = f"delete from {table} {placestr}"
with self.engine.begin() as conn:
conn.execute(sql, params)
def count(self, table, filter_dict={}, column=None):
"""
查询条数
:param table: 数据表
:param filter_dict: 查询条件(dict)
:param column: 指定列名(str)
:return:
"""
if not column:
column = "*"
else:
column = str(column)
table = self.__correct_table__(table)
placestr, params = self.__build_query_condition__(filter_dict)
sql = f"select count({column}) from {table} {placestr}"
with self.engine.begin() as conn:
result = conn.execute(sql, params)
result = result.fetchone()
return result[0]
def get_column(self, table, column, filter_dict={}):
"""
获取某一列的数值
:param table: 数据表
:param column: 指定列名(str)
:param filter_dict: 查询条件(dict)
:return:
"""
table = self.__correct_table__(table)
placestr, params = self.__build_query_condition__(filter_dict)
sql = f"select {column} from {table} {placestr}"
with self.engine.begin() as conn:
result = conn.execute(sql, params)
result = result.fetchone()
return result[0]
def update(self, table, value_dict, filter_dict={}):
"""
更新
:param table: 数据表
:param value_dict: 更新值数据字典(dict)
:param filter_dict: 查询条件(dict)
:return:
"""
table = self.__correct_table__(table)
placestr1, placestr2, params = self.__build_update_condition__(value_dict, filter_dict)
sql = f"update {table} set {placestr1} {placestr2}"
with self.engine.begin() as conn:
conn.execute(sql, params)
def set_column_zero(self, table, column, filter_dict={}):
"""
设置某一列的值为0
:param table: 数据表
:param column: 列名称
:param filter_dict: 查询条件(dict)
:return:
"""
value_dict = {
column: 0
}
self.update(
table=table,
value_dict=value_dict,
filter_dict=filter_dict
)
def clear(self, table):
"""
清空数据表
:param table:数据表
:return:
"""
table = self.__correct_table__(table)
sql = "truncate table %s" % table
with self.engine.begin() as conn:
conn.execute(sql)
# ================================================== DataFrame ==================================================
def get_df(self, sql):
"""
获取DataFrame结果
:param sql:
:return:
"""
with self.engine.begin() as conn:
df = pd.read_sql(sql, con=conn)
return df
def save_df(self, df, tablename, if_exists='append', index=False):
"""
保存DataFrame到数据库
:param df: 要保存的DataFrame数据
:param tablename: 表名称
:param if_exists:
:param index:
:return:
"""
with self.engine.begin() as conn:
df.to_sql(name=tablename, con=conn, if_exists=if_exists, index=index)
return True案例:
if __name__ == '__main__':
db_config = {
"user": "***", #用户名
"password": "***", #密码
"hostname": "***.***.***.***", #ip地址
"port": "3306", #端口
"dbname": "***" #数据库名称
}
db = DBHandleTool(db_config=db_config)
todaystr = arrow.now().format('YYYY-MM-DD')
tb_name = 'data_test'
# ================================================== 原生sql ==================================================
# 插入
db.execute(f"insert into {tb_name}(`datestr`, `count`) values('{todaystr}', 1)")
# 查询
# 某列数据
result = db.fetchcolumn(f"select count from {tb_name} where datestr = '{todaystr}'")
print(result)
# 某行
row = db.fetchone(f"select * from {tb_name} where datestr = '{todaystr}'")
print(row)
# 所有行
rows = db.fetchall(f"select * from {tb_name} where datestr = '{todaystr}'")
print(rows)
# ================================================== 非原生 ==================================================
# 行是否存在
exists = db.row_exists(
table=tb_name,
filter_dict={"datestr": todaystr}
)
print(exists)
# 插入一行
db.insert_row(
table=tb_name,
value_dict={"datestr": arrow.get(todaystr).shift(days=+1).format('YYYY-MM-DD'), "count": 2}
)
# 插入多行
db.insert_many(
table=tb_name,
value_dicts=[
{"datestr": arrow.get(todaystr).shift(days=+2).format('YYYY-MM-DD'), "count": 3},
{"datestr": arrow.get(todaystr).shift(days=+3).format('YYYY-MM-DD'), "count": 4},
]
)
# 删除数据
db.delete(
table=tb_name,
filter_dict={"datestr": arrow.get(todaystr).shift(days=+3).format('YYYY-MM-DD')}
)
# 查询条数
row_nums = db.count(
table=tb_name
)
print(row_nums)
# 获取某列的值
count_num = db.get_column(
table=tb_name,
column="count",
filter_dict={"datestr": arrow.get(todaystr).shift(days=+2).format('YYYY-MM-DD')},
)
print(count_num)
# 更新数据
db.update(
table=tb_name,
value_dict={"count": 4},
filter_dict={"datestr": arrow.get(todaystr).shift(days=+2).format('YYYY-MM-DD')}
)
# ================================================== DataFrame ==================================================
# 获取df
df = db.get_df(f"select * from {tb_name}")
print(df)测试通过! |
|