python编程_python编程软件

实时读取mysql数据库中的数据,数据库中数据修改后,python也跟着自动更新。

import pymysql
import time
from datetime import datetime


class MySQLRealtimeReader:
    def __init__(self, host, database, user, password, table, poll_interval=5):
        """
        初始化实时数据库读取器
        :param host: 数据库主机地址
        :param database: 数据库名称
        :param user: 数据库用户名
        :param password: 数据库密码
        :param table: 要监控的表名
        :param poll_interval: 轮询间隔时间(秒)
        """
        self.host = host
        self.database = database
        self.user = user
        self.password = password
        self.table = table
        self.poll_interval = poll_interval
        self.last_data = None
        self.connection = None
        self.connect()

    def connect(self):
        """建立数据库连接"""
        try:
            self.connection = pymysql.connect(
                host=self.host,
                database=self.database,
                user=self.user,
                password=self.password,
                charset='utf8mb4',
                cursorclass=pymysql.cursors.DictCursor
            )
            if self.connection.open:
                print(f"成功连接到MySQL数据库: {self.database}")
                return True
        except pymysql.MySQLError as e:
            print(f"连接数据库时发生错误: {e}")
            return False

    def fetch_data(self):
        """从数据库获取数据"""
        try:
            if not self.connection or not self.connection.open:
                self.connect()
            else:
                self.connection.ping(reconnect=True)

            with self.connection.cursor() as cursor:
                # 强制读取最新数据
                cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;")
                query = f"SELECT * FROM {self.table};"
                cursor.execute(query)
                data = cursor.fetchall()
            return data
        except pymysql.MySQLError as e:
            print(f"读取数据时发生错误: {e}")
            self.connection = None
            return None

    def check_for_changes(self):
        """检查数据是否有变化"""
        current_data = self.fetch_data()

        if current_data is None:
            return False, None, None

        # 首次运行时初始化数据
        if self.last_data is None:
            self.last_data = current_data
            return True, None, current_data

        # 更可靠的比较方式:转换为字符串后比较
        if str(current_data) != str(self.last_data):
            changes = {
                'old': self.last_data,
                'new': current_data
            }
            self.last_data = current_data.copy() if current_data else None
            return True, changes, current_data
        return False, None, current_data

    def start_monitoring(self):
        """开始监控数据库变化"""
        print(f"开始监控表 {self.table},轮询间隔 {self.poll_interval} 秒...")
        try:
            while True:
                has_changed, changes, current_data = self.check_for_changes()

                if has_changed:
                    if changes:
                        print(f"\n[{datetime.now()}] 数据发生变化:")
                        print("旧数据:", changes['old'])
                        print("新数据:", changes['new'])
                    else:
                        print(f"\n[{datetime.now()}] 初始数据加载完成:")
                        print("当前数据:", current_data)

                time.sleep(self.poll_interval)

        except KeyboardInterrupt:
            print("\n用户中断,停止监控")
        finally:
            if self.connection and self.connection.open:
                self.connection.close()
                print("数据库连接已关闭")


if __name__ == "__main__":
    # 配置数据库连接信息
    db_config = {
        'host': 'localhost',  # 数据库主机地址
        'database': 'test',  # 数据库名称
        'user': 'root',  # 数据库用户名
        'password': '123456',  # 数据库密码
        'table': 'emp',  # 要监控的表名
        'poll_interval': 3  # 轮询间隔(秒)
    }

    # 创建并启动实时读取器
    reader = MySQLRealtimeReader(
        host=db_config['host'],
        database=db_config['database'],
        user=db_config['user'],
        password=db_config['password'],
        table=db_config['table'],
        poll_interval=db_config['poll_interval']
    )

    reader.start_monitoring()