php master slave数据差异对比、同步脚本

<?php
class data_migration{
        public $source_connect;
        public $target_connect;
        public $mig_num = 100;
        public $limit = 500;
        //public $ucenter_connect;
        function __construct($source,$target){
                $this->source_connect = $this->connect($source['host'],$source['user'],$source['password']);
                $this->target_connect = $this->connect($target['host'],$target['user'],$target['password']);
                mysql_select_db($source['database'],$this->source_connect);
                mysql_select_db($target['database'],$this->target_connect);
                mysql_query("set names utf8",$this->source_connect);
                mysql_query("set names utf8",$this->target_connect);
        }

        function init(){

        }

        function connect($host,$user,$password){
                $conn = mysql_connect($host,$user,$password) or die("连接数据库失败".mysql_error());
                return $conn;
        }

        function select_db($tablename,&$link){
                mysql_select_db($tablename,$link) or die("选择表失败或不存在!".mysql_error());
                return true;
        }

        function close_connect($link){
                return mysql_close($link);
        }

        function show_Database($link){
                $db_list = mysql_list_dbs($link);
                while($db=mysql_fetch_object($db_list)){
                        echo $db->Database;
                        echo "\n<!--  --!>";
                }
        }
function show_tables($dbname,$link){
                $db_table_list = mysql_list_tables($dbname,$link);
                while($row=mysql_fetch_row($db_table_list)){
                        //print "Table:{$row[0]}\n";
                        $table_list[] = $row[0];
                }
                mysql_free_result($db_table_list);
                return $table_list;
        }

        function query($str_sql,&$link){
                $result = mysql_query($str_sql,$link) or die("sql执行出错!SQL:".$str_sql);
                return $result;
        }

        function fetch_row($result){
                if(empty($result)){
                        die("fetch_row参数无效!");
                }
                $record = mysql_fetch_row($result);
                return $record;
        }

        function fetch_array($result,$type=MYSQL_ASSOC){
                if(empty($result)){
                        die("fetch_array参数无效!");
                }
                $record = mysql_fetch_array($result,$type);
                return $record;
        }

        function free_result($result){
                return mysql_free_result($result) or die("释放资源失败!");
        }

        function getValue($sql,&$link)
        {
                $row = self::getRow($sql,$link);
                if(!$row)
                {
                        return 0;
                }
                return $row;
        }
        function getRow($sql,&$link)
        {
                $result = mysql_query($sql,$link) or die("执行sql出错!SQL:".$sql);
                if(@mysql_num_rows($result)==0)
                {
                        return 0;
                }
                $row = mysql_fetch_array($result,MYSQL_ASSOC);
                return $row;
        }

        function getRows($sql,&$link)
        {
                $result = mysql_query($sql,$link) or die("执行sql出错!SQL:".$sql);
                if(@mysql_num_rows($result)==0)
                {
                        return 0;
                }
                $rows = array();
                while($row = mysql_fetch_array($result,MYSQL_ASSOC))
                {
                        $rows[] = $row;
                }
                return $rows;
        }

        function insertbystr_sql($insert_sql,&$link){
                return mysql_query($insert_sql,$link) or die("插入数据失败!SQL:".$insert_sql);
        }
function insertbyarray($tableName,$data,&$link){
                if(is_array($data)&&$tableName)
                {
                        $sql = "insert into $tableName (";
                        foreach ($data as $key=>$val)
                        {
                                $sql.=$key.',';
                        }
                        $sql = substr($sql,0,-1).')';
                        $sql.=" values ('";
                        foreach ($data as $val)
                        {
                                $val = addslashes($val);
                                $sql.=$val."','";
                        }
                        $sql = substr($sql,0,-2).")";
                        //var_dump($sql);
                        if(self::query($sql,$link))
                        return true;
                }
                return false;
        }
        function insertupdate($tableName ,$data,&$link)
        {
                if(is_array($data)&&$tableName)
                {
                        $sql = "insert into $tableName (";
                        foreach($data as $key=>$val)
                        {
                                        $sql.=$key.',';
                        }
                        $sql = substr($sql,0,-1).')';
                        $sql.=" value ('";
                        foreach($data as $val)
                        {
                                        $val = addslashes($val);
                                        $sql.=$val."','";
                        }
                        $sql = substr($sql,0,-2).")";
                        $sql.=" on duplicate key update ";
                        foreach($data as $key=>$val)
                        {
                                        $val = addslashes($val);
                                        $sql.= ' `'.$key.'`=\''.$val.'\',';
                        }

                        $sql = substr($sql,0,-1);
                        //var_dump($sql);
                        if(self::query($sql,$link))
                        return true;
                }
                return false;
        }

        function delete($tableName,$condition,&$link)
        {
                $sql = "delete from $tableName where $condition";
                if(self::query($sql,$link))
                return true;

                return false;

        }

        function insert_id(&$link){
                return mysql_insert_id($link) or die("insert id 无效");
        }
        function target_operator($insert_sql,$insert_sql_thread="",$type=false){
                if($type){
                        $this->query($insert_sql_thread,$this->target_connect);
                }
                $this->query($insert_sql,$this->target_connect);
                if($type){
                        $insert_id = $this->fetch_row($this->query('select LAST_INSERT_ID()',$this->target_connect));
                        $update_sql = "update `cdb_posts` set `tid`={$insert_id[0]} where `pid`={$insert_id[0]}";
                        //$this->query($update_sql,$this->target_connect);
                        return $insert_id[0];
                }
        }
        function database_rsync($source,$target){
                $source_table_list = $this->show_tables($source['database'],$this->source_connect);
                foreach($source_table_list as $table){
                        $total_num_source_result = $this->query("select count(*) as total from `{$table}`",$this->source_connect);
                        $source_total_row = mysql_fetch_array($total_num_source_result,MYSQL_ASSOC);
                        if($source_total_row['total']){
                                //$max_target_result = $this->query("select max(id) from `{$table}`",$this->target_connect);
                                $fields = mysql_list_fields($source['database'],$table,$this->source_connect);
                                //$columns = mysql_num_fields($fields);

                                $pkeyname = mysql_field_name($fields, 0);
                                $max_source_result = $this->query("select max({$pkeyname}) as maxid from `{$table}`",$this->source_connect);
                                $source_max_row = mysql_fetch_array($max_source_result,MYSQL_ASSOC);
                                $source_maxid = $source_max_row['maxid'];
                                $total_num_target_result = $this->query("select count(*) as total from `{$table}`",$this->target_connect);
                                $target_total_row = mysql_fetch_array($total_num_target_result,MYSQL_ASSOC);
                                if($target_total_row['total']){
                                        $max_target_result = $this->query("select max({$pkeyname}) as target_maxid from `{$table}`",$this->target_connect);
                                        $target_max_row = mysql_fetch_array($max_target_result,MYSQL_ASSOC);
                                        $target_maxid = $target_max_row['target_maxid'];
                                }else{
                                        $target_maxid = 0;
                                }
                                if(($source_maxid != $target_maxid) || ($source_total_row['total'] != $target_total_row['total'])){
                                        echo "table=>{$table} ,source_total=>{$source_total_row['total']} , source_maxid=>{$source_maxid} |target_total=>{$target_total_row['total']}  ,target_maxid=>{$target_maxid}";
                                        echo "\n";
                                        if((($target_maxid < $source_maxid) && (abs($source_maxid-$target_maxid)>$this->mig_num)) || (abs($source_total_row['total']-$target_total_row['total'])>$this->mig_num)){
                                                $start = 0;
                                                $str_sql = "select * from `{$table}` where `{$pkeyname}`>'{$target_maxid}' limit {$start},{$this->limit}";
                                                $result = $this->query($str_sql,$this->source_connect);
                                                while(mysql_num_rows($result)){
                                                        //static $count;
                                                        $count=0;
                                                        while($row=mysql_fetch_array($result,MYSQL_ASSOC)){
                                                                //var_dump($row);
                                                                $this->insertupdate($table,$row,$this->target_connect);//insertbyarray
                                                                $count++;
                                                        }
                                                        echo $count."\n";
                                                        $start += $this->limit;
                                                        mysql_free_result($result);
                                                        $str_sql = "select * from `{$table}` where `{$pkeyname}`>'{$target_maxid}' limit {$start},{$this->limit}";
                                                        echo $str_sql."\n";
                                                        sleep(2);
                                                        $result = $this->query($str_sql,$this->source_connect);
                                                }
                                                echo $table." php脚本同步成功!\n";
                                               //exit('测试完毕');
                                        }else{
                                                echo $table." 基本正常!\n";
                                        }
                                }else{
                                        echo $table." 正常!\n";
                                }
                        }
                        }
                }

}

$source = array('host'=>'127.0.0.1','user'=>'dab','password'=>'phpdba','charset'=>'utf8','database'=>'chen-123');
$target = array('host'=>'192.168.0.103:3308','user'=>'dba','password'=>'phpdba','charset'=>'utf8','database'=>'chen-123');
$data_mig = new data_migration($source,$target);
$data_mig->database_rsync($source,$target);
$data_mig->close_connect($data_mig->source_connect);
$data_mig->close_connect($data_mig->target_connect);
echo "所有数据导入完成!"."\n";
此条目发表在技术生涯分类目录。将固定链接加入收藏夹。

发表评论

邮箱地址不会被公开。 必填项已用*标注