| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- <?php
- require_once dirname(dirname(__FILE__)) . '/vendor/workerman/Autoloader.php'; //自动加载类
- require_once __DIR__ . '/config.php'; //配置文件
- use Workerman\Worker; //worker类
- use Workerman\Lib\Timer; //定时器函数
- //初始化 创建 websocket 服务器
- $workerWs = new Worker("websocket://127.0.0.1:12380");
- $workerWs->name = "WebSocket"; //命名为WebSocket
- $connection_count = 0;
- // 心跳检测的实现
- $workerWs->onWorkerStart = function ($workerWs) {
- global $connection_count;
- Timer::add(30, function () use ($workerWs) {
- $time_now = time();
- foreach ($workerWs->connections as $connection) {
- // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
- if (empty($connection->lastMessageTime)) {
- $connection->lastMessageTime = $time_now;
- continue;
- }
- //上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
- if ($time_now - $connection->lastMessageTime > HEARTBEAT_TIME) {
- $connection->close();
- }
- }
- });
- };
- // 服务器收到信息后执行
- /**
- * @param $connection
- * @param $data
- */
- $workerWs->onMessage = function ($connection, $data) {
- $connection->lastMessageTime = time(); //最后的信息时间
- global $connection_count;
- global $workerWs;
- $connection_count = count($workerWs->connections);
- $bufExplode = explode("\n", $data); //通过指定的分隔符,把字符串打散为数组
- foreach ($bufExplode as $key => $value) {
-
- $msg = json_decode($value, true); //解析数据
- if ($msg) {
- switch ($msg['type']) {
- case "heartbeat": //心跳检测(前端的心跳检测)
- $results['type'] = 'heartbeat';
- $results['data'] = 'I heard';
- // sendToWeb($msg);
- $results['connection_count'] = $connection_count;
- $connection->send(json_encode($results) . "\n");
- break;
- case "login": //异步客户端的登录(标识身份)
- var_dump($msg['uid']);
- $connection->uid = $msg['uid'];
- break;
-
- // 从前端传递过来的数据 开始位置======================================================================
- case "vue_get_order_record": //查询当前跟单订单
- sendToTcp([
- 'type' => 'get_order_record',
- 'order' => $msg['orderid']
- ]);
- break;
- case "vue_set_current_order": //设置当前跟单订单信息
- sendToTcp([
- 'type' => 'set_current_order',
- 'order' => (int)$msg['orderid']
- ]);
- break;
- case "vue_insert": //插入发送内容
- sendToTcp([
- 'type' => 'insert',
- 'orderid' => $msg['orderid'],
- 'data'=> $msg['data'],
- ]);
- break;
- case "vue_rollback": //Rollback请求
- sendToTcp([
- 'type' => 'rollback',
- 'data'=> $msg['data'],
- ]);
- break;
- case "vue_except_rollback": //Rollback请求
- sendToTcp([
- 'type' => 'except_rollback',
- 'data'=> $msg['data'],
- ]);
- break;
-
- case "vue_retry": //Retry请求
- sendToTcp([
- 'type' => 'retry',
- 'data'=> $msg['data'],
- ]);
- break;
- //从前端传递过来的数据 结束位置==================================================================
-
-
- //从接口过来的数据 开始位置================(这里的数据是一条一条返回的)==============================================
- case "get_order_record": //查询当前跟单订单
- sendToWeb([
- 'type' => $msg['type'],
- 'err_code' => $msg['err_code'],
- 'login' => $msg['login'],
- 'order' => $msg['order'],
- 'symbol' => $msg['symbol'],
- 'digit' => $msg['digit'],
- 'volume' => $msg['volume'],
- 'open_time' => $msg['open_time'],
- 'open_price' => $msg['open_price'],
- 'close_time' => $msg['close_time'],
- 'close_price'=> $msg['close_price'],
- 'sl' => $msg['sl'],
- 'tp' => $msg['tp'],
- 'profit' => $msg['profit'],
- 'commission' => $msg['commission'],
- 'swap' => $msg['swap'],
- 'comment' => $msg['comment'],
- ]);
- break;
- case "set_current_order": //设置当前跟单订单信息
- sendToWeb([
- 'type' => $msg['type'],
- 'error_code' => $msg['error_code']
- ]);
- break;
- case "insert": //插入发送内容
- sendToWeb([
- 'type' => $msg['type'],
- 'orig_order' => $msg['orig_order'],
- 'orig_login' => $msg['orig_login'],
- 'dest' => $msg['dest']
- ]);
- break;
- case "rollback": //Rollback请求
- sendToWeb([
- 'type' => $msg['type'],
- 'orig_order' => $msg['orig_order'],
- 'orig_login' => $msg['orig_login'],
- 'desc' => $msg['desc']
- ]);
- break;
- case "except_rollback": //retry表中 Rollback请求
- sendToWeb([
- 'type' => $msg['type'],
- 'orig_order' => $msg['orig_order'],
- 'orig_login' => $msg['orig_login'],
- 'desc' => $msg['desc']
- ]);
- break;
-
- case "retry": //Retry请求
- sendToWeb([
- 'type' => $msg['type'],
- 'orig_order' => $msg['orig_order'], //最初的订单
- 'orig_login' => $msg['orig_login'], //最初的login
- 'desc' => $msg['desc']
- ]);
- break;
-
- //从接口过来的数据 结束位置========================(这里的数据是一条一条返回的)==============================================
-
- default:
- $results['type'] = 'info';
- $results['data'] = '未知类型';
- $connection->send(json_encode($results) . "\n");
- break;
- }
- } /*else {
- // 没有信息的时候
- $results['type'] = 'info';
- $results['data'] = 'json格式错误';
- $connection->send(json_encode($results) . "\n");
-
- }*/
- }
- };
- // 向所有的web用户发送 ( vue前端的 客户端 这里的客户端是没有uid的)
- function sendToWeb($msg)
- {
- global $workerWs;
- foreach ($workerWs->connections as $connection) {
- if (!isset($connection->uid)) {
- $connection->send(json_encode($msg) . "\n");
- }
- }
- }
- // 向本机的tcp服务器发送请求(tcp上有一个异步请求的客户端 他登陆是有uid字段的赋值的 )
- function sendToTcp($msg)
- {
- global $workerWs;
- foreach ($workerWs->connections as $connection) {
- if (isset($connection->uid) && $connection->uid == "tcp") { //只针对特定的用户发起请求
- $connection->send(json_encode($msg) . "\n");
- }
- }
- }
- Worker::runAll(); //执行函数
|