您的足迹:首页 > Linux&Python >python 连接数据库

python 连接数据库

import mysql.connector

class MySqlHelper(object):
    """操作数据库帮助类"""

    def __init__(self):
        #self.host = "localhost"
        #self.user = "root"
        #self.password = "xinshiyun@123"
        #self.database = "deliverunion_callcenter"
        self.host = "192.168.60.156"
        self.user = "root"
        self.password = "root"
        self.database = "deliverunion_callcenter"
        try:
            self.mydb = mysql.connector.connect(host=self.host,
                        user=self.user,
                        passwd=self.password,
						database=self.database,
						connect_timeout=10)
						#database=self.database,
						#auth_plugin='mysql_native_password')
            self.mycursor = self.mydb.cursor()
        except Exception as e:
            print('MySql Error : %d %s' % (e.args[0],e.args[1]))


	#不带参数的查询
    def select(self,mysql):
        try:
            self.mycursor.execute(mysql)
            values = self.mycursor.fetchall()
            return values
        except Exception as e:
            print ('select Error : %d %s' % (e.args[0],e.args[1]))
            return []
        finally:
            self.mycursor.close()
            self.mydb.close()

	#带参数的查询
    def select2(self,mysql,na):
        try:
            self.mycursor.execute(mysql,na)
            values = self.mycursor.fetchall()
            return values
        except Exception as e:
            print ('select2 Error : %d %s' % (e.args[0],e.args[1]))
            return []
        finally:
            self.mycursor.close()
            self.mydb.close()

	#更新
    def Update(self,mysql,na):
         try:
            self.mycursor.execute(mysql,na)
            self.mydb.commit()
            row = self.mycursor.rowcount
            if row > 0:
                return True
            else:
                return False
         except Exception as e:
            print ('Update Error : %d %s' % (e.args[0],e.args[1]))
            return False
          
         finally:
            self.mycursor.close()
            self.mydb.close()

	#插入数据
    def Insert(self,mysql,na):
         try:
            self.mycursor.execute(mysql,na)
            self.mydb.commit()
            row = self.mycursor.rowcount
            if row > 0:
                return True
            else:
                return False
         except Exception as e:
            print('Insert Error : %d %s' % (e.args[0],e.args[1]))
            return False
          
         finally:
            self.mycursor.close()
            self.mydb.close()

            

使用数据:
from DAL import MySqlHelper
from Entity import TaskPoolEntity
import datetime

class TaskPoolDAL(object):
    """操作数据库t_du_guiji_task"""

    sqlHelper = MySqlHelper.MySqlHelper()

    #查询所有的任务
    def selectTasks():
        TaskPoolDAL.sqlHelper = MySqlHelper.MySqlHelper()
        sql='select * from  t_du_guiji_task'
        values= TaskPoolDAL.sqlHelper.select(sql)
        data=[]
        for item in values:
           selectTask = TaskPoolEntity.TaskPoolEntity(item)
           data.append(selectTask)

        return data

    #查询所有未完结的任务	 或已经发送过了2小时的任务
    def selectUnEndTasks():
        TaskPoolDAL.sqlHelper = MySqlHelper.MySqlHelper()
        sql='select * from t_du_guiji_task where (task_statue in (0,1) and last_call_time is null) or (task_statue in (0,1)  and last_call_time is not null and DATE_ADD(last_call_time,INTERVAL 2 HOUR) <= now()) limit 1' values= TaskPoolDAL.sqlHelper.select(sql) data=[] for item in values: selectTask = TaskPoolEntity.TaskPoolEntity(item) data.append(selectTask) return data #根据id查询Task def selectTaskById(id): TaskPoolDAL.sqlHelper = MySqlHelper.MySqlHelper() sql='select * from t_du_guiji_task where id = '+str(id) values= TaskPoolDAL.sqlHelper.select(sql) data=[] for item in values: selectTask = TaskPoolEntity.TaskPoolEntity(item) data.append(selectTask) return data #根据task的id更新数据 def UpdateTaskById(totalCount,state,lastcallTime,id): TaskPoolDAL.sqlHelper = MySqlHelper.MySqlHelper() sql="update t_du_guiji_task set total_count=%s,task_statue=%s,last_call_time=%s,update_time=%s where id=%s" na=(str(totalCount),str(state),str(lastcallTime),str(datetime.datetime.now()),str(id)) return TaskPoolDAL.sqlHelper.Update(sql,na) #更新任务批次 def UpdateTaskBatchById(batchName,id): TaskPoolDAL.sqlHelper = MySqlHelper.MySqlHelper() sql="update t_du_guiji_task set batch_name=%s,task_statue=1,update_time=%s where id=%s" na=(str(batchName),str(datetime.datetime.now()),str(id)) return TaskPoolDAL.sqlHelper.Update(sql,na) 
本博客所有文章如无特别注明均为原创。作者:cc复制或转载请以超链接形式注明转自 .NET程序员之路
原文地址《python 连接数据库

相关推荐

发表评论

路人甲 表情
Ctrl+Enter快速提交

网友评论(0)