#include "pch.h" #include "PluginInstance.h" #define REDIS_TIMER_ID (1) CPluginInstance::CPluginInstance() : m_enable(false) , m_redis_error_notified(false) , m_debug_mode(0) { } CPluginInstance::~CPluginInstance() { Stop(); } void CPluginInstance::Release() { delete this; } MTAPIRES CPluginInstance::Start(IMTServerAPI * server) { MTAPIRES ret = MT_RET_OK; if (!server) return MT_RET_ERR_PARAMS; m_api = server; if ((m_config = m_api->PluginCreate()) == nullptr) return MT_RET_ERR_MEM; ret = m_api->About(m_info); if (ret != MT_RET_OK) m_api->LoggerOut(MTLogOK, L"Server info failed [%d]", ret); if ((ret = m_api->PluginSubscribe(this)) != MT_RET_OK) { m_api->LoggerOut(MTLogAtt, L"Plugin subscribe failed [%d]", ret); return ret; } //if (ret = m_api->OrderSubscribe(this) != MT_RET_OK) //{ // m_api->LoggerOut(MTLogAtt, L"Order subscribe failed [%d]", ret); // return ret; //} //if (ret = m_api->DealSubscribe(this) != MT_RET_OK) //{ // m_api->LoggerOut(MTLogAtt, L"Deal subscribe failed [%d]", ret); // return ret; //} if (ret = m_api->TradeSubscribe(this) != MT_RET_OK) { m_api->LoggerOut(MTLogAtt, L"Trade subscribe failed [%d]", ret); return ret; } if ((ret = LoadParam()) != MT_RET_OK) { m_api->LoggerOut(MTLogAtt, L"Load param failed [%d]", ret); return ret; } return MT_RET_OK; } MTAPIRES CPluginInstance::Stop() { MTAPIRES ret = MT_RET_OK; std::lock_guard lk(m_lock); m_followers.clear(); if (m_api == nullptr) return MT_RET_OK; if (m_config != nullptr) { m_config->Release(); m_config = nullptr; } if ((ret = m_api->PluginUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND) m_api->LoggerOut(MTLogErr, L"Failed to unsubscribe from plugin config updates [%s (%u)]", SMTFormat::FormatError(ret), ret); //if ((ret = m_api->OrderUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND) // m_api->LoggerOut(MTLogErr, L"Failed to unsubscribe order sink [%s (%u)]", // SMTFormat::FormatError(ret), ret); //if ((ret = m_api->DealUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND) // m_api->LoggerOut(MTLogErr, L"failed to unsubscribe deal [%s (%u)]", // SMTFormat::FormatError(ret), ret); if ((ret = m_api->TradeUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND) m_api->LoggerOut(MTLogErr, L"Failed to unsubscribe trade sink [%s (%u)]", SMTFormat::FormatError(ret), ret); m_api = nullptr; m_enable = false; stop_redis(); if (m_work_thread.joinable()) m_work_thread.join(); return MT_RET_OK; } void CPluginInstance::OnPluginUpdate(const IMTConPlugin * plugin) { int ret = MT_RET_OK; if ((ret = LoadParam()) != MT_RET_OK) { m_api->LoggerOut(MTLogAtt, L"Load param failed [%d]", ret); } } //void CPluginInstance::OnOrderAdd(const IMTOrder * order) //{ // if (order == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (order->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnOrderAdd, login: %lld, ord: %lld, pos: %lld, state: %lld, vol_init: %lld, vol_cur: %lld", // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent()); //} //void CPluginInstance::OnOrderUpdate(const IMTOrder * order) //{ // if (order == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (order->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnOrderAdd, login: %lld, ord: %lld, pos: %lld, state: %lld, vol_init: %lld, vol_cur: %lld", // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent()); //} //void CPluginInstance::OnOrderDelete(const IMTOrder * order) //{ // if (order == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (order->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnOrderDelete, login: %lld, ord: %lld, pos: %lld, state: %d, vol_init: %lld, vol_cur: %lld", // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent()); // // if (order->Type() != IMTOrder::OP_BUY // && order->Type() != IMTOrder::OP_SELL) // { // // 不是buy或者sell,不跟,直接退出 // return; // } // // char order_buf[128]; // sprintf(order_buf, "%lld", order->PositionID()); // UINT64 position = order->PositionID(); // UINT64 order_id = order->Order(); // IMTAccount* account = m_api->UserCreateAccount(); // IMTOrder* new_order = m_api->OrderCreate(); // // ScopeGuard guard([account, new_order, this] // { // if (account) // account->Release(); // if (new_order) // new_order->Release(); // // m_redis_client->commit(); // }); // // // 订单在进入filled状态时,也会产生一个order delete回调 // if (order->Order() == order->PositionID()) // { // // 如果是新建订单,那么写入新纪录 // char login_buf[128] = { 0 }; // char req_buf[128] = { 0 }; // for (auto login : m_followers) // { // if (m_api->UserAccountGet(login, account) != MT_RET_OK) // { // // TODO: 这里失败直接跳过没有做其他处理,下一次进来可能依然会遇到一样的问题 // continue; // } // sprintf(login_buf, "%lld", login); // // UINT level = (UINT)(account->Balance() / m_step); // UINT volume = round((double)level * order->VolumeCurrent() / 10000) * 100; // UINT volume_ext = round((double)level * order->VolumeCurrentExt() / 10000) * 100; // UINT init_volume = round((double)level * order->VolumeInitial() / 10000) * 100; // UINT init_volume_ext = round((double)level * order->VolumeInitialExt() / 10000) * 100; // UINT64 new_order_id = 0; // // m_api->LoggerOut(MTLogOK, L"add order, vol_init: %lld, vol_cur: %lld", init_volume, volume); // // char msg[1024] = { 0 }; // sprintf(msg, "orig_position=&lld&login=%lld&source_login=1005&symbol=%s&action=200&type=%d&volume=%lld&price_order=%lf", // order->PositionID(), login, ws2s(order->Symbol()).c_str(), order->Type(), init_volume, order->PriceOrder()); // m_redis_client->publish("dealer_send", msg, [this, position, login, order_id](cpp_redis::reply& r) // { // // 记录下login order等信息 // m_api->LoggerOut(MTLogErr, L"original position #%lld, original order #%lld, login %lld, failed to excute dealer_send", // ); // }); // // // dealer send之后需要另外一边记录request id // // // 尚未完成建仓,目前position id不填,仅完成订单后记录 // new_order->Clear(); // new_order->VolumeInitial(init_volume); // new_order->VolumeCurrent(volume); // new_order->Login(login); // new_order->Symbol(order->Symbol()); // new_order->Type(order->Type()); // new_order->Digits(order->Digits()); // new_order->DigitsCurrency(order->DigitsCurrency()); // new_order->ContractSize(order->ContractSize()); // new_order->PriceOrder(order->PriceOrder()); // new_order->PriceCurrent(order->PriceCurrent()); // new_order->StateSet(order->State()); // new_order->TimeSetup(order->TimeSetup()); // new_order->TimeSetupMsc(order->TimeSetupMsc()); // new_order->TimeDone(order->TimeDone()); // new_order->TimeDoneMsc(order->TimeDoneMsc()); // // -- // new_order->PriceSL(order->PriceSL()); // new_order->PriceTP(order->PriceTP()); // new_order->Comment(order->Comment()); // new_order->ActivationFlags(order->ActivationFlags()); // new_order->ActivationMode(order->ActivationMode()); // new_order->ActivationPrice(order->ActivationPrice()); // new_order->ActivationTime(order->ActivationTime()); // new_order->PriceTrigger(order->PriceTrigger()); // new_order->RateMargin(order->RateMargin()); // // new_order->ReasonSet(order->Reason()); // // new_order->TypeFill(order->TypeFill()); // // new_order->TypeTime(order->TypeTime()); // // // MTAPIRES ret = m_api->HistoryAdd(new_order); // if (ret != MT_RET_OK) // { // m_api->LoggerOut(MTLogErr, L"%lld failed to add order, original order #%lld [%d]", login, order->Login(), ret); // } // else // { // new_order_id = new_order->Order(); // } // // m_api->LoggerOut(MTLogOK, L"%lld add order #%lld, original order #%lld [%d]", login, new_order_id, order->Order()); // // // TODO: 现在做法是,如果level为0,后面都不管 // // 如果跟单失败,那么position id也为0,所以后面也不应该处理 // position_context context; // context.level = level; // context.cur_ord = new_order_id; // context.position_id = new_order_id; // 建仓,order id = position id // int direction = 1; // if (order->Type() == IMTOrder::OP_SELL) // direction = -1; // context.volume = direction * order->VolumeInitial(); // // m_redis_client->hset(order_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r) // { // if (r.ko()) // { // m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str()); // } // }); // // // 可以在退出前提交 // // m_redis_client->commit(); // } // } // else // { // // 如果不是新建订单,先使用position id先检查记录是否存在 // char login_buf[128]; // for (auto login : m_followers) // { // sprintf(login_buf, "%lld", login); // auto fut = m_redis_client->hget(order_buf, login_buf); // m_redis_client->sync_commit(); // auto reply = fut.get(); // // m_api->LoggerOut(MTLogOK, L"request cache, key %s, field %s", s2ws(order_buf).c_str(), s2ws(login_buf).c_str()); // // // 如果不存在,忽略 // if (reply.ko()) continue; // if (reply.is_null()) continue; // // m_api->LoggerOut(MTLogOK, L"get context, login: %lld", login); // // // 获取context // position_context context; // memcpy(&context, reply.as_string().c_str(), sizeof(position_context)); // // // 按建仓时的叙述,如果level或者position为0,亦忽略该记录 // if (context.level == 0) continue; // if (context.position_id == 0) continue; // // m_api->LoggerOut(MTLogOK, L"get context, order: #%lld, position: #%lld", context.cur_ord, context.position_id); // // // 如果存在,则继续操作 // UINT volume = round((double)context.level * order->VolumeCurrent() / 10000) * 100; // UINT volume_ext = round((double)context.level * order->VolumeCurrentExt() / 10000) * 100; // UINT init_volume = round((double)context.level * order->VolumeInitial() / 10000) * 100; // UINT init_volume_ext = round((double)context.level * order->VolumeInitialExt() / 10000) * 100; // UINT64 new_order_id = 0; // // new_order->Clear(); // new_order->VolumeInitial(init_volume); // new_order->VolumeCurrent(volume); // 这里是状态START,如果是状态4的FILLED时,volume current为0 // //new_order->VolumeCurrent(init_volume); // new_order->Login(login); // new_order->Symbol(order->Symbol()); // new_order->Type(order->Type()); // new_order->Digits(order->Digits()); // new_order->DigitsCurrency(order->DigitsCurrency()); // new_order->ContractSize(order->ContractSize()); // new_order->PriceOrder(order->PriceOrder()); // new_order->PriceCurrent(order->PriceCurrent()); // new_order->StateSet(order->State()); // new_order->TimeSetup(order->TimeSetup()); // new_order->TimeSetupMsc(order->TimeSetupMsc()); // new_order->TimeDone(order->TimeDone()); // new_order->TimeDoneMsc(order->TimeDoneMsc()); // new_order->PositionID(context.position_id); // // new_order->PriceSL(order->PriceSL()); // new_order->PriceTP(order->PriceTP()); // new_order->Comment(order->Comment()); // new_order->ActivationFlags(order->ActivationFlags()); // new_order->ActivationMode(order->ActivationMode()); // new_order->ActivationPrice(order->ActivationPrice()); // new_order->ActivationTime(order->ActivationTime()); // new_order->PriceTrigger(order->PriceTrigger()); // new_order->RateMargin(order->RateMargin()); // // new_order->ReasonSet(order->Reason()); // // new_order->TypeFill(order->TypeFill()); // // new_order->TypeTime(order->TypeTime()); // // // MTAPIRES ret = m_api->HistoryAdd(new_order); // if (ret != MT_RET_OK) // { // m_api->LoggerOut(MTLogErr, L"%lld failed to add order, original order #%lld [%d]", login, order->Order(), ret); // // FIXME: 如果做单失败该怎么办 // continue; // } // // new_order_id = new_order->Order(); // m_api->LoggerOut(MTLogOK, L"%lld add order #%lld, original order #%lld", login, new_order_id, order->Order()); // // // 完成之后,写入新纪录 // context.cur_ord = new_order_id; // int direction = 1; // if (order->Type() == IMTOrder::OP_SELL) // direction = -1; // context.volume += direction * order->VolumeInitial(); // // m_api->LoggerOut(MTLogOK, L"write cache, key %s, field %s", s2ws(order_buf).c_str(), s2ws(login_buf).c_str()); // // m_redis_client->hset(order_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r) // { // if (r.ko()) // { // m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str()); // } // }); // // 下一次get会调用commit // } // } //} //void CPluginInstance::OnOrderClean(const UINT64 login) //{ // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (login != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnOrderClean, Login: %d", login); //} //void CPluginInstance::OnDealAdd(const IMTDeal * deal) //{ // if (deal == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (deal->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnDealAdd, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld", // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed()); //} //void CPluginInstance::OnDealUpdate(const IMTDeal * deal) //{ // if (deal == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (deal->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnDealUpdate, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld", // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed()); //} //void CPluginInstance::OnDealDelete(const IMTDeal * deal) //{ // if (deal == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (deal->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnDealDelete, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld", // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed()); //} //void CPluginInstance::OnDealClean(const UINT64 login) //{ // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (login != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OndealClean, Login: %d", login); //} //void CPluginInstance::OnDealPerform(const IMTDeal * deal, IMTAccount * account, IMTPosition * position) //{ // // position为nullptr时,说明该deal不是由交易本身触发 // // account是deal完成后的用户状况 // // position是交易完成后的持仓状况 // // 对于deal是关闭一个持仓的情况,该position的volume会是0 // if (position == nullptr // || deal == nullptr // || account == nullptr) // return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (deal->Login() != m_trader) return; // // //m_api->LoggerOut(MTLogOK, L"OnDealPerform, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld", // // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed()); // // // FIXME: 需要检查现在的deal是否和之前的order对应。如果没有记录到,则没法根本无法正常跟单以及平仓 // char order_buf[128]; // sprintf(order_buf, "%lld", deal->PositionID()); // IMTDeal* new_deal = m_api->DealCreate(); // // std::vector* fields = nullptr; // // ScopeGuard guard([new_deal, &fields, this] // { // new_deal->Release(); // if (fields) // { // fields->clear(); // delete fields; // fields = nullptr; // } // // // 退出前调用commit // m_redis_client->commit(); // }); // // char login_buf[128]; // for (auto login : m_followers) // { // ScopeGuard g([position, &fields, this, login, login_buf]() // { // if (position->Volume() == 0) // { // if (fields == nullptr) // fields = new(std::vector); // // m_api->LoggerOut(MTLogOK, L"add field %lld to be delete", login); // fields->push_back(login_buf); // } // }); // // sprintf(login_buf, "%lld", login); // auto fut = m_redis_client->hget(order_buf, login_buf); // m_redis_client->sync_commit(); // auto reply = fut.get(); // // m_api->LoggerOut(MTLogOK, L"%lld add deal, original deal #%lld", login, deal->Deal()); // // // 如果不存在,忽略 // if (reply.ko()) continue; // if (reply.is_null()) continue; // // // 获取context // position_context context; // memcpy(&context, reply.as_string().c_str(), sizeof(position_context)); // // // 按建仓时的叙述,如果level或者position为0,亦忽略该记录 // if (context.level == 0) continue; // if (context.position_id == 0) continue; // // uint64_t volume = context.level * deal->Volume() / 100; // uint64_t volume_ext = context.level * deal->VolumeExt() / 100; // uint64_t volume_closed = context.level * deal->VolumeClosed() / 100; // uint64_t volume_closed_ext = context.level * deal->VolumeClosed() / 100; // // double prop = (double)volume / deal->Volume(); // double profit = deal->Profit() * prop; // double commission = deal->Commission() * prop; // double storage = deal->Storage() * prop; // double raw_profit = deal->ProfitRaw() * prop; // // m_api->LoggerOut(MTLogOK, L"add new deal, volume %lld, volume closed %lld, order #%lld, position #%lld", volume, volume_closed, context.cur_ord, context.position_id); // // new_deal->Clear(); // new_deal->Assign(deal); // // new_deal->Login(login); // new_deal->DealSet(0); // new_deal->Volume(volume); // new_deal->VolumeExt(volume_ext); // new_deal->VolumeClosed(volume_closed); // new_deal->VolumeClosedExt(volume_closed_ext); // new_deal->ProfitRaw(raw_profit); // new_deal->Profit(profit); // new_deal->Commission(commission); // new_deal->Storage(storage); // new_deal->Order(context.cur_ord); // new_deal->PositionID(context.position_id); // // MTAPIRES ret = m_api->DealAdd(new_deal); // if (ret != MT_RET_OK) // { // // TODO: 有没有更多的错误处理? // m_api->LoggerOut(MTLogErr, L"%lld cannot add deal [%d] to order #%lld, original deal: #%lld", login, ret, context.cur_ord, deal->Deal()); // continue; // } // // m_api->LoggerOut(MTLogOK, L"add deal #%lld, original deal #%lld", new_deal->Deal(), deal->Deal()); // // m_redis_client->publish("mt5_balance_fix", login_buf, [this](cpp_redis::reply r) // { // if (r.ko()) // { // m_api->LoggerOut(MTLogErr, L"redis publish: %s", r.error().c_str()); // } // }); // // commit 在退出前完成 // } // // m_api->LoggerOut(MTLogOK, L"deal #%lld, position #%lld, volume %lld", deal->Deal(), position->Position(), position->Volume()); // if (position->Volume() == 0) // { // // 如果平仓,则删除hash值 // if (position->Volume() == 0 && fields != nullptr) // { // m_redis_client->hdel(order_buf, *fields, [this](cpp_redis::reply& r) // { // if (r.ko()) // { // // TODO 错误处理 // try // { // m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str()); // } // catch (...) // { // } // } // }); // } // // // TODO 当position中的volume为0时,持仓被彻底平调,被跟订单是否也该检查 // } //} void CPluginInstance::OnTradeRequestProcess(const IMTRequest* request, const IMTConfirm* confirm, const IMTConGroup* group, const IMTConSymbol* symbol, const IMTPosition* position, const IMTOrder* order, const IMTDeal* deal) { if (request == nullptr || group == nullptr || symbol == nullptr) { return; } UINT64 login = request->Login(); bool found = false; for (auto follower : m_followers) { if (login == follower) { found = true; break; } } // 未找到直接退出 if (!found) return; // 获取request cache并删除 request_cache cache; std::string cache_key = std::string("reqid_") + std::to_string(request->ID()); auto fut = m_redis_client->get(cache_key); m_redis_client->sync_commit(); auto reply = fut.get(); DebugOut(MTLogOK, L"request cache, key: %s", cache_key.c_str()); if (reply.ko()) return; if (reply.is_null()) return; memcpy(&cache, reply.as_string().c_str(), sizeof(request_cache)); std::vector keys = { cache_key }; m_redis_client->del(keys, [this](cpp_redis::reply& r) { if (r.ko()) { m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str()); } }); // 获取做单上下文 position_context context; char login_buf[128] = { 0 }; char orig_pos_buf[128] = { 0 }; sprintf(orig_pos_buf, "%lld", cache.orig_position); sprintf(login_buf, "%lld", cache.login); auto ctx_fut = m_redis_client->hget(orig_pos_buf, login_buf); m_redis_client->sync_commit(); auto ctx_reply = fut.get(); DebugOut(MTLogOK, L"request cache, key: %s", cache_key.c_str()); if (ctx_reply.ko()) return; if (ctx_reply.is_null()) return; memcpy(&context, ctx_reply.as_string().c_str(), sizeof(position_context)); // 修改跟单上下文 int direction = 1; if (order->Type() == IMTOrder::OP_SELL) direction = -1; context.volume += direction * order->VolumeCurrent(); context.position_id = position->Position(); DebugOut(MTLogOK, L"writeback context, key: %s, field: %s", orig_pos_buf, login_buf); // 写入 m_redis_client->hset(orig_pos_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r) { if (r.ko()) { m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str()); } }); m_redis_client->commit(); } MTAPIRES CPluginInstance::LoadParam() { MTAPIRES res = MT_RET_OK; IMTConParam* param = NULL; CMTStr128 tmp; if (!m_api || !m_config) return MT_RET_ERR_PARAMS; if ((res = m_api->PluginCurrent(m_config)) != MT_RET_OK) { m_api->LoggerOut(MTLogErr, L"failed to get current plugin configuration [%s (%u)]", SMTFormat::FormatError(res), res); return res; } if ((param = m_api->PluginParamCreate()) == NULL) { m_api->LoggerOut(MTLogErr, L"failed to create plugin parameter object"); return MT_RET_ERR_MEM; } ScopeGuard param_release([param]() { param->Release(); }); std::lock_guard lk(m_lock); // 首先一定要关闭debug mode,显示调用了debug mode才能生效 m_debug_mode = 0; //m_api->LoggerOut(MTLogOK, L"Load redis server params"); if ((res = m_config->ParameterGet(L"Redis Server", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) { return(MT_RET_ERR_PARAMS); } std::string redis_server = ws2s(param->ValueString()); if ((res = m_config->ParameterGet(L"Redis Port", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT) { return(MT_RET_ERR_PARAMS); } int redis_port = param->ValueInt(); //if ((res = m_config->ParameterGet(L"Redis User", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) //{ // return(MT_RET_ERR_PARAMS); //} //std::string redis_user = ws2s(param->ValueString()); if ((res = m_config->ParameterGet(L"Redis Password", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) { return(MT_RET_ERR_PARAMS); } std::string redis_password = ws2s(param->ValueString()); if (m_redis_server != redis_server || m_redis_port != redis_port //|| m_redis_user != redis_user || m_redis_password != redis_password) { m_redis_server = redis_server; m_redis_port = redis_port; //m_redis_user = redis_user; m_redis_password = redis_password; stop_redis(); if (start_redis()) { if (!m_work_thread.joinable()) { m_work_thread = std::thread(&CPluginInstance::keep_alive, this); } } else { m_api->LoggerOut(MTLogErr, L"failed to connect to redis server"); return MT_RET_ERR_CONNECTION; } } //m_api->LoggerOut(MTLogOK, L"Load trade params"); if ((res = m_config->ParameterGet(L"Trader", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT) { return(MT_RET_ERR_PARAMS); } m_trader = param->ValueInt(); //if ((res = m_config->ParameterGet(L"Step", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT) //{ // return(MT_RET_ERR_PARAMS); //} //m_step = param->ValueInt(); //if ((res = m_config->ParameterGet(L"Tolerance", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT) //{ // return(MT_RET_ERR_PARAMS); //} //m_tolerance = param->ValueInt(); if ((res = m_config->ParameterGet(L"Groups", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) { return(MT_RET_ERR_PARAMS); } wcsncpy(m_groups, param->ValueString(), 1024); //if ((res = m_config->ParameterGet(L"Logins", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) //{ // return(MT_RET_ERR_PARAMS); //} //m_logins = param->ValueString(); if ((res = m_config->ParameterGet(L"DebugMode", param)) == MT_RET_OK && param->Type() == IMTConParam::TYPE_INT) { m_debug_mode = param->ValueInt(); } res = LoadLogins(); if (res != MT_RET_OK) { m_api->LoggerOut(MTLogErr, L"failed to load clients [%d]", res); return res; } //m_api->LoggerOut(MTLogOK, L"Load param success"); m_enable = true; return MT_RET_OK; } MTAPIRES CPluginInstance::LoadLogins() { m_followers.clear(); IMTConGroup* group = m_api->GroupCreate(); ScopeGuard group_guard([&]() { group->Release(); }); if (group == nullptr) return MT_RET_ERR_MEM; UINT total = m_api->GroupTotal(); MTAPIRES ret = MT_RET_OK; std::vector groups; for (int i = 0; i < total; i++) { ret = m_api->GroupNext(i, group); if (ret != MT_RET_OK) break; if (CheckGroup(m_groups, group->Group()) == FALSE) continue; DebugOut(MTLogOK, L"group %s matched config", group->Group()); groups.push_back(group->Group()); } for (auto& group : groups) { UINT64* logins = nullptr; UINT total_users = 0; ret = m_api->UserLogins(group.c_str(), logins, total_users); DebugOut(MTLogOK, L"add group %s, total users: %d", group.c_str(), total_users); if (ret == MT_RET_OK && logins != nullptr) { for (int i = 0; i < total_users; ++i) { DebugOut(MTLogOK, L"add %d to list", logins[i]); m_followers.push_back(logins[i]); } } if (logins) { m_api->Free(logins); } } return ret; } bool CPluginInstance::start_redis() { try { m_redis_error_notified = false; if (m_redis_client) { if (m_redis_client->is_connected()) return true; } m_redis_client.reset(new cpp_redis::client); m_redis_client->connect ( m_redis_server, m_redis_port, [this](const std::string& host, std::size_t port, cpp_redis::connect_state status) { if (status == cpp_redis::connect_state::ok) { m_api->LoggerOut(MTLogOK, L"redis server connected"); } }, 500, 10000000, 1000 ); if (m_redis_password != "") { auto fut = m_redis_client->auth(m_redis_password); m_redis_client->sync_commit(); auto reply = fut.get(); if (reply.is_error()) { m_api->LoggerOut(MTLogErr, L"connect: authentication failed"); return false; } } } catch (tacopie::tacopie_error& e) { m_api->LoggerOut(MTLogErr, L"redis conn: %s", e.what()); return false; } catch (std::exception& e) { m_api->LoggerOut(MTLogErr, L"redis conn: %s", e.what()); return false; } catch (...) { m_api->LoggerOut(MTLogErr, L"failed to connect to server"); return false; } return true; } void CPluginInstance::stop_redis() { try { if (m_redis_client) { // 使用sync commit保证操作全部提交 m_redis_client->sync_commit(); m_redis_client->disconnect(); m_redis_client.reset(); } } catch (tacopie::tacopie_error& e) { m_api->LoggerOut(MTLogErr, L"stop redis: %s", e.what()); } catch (std::exception& e) { m_api->LoggerOut(MTLogErr, L"stop redis: %s", e.what()); } catch (...) { m_api->LoggerOut(MTLogErr, L"stop redis: unknown error"); } } void CPluginInstance::keep_alive() { using namespace std::chrono_literals; int counter = 500; while (m_enable) { if (counter-- <= 0) { counter = 500; std::lock_guard lk(m_lock); if (m_redis_client) { if (m_redis_client->is_connected()) { m_redis_client->ping([this](cpp_redis::reply& r) { if (r.ko()) { m_redis_error_notified = true; if (!m_redis_error_notified) { m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str()); } } }); } else { m_redis_error_notified = true; if (!m_redis_error_notified) { m_api->LoggerOut(MTLogErr, L"redis server not connected"); } stop_redis(); start_redis(); } } } std::this_thread::sleep_for(16ms); } }