outLog("Job start " . date('Y-m-d H:i:s')); if ($this->redis->get("aike_service:CommissionJobRun") != false) { $this->outLog("Job is already running, exit."); return; } if ($this->redis->set("aike_service:CommissionJobRun", "1", "NX") == false) { $this->outLog("Job start failed, lock exist, exit."); return; } $now = time(); $lastRunTime = $this->redis->get("aike_service:CommissionJobRunTime"); if ($lastRunTime == false) { $lastRunTime = $now; } $this->redis->set("aike_service:CommissionJobRunTime", $now); $config = Config::find()->asArray()->limit(1)->one(); $commissionDealTime = strtotime($config['commission_deal_time']); // 为了防止数据同步慢,故实际查找返佣的订单往前推24小时 $closeTime = max($lastRunTime - 86400, $commissionDealTime); // 获取所有存在上级代理的MT4 USER $userMembers = UserMember::find()->where(['is_commission_run' => 1])->asArray()->all(); $logins = []; foreach ($userMembers as $userMember) { $logins[] = $userMember['login']; } // forex $forexSymbols = preg_split('/\s*,\s*/', $config['forex_symbols'], -1, PREG_SPLIT_NO_EMPTY); // cfd $cfdSymbols = preg_split('/\s*,\s*/', $config['cfd_symbols'], -1, PREG_SPLIT_NO_EMPTY); // silver $xagSymbols = preg_split('/\s*,\s*/', $config['xag_symbols'], -1, PREG_SPLIT_NO_EMPTY); // gold $xauSymbols = preg_split('/\s*,\s*/', $config['xau_symbols'], -1, PREG_SPLIT_NO_EMPTY); // metal $metalSymbols = preg_split('/\s*,\s*/', $config['metal_symbols'], -1, PREG_SPLIT_NO_EMPTY); // stock $stockSymbols = preg_split('/\s*,\s*/', $config['stock_symbols'], -1, PREG_SPLIT_NO_EMPTY); $this->outLog("Auto commission, CLOSE_TIME >= " . date('Y-m-d H:i:s', $closeTime)); $maxPk = null; $limit = 1000; $memberCache = []; $parentMemberCache = []; $commissionCache = []; while (true) { // 获取未返佣的订单 $query = Mt4Trades::find(); if ($maxPk != null) { $query->andWhere(['>', 'TICKET', $maxPk]); $this->outLog("Auto commission, CLOSE_TIME >= " . date('Y-m-d H:i:s', $closeTime) . ", TICKET > {$maxPk}"); } $allTrades = $query->andWhere('CMD=0 or CMD=1')->andWhere(['LOGIN' => $logins]) ->andWhere(['>=', 'CLOSE_TIME', date('Y-m-d H:i:s', $closeTime)]) ->orderBy('TICKET asc') ->limit($limit) ->asArray()->all(); if (empty($allTrades)) { break; } foreach ($allTrades as $trade) { $maxPk = $trade['TICKET']; $isExist = (new Query())->from('crm_commission_ticket')->where(['ticket' => $trade['TICKET']])->exists($this->getDb()); if ($isExist) { $this->outLog("Commission ticket({$trade['TICKET']}) exist, skipped."); continue; } $volume = $trade['VOLUME'] / 100; // 获取上级代理 $ibMember = null; foreach ($userMembers as $userMember) { if ($userMember['login'] == $trade['LOGIN']) { if (!array_key_exists($userMember['member_id'], $memberCache)) { $memberCache[$userMember['member_id']] = Member::find()->where(['id' => $userMember['member_id']])->asArray()->limit(1)->one(); } $ibMember = $memberCache[$userMember['member_id']]; if ($ibMember == null) { $this->outLog("IB member(id: {$userMember['member_id']}) not found, skipped."); } break; } } if ($ibMember == null || $ibMember['is_enable'] != 1) { $this->outLog("IB member(id: {$ibMember['id']}) is disabled, skipped."); continue; } if (!array_key_exists($ibMember['id'], $parentMemberCache)) { $parentMemberCache[$ibMember['id']] = Member::findParents($ibMember['id']); } $parentMembers = $parentMemberCache[$ibMember['id']]; // 返佣记录 crm_commission_record $records = []; foreach ((array)$parentMembers as $key => $member) { $loginArr = explode(',', $member['logins']); $ibLogin = isset($loginArr[0]) ? intval($loginArr[0]) : null; if ($ibLogin == null) { $this->outLog("IB member login({$ibLogin}) is null, skipped."); continue; } // 获取返佣规则 if (!array_key_exists($member['id'] . '_' . $trade['LOGIN'], $commissionCache)) { $commissionCache[$member['id'] . '_' . $trade['LOGIN']] = Commission::find()->where(['login' => $trade['LOGIN'], 'member_id' => $member['id']])->asArray()->limit(1)->one(); } $commission = $commissionCache[$member['id'] . '_' . $trade['LOGIN']]; if ($commission == null) { $this->outLog("Commission rule(login: {$trade['LOGIN']}, member_id: {$member['id']}) not found, skipped."); continue; } $comm = 0; // 返佣 $commRule = ''; // 返佣规则 $commWy = 0; // 外佣 $commWyRule = ''; // 外佣规则 if (in_array($trade['SYMBOL'], $forexSymbols)) { // FOREX $tradeType = 0; if ($commission['forex'] > 0) { $comm = $volume * $commission['forex']; $commRule = "N" . sprintf('%.1f', $commission['forex']); } } elseif (in_array($trade['SYMBOL'], $cfdSymbols)) { // CFD $tradeType = 1; if ($commission['cfd'] > 0) { $comm = $volume * $commission['cfd']; $commRule = "N" . sprintf('%.1f', $commission['cfd']); } } elseif (in_array($trade['SYMBOL'], $xauSymbols)) { // 黄金 $tradeType = 2; if ($commission['gold'] > 0) { $comm = $volume * $commission['gold']; $commRule = "N" . sprintf('%.1f', $commission['gold']); } elseif ($commission['metal'] > 0) { $comm = $volume * $commission['metal']; $commRule = "N" . sprintf('%.1f', $commission['metal']); } } elseif (in_array($trade['SYMBOL'], $xagSymbols)) { // 白银 $tradeType = 3; if ($commission['silver'] > 0) { $comm = $volume * $commission['silver']; $commRule = "N" . sprintf('%.1f', $commission['silver']); } elseif ($commission['metal'] > 0) { $comm = $volume * $commission['metal']; $commRule = "N" . sprintf('%.1f', $commission['metal']); } } elseif (in_array($trade['SYMBOL'], $stockSymbols)) { // 股指 $tradeType = 4; if ($commission['stock'] > 0) { $comm = $volume * $commission['stock']; $commRule = "N" . sprintf('%.1f', $commission['stock']); } } else { // 未知 $tradeType = 9; } // CFD产品没外佣 if ($tradeType == 0 || $tradeType == 2 || $tradeType == 3) { // 外佣 if ($commission['wy'] > 0) { $commWy = $volume * $commission['wy']; $commWyRule = "W" . sprintf('%.1f', $commission['wy']); } } if ($comm > 0) { $record = [ 'ib_id' => $member['id'], 'ib_login' => $ibLogin, 'user_login' => $trade['LOGIN'], 'trade_ticket' => $trade['TICKET'], 'trade_type' => $tradeType, 'trade_volume' => $volume, 'commission_rule' => $commRule, 'commission' => $comm, 'in_time' => intval(microtime(true) * 1000), ]; $records[] = $record; } if ($commWy > 0) { $record = [ 'ib_id' => $member['id'], 'ib_login' => $ibLogin, 'user_login' => $trade['LOGIN'], 'trade_ticket' => $trade['TICKET'], 'trade_type' => $tradeType, 'trade_volume' => $volume, 'commission_rule' => $commWyRule, 'commission' => $commWy, 'in_time' => intval(microtime(true) * 1000), ]; $records[] = $record; } } if ($records) { $transaction = $this->getDb()->beginTransaction(); try { $command1 = $this->getDb()->createCommand(); $ticket = [ 'ticket' => $trade['TICKET'], ]; $affectRows = $command1->insert('crm_commission_ticket', $ticket)->execute(); if ($affectRows == 0) { throw new \Exception("Ticket insert failed. {$command1->getRawSql()}"); } foreach ($records as $record) { $command2 = $this->getDb()->createCommand(); $affectRows = $command2->insert('crm_commission_record', $record)->execute(); if ($affectRows == 0) { throw new \Exception("Record insert failed. {$command2->getRawSql()}"); } $command3 = $this->getDb()->createCommand(); $deposit = [ 'login' => $record['ib_login'], 'amount' => round($record['commission'], 2), 'comment' => "Auto Commission", 'is_sync' => 0, 'record_id' => $this->getDb()->getLastInsertID(), 'in_time' => intval(microtime(true) * 1000), ]; $affectRows = $command3->insert('crm_sync_desposit', $deposit)->execute(); if ($affectRows == 0) { throw new \Exception("SyncDeposit insert failed. {$command3->getRawSql()}"); } } $transaction->commit(); } catch (\Exception $e) { $this->outLog($e->getMessage()); $transaction->rollBack(); } } } if (count($allTrades) != $limit) { break; } } $this->outLog("Job end, " . date('Y-m-d H:i:s')); $this->redis->del("aike_service:CommissionJobRun"); } /** * @return null|Connection */ protected function getDb() { return Yii::$app->get('dbXcrm'); } }