python编程_python编程软件
文章标签:
python编程是啥
实时读取mysql数据库中的数据,数据库中数据修改后,python也跟着自动更新。
import pymysql
import time
from datetime import datetime
class MySQLRealtimeReader:
def __init__(self, host, database, user, password, table, poll_interval=5):
"""
初始化实时数据库读取器
:param host: 数据库主机地址
:param database: 数据库名称
:param user: 数据库用户名
:param password: 数据库密码
:param table: 要监控的表名
:param poll_interval: 轮询间隔时间(秒)
"""
self.host = host
self.database = database
self.user = user
self.password = password
self.table = table
self.poll_interval = poll_interval
self.last_data = None
self.connection = None
self.connect()
def connect(self):
"""建立数据库连接"""
try:
self.connection = pymysql.connect(
host=self.host,
database=self.database,
user=self.user,
password=self.password,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
if self.connection.open:
print(f"成功连接到MySQL数据库: {self.database}")
return True
except pymysql.MySQLError as e:
print(f"连接数据库时发生错误: {e}")
return False
def fetch_data(self):
"""从数据库获取数据"""
try:
if not self.connection or not self.connection.open:
self.connect()
else:
self.connection.ping(reconnect=True)
with self.connection.cursor() as cursor:
# 强制读取最新数据
cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;")
query = f"SELECT * FROM {self.table};"
cursor.execute(query)
data = cursor.fetchall()
return data
except pymysql.MySQLError as e:
print(f"读取数据时发生错误: {e}")
self.connection = None
return None
def check_for_changes(self):
"""检查数据是否有变化"""
current_data = self.fetch_data()
if current_data is None:
return False, None, None
# 首次运行时初始化数据
if self.last_data is None:
self.last_data = current_data
return True, None, current_data
# 更可靠的比较方式:转换为字符串后比较
if str(current_data) != str(self.last_data):
changes = {
'old': self.last_data,
'new': current_data
}
self.last_data = current_data.copy() if current_data else None
return True, changes, current_data
return False, None, current_data
def start_monitoring(self):
"""开始监控数据库变化"""
print(f"开始监控表 {self.table},轮询间隔 {self.poll_interval} 秒...")
try:
while True:
has_changed, changes, current_data = self.check_for_changes()
if has_changed:
if changes:
print(f"\n[{datetime.now()}] 数据发生变化:")
print("旧数据:", changes['old'])
print("新数据:", changes['new'])
else:
print(f"\n[{datetime.now()}] 初始数据加载完成:")
print("当前数据:", current_data)
time.sleep(self.poll_interval)
except KeyboardInterrupt:
print("\n用户中断,停止监控")
finally:
if self.connection and self.connection.open:
self.connection.close()
print("数据库连接已关闭")
if __name__ == "__main__":
# 配置数据库连接信息
db_config = {
'host': 'localhost', # 数据库主机地址
'database': 'test', # 数据库名称
'user': 'root', # 数据库用户名
'password': '123456', # 数据库密码
'table': 'emp', # 要监控的表名
'poll_interval': 3 # 轮询间隔(秒)
}
# 创建并启动实时读取器
reader = MySQLRealtimeReader(
host=db_config['host'],
database=db_config['database'],
user=db_config['user'],
password=db_config['password'],
table=db_config['table'],
poll_interval=db_config['poll_interval']
)
reader.start_monitoring()