tcpServer.php 45 KB


  1. <?php
  2. require dirname(dirname(__FILE__)).'/vendor/autoload.php';
  3. require_once dirname(dirname(__FILE__)) . '/vendor/workerman/Autoloader.php';
  4. require_once dirname(dirname(__FILE__)) . '/vendor/mysql/src/Connection.php';
  5. require_once __DIR__ . '/config.php';
  6. use Workerman\Worker; //worker容器类
  7. use Workerman\Connection\AsyncTcpConnection; //连接异步客户端
  8. use Workerman\MySQL\Connection; //连接数据库的类
  9. use Workerman\Lib\Timer; //定时器函数
  10. use Monolog\Logger;
  11. use Monolog\Handler\StreamHandler;
  12. use Monolog\Handler\ErrorLogHandler;
  13. date_default_timezone_set("PRC");
  14. //初始化 (创建了一个tcp服务)
  15. $workerTcp = new Worker("tcp://127.0.0.1:12345");
  16. $workerTcp->name = "TCP";
  17. $workerTcp::$logFile = __DIR__.'/workerman.log';
  18. $num = 1;
  19. $workerTcp->onWorkerStart = function ($workerTcp) {
  20. global $db; //数据库
  21. global $con; //访问本地创建的webserver服务器
  22. global $connection_to_tcp; //访问接口服务器(远程的)
  23. global $connection_count;
  24. global $send_data; //存储insert数据
  25. global $index; //insert管理数据序号
  26. global $send_order; //insert全局的订单号
  27. global $seqnum;
  28. global $rollback_data; //rollback数据
  29. global $rollback_index; //rollback 管理数据序号
  30. global $retry_data; //retry数据
  31. global $retry_index; //retry 管理数据的序号
  32. global $retry_rollback_data; //retry表中rollback数据
  33. global $retry_rollback_index; //retry表rollback管理数据的序号
  34. $seqnum = 1;
  35. $db = new Connection(DB_HOST, DB_PORT, DB_USERNAME, DB_PASSWORD, DB_NAME); //本地数据库的配置
  36. //访问本地的websocket服务器=============================================
  37. $con = new AsyncTcpConnection("ws://127.0.0.1:12380");
  38. $con->onConnect = function ($on) {
  39. global $index;
  40. global $rollback_index;
  41. global $retry_index;
  42. global $retry_rollback_index;
  43. $index = 0;
  44. $rollback_index = 0;
  45. $retry_index = 0;
  46. $retry_rollback_index = 0;
  47. sendToWebServer([
  48. 'type' => 'login',
  49. 'uid' => 'tcp'
  50. ]);
  51. };
  52. // websocke服务器发送信息过来触发的函数
  53. $con->onMessage = function ($con, $data) {
  54. global $send_data;
  55. global $connection_count;
  56. $msg = json_decode($data, true);
  57. if ($msg['type'] == "heartbeat") {
  58. $connection_count = $msg["connection_count"];
  59. }
  60. if ($msg['type'] == "get_order_record") { //查询订单状态
  61. sendTo_tcp_Server($msg);
  62. }
  63. if ($msg['type'] == "set_current_order") { //设置当前跟单订单
  64. sendTo_tcp_Server($msg);
  65. }
  66. if ($msg['type'] == "insert") { //Insert请求
  67. global $index; //传递的index
  68. global $send_data; //全局的数据
  69. global $send_order; //全局的订单号码
  70. $insert_data = $msg['data'];
  71. $insert_data = array_chunk($insert_data, 12); //每十个是一个数组
  72. $send_data = $insert_data; //传递的数据
  73. $send_order = (int)$msg['orderid'];
  74. send_insert($send_data, $index, $send_order);
  75. }
  76. if ($msg['type'] == "rollback") { //Rollback请求
  77. global $rollback_index; //传递index
  78. global $rollback_data; //全局的数据
  79. $rollback_data = array_chunk($msg['data'], 5); //每几个是一个数组
  80. send_rollback($rollback_data, $rollback_index);
  81. }
  82. if ($msg['type'] == "except_rollback") { //retry表中的 Rollback请求
  83. global $retry_rollback_data; //retry表中rollback数据
  84. global $retry_rollback_index; //retry表rollback管理数据的序号
  85. $retry_rollback_data = array_chunk($msg['data'], 5);
  86. send_retry_rollback($retry_rollback_data, $retry_rollback_index);
  87. }
  88. if ($msg['type'] == "retry") { //Retry请求
  89. global $retry_data; //retry数据
  90. global $retry_index; //retry 管理数据的序号
  91. $retry_data = array_chunk($msg['data'], 5);
  92. send_retry($retry_data, $retry_index);
  93. }
  94. };
  95. $con->onClose = function ($con) {
  96. echo "connection closed\n";
  97. $con->reConnect(1);
  98. };
  99. $con->onError = function ($con, $code, $msg) {
  100. echo "Error code:$code msg:$msg\n";
  101. };
  102. $con->connect();
  103. // 访问本地的websocket服务器=======================================================
  104. // 向远处的服务器发送信息===========================================================
  105. //$connection_to_tcp = new AsyncTcpConnection('tcp://47.254.202.24:10009');
  106. //$connection_to_tcp = new AsyncTcpConnection('tcp://127.0.0.1:1235');
  107. $connection_to_tcp = new AsyncTcpConnection('tcp://192.168.5.111:10009');
  108. $connection_to_tcp->onConnect = function ($connection_to_tcp) {
  109. // 发送心跳信息
  110. $connection_to_tcp->send('{"type":"ping"}' . "\n");
  111. };
  112. // 接口服务器向我发送的信息
  113. $connection_to_tcp->onMessage = function ($connection_to_tcp, $data) {
  114. global $num;
  115. //这里显示的是从接口服务器返回的数据(需要根据数据的类似判断)
  116. global $db; //操作数据的全局变量
  117. global $recv_buffer; //存储不完整的数据
  118. global $index; //全局的变量
  119. global $send_data; //全局的数据
  120. global $send_order; //全局的订单号码
  121. global $rollback_data; //rollback数据
  122. global $rollback_index; //rollback 管理数据序号
  123. global $retry_data; //retry数据
  124. global $retry_index; //retry 管理数据的序号
  125. global $retry_rollback_data; //retry表中rollback数据
  126. global $retry_rollback_index; //retry表rollback管理数据的序号
  127. global $seqnum;
  128. $bufExplode = explode("\n", $data); //通过指定的分隔符,把字符串打散为数组
  129. foreach ($bufExplode as $key => $value) {
  130. if ($value) {
  131. $msg = json_decode($value, true); //对信息进行处理
  132. if (!$msg) {
  133. $recv_buffer .= $value; //把数据组装起来
  134. $msg1 = json_decode($recv_buffer, true);
  135. if ($msg1) {
  136. $msg = $msg1;
  137. } else {
  138. continue; //解析的不对就跳出循环
  139. }
  140. }
  141. } else {
  142. continue; //信息不存在就跳出循环
  143. }
  144. // 接口服务器返回的信息
  145. echo "接口返回数据";
  146. var_dump($msg);
  147. if ($msg['type'] == "get_order_record") { //查询订单状态
  148. $recv_buffer = "";
  149. if (!isset($msg['seqnum'])) {
  150. return false;
  151. }
  152. ack($msg['seqnum']);
  153. $seqnum++;
  154. }
  155. if ($msg['type'] == "set_current_order") { //设置当前跟单订单
  156. $recv_buffer = "";
  157. if (!isset($msg['seqnum'])) {
  158. return false;
  159. }
  160. sendToWebServer($msg);
  161. ack($msg['seqnum']);
  162. $seqnum++;
  163. }
  164. if ($msg['type'] == "insert") { //Insert请求 (有数据库操作,插入)
  165. $recv_buffer = "";
  166. echo "接收到tcp" . curenttime();
  167. if (!isset($msg['seqnum'])) {
  168. return false;
  169. }
  170. if ($seqnum != $msg["seqnum"]) {
  171. return false;
  172. }
  173. $index++; //将数据加一
  174. $total = count($send_data); //总数据的长度
  175. $insert_data = $msg['dest'];
  176. ack($msg['seqnum']);
  177. $seqnum++;
  178. if ($index < $total) {
  179. send_insert($send_data, $index, $send_order);
  180. } else {
  181. $index = 0;
  182. }
  183. sendToWebServer($msg); //发送给前端
  184. $arr = "";
  185. foreach ($insert_data as $key => $value) {
  186. if ($value['error_code'] == -2) {
  187. return false;
  188. } else {
  189. /* $msg_child['type'] = $msg['type'];
  190. $msg_child['orig_order'] = $msg['orig_order'];
  191. $msg_child['orig_login'] = $msg['orig_login'];
  192. $msg_child['dest_login'] = $value['dest_login'];
  193. $msg_child['dest_order'] = $value['dest_order'];
  194. $msg_child['percentage'] = $value['percentage'];
  195. $msg_child['profit'] = $value['profit'];
  196. $msg_child['error_code'] = $value['error_code'];
  197. $msg_child['addtime']=time(); //添加时间
  198. $result = $db->insert('order_progress')->cols($msg_child)->query(); //向order_progress插入数据
  199. $result1 = $db->insert('order_save')->cols($msg_child)->query(); //向order_save插入数据*/
  200. $time = time();
  201. $arr .= "(" . "'{$msg['type']}'" . "," . $msg['orig_order'] . "," . $msg['orig_login'] . "," . $value['dest_login'] . "," . $value['dest_order'] . "," . $value['percentage'] . "," . $value['profit'] . "," . $value['error_code'] . "," . $time . ")" . ",";
  202. }
  203. }
  204. $filed = "type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime";
  205. $arr = substr($arr, 0, -1);
  206. $sql = sprintf("INSERT INTO %s(%s) VALUES %s", "order_progress", $filed, $arr);
  207. $sql2 = sprintf("INSERT INTO %s(%s) VALUES %s", "order_save", $filed, $arr);
  208. $db->query($sql);
  209. $db->query($sql2);
  210. }
  211. if ($msg['type'] == "rollback") { //Rollback请求 (有数据库操作,插入并看看是否需要直接返回给前端)
  212. $recv_buffer = "";
  213. if (!isset($msg['seqnum'])) {
  214. return false;
  215. }
  216. if ($seqnum != $msg['seqnum']) {
  217. return false;
  218. }
  219. $rollback_index++; //将数据加一
  220. $total = count($rollback_data); //总数据的长度
  221. $rollbackdata = $msg['desc'];
  222. ack($msg["seqnum"]);
  223. $seqnum++;
  224. if ($rollback_index < $total) {
  225. send_rollback($rollback_data, $rollback_index);
  226. }else{
  227. $rollback_index = 0;
  228. }
  229. sendToWebServer($msg); //发送给前端
  230. $delete_sql = 'delete from order_progress where ';
  231. $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
  232. $update = [];
  233. foreach ($rollbackdata as $key => $value) {
  234. if ($value['error_code'] == -2) {
  235. return false;
  236. } else {
  237. $time = time();
  238. if ($value['error_code'] == 0) {
  239. $delete_sql .= "(dest_login = $value[dest_login] and orig_order = $msg[orig_order]) or ";
  240. $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
  241. } else {
  242. //不成功把参数组装数组以便批量修改
  243. $rollbackdata[$key]['type'] = $msg['type'];
  244. $rollbackdata[$key]['orig_order'] = $msg['orig_order'];
  245. $rollbackdata[$key]['orig_login'] = $msg['orig_login'];
  246. $rollbackdata[$key]['addtime'] = time();
  247. array_push($update, $value);
  248. }
  249. }
  250. }
  251. //执行批量修改数据
  252. if (!empty($update)) {
  253. $sql = batchUpdate($rollbackdata, 'dest_login', ['orig_order' => $msg['orig_order']],"order_progress");
  254. $db->query($sql);
  255. }
  256. //成功执行删除后增加数据
  257. if ($delete_sql != 'delete from order_progress where ') {
  258. $delete_sql = trim($delete_sql, 'or ');
  259. $insert_sql = substr($insert_sql, 0, -1);
  260. $db->query($delete_sql);
  261. $db->query($insert_sql);
  262. }
  263. }
  264. if ($msg['type'] == "except_rollback") { //retry表中的Rollback请求 (有数据库操作,插入并看看是否需要直接返回给前端)
  265. $recv_buffer = "";
  266. if (!isset($msg['seqnum'])) {
  267. return false;
  268. }
  269. if ($seqnum != $msg['seqnum']) {
  270. return false;
  271. }
  272. ack($msg["seqnum"]);
  273. $seqnum++;
  274. $retry_rollback_index++; //将数据加一
  275. $total = count($retry_rollback_data); //总数据的长度
  276. if ($retry_rollback_index < $total) {
  277. send_retry_rollback($retry_rollback_data, $retry_rollback_index); //继续发送数据
  278. } else {
  279. $retry_rollback_index = 0; //对序号重置为0
  280. }
  281. $exceptrollbackdata = $msg['desc'];
  282. sendToWebServer($msg);
  283. $update = [];
  284. $delete_sql = 'delete from order_progress where ';
  285. $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
  286. foreach ($exceptrollbackdata as $key => $value) {
  287. if ($value['error_code'] == -2) {
  288. return false;
  289. } else {
  290. $time = time();
  291. if ($value['error_code'] == 0) {
  292. $delete_sql .= "(dest_login = $value[dest_login] and orig_order = $msg[orig_order]) or ";
  293. $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
  294. } else {
  295. $exceptrollbackdata[$key]['type'] = $msg['type'];
  296. $exceptrollbackdata[$key]['orig_order'] = $msg['orig_order'];
  297. $exceptrollbackdata[$key]['orig_login'] = $msg['orig_login'];
  298. $exceptrollbackdata[$key]['addtime'] = time();
  299. array_push($update, $value);
  300. }
  301. }
  302. }
  303. if (!empty($update)){
  304. $sql = batchUpdate($exceptrollbackdata, 'dest_login', ['orig_order' => $msg['orig_order']],"order_progress");
  305. $db->query($sql);
  306. }
  307. if ($delete_sql != 'delete from order_progress where ') {
  308. $delete_sql = trim($delete_sql, 'or ');
  309. $insert_sql = substr($insert_sql, 0, -1);
  310. $db->query($delete_sql);
  311. $db->query($insert_sql);
  312. }
  313. }
  314. if ($msg['type'] == "retry") { //
  315. if (!isset($msg['seqnum'])) {
  316. return false;
  317. }
  318. if ($seqnum != $msg['seqnum']) {
  319. return false;
  320. }
  321. ack($msg["seqnum"]);
  322. $seqnum++;
  323. $retry_index++; //将数据加一
  324. $total = count($retry_data); //总数据的长度
  325. if ($retry_index < $total) {
  326. send_retry($retry_data, $retry_index);
  327. } else {
  328. $retry_index = 0;
  329. }
  330. $retrydata = $msg['desc'];
  331. sendToWebServer($msg);
  332. $succeUpdate = [];
  333. $update = [];
  334. $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
  335. foreach ($retrydata as $key => $value) {
  336. if ($value['error_code'] == -2) {
  337. return false;
  338. } else {
  339. $time = time();
  340. if ($value['error_code'] == 0) {
  341. $retrydata[$key]['type'] = $msg['type'];
  342. $retrydata[$key]['orig_order'] = $msg['orig_order'];
  343. $retrydata[$key]['orig_login'] = $msg['orig_login'];
  344. $retrydata[$key]['addtime'] = time();
  345. array_push($succeUpdate, $value);
  346. $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
  347. } else {
  348. $retrydata[$key]['type'] = $msg['type'];
  349. $retrydata[$key]['orig_order'] = $msg['orig_order'];
  350. $retrydata[$key]['orig_login'] = $msg['orig_login'];
  351. $retrydata[$key]['addtime'] = time();
  352. array_push($update, $value);
  353. }
  354. }
  355. }
  356. if (!empty($update)) {
  357. $sql = batchUpdate($retrydata, 'dest_login', ['orig_order' => $msg['orig_order']], "order_progress");
  358. $db->query($sql);
  359. }
  360. if (!empty($succeUpdate)) {
  361. $sql = batchUpdate($retrydata, 'dest_login', ['orig_order' => $msg['orig_order']], "order_progress");
  362. $db->query($sql);
  363. $insert_sql = substr($insert_sql, 0, -1);
  364. $db->query($insert_sql);
  365. }
  366. }
  367. }
  368. };
  369. $connection_to_tcp->onClose = function($connection_to_tcp)
  370. {
  371. echo "connection closed\n";
  372. log_file("to_tcp connection closed\n");
  373. // $connection_to_tcp->reConnect(); //执行连接
  374. };
  375. $connection_to_tcp->onError = function($connection_to_tcp, $code, $msg)
  376. {
  377. echo "Error code:$code msg:$msg\n";
  378. };
  379. // 向远处的服务器发送信息============================================================
  380. $connection_to_tcp->connect(); //执行连接
  381. //发送给 接口tcp服务器 心跳========================================================
  382. Timer::add(30, function () use ($connection_to_tcp) {
  383. //定时发送心跳信息
  384. $connection_to_tcp->send('{"type":"ping"}' . "\n");
  385. });
  386. // 发送给 webserver 服务器 心跳=====================================================
  387. Timer::add(30, function () use ($con,$connection_to_tcp) {
  388. // 定时发送心跳信息
  389. global $connection_count;
  390. var_dump($connection_count);
  391. if($connection_count){
  392. if($connection_count>1){
  393. $connection_to_tcp->connect(); //执行连接
  394. $connection_to_tcp->onConnect = function ($connection_to_tcp) {
  395. // 发送心跳信息
  396. $connection_to_tcp->send('{"type":"ping"}' . "\n");
  397. };
  398. // 接口服务器向我发送的信息
  399. $connection_to_tcp->onMessage = function ($connection_to_tcp, $data) {
  400. global $num;
  401. //这里显示的是从接口服务器返回的数据(需要根据数据的类似判断)
  402. global $db; //操作数据的全局变量
  403. global $recv_buffer; //存储不完整的数据
  404. global $index; //全局的变量
  405. global $send_data; //全局的数据
  406. global $send_order; //全局的订单号码
  407. global $rollback_data; //rollback数据
  408. global $rollback_index; //rollback 管理数据序号
  409. global $retry_data; //retry数据
  410. global $retry_index; //retry 管理数据的序号
  411. global $retry_rollback_data; //retry表中rollback数据
  412. global $retry_rollback_index; //retry表rollback管理数据的序号
  413. global $seqnum;
  414. $bufExplode = explode("\n", $data); //通过指定的分隔符,把字符串打散为数组
  415. foreach ($bufExplode as $key => $value) {
  416. if ($value) {
  417. $msg = json_decode($value, true); //对信息进行处理
  418. if (!$msg) {
  419. $recv_buffer .= $value; //把数据组装起来
  420. $msg1 = json_decode($recv_buffer, true);
  421. if ($msg1) {
  422. $msg = $msg1;
  423. } else {
  424. continue; //解析的不对就跳出循环
  425. }
  426. }
  427. } else {
  428. continue; //信息不存在就跳出循环
  429. }
  430. // 接口服务器返回的信息
  431. echo "接口返回数据";
  432. var_dump($msg);
  433. if ($msg['type'] == "get_order_record") { //查询订单状态
  434. $recv_buffer = "";
  435. if (!isset($msg['seqnum'])) {
  436. return false;
  437. }
  438. ack($msg['seqnum']);
  439. $seqnum++;
  440. }
  441. if ($msg['type'] == "set_current_order") { //设置当前跟单订单
  442. $recv_buffer = "";
  443. if (!isset($msg['seqnum'])) {
  444. return false;
  445. }
  446. sendToWebServer($msg);
  447. ack($msg['seqnum']);
  448. $seqnum++;
  449. }
  450. if ($msg['type'] == "insert") { //Insert请求 (有数据库操作,插入)
  451. $recv_buffer = "";
  452. echo "接收到tcp" . curenttime();
  453. if (!isset($msg['seqnum'])) {
  454. return false;
  455. }
  456. if ($seqnum != $msg["seqnum"]) {
  457. return false;
  458. }
  459. $index++; //将数据加一
  460. $total = count($send_data); //总数据的长度
  461. $insert_data = $msg['dest'];
  462. ack($msg['seqnum']);
  463. $seqnum++;
  464. if ($index < $total) {
  465. send_insert($send_data, $index, $send_order);
  466. } else {
  467. $index = 0;
  468. }
  469. sendToWebServer($msg); //发送给前端
  470. $arr = "";
  471. foreach ($insert_data as $key => $value) {
  472. if ($value['error_code'] == -2) {
  473. return false;
  474. } else {
  475. /* $msg_child['type'] = $msg['type'];
  476. $msg_child['orig_order'] = $msg['orig_order'];
  477. $msg_child['orig_login'] = $msg['orig_login'];
  478. $msg_child['dest_login'] = $value['dest_login'];
  479. $msg_child['dest_order'] = $value['dest_order'];
  480. $msg_child['percentage'] = $value['percentage'];
  481. $msg_child['profit'] = $value['profit'];
  482. $msg_child['error_code'] = $value['error_code'];
  483. $msg_child['addtime']=time(); //添加时间
  484. $result = $db->insert('order_progress')->cols($msg_child)->query(); //向order_progress插入数据
  485. $result1 = $db->insert('order_save')->cols($msg_child)->query(); //向order_save插入数据*/
  486. $time = time();
  487. $arr .= "(" . "'{$msg['type']}'" . "," . $msg['orig_order'] . "," . $msg['orig_login'] . "," . $value['dest_login'] . "," . $value['dest_order'] . "," . $value['percentage'] . "," . $value['profit'] . "," . $value['error_code'] . "," . $time . ")" . ",";
  488. }
  489. }
  490. $filed = "type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime";
  491. $arr = substr($arr, 0, -1);
  492. $sql = sprintf("INSERT INTO %s(%s) VALUES %s", "order_progress", $filed, $arr);
  493. $sql2 = sprintf("INSERT INTO %s(%s) VALUES %s", "order_save", $filed, $arr);
  494. $db->query($sql);
  495. $db->query($sql2);
  496. }
  497. if ($msg['type'] == "rollback") { //Rollback请求 (有数据库操作,插入并看看是否需要直接返回给前端)
  498. $recv_buffer = "";
  499. if (!isset($msg['seqnum'])) {
  500. return false;
  501. }
  502. if ($seqnum != $msg['seqnum']) {
  503. return false;
  504. }
  505. $rollback_index++; //将数据加一
  506. $total = count($rollback_data); //总数据的长度
  507. $rollbackdata = $msg['desc'];
  508. ack($msg["seqnum"]);
  509. $seqnum++;
  510. if ($rollback_index < $total) {
  511. send_rollback($rollback_data, $rollback_index);
  512. }else{
  513. $rollback_index = 0;
  514. }
  515. sendToWebServer($msg); //发送给前端
  516. $delete_sql = 'delete from order_progress where ';
  517. $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
  518. $update = [];
  519. foreach ($rollbackdata as $key => $value) {
  520. if ($value['error_code'] == -2) {
  521. return false;
  522. } else {
  523. $time = time();
  524. if ($value['error_code'] == 0) {
  525. $delete_sql .= "(dest_login = $value[dest_login] and orig_order = $msg[orig_order]) or ";
  526. $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
  527. } else {
  528. //不成功把参数组装数组以便批量修改
  529. $rollbackdata[$key]['type'] = $msg['type'];
  530. $rollbackdata[$key]['orig_order'] = $msg['orig_order'];
  531. $rollbackdata[$key]['orig_login'] = $msg['orig_login'];
  532. $rollbackdata[$key]['addtime'] = time();
  533. array_push($update, $value);
  534. }
  535. }
  536. }
  537. //执行批量修改数据
  538. if (!empty($update)) {
  539. $sql = batchUpdate($rollbackdata, 'dest_login', ['orig_order' => $msg['orig_order']],"order_progress");
  540. $db->query($sql);
  541. }
  542. //成功执行删除后增加数据
  543. if ($delete_sql != 'delete from order_progress where ') {
  544. $delete_sql = trim($delete_sql, 'or ');
  545. $insert_sql = substr($insert_sql, 0, -1);
  546. $db->query($delete_sql);
  547. $db->query($insert_sql);
  548. }
  549. }
  550. if ($msg['type'] == "except_rollback") { //retry表中的Rollback请求 (有数据库操作,插入并看看是否需要直接返回给前端)
  551. $recv_buffer = "";
  552. if (!isset($msg['seqnum'])) {
  553. return false;
  554. }
  555. if ($seqnum != $msg['seqnum']) {
  556. return false;
  557. }
  558. ack($msg["seqnum"]);
  559. $seqnum++;
  560. $retry_rollback_index++; //将数据加一
  561. $total = count($retry_rollback_data); //总数据的长度
  562. if ($retry_rollback_index < $total) {
  563. send_retry_rollback($retry_rollback_data, $retry_rollback_index); //继续发送数据
  564. } else {
  565. $retry_rollback_index = 0; //对序号重置为0
  566. }
  567. $exceptrollbackdata = $msg['desc'];
  568. sendToWebServer($msg);
  569. $update = [];
  570. $delete_sql = 'delete from order_progress where ';
  571. $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
  572. foreach ($exceptrollbackdata as $key => $value) {
  573. if ($value['error_code'] == -2) {
  574. return false;
  575. } else {
  576. $time = time();
  577. if ($value['error_code'] == 0) {
  578. $delete_sql .= "(dest_login = $value[dest_login] and orig_order = $msg[orig_order]) or ";
  579. $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
  580. } else {
  581. $exceptrollbackdata[$key]['type'] = $msg['type'];
  582. $exceptrollbackdata[$key]['orig_order'] = $msg['orig_order'];
  583. $exceptrollbackdata[$key]['orig_login'] = $msg['orig_login'];
  584. $exceptrollbackdata[$key]['addtime'] = time();
  585. array_push($update, $value);
  586. }
  587. }
  588. }
  589. if (!empty($update)){
  590. $sql = batchUpdate($exceptrollbackdata, 'dest_login', ['orig_order' => $msg['orig_order']],"order_progress");
  591. $db->query($sql);
  592. }
  593. if ($delete_sql != 'delete from order_progress where ') {
  594. $delete_sql = trim($delete_sql, 'or ');
  595. $insert_sql = substr($insert_sql, 0, -1);
  596. $db->query($delete_sql);
  597. $db->query($insert_sql);
  598. }
  599. }
  600. if ($msg['type'] == "retry") { //
  601. if (!isset($msg['seqnum'])) {
  602. return false;
  603. }
  604. if ($seqnum != $msg['seqnum']) {
  605. return false;
  606. }
  607. ack($msg["seqnum"]);
  608. $seqnum++;
  609. $retry_index++; //将数据加一
  610. $total = count($retry_data); //总数据的长度
  611. if ($retry_index < $total) {
  612. send_retry($retry_data, $retry_index);
  613. } else {
  614. $retry_index = 0;
  615. }
  616. $retrydata = $msg['desc'];
  617. sendToWebServer($msg);
  618. $succeUpdate = [];
  619. $update = [];
  620. $insert_sql = "insert into order_save(type,orig_order,orig_login,dest_login,dest_order,percentage,profit,error_code,addtime) values";
  621. foreach ($retrydata as $key => $value) {
  622. if ($value['error_code'] == -2) {
  623. return false;
  624. } else {
  625. $time = time();
  626. if ($value['error_code'] == 0) {
  627. $retrydata[$key]['type'] = $msg['type'];
  628. $retrydata[$key]['orig_order'] = $msg['orig_order'];
  629. $retrydata[$key]['orig_login'] = $msg['orig_login'];
  630. $retrydata[$key]['addtime'] = time();
  631. array_push($succeUpdate, $value);
  632. $insert_sql .= "('$msg[type]',$msg[orig_order],$msg[orig_login],$value[dest_login],$value[dest_order],$value[percentage],$value[profit],$value[error_code],$time),";
  633. } else {
  634. $retrydata[$key]['type'] = $msg['type'];
  635. $retrydata[$key]['orig_order'] = $msg['orig_order'];
  636. $retrydata[$key]['orig_login'] = $msg['orig_login'];
  637. $retrydata[$key]['addtime'] = time();
  638. array_push($update, $value);
  639. }
  640. }
  641. }
  642. if (!empty($update)) {
  643. $sql = batchUpdate($retrydata, 'dest_login', ['orig_order' => $msg['orig_order']], "order_progress");
  644. $db->query($sql);
  645. }
  646. if (!empty($succeUpdate)) {
  647. $sql = batchUpdate($retrydata, 'dest_login', ['orig_order' => $msg['orig_order']], "order_progress");
  648. $db->query($sql);
  649. $insert_sql = substr($insert_sql, 0, -1);
  650. $db->query($insert_sql);
  651. }
  652. }
  653. }
  654. };
  655. $connection_to_tcp->onClose = function($connection_to_tcp)
  656. {
  657. echo "connection closed\n";
  658. log_file("to_tcp connection closed\n");
  659. // $connection_to_tcp->reConnect(); //执行连接
  660. };
  661. $connection_to_tcp->onError = function($connection_to_tcp, $code, $msg)
  662. {
  663. echo "Error code:$code msg:$msg\n";
  664. };
  665. }else{
  666. $connection_to_tcp->close();
  667. }
  668. }
  669. $con->send(json_encode(['type' => 'heartbeat']). "\n");
  670. });
  671. // 心跳检测(TCP 所有客户端) 心跳===================================================
  672. Timer::add(1, function () use ($workerTcp) {
  673. $time_now = time(); //当前时间
  674. foreach ($workerTcp->connections as $connection) {
  675. // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
  676. if (empty($connection->lastMessageTime)) {
  677. $connection->lastMessageTime = $time_now;
  678. continue;
  679. }
  680. // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
  681. if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
  682. $connection->close();
  683. }
  684. }
  685. });
  686. };
  687. $workerTcp->onConnect = function ($connection) {
  688. echo "connection onConnect\n";
  689. };
  690. $workerTcp->onError = function ($connection, $code, $msg) {
  691. echo "Error code:$code msg:$msg\n";
  692. };
  693. $workerTcp->onClose = function ($connection) {
  694. echo "connection closed\n";
  695. };
  696. $workerTcp->onMessage = function ($connection, $data) {
  697. echo "connection onMessage\n";
  698. };
  699. function ack($seqnum){
  700. global $connection_to_tcp;
  701. $data = ["type"=>"ack","seqnum"=>$seqnum];
  702. $connection_to_tcp->send(json_encode($data) . "\n");
  703. }
  704. // 发送信息到webserver服务器
  705. function sendToWebServer($data)
  706. {
  707. global $con;
  708. log_file($data);
  709. $con->send(json_encode($data) . "\n");
  710. }
  711. // 发送信息到接口服务器
  712. function sendTo_tcp_Server($data){
  713. global $connection_to_tcp;
  714. global $seqnum;
  715. $data['seqnum'] = $seqnum;
  716. log_file($data);
  717. $connection_to_tcp->send(json_encode($data) . "\n");
  718. }
  719. // insert 数据
  720. function send_insert($data,$index=0,$order){
  721. $send['type'] = "insert_some";
  722. $send['orig_order'] = (int)$order; //订单号码
  723. $send['dest'] =[];
  724. $send_data = $data[$index]; //将分割的数据交给全局变量
  725. foreach ($send_data as $key => $value) {
  726. $send_child['dest_login'] = $value['LOGIN']; //登录的账号
  727. $send_child['percentage'] = $value['ladder']; //梯度
  728. array_push($send['dest'],$send_child);
  729. }
  730. sendTo_tcp_Server($send);
  731. }
  732. // rollback 数据
  733. function send_rollback($data,$index=0){
  734. $send['type'] = "rollback_some";
  735. $send['orig_order'] = (int)$data[$index][0]['orig_order']; //订单号码
  736. $send['orig_login'] = (int)$data[$index][0]['orig_login']; //登录者账号
  737. $send['desc'] =[];
  738. $send_child =[];
  739. $send_data = $data[$index]; //将分割的数据交给全局变量
  740. foreach ($send_data as $key => $value) {
  741. $send_child['dest_login'] = (int)$value['dest_login']; //登录的账号
  742. $send_child['dest_order'] = (int)$value['dest_order']; //订单号
  743. $send_child['percentage'] = (int)$value['percentage']; //手数
  744. $send_child['profit'] = (float)$value['profit']; //利润点
  745. $send_child['error_code'] = (int)$value['error_code']; //错误的号码
  746. array_push($send['desc'],$send_child);
  747. }
  748. sendTo_tcp_Server($send);
  749. }
  750. // retry表中的rollback操作
  751. function send_retry_rollback($data,$index=0){
  752. $send['type'] = "except_rollback_some";
  753. $send['orig_order'] = (int)$data[$index][0]['orig_order']; //订单号码
  754. $send['orig_login'] = (int)$data[$index][0]['orig_login']; //登录者账号
  755. $send['desc'] =[];
  756. $send_child =[];
  757. $send_data = $data[$index]; //将分割的数据交给全局变量
  758. foreach ($send_data as $key => $value) {
  759. $send_child['dest_login'] = (int)$value['dest_login']; //登录的账号
  760. $send_child['dest_order'] = (int)$value['dest_order']; //订单号
  761. $send_child['percentage'] = (int)$value['percentage']; //手数
  762. $send_child['profit'] = (float)$value['profit']; //利润点
  763. $send_child['error_code'] = (int)$value['error_code']; //错误的号码
  764. array_push($send['desc'],$send_child);
  765. }
  766. echo "发送的数据";
  767. var_dump($send);
  768. sendTo_tcp_Server($send);
  769. }
  770. // retry表中的retry操作
  771. function send_retry($data,$index=0){
  772. $send['type'] = "retry_some";
  773. $send['orig_order'] = (int)$data[$index][0]['orig_order']; //订单号码
  774. $send['orig_login'] = (int)$data[$index][0]['orig_login']; //登录者账号
  775. $send['desc'] =[];
  776. $send_child =[];
  777. $send_data = $data[$index]; //将分割的数据交给全局变量
  778. foreach ($send_data as $key => $value) {
  779. $send_child['dest_login'] = (int)$value['dest_login']; //登录的账号
  780. $send_child['dest_order'] = (int)$value['dest_order']; //订单号
  781. $send_child['percentage'] = (int)$value['percentage']; //手数
  782. $send_child['profit'] = (float)$value['profit']; //利润点
  783. $send_child['error_code'] = (int)$value['error_code']; //错误的号码
  784. array_push($send['desc'],$send_child);
  785. }
  786. echo "发送的数据";
  787. var_dump($send);
  788. sendTo_tcp_Server($send);
  789. }
  790. //获取批量修改sql
  791. function batchUpdate($data,$field,$params=[],$tableName)
  792. {
  793. if (!is_array($data) || !$field || !is_array($params)) {
  794. return false;
  795. }
  796. $updates = parseUpdate($data, $field);
  797. $where = parseParams($params);
  798. $fields = array_column($data, $field);
  799. $fields = implode(',', array_map(function($value) {
  800. return "'".$value."'";
  801. }, $fields));
  802. $sql = sprintf("UPDATE `%s` SET %s WHERE `%s` IN (%s) %s", "$tableName", $updates, $field, $fields, $where);
  803. return $sql;
  804. }
  805. function parseUpdate($data,$field)
  806. {
  807. $sql = "";
  808. $keys = array_keys(current($data));
  809. foreach ($keys as $column){
  810. $sql .= sprintf(" `%s` = CASE `%s` \n",$column,$field);
  811. foreach ($data as $line){
  812. $sql .= sprintf("WHEN '%s' THEN '%s' \n",$line[$field],$line[$column]);
  813. }
  814. $sql .= "END,";
  815. }
  816. return rtrim($sql,",");
  817. }
  818. function parseParams($params)
  819. {
  820. $where = [];
  821. foreach ($params as $key=>$value){
  822. $where[] = sprintf(" `%s` = '%s' ",$key,$value);
  823. }
  824. return $where ? " AND " .implode('AND',$where) : '';
  825. }
  826. function curenttime()
  827. {
  828. $t = microtime(true);
  829. $micro = sprintf("%06d",($t - floor($t)) * 1000000);
  830. $d = new DateTime( date('Y-m-d H:i:s.'.$micro, $t) );
  831. print $d->format("Y-m-d H:i:s.u"); // note at point on "u"
  832. }
  833. //自定义日志类
  834. function log_file($data){
  835. $data = print_r($data,true);
  836. error_log(date("Y-m-d H:i:s", time()).' ===== info: '.$data."\n",3,__DIR__ . '/log/' . date("Ymd", time()) . '.log');
  837. }
  838. Worker::runAll(); // 执行函数