tcpServer.php 33 KB


  1. <?php
  2. require dirname(dirname(__FILE__)).'/vendor/autoload.php';
  3. require_once dirname(dirname(__FILE__)) . '/vendor/workerman/Autoloader.php';
  4. require_once dirname(dirname(__FILE__)) . '/vendor/mysql/src/Connection.php';
  5. require_once __DIR__ . '/config.php';
  6. use Workerman\Worker; //worker容器类
  7. use Workerman\Connection\AsyncTcpConnection; //连接异步客户端
  8. use Workerman\MySQL\Connection; //连接数据库的类
  9. use Workerman\Lib\Timer; //定时器函数
  10. use Monolog\Logger;
  11. use Monolog\Handler\StreamHandler;
  12. use Monolog\Handler\ErrorLogHandler;
  13. date_default_timezone_set("PRC");
  14. //初始化 (创建了一个tcp服务)
  15. $workerTcp = new Worker("tcp://0.0.0.0:12347");
  16. $workerTcp->name = "TCP";
  17. $workerTcp::$logFile = __DIR__.'/workerman.log';
  18. $path = "displaylog/".date('Ym', time());
  19. createdir($path);
  20. $workerTcp::$stdoutFile = __DIR__."/".$path."/".date("Ymd",time()).'.log';
  21. $num = 1;
  22. $workerTcp->onWorkerStart = function ($workerTcp) {
  23. global $db; //数据库
  24. global $con; //访问本地创建的webserver服务器
  25. global $connection_to_tcp; //访问接口服务器(远程的)
  26. global $connection_count;
  27. global $send_data; //存储insert数据
  28. global $index; //insert管理数据序号
  29. global $send_order; //insert全局的订单号
  30. global $seqnum;
  31. global $message;
  32. global $rollback_data; //rollback数据
  33. global $rollback_index; //rollback 管理数据序号
  34. global $retry_data; //retry数据
  35. global $retry_index; //retry 管理数据的序号
  36. global $retry_rollback_data; //retry表中rollback数据
  37. global $retry_rollback_index; //retry表rollback管理数据的序号
  38. global $token; //储存token
  39. $message = 0;
  40. $seqnum = 1;
  41. getToken();
  42. $db = new Connection(DB_HOST, DB_PORT, DB_USERNAME, DB_PASSWORD, DB_NAME); //本地数据库的配置
  43. //访问本地的websocket服务器=============================================
  44. $con = new AsyncTcpConnection("ws://127.0.0.1:12382");
  45. $con->onConnect = function ($on) {
  46. global $index;
  47. global $rollback_index;
  48. global $retry_index;
  49. global $retry_rollback_index;
  50. $index = 0;
  51. $rollback_index = 0;
  52. $retry_index = 0;
  53. $retry_rollback_index = 0;
  54. sendToWebServer([
  55. 'type' => 'login',
  56. 'uid' => 'tcp'
  57. ]);
  58. };
  59. // websocke服务器发送信息过来触发的函数
  60. $con->onMessage = function ($con, $data) {
  61. global $send_data;
  62. global $connection_count;
  63. $msg = json_decode($data, true);
  64. if ($msg['type'] == "heartbeat") {
  65. $connection_count = $msg["connection_count"];
  66. }
  67. if ($msg['type'] == "get_order_record") { //查询订单状态
  68. sendTo_tcp_Server($msg);
  69. }
  70. if ($msg['type'] == "set_current_order") { //设置当前跟单订单
  71. sendTo_tcp_Server($msg);
  72. }
  73. if($msg['type'] == 'set_current_position'){
  74. sendTo_tcp_Server($msg);
  75. }
  76. if ($msg['type'] == "insert") { //Insert请求
  77. global $index; //传递的index
  78. global $send_data; //全局的数据
  79. global $send_order; //全局的订单号码
  80. global $message;
  81. if(isset($msg['message'])){
  82. $message = 1;
  83. unset($msg['message']);
  84. }else{
  85. $message = 0;
  86. }
  87. $insert_data = $msg['data'];
  88. $insert_data = array_chunk($insert_data, 12); //每十个是一个数组
  89. $send_data = $insert_data; //传递的数据
  90. $order = trim($msg['orderid']);
  91. $send_order = (int)$order;
  92. send_insert($send_data, $index, $send_order);
  93. }
  94. if ($msg['type'] == "rollback") { //Rollback请求
  95. global $rollback_index; //传递index
  96. global $rollback_data; //全局的数据
  97. $rollback_data = array_chunk($msg['data'], 8); //每几个是一个数组
  98. send_rollback($rollback_data, $rollback_index);
  99. }
  100. if ($msg['type'] == "except_rollback") { //retry表中的 Rollback请求
  101. global $retry_rollback_data; //retry表中rollback数据
  102. global $retry_rollback_index; //retry表rollback管理数据的序号
  103. $retry_rollback_data = array_chunk($msg['data'], 5);
  104. send_retry_rollback($retry_rollback_data, $retry_rollback_index);
  105. }
  106. if ($msg['type'] == "retry") { //Retry请求
  107. global $retry_data; //retry数据
  108. global $retry_index; //retry 管理数据的序号
  109. $retry_data = array_chunk($msg['data'], 5);
  110. send_retry($retry_data, $retry_index);
  111. }
  112. };
  113. $con->onClose = function ($con) {
  114. echo "connection closed\n";
  115. //$con->reConnect(1);
  116. };
  117. $con->onError = function ($con, $code, $msg) {
  118. echo "Error code:$code msg:$msg\n";
  119. };
  120. $con->connect();
  121. // 访问本地的websocket服务器=======================================================
  122. // 向远处的服务器发送信息===========================================================
  123. //$connection_to_tcp = new AsyncTcpConnection('tcp://119.23.51.113:10008');
  124. //$connection_to_tcp = new AsyncTcpConnection('tcp://127.0.0.1:1235');
  125. //$connection_to_tcp = new AsyncTcpConnection('tcp://192.168.5.111:10009');
  126. $connection_to_tcp = new AsyncTcpConnection('tcp://103.230.218.164:10008');
  127. $connection_to_tcp->onConnect = function ($connection_to_tcp) {
  128. // 发送心跳信息
  129. $connection_to_tcp->send('{"type":"ping"}' . "\n");
  130. };
  131. // 接口服务器向我发送的信息
  132. $connection_to_tcp->onMessage = function ($connection_to_tcp, $data) {
  133. tcpMessHandle($data);
  134. };
  135. $connection_to_tcp->onClose = function($connection_to_tcp)
  136. {
  137. echo "connection closed\n";
  138. log_file("to_tcp connection closed\n");
  139. $connection_to_tcp->reConnect(); //执行连接
  140. };
  141. $connection_to_tcp->onError = function($connection_to_tcp, $code, $msg)
  142. {
  143. echo "Error code:$code msg:$msg\n";
  144. };
  145. // 向远处的服务器发送信息============================================================
  146. $connection_to_tcp->connect(); //执行连接
  147. //发送给 接口tcp服务器 心跳========================================================
  148. Timer::add(30, function () use ($connection_to_tcp) {
  149. //定时发送心跳信息
  150. $connection_to_tcp->send('{"type":"ping"}' . "\n");
  151. });
  152. // 发送给 webserver 服务器 心跳=====================================================
  153. Timer::add(30, function () use ($con,$connection_to_tcp) {
  154. // 定时发送心跳信息
  155. global $connection_count;
  156. /*if($connection_count){
  157. if($connection_count>1){
  158. $connection_to_tcp->onConnect = function ($connection_to_tcp) {
  159. // 发送心跳信息
  160. $connection_to_tcp->send('{"type":"ping"}' . "\n");
  161. };
  162. // 接口服务器向我发送的信息
  163. $connection_to_tcp->onMessage = function ($connection_to_tcp, $data) {
  164. tcpMessHandle($data);
  165. };
  166. $connection_to_tcp->onClose = function($connection_to_tcp)
  167. {
  168. echo "connection closed\n";
  169. log_file("to_tcp connection closed\n");
  170. // $connection_to_tcp->reConnect(); //执行连接
  171. };
  172. $connection_to_tcp->onError = function($connection_to_tcp, $code, $msg)
  173. {
  174. echo "Error code:$code msg:$msg\n";
  175. };
  176. $connection_to_tcp->connect(); //客户端上线重新连接接口
  177. }else{
  178. //客户端不在线
  179. $connection_to_tcp->close();
  180. }
  181. }*/
  182. $con->send(json_encode(['type' => 'heartbeat']). "\n");
  183. });
  184. Timer::add(1800,function(){
  185. getToken();
  186. });
  187. // 心跳检测(TCP 所有客户端) 心跳===================================================
  188. Timer::add(1, function () use ($workerTcp) {
  189. $time_now = time(); //当前时间
  190. foreach ($workerTcp->connections as $connection) {
  191. // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
  192. if (empty($connection->lastMessageTime)) {
  193. $connection->lastMessageTime = $time_now;
  194. continue;
  195. }
  196. // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
  197. if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
  198. $connection->close();
  199. }
  200. }
  201. });
  202. //避免数据库长时间未操作连接超时
  203. Timer::add(25200,function(){
  204. global $db;
  205. $db->query("select username from admin limit 1");
  206. });
  207. };
  208. $workerTcp->onConnect = function ($connection) {
  209. echo "connection onConnect\n";
  210. };
  211. $workerTcp->onError = function ($connection, $code, $msg) {
  212. echo "Error code:$code msg:$msg\n";
  213. };
  214. $workerTcp->onClose = function ($connection) {
  215. echo "connection closed\n";
  216. };
  217. $workerTcp->onMessage = function ($connection, $data) {
  218. echo "connection onMessage\n";
  219. };
  220. //远程服务器返回消息处理
  221. function tcpMessHandle($data)
  222. {
  223. global $num;
  224. //这里显示的是从接口服务器返回的数据(需要根据数据的类似判断)
  225. global $db; //操作数据的全局变量
  226. global $recv_buffer; //存储不完整的数据
  227. global $index; //全局的变量
  228. global $send_data; //全局的数据
  229. global $send_order; //全局的订单号码
  230. global $rollback_data; //rollback数据
  231. global $rollback_index; //rollback 管理数据序号
  232. global $retry_data; //retry数据
  233. global $retry_index; //retry 管理数据的序号
  234. global $retry_rollback_data; //retry表中rollback数据
  235. global $retry_rollback_index; //retry表rollback管理数据的序号
  236. global $seqnum;
  237. global $message;
  238. $bufExplode = explode("\n", $data); //通过指定的分隔符,把字符串打散为数组
  239. foreach ($bufExplode as $key => $value) {
  240. if ($value) {
  241. $msg = json_decode($value, true); //对信息进行处理
  242. if (!$msg) {
  243. $recv_buffer .= $value; //把数据组装起来
  244. $msg1 = json_decode($recv_buffer, true);
  245. if ($msg1) {
  246. $msg = $msg1;
  247. } else {
  248. continue; //解析的不对就跳出循环
  249. }
  250. }
  251. } else {
  252. continue; //信息不存在就跳出循环
  253. }
  254. // 接口服务器返回的信息
  255. if ($msg['type'] == "get_order_record") { //查询订单状态
  256. $recv_buffer = "";
  257. /* if (!isset($msg['seqnum'])) {
  258. return false;
  259. }
  260. ack($msg['seqnum']);
  261. $seqnum++;*/
  262. }
  263. if ($msg['type'] == "set_current_order") { //设置当前跟单订单
  264. $recv_buffer = "";
  265. /*if (!isset($msg['seqnum'])) {
  266. return false;
  267. }*/
  268. sendToWebServer($msg);
  269. /* ack($msg['seqnum']);
  270. $seqnum++;*/
  271. }
  272. if ($msg['type'] == "insert") { //Insert请求 (有数据库操作,插入)
  273. $recv_buffer = "";
  274. echo '接口返回数据';
  275. curenttime();
  276. var_dump($msg);
  277. /* if (!isset($msg['seqnum'])) {
  278. return false;
  279. }
  280. if ($seqnum != $msg["seqnum"]) {
  281. return false;
  282. }*/
  283. global $message;
  284. global $token;
  285. $index++; //将数据加一
  286. $total = count($send_data); //总数据的长度
  287. $insert_data = $msg['dest'];
  288. /* ack($msg['seqnum']);
  289. $seqnum++;*/
  290. if ($index < $total) {
  291. send_insert($send_data, $index, $send_order);
  292. } else {
  293. $index = 0;
  294. }
  295. $login = '';
  296. $arr = "";
  297. $time = time();
  298. $delete_sql = 'delete from order_progress where ';
  299. foreach($insert_data as $key=>$value){
  300. $insert_data[$key]['dest_orders'] =json_encode($value['dest_orders'],JSON_FORCE_OBJECT);
  301. $insert_data[$key]['dest_deals'] = json_encode($value['dest_deals'],JSON_FORCE_OBJECT);
  302. }
  303. foreach ($insert_data as $key => $value) {
  304. if ($value['error_code'] == -2) {
  305. return false;
  306. } else {
  307. if($message == 1){
  308. if($value['error_code'] == 0){
  309. $arr .= "(" . "'{$msg['type']}'" . "," . $msg['orig_position'] . "," . $msg['orig_login'] . "," . $value['dest_login'] . "," . $value['dest_position'] . "," . $value['percentage'] . "," . $value['profit'] . "," . $value['error_code'] . "," . $time .",". "'".$value['dest_orders']."'".","."'".$value['dest_deals']."'".")" . ",";
  310. $delete_sql .= "(dest_login = $value[dest_login] and orig_order = $msg[orig_position] and error_code != 0) or ";
  311. $login .= $value['dest_login'].",";
  312. }
  313. }else{
  314. $arr .= "(" . "'{$msg['type']}'" . "," . $msg['orig_position'] . "," . $msg['orig_login'] . "," . $value['dest_login'] . "," . $value['dest_position'] . "," . $value['percentage'] . "," . $value['profit'] . "," . $value['error_code'] . "," . $time .","."'".$value['dest_orders']."'".","."'".$value['dest_deals']."'". ")" . ",";
  315. if($value['error_code'] == 0){
  316. $login .= $value['dest_login'].",";
  317. }
  318. }
  319. }
  320. }
  321. $filed = "type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime,dest_orders,dest_deals";
  322. $arr = substr($arr, 0, -1);
  323. $sql = sprintf("INSERT INTO %s(%s) VALUES %s ", "order_progress", $filed, $arr);
  324. //$sql2 = sprintf("INSERT INTO %s(%s) VALUES %s", "order_save", $filed, $arr);
  325. if($message == 1){
  326. if ($delete_sql != 'delete from order_progress where ') {
  327. $delete_sql = trim($delete_sql, 'or ');
  328. $db->query($delete_sql);
  329. $db->query($sql);
  330. //$db->query($sql2);
  331. }
  332. }else{
  333. $data = $db->query($sql);
  334. // $db->query($sql2);
  335. }
  336. if($login){
  337. $logins = substr($login,0,-1);
  338. balancefix($logins);
  339. }
  340. if($message == 1){
  341. $msg['type'] = "ErrorInsert";
  342. }else{
  343. $msg['type'] = "insert";
  344. }
  345. sendToWebServer($msg); //发送给前端
  346. }
  347. if ($msg['type'] == "rollback") { //Rollback请求 (有数据库操作,插入并看看是否需要直接返回给前端)
  348. echo '接口返回数据'.curenttime();
  349. var_dump($msg);
  350. $recv_buffer = "";
  351. /* if (!isset($msg['seqnum'])) {
  352. return false;
  353. }
  354. if ($seqnum != $msg['seqnum']) {
  355. return false;
  356. }*/
  357. $rollback_index++; //将数据加一
  358. $total = count($rollback_data); //总数据的长度
  359. $rollbackdata = $msg['desc'];
  360. /* ack($msg["seqnum"]);
  361. $seqnum++;*/
  362. if ($rollback_index < $total) {
  363. send_rollback($rollback_data, $rollback_index);
  364. }else{
  365. $rollback_index = 0;
  366. }
  367. sendToWebServer($msg); //发送给前端
  368. $delete_sql = 'delete from order_progress where ';
  369. /* $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_orders,dest_deals,percentage,error_code,addtime) values";*/
  370. $update = [];
  371. $login = '';
  372. /* foreach($rollbackdata as $key=>$value){
  373. $rollbackdata[$key]['dest_orders'] = json_encode($rollbackdata[$key]['dest_orders'],JSON_FORCE_OBJECT);
  374. $rollbackdata[$key]['dest_deals'] = json_encode($rollbackdata[$key]['dest_orders'],JSON_FORCE_OBJECT);
  375. }*/
  376. foreach ($rollbackdata as $key => $value) {
  377. if ($value['error_code'] == -2) {
  378. return false;
  379. } else {
  380. $time = time();
  381. if ($value['error_code'] == 0) {
  382. $delete_sql .= "(dest_login = $value[dest_login] and orig_order = $msg[orig_position]) or ";
  383. /*$insert_sql .= "('$msg[type]',$msg[orig_position],$msg[orig_login],'$value[dest_login]','$value[dest_orders]',$value[dest_deals],$value[percentage],$value[error_code],$time),";*/
  384. $login .= $value['dest_login'].",";
  385. } else {
  386. //不成功把参数组装数组以便批量修改
  387. $rollbackdata[$key]['type'] = $msg['type'];
  388. $rollbackdata[$key]['orig_order'] = $msg['orig_position'];
  389. $rollbackdata[$key]['orig_login'] = $msg['orig_login'];
  390. $rollbackdata[$key]['addtime'] = time();
  391. array_push($update, $value);
  392. }
  393. }
  394. }
  395. //执行批量修改数据
  396. if (!empty($update)) {
  397. $sql = batchUpdate($rollbackdata, 'dest_login', ['orig_order' => $msg['orig_position']],"order_progress");
  398. $db->query($sql);
  399. }
  400. //成功执行删除后增加数据
  401. if ($delete_sql != 'delete from order_progress where ') {
  402. $delete_sql = trim($delete_sql, 'or ');
  403. //$insert_sql = substr($insert_sql, 0, -1);
  404. $db->query($delete_sql);
  405. // $db->query($insert_sql);
  406. if($login){
  407. $logins = substr($login,0,-1);
  408. balancefix($logins);
  409. }
  410. }
  411. }
  412. if ($msg['type'] == "except_rollback") { //retry表中的Rollback请求 (有数据库操作,插入并看看是否需要直接返回给前端)
  413. $recv_buffer = "";
  414. /* if (!isset($msg['seqnum'])) {
  415. return false;
  416. }
  417. if ($seqnum != $msg['seqnum']) {
  418. return false;
  419. }*/
  420. /* ack($msg["seqnum"]);
  421. $seqnum++;*/
  422. $retry_rollback_index++; //将数据加一
  423. $total = count($retry_rollback_data); //总数据的长度
  424. if ($retry_rollback_index < $total) {
  425. send_retry_rollback($retry_rollback_data, $retry_rollback_index); //继续发送数据
  426. } else {
  427. $retry_rollback_index = 0; //对序号重置为0
  428. }
  429. $exceptrollbackdata = $msg['desc'];
  430. sendToWebServer($msg);
  431. $update = [];
  432. $delete_sql = 'delete from order_progress where ';
  433. $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
  434. foreach ($exceptrollbackdata as $key => $value) {
  435. if ($value['error_code'] == -2) {
  436. return false;
  437. } else {
  438. $time = time();
  439. if ($value['error_code'] == 0) {
  440. $delete_sql .= "(dest_login = $value[dest_login] and orig_order = $msg[orig_order]) or ";
  441. $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
  442. } else {
  443. $exceptrollbackdata[$key]['type'] = $msg['type'];
  444. $exceptrollbackdata[$key]['orig_order'] = $msg['orig_order'];
  445. $exceptrollbackdata[$key]['orig_login'] = $msg['orig_login'];
  446. $exceptrollbackdata[$key]['addtime'] = time();
  447. array_push($update, $value);
  448. }
  449. }
  450. }
  451. if (!empty($update)){
  452. $sql = batchUpdate($exceptrollbackdata, 'dest_login', ['orig_order' => $msg['orig_order']],"order_progress");
  453. $db->query($sql);
  454. }
  455. if ($delete_sql != 'delete from order_progress where ') {
  456. $delete_sql = trim($delete_sql, 'or ');
  457. $insert_sql = substr($insert_sql, 0, -1);
  458. $db->query($delete_sql);
  459. $db->query($insert_sql);
  460. }
  461. }
  462. if ($msg['type'] == "retry") { //
  463. /* if (!isset($msg['seqnum'])) {
  464. return false;
  465. }
  466. if ($seqnum != $msg['seqnum']) {
  467. return false;
  468. }*/
  469. /* ack($msg["seqnum"]);
  470. $seqnum++;*/
  471. $retry_index++; //将数据加一
  472. $total = count($retry_data); //总数据的长度
  473. if ($retry_index < $total) {
  474. send_retry($retry_data, $retry_index);
  475. } else {
  476. $retry_index = 0;
  477. }
  478. $retrydata = $msg['desc'];
  479. sendToWebServer($msg);
  480. $succeUpdate = [];
  481. $update = [];
  482. $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
  483. foreach ($retrydata as $key => $value) {
  484. if ($value['error_code'] == -2) {
  485. return false;
  486. } else {
  487. $time = time();
  488. if ($value['error_code'] == 0) {
  489. $retrydata[$key]['type'] = $msg['type'];
  490. $retrydata[$key]['orig_order'] = $msg['orig_order'];
  491. $retrydata[$key]['orig_login'] = $msg['orig_login'];
  492. $retrydata[$key]['addtime'] = time();
  493. array_push($succeUpdate, $value);
  494. $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
  495. } else {
  496. $retrydata[$key]['type'] = $msg['type'];
  497. $retrydata[$key]['orig_order'] = $msg['orig_order'];
  498. $retrydata[$key]['orig_login'] = $msg['orig_login'];
  499. $retrydata[$key]['addtime'] = time();
  500. array_push($update, $value);
  501. }
  502. }
  503. }
  504. if (!empty($update)) {
  505. $sql = batchUpdate($retrydata, 'dest_login', ['orig_order' => $msg['orig_order']], "order_progress");
  506. $db->query($sql);
  507. }
  508. if (!empty($succeUpdate)) {
  509. $sql = batchUpdate($retrydata, 'dest_login', ['orig_order' => $msg['orig_order']], "order_progress");
  510. $db->query($sql);
  511. $insert_sql = substr($insert_sql, 0, -1);
  512. $db->query($insert_sql);
  513. }
  514. }
  515. }
  516. }
  517. //发送ack给远程服务器
  518. function ack($seqnum){
  519. global $connection_to_tcp;
  520. $data = ["type"=>"ack","seqnum"=>$seqnum];
  521. $connection_to_tcp->send(json_encode($data) . "\n");
  522. }
  523. // 发送信息到webserver服务器
  524. function sendToWebServer($data)
  525. {
  526. global $con;
  527. log_file($data);
  528. $con->send(json_encode($data) . "\n");
  529. }
  530. // 发送信息到接口服务器
  531. function sendTo_tcp_Server($data){
  532. global $connection_to_tcp;
  533. global $seqnum;
  534. // $data['seqnum'] = $seqnum;
  535. log_file($data);
  536. echo "发送给接口的信息".curenttime();
  537. var_dump($data);
  538. $connection_to_tcp->send(json_encode($data) . "\n");
  539. }
  540. // insert 数据
  541. function send_insert($data,$index=0,$order){
  542. $send['type'] = "insert_some";
  543. $send['orig_position'] = (int)$order; //订单号码
  544. $send['dest'] =[];
  545. $send_data = $data[$index]; //将分割的数据交给全局变量
  546. foreach ($send_data as $key => $value) {
  547. $send_child['dest_login'] = (int)$value['LOGIN']; //登录的账号
  548. $send_child['percentage'] = (int)$value['ladder']; //梯度
  549. array_push($send['dest'],$send_child);
  550. }
  551. sendTo_tcp_Server($send);
  552. }
  553. // rollback 数据
  554. function send_rollback($data,$index=0){
  555. global $db;
  556. $send['type'] = "rollback_some";
  557. $send['orig_position'] = (int)$data[$index][0]['orig_order']; //订单号码
  558. $send['orig_login'] = (int)$data[$index][0]['orig_login']; //登录者账号
  559. $send['desc'] =[];
  560. $send_child =[];
  561. $send_data = $data[$index]; //将分割的数据交给全局变量
  562. foreach ($send_data as $key => $value) {
  563. //$order = $db->query("SELECT `Order`,`Deal` FROM mt5_deals WHERE PositionID = {$value['dest_order']}");
  564. $send_child['dest_login'] = (int)$value['dest_login']; //登录的账号
  565. $send_child['dest_orders'] = json_decode($value['dest_orders'],true); //订单号
  566. $send_child['dest_deals'] = json_decode($value['dest_deals'],true);
  567. $send_child['percentage'] = (int)$value['percentage'];
  568. array_push($send['desc'],$send_child);
  569. }
  570. sendTo_tcp_Server($send);
  571. }
  572. // retry表中的rollback操作
  573. function send_retry_rollback($data,$index=0){
  574. $send['type'] = "except_rollback_some";
  575. $send['orig_order'] = (int)$data[$index][0]['orig_order']; //订单号码
  576. $send['orig_login'] = (int)$data[$index][0]['orig_login']; //登录者账号
  577. $send['desc'] =[];
  578. $send_child =[];
  579. $send_data = $data[$index]; //将分割的数据交给全局变量
  580. foreach ($send_data as $key => $value) {
  581. $send_child['dest_login'] = (int)$value['dest_login']; //登录的账号
  582. $send_child['dest_order'] = (int)$value['dest_order']; //订单号
  583. $send_child['percentage'] = (int)$value['percentage']; //手数
  584. $send_child['profit'] = (float)$value['profit']; //利润点
  585. $send_child['error_code'] = (int)$value['error_code']; //错误的号码
  586. array_push($send['desc'],$send_child);
  587. }
  588. echo "发送的数据";
  589. var_dump($send);
  590. sendTo_tcp_Server($send);
  591. }
  592. // retry表中的retry操作
  593. function send_retry($data,$index=0){
  594. $send['type'] = "retry_some";
  595. $send['orig_order'] = (int)$data[$index][0]['orig_order']; //订单号码
  596. $send['orig_login'] = (int)$data[$index][0]['orig_login']; //登录者账号
  597. $send['desc'] =[];
  598. $send_child =[];
  599. $send_data = $data[$index]; //将分割的数据交给全局变量
  600. foreach ($send_data as $key => $value) {
  601. $send_child['dest_login'] = (int)$value['dest_login']; //登录的账号
  602. $send_child['dest_order'] = (int)$value['dest_order']; //订单号
  603. $send_child['percentage'] = (int)$value['percentage']; //手数
  604. $send_child['profit'] = (float)$value['profit']; //利润点
  605. $send_child['error_code'] = (int)$value['error_code']; //错误的号码
  606. array_push($send['desc'],$send_child);
  607. }
  608. echo "发送的数据";
  609. var_dump($send);
  610. sendTo_tcp_Server($send);
  611. }
  612. //获取批量修改sql
  613. function batchUpdate($data,$field,$params=[],$tableName)
  614. {
  615. if (!is_array($data) || !$field || !is_array($params)) {
  616. return false;
  617. }
  618. $updates = parseUpdate($data, $field);
  619. $where = parseParams($params);
  620. $fields = array_column($data, $field);
  621. $fields = implode(',', array_map(function($value) {
  622. return "'".$value."'";
  623. }, $fields));
  624. $sql = sprintf("UPDATE `%s` SET %s WHERE `%s` IN (%s) %s", "$tableName", $updates, $field, $fields, $where);
  625. return $sql;
  626. }
  627. function parseUpdate($data,$field)
  628. {
  629. $sql = "";
  630. $keys = array_keys(current($data));
  631. foreach ($keys as $column){
  632. $sql .= sprintf(" `%s` = CASE `%s` \n",$column,$field);
  633. foreach ($data as $line){
  634. $sql .= sprintf("WHEN '%s' THEN '%s' \n",$line[$field],$line[$column]);
  635. }
  636. $sql .= "END,";
  637. }
  638. return rtrim($sql,",");
  639. }
  640. function parseParams($params)
  641. {
  642. $where = [];
  643. foreach ($params as $key=>$value){
  644. $where[] = sprintf(" `%s` = '%s' ",$key,$value);
  645. }
  646. return $where ? " AND " .implode('AND',$where) : '';
  647. }
  648. function getToken()
  649. {
  650. global $token;
  651. $requestApi = "/api/request_token";
  652. $params = ['login'=>LOGIN,'password'=>PASSWORD];
  653. $data = http(BALANCE_URL.$requestApi,$params,"POST");
  654. $data = json_decode($data,true);
  655. var_dump($requestApi,$data);
  656. $token = $data['token'];
  657. }
  658. function refreshToken()
  659. {
  660. global $token;
  661. $sendToken = $token;
  662. $params = ['request'=>'token_renewal','token'=>$sendToken];
  663. $data = http(BALANCE_URL,$params);
  664. $data = json_decode($data,true);
  665. $token = $data['token'];
  666. }
  667. function balancefix($logins)
  668. {
  669. global $token;
  670. $requestApi = "/api/balance_fix";
  671. $params = ['logins'=>$logins,'token'=>$token];
  672. $data = http(BALANCE_URL.$requestApi,$params,"POST");
  673. $data = json_decode($data,true);
  674. echo 'balancefix';
  675. curenttime();
  676. var_dump($data);
  677. if(!$data){
  678. log_file("send balancefix Invalid\n");
  679. getToken();
  680. balancefix($logins);
  681. }
  682. }
  683. //发送curl
  684. function http($url, $params, $method = 'GET', $header = array(), $timeout = 5)
  685. {
  686. // POST 提交方式的传入 $set_params 必须是字符串形式
  687. $opts = array(
  688. CURLOPT_TIMEOUT => $timeout,
  689. CURLOPT_RETURNTRANSFER => 1,
  690. CURLOPT_SSL_VERIFYPEER => false,
  691. CURLOPT_SSL_VERIFYHOST => false,
  692. CURLOPT_HTTPHEADER => $header
  693. );
  694. /* 根据请求类型设置特定参数 */
  695. switch (strtoupper($method)) {
  696. case 'GET':
  697. $opts[CURLOPT_URL] = $url . '?' . http_build_query($params);
  698. break;
  699. case 'POST':
  700. $params = http_build_query($params);
  701. $opts[CURLOPT_URL] = $url;
  702. $opts[CURLOPT_POST] = 1;
  703. $opts[CURLOPT_POSTFIELDS] = $params;
  704. break;
  705. case 'DELETE':
  706. $opts[CURLOPT_URL] = $url;
  707. $opts[CURLOPT_HTTPHEADER] = array("X-HTTP-Method-Override: DELETE");
  708. $opts[CURLOPT_CUSTOMREQUEST] = 'DELETE';
  709. $opts[CURLOPT_POSTFIELDS] = $params;
  710. break;
  711. case 'PUT':
  712. $opts[CURLOPT_URL] = $url;
  713. $opts[CURLOPT_POST] = 0;
  714. $opts[CURLOPT_CUSTOMREQUEST] = 'PUT';
  715. $opts[CURLOPT_POSTFIELDS] = $params;
  716. break;
  717. default:
  718. throw new Exception('不支持的请求方式!');
  719. }
  720. /* 初始化并执行curl请求 */
  721. $ch = curl_init();
  722. curl_setopt_array($ch, $opts);
  723. $data = curl_exec($ch);
  724. $error = curl_error($ch);
  725. return $data;
  726. }
  727. //程序执行时间秒速
  728. function curenttime()
  729. {
  730. $t = microtime(true);
  731. $micro = sprintf("%06d",($t - floor($t)) * 1000000);
  732. $d = new DateTime( date('Y-m-d H:i:s.'.$micro, $t) );
  733. print $d->format("Y-m-d H:i:s.u"); // note at point on "u"
  734. }
  735. //自定义日志类
  736. function log_file($data){
  737. $data = print_r($data,true);
  738. error_log(date("Y-m-d H:i:s", time()).' ===== info: '.$data."\n",3,__DIR__ . '/log/' . date("Ymd", time()) . '.log');
  739. }
  740. function createdir($path) {
  741. // 判断传过来的$path是否已是目录,若是,则直接返回true
  742. if(is_dir($path)) {
  743. return true;
  744. }
  745. // 走到这步,说明传过来的$path不是目录
  746. // 判断其上级是否为目录,是,则直接创建$path目录
  747. if(is_dir(dirname($path))) {
  748. return mkdir($path);
  749. }
  750. // 走到这说明其上级目录也不是目录,则继续判断其上上...级目录
  751. createdir(dirname($path));
  752. // 走到这步,说明上级目录已创建成功,则直接接着创建当前目录,并把创建的结果返回
  753. return mkdir($path);
  754. }
  755. Worker::runAll(); // 执行函数