Python使用PyMySQL操作MySQL
前言
在Python开发中,操作MySQL数据库是一项非常常见的技能。PyMySQL是Python中一个优秀的MySQL客户端库,它允许我们通过Python代码与MySQL数据库进行交互。本文将详细介绍如何使用PyMySQL进行数据库的增、删、查、改操作,并且会分别演示SQL拼接和参数化查询(预编译)两种方式,帮助初学者快速上手。
环境准备
在开始之前,请确保你已经安装了必要的软件和库:
- Python 3.x 环境
- MySQL 数据库
- PyMySQL库
如果尚未安装PyMySQL,可以使用pip进行安装:
pip install pymysql
连接数据库
在进行任何数据库操作之前,我们需要先建立与数据库的连接。下面是一个通用的连接函数:
import pymysql
from pymysql.cursors import DictCursor
def get_db_connection():
"""建立数据库连接并返回连接对象"""
try:
# 连接数据库
connection = pymysql.connect(
host='localhost', # 数据库主机地址
user='root', # 数据库用户名
password='123456', # 数据库密码
database='testdb', # 要连接的数据库名
port=3306, # 数据库端口,默认为3306
charset='utf8mb4', # 字符集
cursorclass=DictCursor # 使查询结果以字典形式返回
)
print("数据库连接成功")
return connection
except pymysql.Error as e:
print(f"数据库连接失败: {e}")
return None
注意:请根据你的实际数据库配置修改上述代码中的连接参数
创建测试表
为了演示数据库操作,我们先创建一个测试用的users
表:
def create_test_table():
"""创建测试表"""
connection = get_db_connection()
if not connection:
return
try:
with connection.cursor() as cursor:
# 创建users表
sql = """
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50) NOT NULL,
age INT NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
cursor.execute(sql)
connection.commit()
print("测试表创建成功")
except pymysql.Error as e:
print(f"创建表失败: {e}")
connection.rollback()
finally:
# 关闭连接
if connection:
connection.close()
print("数据库连接已关闭")
# 执行创建表函数
create_test_table()
数据库操作详解
下面我们将详细介绍数据库的四种基本操作:增加(INSERT)、查询(SELECT)、更新(UPDATE)和删除(DELETE),每种操作都会分别演示SQL拼接和参数化查询两种方式。
1. 增加数据(INSERT)
方式一:SQL拼接(不推荐,存在SQL注入风险)
def insert_user_by_concat(name, age, email):
"""使用SQL拼接方式插入用户"""
connection = get_db_connection()
if not connection:
return
try:
with connection.cursor() as cursor:
# SQL拼接方式
sql = f"INSERT INTO users (name, age, email) VALUES ('{name}', {age}, '{email}')"
print(f"执行SQL: {sql}")
cursor.execute(sql)
connection.commit()
print(f"插入成功,新记录ID: {cursor.lastrowid}")
return cursor.lastrowid
except pymysql.Error as e:
print(f"插入失败: {e}")
connection.rollback()
finally:
if connection:
connection.close()
print("数据库连接已关闭")
# 测试插入
# insert_user_by_concat("张三", 25, "zhangsan@example.com")
方式二:参数化查询(预编译,推荐)
def insert_user_by_param(name, age, email):
"""使用参数化查询方式插入用户"""
connection = get_db_connection()
if not connection:
return
try:
with connection.cursor() as cursor:
# 参数化查询方式
sql = "INSERT INTO users (name, age, email) VALUES (%s, %s, %s)"
# 执行SQL,并传递参数
cursor.execute(sql, (name, age, email))
connection.commit()
print(f"插入成功,新记录ID: {cursor.lastrowid}")
return cursor.lastrowid
except pymysql.Error as e:
print(f"插入失败: {e}")
connection.rollback()
finally:
if connection:
connection.close()
print("数据库连接已关闭")
# 测试插入
# insert_user_by_param("李四", 30, "lisi@example.com")
# insert_user_by_param("王五", 28, "wangwu@example.com")
2. 查询数据(SELECT)
方式一:SQL拼接(不推荐)
def get_users_by_concat(age=None):
"""使用SQL拼接方式查询用户"""
connection = get_db_connection()
if not connection:
return []
try:
with connection.cursor() as cursor:
# 基础SQL
sql = "SELECT * FROM users"
# 如果提供了年龄参数,添加条件
if age is not None:
sql += f" WHERE age > {age}"
# 按ID排序
sql += " ORDER BY id"
print(f"执行SQL: {sql}")
cursor.execute(sql)
# 获取所有记录
results = cursor.fetchall()
print(f"查询到 {len(results)} 条记录")
return results
except pymysql.Error as e:
print(f"查询失败: {e}")
return []
finally:
if connection:
connection.close()
print("数据库连接已关闭")
# 测试查询
# users = get_users_by_concat(26)
# for user in users:
# print(user)
方式二:参数化查询(推荐)
def get_users_by_param(age=None):
"""使用参数化查询方式查询用户"""
connection = get_db_connection()
if not connection:
return []
try:
with connection.cursor() as cursor:
# 基础SQL和参数列表
sql = "SELECT * FROM users"
params = []
# 如果提供了年龄参数,添加条件
if age is not None:
sql += " WHERE age > %s"
params.append(age)
# 按ID排序
sql += " ORDER BY id"
print(f"执行SQL: {sql}")
cursor.execute(sql, params)
# 获取所有记录
results = cursor.fetchall()
print(f"查询到 {len(results)} 条记录")
return results
except pymysql.Error as e:
print(f"查询失败: {e}")
return []
finally:
if connection:
connection.close()
print("数据库连接已关闭")
# 测试查询
# users = get_users_by_param(26)
# for user in users:
# print(user)
3. 更新数据(UPDATE)
方式一:SQL拼接(不推荐)
def update_user_by_concat(user_id, new_age, new_email):
"""使用SQL拼接方式更新用户"""
connection = get_db_connection()
if not connection:
return False
try:
with connection.cursor() as cursor:
# SQL拼接方式
sql = f"UPDATE users SET age = {new_age}, email = '{new_email}' WHERE id = {user_id}"
print(f"执行SQL: {sql}")
affected_rows = cursor.execute(sql)
connection.commit()
if affected_rows > 0:
print(f"更新成功,影响行数: {affected_rows}")
return True
else:
print("未找到要更新的记录")
return False
except pymysql.Error as e:
print(f"更新失败: {e}")
connection.rollback()
return False
finally:
if connection:
connection.close()
print("数据库连接已关闭")
# 测试更新
# update_user_by_concat(1, 26, "zhangsan_new@example.com")
方式二:参数化查询(推荐)
def update_user_by_param(user_id, new_age, new_email):
"""使用参数化查询方式更新用户"""
connection = get_db_connection()
if not connection:
return False
try:
with connection.cursor() as cursor:
# 参数化查询方式
sql = "UPDATE users SET age = %s, email = %s WHERE id = %s"
affected_rows = cursor.execute(sql, (new_age, new_email, user_id))
connection.commit()
if affected_rows > 0:
print(f"更新成功,影响行数: {affected_rows}")
return True
else:
print("未找到要更新的记录")
return False
except pymysql.Error as e:
print(f"更新失败: {e}")
connection.rollback()
return False
finally:
if connection:
connection.close()
print("数据库连接已关闭")
# 测试更新
# update_user_by_param(2, 31, "lisi_new@example.com")
4. 删除数据(DELETE)
方式一:SQL拼接(不推荐)
def delete_user_by_concat(user_id):
"""使用SQL拼接方式删除用户"""
connection = get_db_connection()
if not connection:
return False
try:
with connection.cursor() as cursor:
# SQL拼接方式
sql = f"DELETE FROM users WHERE id = {user_id}"
print(f"执行SQL: {sql}")
affected_rows = cursor.execute(sql)
connection.commit()
if affected_rows > 0:
print(f"删除成功,影响行数: {affected_rows}")
return True
else:
print("未找到要删除的记录")
return False
except pymysql.Error as e:
print(f"删除失败: {e}")
connection.rollback()
return False
finally:
if connection:
connection.close()
print("数据库连接已关闭")
# 测试删除(谨慎操作!)
# delete_user_by_concat(3)
方式二:参数化查询(推荐)
def delete_user_by_param(user_id):
"""使用参数化查询方式删除用户"""
connection = get_db_connection()
if not connection:
return False
try:
with connection.cursor() as cursor:
# 参数化查询方式
sql = "DELETE FROM users WHERE id = %s"
affected_rows = cursor.execute(sql, (user_id,))
connection.commit()
if affected_rows > 0:
print(f"删除成功,影响行数: {affected_rows}")
return True
else:
print("未找到要删除的记录")
return False
except pymysql.Error as e:
print(f"删除失败: {e}")
connection.rollback()
return False
finally:
if connection:
connection.close()
print("数据库连接已关闭")
# 测试删除(谨慎操作!)
# delete_user_by_param(3)
SQL拼接 vs 参数化查询:为什么参数化更安全?
通过上面的示例,我们看到了两种执行SQL的方式,那么为什么推荐使用参数化查询呢?
主要原因是防止SQL注入攻击。
举个例子,假设我们使用SQL拼接方式实现一个登录功能:
# 不安全的做法
def unsafe_login(username, password):
sql = f"SELECT * FROM users WHERE username = '{username}' AND password = '{password}'"
# 执行SQL...
如果有人恶意输入用户名:' OR '1'='1
,密码任意,那么拼接后的SQL会变成:
SELECT * FROM users WHERE username = '' OR '1'='1' AND password = '任意值'
这个SQL会查询出所有用户,因为'1'='1'
永远为真,这就是典型的SQL注入攻击。
而使用参数化查询时,输入的内容会被当作纯数据处理,不会被解析为SQL命令,从而有效防止SQL注入。
共同學(xué)習(xí),寫下你的評論
評論加載中...
作者其他優(yōu)質(zhì)文章