webserver.php 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. <?php
  2. require_once dirname(dirname(__FILE__)) . '/vendor/workerman/Autoloader.php'; //自动加载类
  3. require_once __DIR__ . '/config.php'; //配置文件
  4. use Workerman\Worker; //worker类
  5. use Workerman\Lib\Timer; //定时器函数
  6. //初始化 创建 websocket 服务器
  7. $workerWs = new Worker("websocket://0.0.0.0:12380");
  8. $workerWs->name = "WebSocket"; //命名为WebSocket
  9. // 心跳检测的实现
  10. $workerWs->onWorkerStart = function ($workerWs) {
  11. Timer::add(1, function () use ($workerWs) {
  12. $time_now = time();
  13. foreach ($workerWs->connections as $connection) {
  14. // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
  15. if (empty($connection->lastMessageTime)) {
  16. $connection->lastMessageTime = $time_now;
  17. continue;
  18. }
  19. //上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
  20. if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
  21. $connection->close();
  22. }
  23. }
  24. });
  25. };
  26. // 服务器收到信息后执行
  27. /**
  28. * @param $connection
  29. * @param $data
  30. */
  31. $workerWs->onMessage = function ($connection, $data) {
  32. $connection->lastMessageTime = time(); //最后的信息时间
  33. global $workerWs;
  34. $bufExplode = explode("\n", $data); //通过指定的分隔符,把字符串打散为数组
  35. foreach ($bufExplode as $key => $value) {
  36. $msg = json_decode($value, true); //解析数据
  37. if ($msg) {
  38. switch ($msg['type']) {
  39. case "heartbeat": //心跳检测(前端的心跳检测)
  40. $results['type'] = 'heartbeat';
  41. $results['data'] = 'I heard';
  42. $connection->send(json_encode($results) . "\n");
  43. break;
  44. case "login": //异步客户端的登录(标识身份)
  45. $connection->uid = $msg['uid'];
  46. break;
  47. case "vue_set_current_position":
  48. $data = $msg['send'];
  49. $data['type'] = "set_current_position";
  50. $data['position'] = (int) $msg['orderid'];
  51. sendToTcp($data);
  52. break;
  53. case "vue_insert": //插入发送内容
  54. if(isset($msg['message'])){
  55. sendToTcp([
  56. 'type' => 'insert',
  57. 'message' => 'ErrorInsert',
  58. 'orderid' => $msg['orderid'],
  59. 'data'=> $msg['data'],
  60. ]);
  61. }else{
  62. sendToTcp([
  63. 'type' => 'insert',
  64. 'orderid' => $msg['orderid'],
  65. 'data'=> $msg['data'],
  66. ]);
  67. }
  68. break;
  69. case "vue_rollback": //Rollback请求
  70. sendToTcp([
  71. 'type' => 'rollback',
  72. 'data'=> $msg['data'],
  73. ]);
  74. break;
  75. //从前端传递过来的数据 结束位置==================================================================
  76. case "set_current_order": //设置当前跟单订单信息
  77. sendToWeb([
  78. 'type' => $msg['type'],
  79. 'error_code' => $msg['error_code']
  80. ]);
  81. break;
  82. case "insert": //插入发送内容
  83. sendToWeb([
  84. 'type' => $msg['type'],
  85. 'orig_order' => $msg['orig_position'],
  86. 'orig_login' => $msg['orig_login'],
  87. 'dest' => $msg['dest']
  88. ]);
  89. break;
  90. case "ErrorInsert": //插入发送内容
  91. sendToWeb([
  92. 'type' => $msg['type'],
  93. 'orig_order' => $msg['orig_position'],
  94. 'orig_login' => $msg['orig_login'],
  95. 'dest' => $msg['dest']
  96. ]);
  97. break;
  98. case "rollback": //Rollback请求
  99. sendToWeb([
  100. 'type' => $msg['type'],
  101. 'orig_order' => $msg['orig_position'],
  102. 'orig_login' => $msg['orig_login'],
  103. 'desc' => $msg['desc']
  104. ]);
  105. break;
  106. //从接口过来的数据 结束位置========================(这里的数据是一条一条返回的)==============================================
  107. default:
  108. $results['type'] = 'info';
  109. $results['data'] = '未知类型';
  110. $connection->send(json_encode($results) . "\n");
  111. break;
  112. }
  113. } /*else {
  114. // 没有信息的时候
  115. $results['type'] = 'info';
  116. $results['data'] = 'json格式错误';
  117. $connection->send(json_encode($results) . "\n");
  118. }*/
  119. }
  120. };
  121. // 向所有的web用户发送 ( vue前端的 客户端 这里的客户端是没有uid的)
  122. function sendToWeb($msg)
  123. {
  124. global $workerWs;
  125. foreach ($workerWs->connections as $connection) {
  126. if (!isset($connection->uid)) {
  127. $connection->send(json_encode($msg) . "\n");
  128. }
  129. }
  130. }
  131. // 向本机的tcp服务器发送请求(tcp上有一个异步请求的客户端 他登陆是有uid字段的赋值的 )
  132. function sendToTcp($msg)
  133. {
  134. global $workerWs;
  135. foreach ($workerWs->connections as $connection) {
  136. if (isset($connection->uid) && $connection->uid == "tcp") { //只针对特定的用户发起请求
  137. $connection->send(json_encode($msg) . "\n");
  138. }
  139. }
  140. }
  141. Worker::runAll(); //执行函数