webserver.php 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. <?php
  2. require_once dirname(dirname(__FILE__)) . '/vendor/workerman/Autoloader.php'; //自动加载类
  3. require_once __DIR__ . '/config.php'; //配置文件
  4. use Workerman\Worker; //worker类
  5. use Workerman\Lib\Timer; //定时器函数
  6. //初始化 创建 websocket 服务器
  7. $workerWs = new Worker("websocket://127.0.0.1:12380");
  8. $workerWs->name = "WebSocket"; //命名为WebSocket
  9. $connection_count = 0;
  10. // 心跳检测的实现
  11. $workerWs->onWorkerStart = function ($workerWs) {
  12. global $connection_count;
  13. Timer::add(30, function () use ($workerWs) {
  14. $time_now = time();
  15. foreach ($workerWs->connections as $connection) {
  16. // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
  17. if (empty($connection->lastMessageTime)) {
  18. $connection->lastMessageTime = $time_now;
  19. continue;
  20. }
  21. //上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
  22. if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
  23. $connection->close();
  24. }
  25. }
  26. });
  27. };
  28. // 服务器收到信息后执行
  29. /**
  30. * @param $connection
  31. * @param $data
  32. */
  33. $workerWs->onMessage = function ($connection, $data) {
  34. $connection->lastMessageTime = time(); //最后的信息时间
  35. global $connection_count;
  36. global $workerWs;
  37. $connection_count = count($workerWs->connections);
  38. $bufExplode = explode("\n", $data); //通过指定的分隔符,把字符串打散为数组
  39. foreach ($bufExplode as $key => $value) {
  40. $msg = json_decode($value, true); //解析数据
  41. if ($msg) {
  42. switch ($msg['type']) {
  43. case "heartbeat": //心跳检测(前端的心跳检测)
  44. $results['type'] = 'heartbeat';
  45. $results['data'] = 'I heard';
  46. // sendToWeb($msg);
  47. $results['connection_count'] = $connection_count;
  48. $connection->send(json_encode($results) . "\n");
  49. break;
  50. case "login": //异步客户端的登录(标识身份)
  51. $connection->uid = $msg['uid'];
  52. break;
  53. // 从前端传递过来的数据 开始位置======================================================================
  54. case "vue_get_order_record": //查询当前跟单订单
  55. sendToTcp([
  56. 'type' => 'get_order_record',
  57. 'order' => $msg['orderid']
  58. ]);
  59. break;
  60. case "vue_set_current_position":
  61. $data = $msg['send'];
  62. $data['type'] = "set_current_position";
  63. $data['position'] = (int) $msg['orderid'];
  64. sendToTcp($data);
  65. break;
  66. case "vue_set_current_order": //设置当前跟单订单信息
  67. sendToTcp([
  68. 'type' => 'set_current_order',
  69. 'order' => (int)$msg['orderid']
  70. ]);
  71. break;
  72. case "vue_insert": //插入发送内容
  73. if(isset($msg['message'])){
  74. sendToTcp([
  75. 'type' => 'insert',
  76. 'message' => 'ErrorInsert',
  77. 'orderid' => $msg['orderid'],
  78. 'data'=> $msg['data'],
  79. ]);
  80. }else{
  81. sendToTcp([
  82. 'type' => 'insert',
  83. 'orderid' => $msg['orderid'],
  84. 'data'=> $msg['data'],
  85. ]);
  86. }
  87. break;
  88. case "vue_rollback": //Rollback请求
  89. sendToTcp([
  90. 'type' => 'rollback',
  91. 'data'=> $msg['data'],
  92. ]);
  93. break;
  94. case "vue_except_rollback": //Rollback请求
  95. sendToTcp([
  96. 'type' => 'except_rollback',
  97. 'data'=> $msg['data'],
  98. ]);
  99. break;
  100. case "vue_retry": //Retry请求
  101. sendToTcp([
  102. 'type' => 'retry',
  103. 'data'=> $msg['data'],
  104. ]);
  105. break;
  106. //从前端传递过来的数据 结束位置==================================================================
  107. //从接口过来的数据 开始位置================(这里的数据是一条一条返回的)==============================================
  108. case "get_order_record": //查询当前跟单订单
  109. sendToWeb([
  110. 'type' => $msg['type'],
  111. 'err_code' => $msg['err_code'],
  112. 'login' => $msg['login'],
  113. 'order' => $msg['order'],
  114. 'symbol' => $msg['symbol'],
  115. 'digit' => $msg['digit'],
  116. 'volume' => $msg['volume'],
  117. 'open_time' => $msg['open_time'],
  118. 'open_price' => $msg['open_price'],
  119. 'close_time' => $msg['close_time'],
  120. 'close_price'=> $msg['close_price'],
  121. 'sl' => $msg['sl'],
  122. 'tp' => $msg['tp'],
  123. 'profit' => $msg['profit'],
  124. 'commission' => $msg['commission'],
  125. 'swap' => $msg['swap'],
  126. 'comment' => $msg['comment'],
  127. ]);
  128. break;
  129. case "set_current_order": //设置当前跟单订单信息
  130. sendToWeb([
  131. 'type' => $msg['type'],
  132. 'error_code' => $msg['error_code']
  133. ]);
  134. break;
  135. case "insert": //插入发送内容
  136. sendToWeb([
  137. 'type' => $msg['type'],
  138. 'orig_order' => $msg['orig_order'],
  139. 'orig_login' => $msg['orig_login'],
  140. 'dest' => $msg['dest']
  141. ]);
  142. break;
  143. case "ErrorInsert": //插入发送内容
  144. sendToWeb([
  145. 'type' => $msg['type'],
  146. 'orig_order' => $msg['orig_order'],
  147. 'orig_login' => $msg['orig_login'],
  148. 'dest' => $msg['dest']
  149. ]);
  150. break;
  151. case "rollback": //Rollback请求
  152. sendToWeb([
  153. 'type' => $msg['type'],
  154. 'orig_order' => $msg['orig_order'],
  155. 'orig_login' => $msg['orig_login'],
  156. 'desc' => $msg['desc']
  157. ]);
  158. break;
  159. case "except_rollback": //retry表中 Rollback请求
  160. sendToWeb([
  161. 'type' => $msg['type'],
  162. 'orig_order' => $msg['orig_order'],
  163. 'orig_login' => $msg['orig_login'],
  164. 'desc' => $msg['desc']
  165. ]);
  166. break;
  167. case "retry": //Retry请求
  168. sendToWeb([
  169. 'type' => $msg['type'],
  170. 'orig_order' => $msg['orig_order'], //最初的订单
  171. 'orig_login' => $msg['orig_login'], //最初的login
  172. 'desc' => $msg['desc']
  173. ]);
  174. break;
  175. //从接口过来的数据 结束位置========================(这里的数据是一条一条返回的)==============================================
  176. default:
  177. $results['type'] = 'info';
  178. $results['data'] = '未知类型';
  179. $connection->send(json_encode($results) . "\n");
  180. break;
  181. }
  182. } /*else {
  183. // 没有信息的时候
  184. $results['type'] = 'info';
  185. $results['data'] = 'json格式错误';
  186. $connection->send(json_encode($results) . "\n");
  187. }*/
  188. }
  189. };
  190. // 向所有的web用户发送 ( vue前端的 客户端 这里的客户端是没有uid的)
  191. function sendToWeb($msg)
  192. {
  193. global $workerWs;
  194. foreach ($workerWs->connections as $connection) {
  195. if (!isset($connection->uid)) {
  196. $connection->send(json_encode($msg) . "\n");
  197. }
  198. }
  199. }
  200. // 向本机的tcp服务器发送请求(tcp上有一个异步请求的客户端 他登陆是有uid字段的赋值的 )
  201. function sendToTcp($msg)
  202. {
  203. global $workerWs;
  204. foreach ($workerWs->connections as $connection) {
  205. if (isset($connection->uid) && $connection->uid == "tcp") { //只针对特定的用户发起请求
  206. $connection->send(json_encode($msg) . "\n");
  207. }
  208. }
  209. }
  210. Worker::runAll(); //执行函数