|
|
@@ -147,239 +147,353 @@ $workerTcp->onWorkerStart = function ($workerTcp) {
|
|
|
// 接口服务器向我发送的信息
|
|
|
$connection_to_tcp->onMessage = function ($connection_to_tcp, $data) {
|
|
|
|
|
|
- global $num;
|
|
|
+ tcpMessHandle($data);
|
|
|
|
|
|
- //这里显示的是从接口服务器返回的数据(需要根据数据的类似判断)
|
|
|
- global $db; //操作数据的全局变量
|
|
|
- global $recv_buffer; //存储不完整的数据
|
|
|
+ };
|
|
|
|
|
|
|
|
|
- global $index; //全局的变量
|
|
|
- global $send_data; //全局的数据
|
|
|
- global $send_order; //全局的订单号码
|
|
|
+ $connection_to_tcp->onClose = function($connection_to_tcp)
|
|
|
+ {
|
|
|
|
|
|
+ echo "connection closed\n";
|
|
|
+ log_file("to_tcp connection closed\n");
|
|
|
|
|
|
- global $rollback_data; //rollback数据
|
|
|
- global $rollback_index; //rollback 管理数据序号
|
|
|
+ // $connection_to_tcp->reConnect(); //执行连接
|
|
|
+ };
|
|
|
|
|
|
+ $connection_to_tcp->onError = function($connection_to_tcp, $code, $msg)
|
|
|
+ {
|
|
|
+ echo "Error code:$code msg:$msg\n";
|
|
|
+ };
|
|
|
|
|
|
- global $retry_data; //retry数据
|
|
|
- global $retry_index; //retry 管理数据的序号
|
|
|
|
|
|
- global $retry_rollback_data; //retry表中rollback数据
|
|
|
- global $retry_rollback_index; //retry表rollback管理数据的序号
|
|
|
- global $seqnum;
|
|
|
- $bufExplode = explode("\n", $data); //通过指定的分隔符,把字符串打散为数组
|
|
|
- foreach ($bufExplode as $key => $value) {
|
|
|
|
|
|
- if ($value) {
|
|
|
- $msg = json_decode($value, true); //对信息进行处理
|
|
|
- if (!$msg) {
|
|
|
+ // 向远处的服务器发送信息============================================================
|
|
|
|
|
|
- $recv_buffer .= $value; //把数据组装起来
|
|
|
- $msg1 = json_decode($recv_buffer, true);
|
|
|
- if ($msg1) {
|
|
|
- $msg = $msg1;
|
|
|
- } else {
|
|
|
- continue; //解析的不对就跳出循环
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- continue; //信息不存在就跳出循环
|
|
|
- }
|
|
|
+ $connection_to_tcp->connect(); //执行连接
|
|
|
+ //发送给 接口tcp服务器 心跳========================================================
|
|
|
|
|
|
- // 接口服务器返回的信息
|
|
|
- echo "接口返回数据";
|
|
|
- var_dump($msg);
|
|
|
- if ($msg['type'] == "get_order_record") { //查询订单状态
|
|
|
- $recv_buffer = "";
|
|
|
- if (!isset($msg['seqnum'])) {
|
|
|
- return false;
|
|
|
+ Timer::add(30, function () use ($connection_to_tcp) {
|
|
|
+
|
|
|
+ //定时发送心跳信息
|
|
|
+ $connection_to_tcp->send('{"type":"ping"}' . "\n");
|
|
|
+
|
|
|
+ });
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ // 发送给 webserver 服务器 心跳=====================================================
|
|
|
+ Timer::add(1, function () use ($con,$connection_to_tcp) {
|
|
|
+ // 定时发送心跳信息
|
|
|
+ global $connection_count;
|
|
|
+
|
|
|
+ if($connection_count){
|
|
|
+ if($connection_count>1){
|
|
|
+
|
|
|
+ $connection_to_tcp->connect(); //客户端上线重新连接接口
|
|
|
+ $connection_to_tcp->onConnect = function ($connection_to_tcp) {
|
|
|
+ // 发送心跳信息
|
|
|
+ $connection_to_tcp->send('{"type":"ping"}' . "\n");
|
|
|
+ };
|
|
|
+ // 接口服务器向我发送的信息
|
|
|
+ $connection_to_tcp->onMessage = function ($connection_to_tcp, $data) {
|
|
|
+
|
|
|
+ tcpMessHandle($data);
|
|
|
+ };
|
|
|
+
|
|
|
+ $connection_to_tcp->onClose = function($connection_to_tcp)
|
|
|
+ {
|
|
|
+
|
|
|
+ echo "connection closed\n";
|
|
|
+ log_file("to_tcp connection closed\n");
|
|
|
+
|
|
|
+ // $connection_to_tcp->reConnect(); //执行连接
|
|
|
+ };
|
|
|
+
|
|
|
+ $connection_to_tcp->onError = function($connection_to_tcp, $code, $msg)
|
|
|
+ {
|
|
|
+ echo "Error code:$code msg:$msg\n";
|
|
|
+ };
|
|
|
+
|
|
|
+ }else{
|
|
|
+ //客户端不在线
|
|
|
+ $connection_to_tcp->close();
|
|
|
}
|
|
|
- ack($msg['seqnum']);
|
|
|
- $seqnum++;
|
|
|
}
|
|
|
- if ($msg['type'] == "set_current_order") { //设置当前跟单订单
|
|
|
- $recv_buffer = "";
|
|
|
- if (!isset($msg['seqnum'])) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- sendToWebServer($msg);
|
|
|
- ack($msg['seqnum']);
|
|
|
- $seqnum++;
|
|
|
+ $con->send(json_encode(['type' => 'heartbeat']). "\n");
|
|
|
+ });
|
|
|
|
|
|
- }
|
|
|
|
|
|
- if ($msg['type'] == "insert") { //Insert请求 (有数据库操作,插入)
|
|
|
- $recv_buffer = "";
|
|
|
- echo "接收到tcp" . curenttime();
|
|
|
- if (!isset($msg['seqnum'])) {
|
|
|
- return false;
|
|
|
+
|
|
|
+ // 心跳检测(TCP 所有客户端) 心跳===================================================
|
|
|
+ Timer::add(1, function () use ($workerTcp) {
|
|
|
+ $time_now = time(); //当前时间
|
|
|
+ foreach ($workerTcp->connections as $connection) {
|
|
|
+ // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
|
|
|
+ if (empty($connection->lastMessageTime)) {
|
|
|
+ $connection->lastMessageTime = $time_now;
|
|
|
+ continue;
|
|
|
}
|
|
|
- if ($seqnum != $msg["seqnum"]) {
|
|
|
- return false;
|
|
|
+ // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
|
|
|
+ if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
|
|
|
+
|
|
|
+ $connection->close();
|
|
|
}
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+};
|
|
|
+
|
|
|
+$workerTcp->onConnect = function ($connection) {
|
|
|
+ echo "connection onConnect\n";
|
|
|
+};
|
|
|
+
|
|
|
+$workerTcp->onError = function ($connection, $code, $msg) {
|
|
|
+ echo "Error code:$code msg:$msg\n";
|
|
|
+};
|
|
|
+
|
|
|
+$workerTcp->onClose = function ($connection) {
|
|
|
+ echo "connection closed\n";
|
|
|
+};
|
|
|
+
|
|
|
+$workerTcp->onMessage = function ($connection, $data) {
|
|
|
+ echo "connection onMessage\n";
|
|
|
+};
|
|
|
+//远程服务器返回消息处理
|
|
|
+function tcpMessHandle($data)
|
|
|
+{
|
|
|
+ global $num;
|
|
|
+
|
|
|
+ //这里显示的是从接口服务器返回的数据(需要根据数据的类似判断)
|
|
|
+ global $db; //操作数据的全局变量
|
|
|
+ global $recv_buffer; //存储不完整的数据
|
|
|
|
|
|
- $index++; //将数据加一
|
|
|
- $total = count($send_data); //总数据的长度
|
|
|
- $insert_data = $msg['dest'];
|
|
|
- ack($msg['seqnum']);
|
|
|
- $seqnum++;
|
|
|
- if ($index < $total) {
|
|
|
- send_insert($send_data, $index, $send_order);
|
|
|
+
|
|
|
+ global $index; //全局的变量
|
|
|
+ global $send_data; //全局的数据
|
|
|
+ global $send_order; //全局的订单号码
|
|
|
+
|
|
|
+
|
|
|
+ global $rollback_data; //rollback数据
|
|
|
+ global $rollback_index; //rollback 管理数据序号
|
|
|
+
|
|
|
+
|
|
|
+ global $retry_data; //retry数据
|
|
|
+ global $retry_index; //retry 管理数据的序号
|
|
|
+
|
|
|
+ global $retry_rollback_data; //retry表中rollback数据
|
|
|
+ global $retry_rollback_index; //retry表rollback管理数据的序号
|
|
|
+ global $seqnum;
|
|
|
+ $bufExplode = explode("\n", $data); //通过指定的分隔符,把字符串打散为数组
|
|
|
+ foreach ($bufExplode as $key => $value) {
|
|
|
+
|
|
|
+ if ($value) {
|
|
|
+ $msg = json_decode($value, true); //对信息进行处理
|
|
|
+ if (!$msg) {
|
|
|
+
|
|
|
+ $recv_buffer .= $value; //把数据组装起来
|
|
|
+ $msg1 = json_decode($recv_buffer, true);
|
|
|
+ if ($msg1) {
|
|
|
+ $msg = $msg1;
|
|
|
} else {
|
|
|
- $index = 0;
|
|
|
+ continue; //解析的不对就跳出循环
|
|
|
}
|
|
|
- sendToWebServer($msg); //发送给前端
|
|
|
- $arr = "";
|
|
|
- foreach ($insert_data as $key => $value) {
|
|
|
- if ($value['error_code'] == -2) {
|
|
|
- return false;
|
|
|
- } else {
|
|
|
- /* $msg_child['type'] = $msg['type'];
|
|
|
- $msg_child['orig_order'] = $msg['orig_order'];
|
|
|
- $msg_child['orig_login'] = $msg['orig_login'];
|
|
|
- $msg_child['dest_login'] = $value['dest_login'];
|
|
|
- $msg_child['dest_order'] = $value['dest_order'];
|
|
|
- $msg_child['percentage'] = $value['percentage'];
|
|
|
- $msg_child['profit'] = $value['profit'];
|
|
|
- $msg_child['error_code'] = $value['error_code'];
|
|
|
- $msg_child['addtime']=time(); //添加时间
|
|
|
- $result = $db->insert('order_progress')->cols($msg_child)->query(); //向order_progress插入数据
|
|
|
- $result1 = $db->insert('order_save')->cols($msg_child)->query(); //向order_save插入数据*/
|
|
|
- $time = time();
|
|
|
- $arr .= "(" . "'{$msg['type']}'" . "," . $msg['orig_order'] . "," . $msg['orig_login'] . "," . $value['dest_login'] . "," . $value['dest_order'] . "," . $value['percentage'] . "," . $value['profit'] . "," . $value['error_code'] . "," . $time . ")" . ",";
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ continue; //信息不存在就跳出循环
|
|
|
+ }
|
|
|
|
|
|
- }
|
|
|
- }
|
|
|
- $filed = "type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime";
|
|
|
- $arr = substr($arr, 0, -1);
|
|
|
- $sql = sprintf("INSERT INTO %s(%s) VALUES %s", "order_progress", $filed, $arr);
|
|
|
- $sql2 = sprintf("INSERT INTO %s(%s) VALUES %s", "order_save", $filed, $arr);
|
|
|
- $db->query($sql);
|
|
|
- $db->query($sql2);
|
|
|
+ // 接口服务器返回的信息
|
|
|
+ echo "接口返回数据";
|
|
|
+ var_dump($msg);
|
|
|
+ if ($msg['type'] == "get_order_record") { //查询订单状态
|
|
|
+ $recv_buffer = "";
|
|
|
+ if (!isset($msg['seqnum'])) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ ack($msg['seqnum']);
|
|
|
+ $seqnum++;
|
|
|
+ }
|
|
|
+ if ($msg['type'] == "set_current_order") { //设置当前跟单订单
|
|
|
+ $recv_buffer = "";
|
|
|
+ if (!isset($msg['seqnum'])) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ sendToWebServer($msg);
|
|
|
+ ack($msg['seqnum']);
|
|
|
+ $seqnum++;
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ if ($msg['type'] == "insert") { //Insert请求 (有数据库操作,插入)
|
|
|
+ $recv_buffer = "";
|
|
|
+ if (!isset($msg['seqnum'])) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if ($seqnum != $msg["seqnum"]) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
|
|
|
+ $index++; //将数据加一
|
|
|
+ $total = count($send_data); //总数据的长度
|
|
|
+ $insert_data = $msg['dest'];
|
|
|
+ ack($msg['seqnum']);
|
|
|
+ $seqnum++;
|
|
|
+ if ($index < $total) {
|
|
|
+ send_insert($send_data, $index, $send_order);
|
|
|
+ } else {
|
|
|
+ $index = 0;
|
|
|
}
|
|
|
- if ($msg['type'] == "rollback") { //Rollback请求 (有数据库操作,插入并看看是否需要直接返回给前端)
|
|
|
- $recv_buffer = "";
|
|
|
- if (!isset($msg['seqnum'])) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- if ($seqnum != $msg['seqnum']) {
|
|
|
+ sendToWebServer($msg); //发送给前端
|
|
|
+ $arr = "";
|
|
|
+ foreach ($insert_data as $key => $value) {
|
|
|
+ if ($value['error_code'] == -2) {
|
|
|
return false;
|
|
|
- }
|
|
|
- $rollback_index++; //将数据加一
|
|
|
- $total = count($rollback_data); //总数据的长度
|
|
|
- $rollbackdata = $msg['desc'];
|
|
|
- ack($msg["seqnum"]);
|
|
|
- $seqnum++;
|
|
|
- if ($rollback_index < $total) {
|
|
|
- send_rollback($rollback_data, $rollback_index);
|
|
|
+ } else {
|
|
|
+ /* $msg_child['type'] = $msg['type'];
|
|
|
+ $msg_child['orig_order'] = $msg['orig_order'];
|
|
|
+ $msg_child['orig_login'] = $msg['orig_login'];
|
|
|
+ $msg_child['dest_login'] = $value['dest_login'];
|
|
|
+ $msg_child['dest_order'] = $value['dest_order'];
|
|
|
+ $msg_child['percentage'] = $value['percentage'];
|
|
|
+ $msg_child['profit'] = $value['profit'];
|
|
|
+ $msg_child['error_code'] = $value['error_code'];
|
|
|
+ $msg_child['addtime']=time(); //添加时间
|
|
|
+ $result = $db->insert('order_progress')->cols($msg_child)->query(); //向order_progress插入数据
|
|
|
+ $result1 = $db->insert('order_save')->cols($msg_child)->query(); //向order_save插入数据*/
|
|
|
+ $time = time();
|
|
|
+ $arr .= "(" . "'{$msg['type']}'" . "," . $msg['orig_order'] . "," . $msg['orig_login'] . "," . $value['dest_login'] . "," . $value['dest_order'] . "," . $value['percentage'] . "," . $value['profit'] . "," . $value['error_code'] . "," . $time . ")" . ",";
|
|
|
|
|
|
- }else{
|
|
|
- $rollback_index = 0;
|
|
|
}
|
|
|
+ }
|
|
|
+ $filed = "type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime";
|
|
|
+ $arr = substr($arr, 0, -1);
|
|
|
+ $sql = sprintf("INSERT INTO %s(%s) VALUES %s", "order_progress", $filed, $arr);
|
|
|
+ $sql2 = sprintf("INSERT INTO %s(%s) VALUES %s", "order_save", $filed, $arr);
|
|
|
+ $db->query($sql);
|
|
|
+ $db->query($sql2);
|
|
|
|
|
|
- sendToWebServer($msg); //发送给前端
|
|
|
- $delete_sql = 'delete from order_progress where ';
|
|
|
- $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
|
|
|
+ }
|
|
|
+ if ($msg['type'] == "rollback") { //Rollback请求 (有数据库操作,插入并看看是否需要直接返回给前端)
|
|
|
+ $recv_buffer = "";
|
|
|
+ if (!isset($msg['seqnum'])) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if ($seqnum != $msg['seqnum']) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ $rollback_index++; //将数据加一
|
|
|
+ $total = count($rollback_data); //总数据的长度
|
|
|
+ $rollbackdata = $msg['desc'];
|
|
|
+ ack($msg["seqnum"]);
|
|
|
+ $seqnum++;
|
|
|
+ if ($rollback_index < $total) {
|
|
|
+ send_rollback($rollback_data, $rollback_index);
|
|
|
|
|
|
- $update = [];
|
|
|
- foreach ($rollbackdata as $key => $value) {
|
|
|
- if ($value['error_code'] == -2) {
|
|
|
- return false;
|
|
|
- } else {
|
|
|
+ }else{
|
|
|
+ $rollback_index = 0;
|
|
|
+ }
|
|
|
|
|
|
- $time = time();
|
|
|
- if ($value['error_code'] == 0) {
|
|
|
+ sendToWebServer($msg); //发送给前端
|
|
|
+ $delete_sql = 'delete from order_progress where ';
|
|
|
+ $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
|
|
|
|
|
|
- $delete_sql .= "(dest_login = $value[dest_login] and orig_order = $msg[orig_order]) or ";
|
|
|
- $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
|
|
|
+ $update = [];
|
|
|
+ foreach ($rollbackdata as $key => $value) {
|
|
|
+ if ($value['error_code'] == -2) {
|
|
|
+ return false;
|
|
|
+ } else {
|
|
|
|
|
|
- } else {
|
|
|
- //不成功把参数组装数组以便批量修改
|
|
|
- $rollbackdata[$key]['type'] = $msg['type'];
|
|
|
- $rollbackdata[$key]['orig_order'] = $msg['orig_order'];
|
|
|
- $rollbackdata[$key]['orig_login'] = $msg['orig_login'];
|
|
|
- $rollbackdata[$key]['addtime'] = time();
|
|
|
- array_push($update, $value);
|
|
|
- }
|
|
|
+ $time = time();
|
|
|
+ if ($value['error_code'] == 0) {
|
|
|
|
|
|
+ $delete_sql .= "(dest_login = $value[dest_login] and orig_order = $msg[orig_order]) or ";
|
|
|
+ $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
|
|
|
+
|
|
|
+ } else {
|
|
|
+ //不成功把参数组装数组以便批量修改
|
|
|
+ $rollbackdata[$key]['type'] = $msg['type'];
|
|
|
+ $rollbackdata[$key]['orig_order'] = $msg['orig_order'];
|
|
|
+ $rollbackdata[$key]['orig_login'] = $msg['orig_login'];
|
|
|
+ $rollbackdata[$key]['addtime'] = time();
|
|
|
+ array_push($update, $value);
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- //执行批量修改数据
|
|
|
- if (!empty($update)) {
|
|
|
- $sql = batchUpdate($rollbackdata, 'dest_login', ['orig_order' => $msg['orig_order']],"order_progress");
|
|
|
- $db->query($sql);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- //成功执行删除后增加数据
|
|
|
- if ($delete_sql != 'delete from order_progress where ') {
|
|
|
- $delete_sql = trim($delete_sql, 'or ');
|
|
|
- $insert_sql = substr($insert_sql, 0, -1);
|
|
|
- $db->query($delete_sql);
|
|
|
- $db->query($insert_sql);
|
|
|
- }
|
|
|
+ //执行批量修改数据
|
|
|
+ if (!empty($update)) {
|
|
|
+ $sql = batchUpdate($rollbackdata, 'dest_login', ['orig_order' => $msg['orig_order']],"order_progress");
|
|
|
+ $db->query($sql);
|
|
|
+ }
|
|
|
|
|
|
+ //成功执行删除后增加数据
|
|
|
+ if ($delete_sql != 'delete from order_progress where ') {
|
|
|
+ $delete_sql = trim($delete_sql, 'or ');
|
|
|
+ $insert_sql = substr($insert_sql, 0, -1);
|
|
|
+ $db->query($delete_sql);
|
|
|
+ $db->query($insert_sql);
|
|
|
}
|
|
|
|
|
|
- if ($msg['type'] == "except_rollback") { //retry表中的Rollback请求 (有数据库操作,插入并看看是否需要直接返回给前端)
|
|
|
- $recv_buffer = "";
|
|
|
- if (!isset($msg['seqnum'])) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- if ($seqnum != $msg['seqnum']) {
|
|
|
+ }
|
|
|
+
|
|
|
+ if ($msg['type'] == "except_rollback") { //retry表中的Rollback请求 (有数据库操作,插入并看看是否需要直接返回给前端)
|
|
|
+ $recv_buffer = "";
|
|
|
+ if (!isset($msg['seqnum'])) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ if ($seqnum != $msg['seqnum']) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ ack($msg["seqnum"]);
|
|
|
+ $seqnum++;
|
|
|
+ $retry_rollback_index++; //将数据加一
|
|
|
+ $total = count($retry_rollback_data); //总数据的长度
|
|
|
+ if ($retry_rollback_index < $total) {
|
|
|
+ send_retry_rollback($retry_rollback_data, $retry_rollback_index); //继续发送数据
|
|
|
+ } else {
|
|
|
+ $retry_rollback_index = 0; //对序号重置为0
|
|
|
+ }
|
|
|
+
|
|
|
+ $exceptrollbackdata = $msg['desc'];
|
|
|
+ sendToWebServer($msg);
|
|
|
+ $update = [];
|
|
|
+ $delete_sql = 'delete from order_progress where ';
|
|
|
+ $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
|
|
|
+ foreach ($exceptrollbackdata as $key => $value) {
|
|
|
+ if ($value['error_code'] == -2) {
|
|
|
return false;
|
|
|
- }
|
|
|
- ack($msg["seqnum"]);
|
|
|
- $seqnum++;
|
|
|
- $retry_rollback_index++; //将数据加一
|
|
|
- $total = count($retry_rollback_data); //总数据的长度
|
|
|
- if ($retry_rollback_index < $total) {
|
|
|
- send_retry_rollback($retry_rollback_data, $retry_rollback_index); //继续发送数据
|
|
|
} else {
|
|
|
- $retry_rollback_index = 0; //对序号重置为0
|
|
|
- }
|
|
|
+ $time = time();
|
|
|
+ if ($value['error_code'] == 0) {
|
|
|
+ $delete_sql .= "(dest_login = $value[dest_login] and orig_order = $msg[orig_order]) or ";
|
|
|
+ $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
|
|
|
|
|
|
- $exceptrollbackdata = $msg['desc'];
|
|
|
- sendToWebServer($msg);
|
|
|
- $update = [];
|
|
|
- $delete_sql = 'delete from order_progress where ';
|
|
|
- $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
|
|
|
- foreach ($exceptrollbackdata as $key => $value) {
|
|
|
- if ($value['error_code'] == -2) {
|
|
|
- return false;
|
|
|
} else {
|
|
|
- $time = time();
|
|
|
- if ($value['error_code'] == 0) {
|
|
|
- $delete_sql .= "(dest_login = $value[dest_login] and orig_order = $msg[orig_order]) or ";
|
|
|
- $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
|
|
|
-
|
|
|
- } else {
|
|
|
- $exceptrollbackdata[$key]['type'] = $msg['type'];
|
|
|
- $exceptrollbackdata[$key]['orig_order'] = $msg['orig_order'];
|
|
|
- $exceptrollbackdata[$key]['orig_login'] = $msg['orig_login'];
|
|
|
- $exceptrollbackdata[$key]['addtime'] = time();
|
|
|
- array_push($update, $value);
|
|
|
-
|
|
|
- }
|
|
|
+ $exceptrollbackdata[$key]['type'] = $msg['type'];
|
|
|
+ $exceptrollbackdata[$key]['orig_order'] = $msg['orig_order'];
|
|
|
+ $exceptrollbackdata[$key]['orig_login'] = $msg['orig_login'];
|
|
|
+ $exceptrollbackdata[$key]['addtime'] = time();
|
|
|
+ array_push($update, $value);
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
|
|
|
- if (!empty($update)){
|
|
|
- $sql = batchUpdate($exceptrollbackdata, 'dest_login', ['orig_order' => $msg['orig_order']],"order_progress");
|
|
|
- $db->query($sql);
|
|
|
- }
|
|
|
+ if (!empty($update)){
|
|
|
+ $sql = batchUpdate($exceptrollbackdata, 'dest_login', ['orig_order' => $msg['orig_order']],"order_progress");
|
|
|
+ $db->query($sql);
|
|
|
+ }
|
|
|
|
|
|
- if ($delete_sql != 'delete from order_progress where ') {
|
|
|
- $delete_sql = trim($delete_sql, 'or ');
|
|
|
- $insert_sql = substr($insert_sql, 0, -1);
|
|
|
- $db->query($delete_sql);
|
|
|
- $db->query($insert_sql);
|
|
|
- }
|
|
|
+ if ($delete_sql != 'delete from order_progress where ') {
|
|
|
+ $delete_sql = trim($delete_sql, 'or ');
|
|
|
+ $insert_sql = substr($insert_sql, 0, -1);
|
|
|
+ $db->query($delete_sql);
|
|
|
+ $db->query($insert_sql);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
|
|
|
if ($msg['type'] == "retry") { //
|
|
|
@@ -408,7 +522,7 @@ $workerTcp->onWorkerStart = function ($workerTcp) {
|
|
|
return false;
|
|
|
} else {
|
|
|
|
|
|
- $time = time();
|
|
|
+ $time = time();
|
|
|
if ($value['error_code'] == 0) {
|
|
|
$retrydata[$key]['type'] = $msg['type'];
|
|
|
$retrydata[$key]['orig_order'] = $msg['orig_order'];
|
|
|
@@ -444,425 +558,8 @@ $workerTcp->onWorkerStart = function ($workerTcp) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- };
|
|
|
-
|
|
|
-
|
|
|
- $connection_to_tcp->onClose = function($connection_to_tcp)
|
|
|
- {
|
|
|
-
|
|
|
- echo "connection closed\n";
|
|
|
- log_file("to_tcp connection closed\n");
|
|
|
-
|
|
|
- // $connection_to_tcp->reConnect(); //执行连接
|
|
|
- };
|
|
|
-
|
|
|
- $connection_to_tcp->onError = function($connection_to_tcp, $code, $msg)
|
|
|
- {
|
|
|
- echo "Error code:$code msg:$msg\n";
|
|
|
- };
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- // 向远处的服务器发送信息============================================================
|
|
|
-
|
|
|
- $connection_to_tcp->connect(); //执行连接
|
|
|
- //发送给 接口tcp服务器 心跳========================================================
|
|
|
-
|
|
|
- Timer::add(30, function () use ($connection_to_tcp) {
|
|
|
-
|
|
|
- //定时发送心跳信息
|
|
|
- $connection_to_tcp->send('{"type":"ping"}' . "\n");
|
|
|
-
|
|
|
- });
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- // 发送给 webserver 服务器 心跳=====================================================
|
|
|
- Timer::add(30, function () use ($con,$connection_to_tcp) {
|
|
|
- // 定时发送心跳信息
|
|
|
- global $connection_count;
|
|
|
- var_dump($connection_count);
|
|
|
- if($connection_count){
|
|
|
- if($connection_count>1){
|
|
|
-
|
|
|
- $connection_to_tcp->connect(); //执行连接
|
|
|
- $connection_to_tcp->onConnect = function ($connection_to_tcp) {
|
|
|
- // 发送心跳信息
|
|
|
- $connection_to_tcp->send('{"type":"ping"}' . "\n");
|
|
|
- };
|
|
|
-
|
|
|
-
|
|
|
- // 接口服务器向我发送的信息
|
|
|
- $connection_to_tcp->onMessage = function ($connection_to_tcp, $data) {
|
|
|
-
|
|
|
- global $num;
|
|
|
-
|
|
|
- //这里显示的是从接口服务器返回的数据(需要根据数据的类似判断)
|
|
|
- global $db; //操作数据的全局变量
|
|
|
- global $recv_buffer; //存储不完整的数据
|
|
|
-
|
|
|
-
|
|
|
- global $index; //全局的变量
|
|
|
- global $send_data; //全局的数据
|
|
|
- global $send_order; //全局的订单号码
|
|
|
-
|
|
|
-
|
|
|
- global $rollback_data; //rollback数据
|
|
|
- global $rollback_index; //rollback 管理数据序号
|
|
|
-
|
|
|
-
|
|
|
- global $retry_data; //retry数据
|
|
|
- global $retry_index; //retry 管理数据的序号
|
|
|
-
|
|
|
- global $retry_rollback_data; //retry表中rollback数据
|
|
|
- global $retry_rollback_index; //retry表rollback管理数据的序号
|
|
|
- global $seqnum;
|
|
|
- $bufExplode = explode("\n", $data); //通过指定的分隔符,把字符串打散为数组
|
|
|
- foreach ($bufExplode as $key => $value) {
|
|
|
-
|
|
|
- if ($value) {
|
|
|
- $msg = json_decode($value, true); //对信息进行处理
|
|
|
- if (!$msg) {
|
|
|
-
|
|
|
- $recv_buffer .= $value; //把数据组装起来
|
|
|
- $msg1 = json_decode($recv_buffer, true);
|
|
|
- if ($msg1) {
|
|
|
- $msg = $msg1;
|
|
|
- } else {
|
|
|
- continue; //解析的不对就跳出循环
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- continue; //信息不存在就跳出循环
|
|
|
- }
|
|
|
-
|
|
|
- // 接口服务器返回的信息
|
|
|
- echo "接口返回数据";
|
|
|
- var_dump($msg);
|
|
|
- if ($msg['type'] == "get_order_record") { //查询订单状态
|
|
|
- $recv_buffer = "";
|
|
|
- if (!isset($msg['seqnum'])) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- ack($msg['seqnum']);
|
|
|
- $seqnum++;
|
|
|
- }
|
|
|
- if ($msg['type'] == "set_current_order") { //设置当前跟单订单
|
|
|
- $recv_buffer = "";
|
|
|
- if (!isset($msg['seqnum'])) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- sendToWebServer($msg);
|
|
|
- ack($msg['seqnum']);
|
|
|
- $seqnum++;
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- if ($msg['type'] == "insert") { //Insert请求 (有数据库操作,插入)
|
|
|
- $recv_buffer = "";
|
|
|
- echo "接收到tcp" . curenttime();
|
|
|
- if (!isset($msg['seqnum'])) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- if ($seqnum != $msg["seqnum"]) {
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- $index++; //将数据加一
|
|
|
- $total = count($send_data); //总数据的长度
|
|
|
- $insert_data = $msg['dest'];
|
|
|
- ack($msg['seqnum']);
|
|
|
- $seqnum++;
|
|
|
- if ($index < $total) {
|
|
|
- send_insert($send_data, $index, $send_order);
|
|
|
- } else {
|
|
|
- $index = 0;
|
|
|
- }
|
|
|
- sendToWebServer($msg); //发送给前端
|
|
|
- $arr = "";
|
|
|
- foreach ($insert_data as $key => $value) {
|
|
|
- if ($value['error_code'] == -2) {
|
|
|
- return false;
|
|
|
- } else {
|
|
|
- /* $msg_child['type'] = $msg['type'];
|
|
|
- $msg_child['orig_order'] = $msg['orig_order'];
|
|
|
- $msg_child['orig_login'] = $msg['orig_login'];
|
|
|
- $msg_child['dest_login'] = $value['dest_login'];
|
|
|
- $msg_child['dest_order'] = $value['dest_order'];
|
|
|
- $msg_child['percentage'] = $value['percentage'];
|
|
|
- $msg_child['profit'] = $value['profit'];
|
|
|
- $msg_child['error_code'] = $value['error_code'];
|
|
|
- $msg_child['addtime']=time(); //添加时间
|
|
|
- $result = $db->insert('order_progress')->cols($msg_child)->query(); //向order_progress插入数据
|
|
|
- $result1 = $db->insert('order_save')->cols($msg_child)->query(); //向order_save插入数据*/
|
|
|
- $time = time();
|
|
|
- $arr .= "(" . "'{$msg['type']}'" . "," . $msg['orig_order'] . "," . $msg['orig_login'] . "," . $value['dest_login'] . "," . $value['dest_order'] . "," . $value['percentage'] . "," . $value['profit'] . "," . $value['error_code'] . "," . $time . ")" . ",";
|
|
|
-
|
|
|
- }
|
|
|
- }
|
|
|
- $filed = "type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime";
|
|
|
- $arr = substr($arr, 0, -1);
|
|
|
- $sql = sprintf("INSERT INTO %s(%s) VALUES %s", "order_progress", $filed, $arr);
|
|
|
- $sql2 = sprintf("INSERT INTO %s(%s) VALUES %s", "order_save", $filed, $arr);
|
|
|
- $db->query($sql);
|
|
|
- $db->query($sql2);
|
|
|
-
|
|
|
- }
|
|
|
- if ($msg['type'] == "rollback") { //Rollback请求 (有数据库操作,插入并看看是否需要直接返回给前端)
|
|
|
- $recv_buffer = "";
|
|
|
- if (!isset($msg['seqnum'])) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- if ($seqnum != $msg['seqnum']) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- $rollback_index++; //将数据加一
|
|
|
- $total = count($rollback_data); //总数据的长度
|
|
|
- $rollbackdata = $msg['desc'];
|
|
|
- ack($msg["seqnum"]);
|
|
|
- $seqnum++;
|
|
|
- if ($rollback_index < $total) {
|
|
|
- send_rollback($rollback_data, $rollback_index);
|
|
|
-
|
|
|
- }else{
|
|
|
- $rollback_index = 0;
|
|
|
- }
|
|
|
-
|
|
|
- sendToWebServer($msg); //发送给前端
|
|
|
- $delete_sql = 'delete from order_progress where ';
|
|
|
- $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
|
|
|
-
|
|
|
- $update = [];
|
|
|
- foreach ($rollbackdata as $key => $value) {
|
|
|
- if ($value['error_code'] == -2) {
|
|
|
- return false;
|
|
|
- } else {
|
|
|
-
|
|
|
- $time = time();
|
|
|
- if ($value['error_code'] == 0) {
|
|
|
-
|
|
|
- $delete_sql .= "(dest_login = $value[dest_login] and orig_order = $msg[orig_order]) or ";
|
|
|
- $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
|
|
|
-
|
|
|
- } else {
|
|
|
- //不成功把参数组装数组以便批量修改
|
|
|
- $rollbackdata[$key]['type'] = $msg['type'];
|
|
|
- $rollbackdata[$key]['orig_order'] = $msg['orig_order'];
|
|
|
- $rollbackdata[$key]['orig_login'] = $msg['orig_login'];
|
|
|
- $rollbackdata[$key]['addtime'] = time();
|
|
|
- array_push($update, $value);
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //执行批量修改数据
|
|
|
- if (!empty($update)) {
|
|
|
- $sql = batchUpdate($rollbackdata, 'dest_login', ['orig_order' => $msg['orig_order']],"order_progress");
|
|
|
- $db->query($sql);
|
|
|
- }
|
|
|
-
|
|
|
- //成功执行删除后增加数据
|
|
|
- if ($delete_sql != 'delete from order_progress where ') {
|
|
|
- $delete_sql = trim($delete_sql, 'or ');
|
|
|
- $insert_sql = substr($insert_sql, 0, -1);
|
|
|
- $db->query($delete_sql);
|
|
|
- $db->query($insert_sql);
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- if ($msg['type'] == "except_rollback") { //retry表中的Rollback请求 (有数据库操作,插入并看看是否需要直接返回给前端)
|
|
|
- $recv_buffer = "";
|
|
|
- if (!isset($msg['seqnum'])) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- if ($seqnum != $msg['seqnum']) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- ack($msg["seqnum"]);
|
|
|
- $seqnum++;
|
|
|
- $retry_rollback_index++; //将数据加一
|
|
|
- $total = count($retry_rollback_data); //总数据的长度
|
|
|
- if ($retry_rollback_index < $total) {
|
|
|
- send_retry_rollback($retry_rollback_data, $retry_rollback_index); //继续发送数据
|
|
|
- } else {
|
|
|
- $retry_rollback_index = 0; //对序号重置为0
|
|
|
- }
|
|
|
-
|
|
|
- $exceptrollbackdata = $msg['desc'];
|
|
|
- sendToWebServer($msg);
|
|
|
- $update = [];
|
|
|
- $delete_sql = 'delete from order_progress where ';
|
|
|
- $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
|
|
|
- foreach ($exceptrollbackdata as $key => $value) {
|
|
|
- if ($value['error_code'] == -2) {
|
|
|
- return false;
|
|
|
- } else {
|
|
|
- $time = time();
|
|
|
- if ($value['error_code'] == 0) {
|
|
|
- $delete_sql .= "(dest_login = $value[dest_login] and orig_order = $msg[orig_order]) or ";
|
|
|
- $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
|
|
|
-
|
|
|
- } else {
|
|
|
- $exceptrollbackdata[$key]['type'] = $msg['type'];
|
|
|
- $exceptrollbackdata[$key]['orig_order'] = $msg['orig_order'];
|
|
|
- $exceptrollbackdata[$key]['orig_login'] = $msg['orig_login'];
|
|
|
- $exceptrollbackdata[$key]['addtime'] = time();
|
|
|
- array_push($update, $value);
|
|
|
-
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- if (!empty($update)){
|
|
|
- $sql = batchUpdate($exceptrollbackdata, 'dest_login', ['orig_order' => $msg['orig_order']],"order_progress");
|
|
|
- $db->query($sql);
|
|
|
- }
|
|
|
-
|
|
|
- if ($delete_sql != 'delete from order_progress where ') {
|
|
|
- $delete_sql = trim($delete_sql, 'or ');
|
|
|
- $insert_sql = substr($insert_sql, 0, -1);
|
|
|
- $db->query($delete_sql);
|
|
|
- $db->query($insert_sql);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- if ($msg['type'] == "retry") { //
|
|
|
- if (!isset($msg['seqnum'])) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- if ($seqnum != $msg['seqnum']) {
|
|
|
- return false;
|
|
|
- }
|
|
|
- ack($msg["seqnum"]);
|
|
|
- $seqnum++;
|
|
|
- $retry_index++; //将数据加一
|
|
|
- $total = count($retry_data); //总数据的长度
|
|
|
- if ($retry_index < $total) {
|
|
|
- send_retry($retry_data, $retry_index);
|
|
|
- } else {
|
|
|
- $retry_index = 0;
|
|
|
- }
|
|
|
- $retrydata = $msg['desc'];
|
|
|
- sendToWebServer($msg);
|
|
|
- $succeUpdate = [];
|
|
|
- $update = [];
|
|
|
- $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
|
|
|
- foreach ($retrydata as $key => $value) {
|
|
|
- if ($value['error_code'] == -2) {
|
|
|
- return false;
|
|
|
- } else {
|
|
|
-
|
|
|
- $time = time();
|
|
|
- if ($value['error_code'] == 0) {
|
|
|
- $retrydata[$key]['type'] = $msg['type'];
|
|
|
- $retrydata[$key]['orig_order'] = $msg['orig_order'];
|
|
|
- $retrydata[$key]['orig_login'] = $msg['orig_login'];
|
|
|
- $retrydata[$key]['addtime'] = time();
|
|
|
- array_push($succeUpdate, $value);
|
|
|
- $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
|
|
|
-
|
|
|
- } else {
|
|
|
-
|
|
|
- $retrydata[$key]['type'] = $msg['type'];
|
|
|
- $retrydata[$key]['orig_order'] = $msg['orig_order'];
|
|
|
- $retrydata[$key]['orig_login'] = $msg['orig_login'];
|
|
|
- $retrydata[$key]['addtime'] = time();
|
|
|
- array_push($update, $value);
|
|
|
-
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (!empty($update)) {
|
|
|
- $sql = batchUpdate($retrydata, 'dest_login', ['orig_order' => $msg['orig_order']], "order_progress");
|
|
|
- $db->query($sql);
|
|
|
- }
|
|
|
-
|
|
|
- if (!empty($succeUpdate)) {
|
|
|
- $sql = batchUpdate($retrydata, 'dest_login', ['orig_order' => $msg['orig_order']], "order_progress");
|
|
|
- $db->query($sql);
|
|
|
- $insert_sql = substr($insert_sql, 0, -1);
|
|
|
- $db->query($insert_sql);
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- };
|
|
|
-
|
|
|
-
|
|
|
- $connection_to_tcp->onClose = function($connection_to_tcp)
|
|
|
- {
|
|
|
-
|
|
|
- echo "connection closed\n";
|
|
|
- log_file("to_tcp connection closed\n");
|
|
|
-
|
|
|
- // $connection_to_tcp->reConnect(); //执行连接
|
|
|
- };
|
|
|
-
|
|
|
- $connection_to_tcp->onError = function($connection_to_tcp, $code, $msg)
|
|
|
- {
|
|
|
- echo "Error code:$code msg:$msg\n";
|
|
|
- };
|
|
|
-
|
|
|
- }else{
|
|
|
- $connection_to_tcp->close();
|
|
|
- }
|
|
|
- }
|
|
|
- $con->send(json_encode(['type' => 'heartbeat']). "\n");
|
|
|
- });
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- // 心跳检测(TCP 所有客户端) 心跳===================================================
|
|
|
- Timer::add(1, function () use ($workerTcp) {
|
|
|
- $time_now = time(); //当前时间
|
|
|
- foreach ($workerTcp->connections as $connection) {
|
|
|
- // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
|
|
|
- if (empty($connection->lastMessageTime)) {
|
|
|
- $connection->lastMessageTime = $time_now;
|
|
|
- continue;
|
|
|
- }
|
|
|
- // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
|
|
|
- if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
|
|
|
-
|
|
|
- $connection->close();
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
-
|
|
|
-};
|
|
|
-
|
|
|
-$workerTcp->onConnect = function ($connection) {
|
|
|
- echo "connection onConnect\n";
|
|
|
-};
|
|
|
-
|
|
|
-$workerTcp->onError = function ($connection, $code, $msg) {
|
|
|
- echo "Error code:$code msg:$msg\n";
|
|
|
-};
|
|
|
-
|
|
|
-$workerTcp->onClose = function ($connection) {
|
|
|
- echo "connection closed\n";
|
|
|
-};
|
|
|
-
|
|
|
-$workerTcp->onMessage = function ($connection, $data) {
|
|
|
- echo "connection onMessage\n";
|
|
|
-};
|
|
|
-
|
|
|
+}
|
|
|
+//发送ack给远程服务器
|
|
|
function ack($seqnum){
|
|
|
global $connection_to_tcp;
|
|
|
$data = ["type"=>"ack","seqnum"=>$seqnum];
|
|
|
@@ -1012,13 +709,14 @@ function parseParams($params)
|
|
|
|
|
|
return $where ? " AND " .implode('AND',$where) : '';
|
|
|
}
|
|
|
-function curenttime()
|
|
|
+//程序执行时间秒速
|
|
|
+/*function curenttime()
|
|
|
{
|
|
|
$t = microtime(true);
|
|
|
$micro = sprintf("%06d",($t - floor($t)) * 1000000);
|
|
|
$d = new DateTime( date('Y-m-d H:i:s.'.$micro, $t) );
|
|
|
print $d->format("Y-m-d H:i:s.u"); // note at point on "u"
|
|
|
-}
|
|
|
+}*/
|
|
|
|
|
|
|
|
|
|