#include "pch.h" #include "PluginInstance.h" #define REDIS_TIMER_ID (1) CPluginInstance::CPluginInstance() : m_enable(false) , m_redis_error_notified(false) { } CPluginInstance::~CPluginInstance() { Stop(); } void CPluginInstance::Release() { delete this; } MTAPIRES CPluginInstance::Start(IMTServerAPI * server) { MTAPIRES ret = MT_RET_OK; if (!server) return MT_RET_ERR_PARAMS; m_api = server; if ((m_config = m_api->PluginCreate()) == nullptr) return MT_RET_ERR_MEM; ret = m_api->About(m_info); if (ret != MT_RET_OK) m_api->LoggerOut(MTLogOK, L"Server info failed [%d]", ret); if ((ret = m_api->PluginSubscribe(this)) != MT_RET_OK) { m_api->LoggerOut(MTLogAtt, L"Plugin subscribe failed [%d]", ret); return ret; } if (ret = m_api->OrderSubscribe(this) != MT_RET_OK) { m_api->LoggerOut(MTLogAtt, L"Order subscribe failed [%d]", ret); return ret; } if (ret = m_api->DealSubscribe(this) != MT_RET_OK) { m_api->LoggerOut(MTLogAtt, L"Deal subscribe failed [%d]", ret); return ret; } if ((ret = LoadParam()) != MT_RET_OK) { m_api->LoggerOut(MTLogAtt, L"Load param failed [%d]", ret); return ret; } return MT_RET_OK; } MTAPIRES CPluginInstance::Stop() { MTAPIRES ret = MT_RET_OK; std::lock_guard lk(m_lock); m_followers.clear(); if (m_api == nullptr) return MT_RET_OK; if (m_config != nullptr) { m_config->Release(); m_config = nullptr; } if ((ret = m_api->PluginUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND) m_api->LoggerOut(MTLogErr, L"failed to unsubscribe from plugin config updates [%s (%u)]", SMTFormat::FormatError(ret), ret); if ((ret = m_api->OrderUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND) m_api->LoggerOut(MTLogErr, L"failed to unsubscribe order [%s (%u)]", SMTFormat::FormatError(ret), ret); if ((ret = m_api->DealUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND) m_api->LoggerOut(MTLogErr, L"failed to unsubscribe deal [%s (%u)]", SMTFormat::FormatError(ret), ret); m_api = nullptr; m_enable = false; stop_redis(); if (m_work_thread.joinable()) m_work_thread.join(); return MT_RET_OK; } void CPluginInstance::OnPluginUpdate(const IMTConPlugin * plugin) { int ret = MT_RET_OK; if ((ret = LoadParam()) != MT_RET_OK) { m_api->LoggerOut(MTLogAtt, L"Load param failed [%d]", ret); } } //void CPluginInstance::OnOrderAdd(const IMTOrder * order) //{ // if (order == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (order->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnOrderAdd, login: %lld, ord: %lld, pos: %lld, state: %lld, vol_init: %lld, vol_cur: %lld", // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent()); //} //void CPluginInstance::OnOrderUpdate(const IMTOrder * order) //{ // if (order == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (order->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnOrderAdd, login: %lld, ord: %lld, pos: %lld, state: %lld, vol_init: %lld, vol_cur: %lld", // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent()); //} void CPluginInstance::OnOrderDelete(const IMTOrder * order) { if (order == nullptr) return; std::lock_guard lk(m_lock); if (!m_enable) return; if (order->Login() != m_trader) return; m_api->LoggerOut(MTLogOK, L"OnOrderDelete, login: %lld, ord: %lld, pos: %lld, state: %d, vol_init: %lld, vol_cur: %lld", order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent()); if (order->Type() != IMTOrder::OP_BUY && order->Type() != IMTOrder::OP_SELL) { // 不是buy或者sell,不跟,直接退出 return; } char order_buf[128]; sprintf(order_buf, "%lld", order->Order()); IMTAccount* account = m_api->UserCreateAccount(); IMTOrder* new_order = m_api->OrderCreate(); ScopeGuard guard([account, new_order] { account->Release(); new_order->Release(); }); // 订单在进入filled状态时,也会产生一个order delete回调 if (order->Order() == order->PositionID()) { // 如果是新建订单,那么写入新纪录 char login_buf[128]; for (auto login : m_followers) { if (m_api->UserAccountGet(login, account) != MT_RET_OK) { // TODO: 这里失败直接跳过没有做其他处理,下一次进来可能依然会遇到一样的问题 continue; } sprintf(login_buf, "%lld", login); UINT level = (UINT)(account->Balance() / m_step); UINT volume = round((double)level * order->VolumeCurrent() / 10000) * 100; UINT volume_ext = round((double)level * order->VolumeCurrentExt() / 10000) * 100; UINT init_volume = round((double)level * order->VolumeInitial() / 10000) * 100; UINT init_volume_ext = round((double)level * order->VolumeInitialExt() / 10000) * 100; UINT64 new_order_id = 0; // 尚未完成建仓,目前position id不填,仅完成订单后记录 new_order->Clear(); new_order->VolumeInitial(init_volume); new_order->VolumeCurrent(volume); new_order->Login(login); new_order->Symbol(order->Symbol()); new_order->Type(order->Type()); new_order->Digits(order->Digits()); new_order->DigitsCurrency(order->DigitsCurrency()); //new_order->ContractSize(order->ContractSize()); new_order->PriceOrder(order->PriceOrder()); //new_order->PriceCurrent(order->PriceCurrent()); // State不能填PARTIAL FILLED REJECTED EXPIRED new_order->StateSet(IMTOrder::ORDER_STATE_STARTED); new_order->TimeSetup(m_api->TimeCurrent()); new_order->TimeSetupMsc(m_api->TimeCurrentMsc()); // 一定不能填入time done //new_order->TimeDone(order->TimeDone()); //new_order->TimeDoneMsc(order->TimeDoneMsc()); // -- //new_order->PriceSL(order->PriceSL()); //new_order->PriceTP(order->PriceTP()); //new_order->Comment(order->Comment()); //new_order->ActivationFlags(order->ActivationFlags()); //new_order->ActivationMode(order->ActivationMode()); //new_order->ActivationPrice(order->ActivationPrice()); //new_order->ActivationTime(order->ActivationTime()); //new_order->PriceTrigger(order->PriceTrigger()); //new_order->RateMargin(order->RateMargin()); // //new_order->ReasonSet(order->Reason()); // //new_order->TypeFill(order->TypeFill()); // //new_order->TypeTime(order->TypeTime()); // MTAPIRES ret = m_api->OrderAdd(new_order); if (ret != MT_RET_OK) { m_api->LoggerOut(MTLogErr, L"%lld failed to add order, original order #%lld [%d]", login, order->Login(), ret); } else { new_order_id = new_order->Order(); } // TODO: 现在做法是,如果level为0,后面都不管 // 如果跟单失败,那么position id也为0,所以后面也不应该处理 position_context context; context.level = level; context.cur_ord = new_order_id; context.position_id = new_order_id; // 建仓,order id = position id int direction = 1; if (order->Type() == IMTOrder::OP_SELL) direction = -1; context.volume = direction * order->VolumeInitial(); auto fut = m_redis_client->hset(order_buf, login_buf, std::string((char*)&context, sizeof(context))); m_redis_client->sync_commit(); auto rep = fut.get(); if (rep.ko()) { // FIXME: 错误处理 } } } else { // 如果不是新建订单,先使用position id先检查记录是否存在 char login_buf[128]; for (auto login : m_followers) { sprintf(login_buf, "%lld", login); auto fut = m_redis_client->hget(order_buf, login_buf); m_redis_client->sync_commit(); auto reply = fut.get(); // 如果不存在,忽略 if (reply.ko()) continue; if (reply.is_null()) continue; // 获取context position_context context; memcpy(&context, reply.as_string().c_str(), sizeof(position_context)); // 按建仓时的叙述,如果level或者position为0,亦忽略该记录 if (context.level == 0) continue; if (context.position_id == 0) continue; // 如果存在,则继续操作 UINT volume = round((double)context.level * order->VolumeCurrent() / 10000) * 100; UINT volume_ext = round((double)context.level * order->VolumeCurrentExt() / 10000) * 100; UINT init_volume = round((double)context.level * order->VolumeInitial() / 10000) * 100; UINT init_volume_ext = round((double)context.level * order->VolumeInitialExt() / 10000) * 100; UINT64 new_order_id = 0; new_order->Clear(); new_order->VolumeInitial(init_volume); new_order->VolumeCurrent(volume); new_order->Login(login); new_order->Symbol(order->Symbol()); new_order->Type(order->Type()); new_order->Digits(order->Digits()); new_order->DigitsCurrency(order->DigitsCurrency()); //new_order->ContractSize(order->ContractSize()); new_order->PriceOrder(order->PriceOrder()); //new_order->PriceCurrent(order->PriceCurrent()); // State不能填PARTIAL FILLED REJECTED EXPIRED new_order->StateSet(IMTOrder::ORDER_STATE_STARTED); new_order->TimeSetup(m_api->TimeCurrent()); new_order->TimeSetupMsc(m_api->TimeCurrentMsc()); new_order->PositionID(context.position_id); MTAPIRES ret = m_api->OrderAdd(new_order); if (ret != MT_RET_OK) { m_api->LoggerOut(MTLogErr, L"%lld failed to add order, original order #%lld [%d]", login, order->Order(), ret); // FIXME: 如果做单失败该怎么办 continue; } new_order_id = new_order->Order(); m_api->LoggerOut(MTLogOK, L"%lld add order #%lld, original order #%lld [%d]", login, new_order_id, order->Order()); // 完成之后,写入新纪录 context.cur_ord = new_order_id; int direction = 1; if (order->Type() == IMTOrder::OP_SELL) direction = -1; context.volume += direction * order->VolumeInitial(); auto wfut = m_redis_client->hset(order_buf, login_buf, std::string((char*)&context, sizeof(context))); m_redis_client->sync_commit(); auto wrep = fut.get(); if (wrep.ko()) { // FIXME: 错误处理 } } } } //void CPluginInstance::OnOrderClean(const UINT64 login) //{ // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (login != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnOrderClean, Login: %d", login); //} //void CPluginInstance::OnDealAdd(const IMTDeal * deal) //{ // if (deal == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (deal->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnDealAdd, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld", // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed()); //} //void CPluginInstance::OnDealUpdate(const IMTDeal * deal) //{ // if (deal == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (deal->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnDealUpdate, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld", // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed()); //} //void CPluginInstance::OnDealDelete(const IMTDeal * deal) //{ // if (deal == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (deal->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnDealDelete, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld", // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed()); //} //void CPluginInstance::OnDealClean(const UINT64 login) //{ // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (login != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OndealClean, Login: %d", login); //} void CPluginInstance::OnDealPerform(const IMTDeal * deal, IMTAccount * account, IMTPosition * position) { // position为nullptr时,说明该deal不是由交易本身触发 // account是deal完成后的用户状况 // position是交易完成后的持仓状况 // 对于deal是关闭一个持仓的情况,该position的volume会是0 if (position == nullptr || deal == nullptr || account == nullptr) return; std::lock_guard lk(m_lock); if (!m_enable) return; if (deal->Login() != m_trader) return; //m_api->LoggerOut(MTLogOK, L"OnDealPerform, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld", // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed()); // FIXME: 需要检查现在的deal是否和之前的order对应。如果没有记录到,则没法根本无法正常跟单以及平仓 char order_buf[128]; sprintf(order_buf, "%lld", deal->Order()); IMTDeal* new_deal = m_api->DealCreate(); std::vector* fields = nullptr; ScopeGuard guard([new_deal, fields] { new_deal->Release(); if (fields) { fields->clear(); delete fields; } }); char login_buf[128]; for (auto login : m_followers) { sprintf(login_buf, "%lld", login); auto fut = m_redis_client->hget(order_buf, login_buf); m_redis_client->sync_commit(); auto reply = fut.get(); m_api->LoggerOut(MTLogOK, L"%lld add deal, original deal #%lld", login, deal->Deal()); // 如果不存在,忽略 if (reply.ko()) continue; if (reply.is_null()) continue; // 获取context position_context context; memcpy(&context, reply.as_string().c_str(), sizeof(position_context)); // 按建仓时的叙述,如果level或者position为0,亦忽略该记录 if (context.level == 0) continue; if (context.position_id == 0) continue; uint64_t volume = context.level * deal->Volume() / 100; uint64_t volume_ext = context.level * deal->VolumeExt() / 100; uint64_t volume_closed = context.level * deal->VolumeClosed() / 100; uint64_t volume_closed_ext = context.level * deal->VolumeClosed() / 100; double prop = (double)volume / deal->Volume(); double profit = deal->Profit() * prop; double commission = deal->Commission() * prop; double storage = deal->Storage() * prop; double raw_profit = deal->ProfitRaw() * prop; new_deal->Clear(); new_deal->DealSet(0); new_deal->Volume(volume); new_deal->VolumeExt(volume_ext); new_deal->VolumeClosed(volume_closed); new_deal->VolumeClosedExt(); new_deal->ProfitRaw(raw_profit); new_deal->Profit(profit); new_deal->Commission(commission); new_deal->Storage(storage); new_deal->Order(context.cur_ord); new_deal->PositionID(context.position_id); MTAPIRES ret = m_api->DealAdd(new_deal); if (ret != MT_RET_OK) { // TODO: 有没有更多的错误处理? m_api->LoggerOut(MTLogErr, L"%lld cannot add deal [%d], original deal: #%lld", login, ret, deal->Deal()); continue; } m_api->LoggerOut(MTLogOK, L"add deal #%lld, original deal #%lld [%d]", new_deal->Deal(), deal->Deal()); if (position->Volume() == 0) { if (fields == nullptr) fields = new(std::vector); fields->push_back(login_buf); } } if (position->Volume() == 0) { // 如果平仓,则删除hash值 if (position->Volume() == 0) { auto fut = m_redis_client->hdel(order_buf, *fields); m_redis_client->sync_commit(); auto rep = fut.get(); if (rep.ko()) { // TODO 错误处理 } } // TODO 当position中的volume为0时,持仓被彻底平调,被跟订单是否也该检查 } } MTAPIRES CPluginInstance::LoadParam() { MTAPIRES res = MT_RET_OK; IMTConParam* param = NULL; CMTStr128 tmp; if (!m_api || !m_config) return MT_RET_ERR_PARAMS; if ((res = m_api->PluginCurrent(m_config)) != MT_RET_OK) { m_api->LoggerOut(MTLogErr, L"failed to get current plugin configuration [%s (%u)]", SMTFormat::FormatError(res), res); return res; } if ((param = m_api->PluginParamCreate()) == NULL) { m_api->LoggerOut(MTLogErr, L"failed to create plugin parameter object"); return MT_RET_ERR_MEM; } ScopeGuard param_release([param]() { param->Release(); }); std::lock_guard lk(m_lock); //m_api->LoggerOut(MTLogOK, L"Load redis server params"); if ((res = m_config->ParameterGet(L"Redis Server", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) { return(MT_RET_ERR_PARAMS); } std::string redis_server = ws2s(param->ValueString()); if ((res = m_config->ParameterGet(L"Redis Port", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT) { return(MT_RET_ERR_PARAMS); } int redis_port = param->ValueInt(); //if ((res = m_config->ParameterGet(L"Redis User", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) //{ // return(MT_RET_ERR_PARAMS); //} //std::string redis_user = ws2s(param->ValueString()); if ((res = m_config->ParameterGet(L"Redis Password", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) { return(MT_RET_ERR_PARAMS); } std::string redis_password = ws2s(param->ValueString()); if (m_redis_server != redis_server || m_redis_port != redis_port //|| m_redis_user != redis_user || m_redis_password != redis_password) { m_redis_server = redis_server; m_redis_port = redis_port; //m_redis_user = redis_user; m_redis_password = redis_password; stop_redis(); if (start_redis()) { if (!m_work_thread.joinable()) { m_work_thread = std::thread(&CPluginInstance::keep_alive, this); } } else { m_api->LoggerOut(MTLogErr, L"failed to connect to redis server"); return MT_RET_ERR_CONNECTION; } } //m_api->LoggerOut(MTLogOK, L"Load trade params"); if ((res = m_config->ParameterGet(L"Trader", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT) { return(MT_RET_ERR_PARAMS); } m_trader = param->ValueInt(); if ((res = m_config->ParameterGet(L"Step", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT) { return(MT_RET_ERR_PARAMS); } m_step = param->ValueInt(); if ((res = m_config->ParameterGet(L"Tolerance", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT) { return(MT_RET_ERR_PARAMS); } m_tolerance = param->ValueInt(); if ((res = m_config->ParameterGet(L"Groups", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) { return(MT_RET_ERR_PARAMS); } wcsncpy(m_groups, param->ValueString(), 1024); //if ((res = m_config->ParameterGet(L"Logins", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) //{ // return(MT_RET_ERR_PARAMS); //} //m_logins = param->ValueString(); res = LoadLogins(); if (res != MT_RET_OK) { m_api->LoggerOut(MTLogErr, L"failed to load clients [%d]", res); return res; } //m_api->LoggerOut(MTLogOK, L"Load param success"); m_enable = true; return MT_RET_OK; } MTAPIRES CPluginInstance::LoadLogins() { m_followers.clear(); IMTConGroup* group = m_api->GroupCreate(); ScopeGuard group_guard([&]() { group->Release(); }); if (group == nullptr) return MT_RET_ERR_MEM; UINT total = m_api->GroupTotal(); MTAPIRES ret = MT_RET_OK; std::vector groups; for (int i = 0; i < total; i++) { ret = m_api->GroupNext(i, group); if (ret != MT_RET_OK) break; if (CheckGroup(m_groups, group->Group()) == FALSE) continue; m_api->LoggerOut(MTLogOK, L"group %s matched config", group->Group()); groups.push_back(group->Group()); } for (auto& group : groups) { UINT64* logins = nullptr; UINT total_users = 0; ret = m_api->UserLogins(group.c_str(), logins, total_users); m_api->LoggerOut(MTLogOK, L"add group %s, total users: %d", group.c_str(), total_users); if (ret == MT_RET_OK && logins != nullptr) { for (int i = 0; i < total_users; ++i) { m_api->LoggerOut(MTLogOK, L"add %d to list", logins[i]); m_followers.push_back(logins[i]); } } if (logins) { m_api->Free(logins); } } return ret; } bool CPluginInstance::start_redis() { try { m_redis_error_notified = false; if (m_redis_client) { if (m_redis_client->is_connected()) return true; } m_redis_client.reset(new cpp_redis::client); m_redis_client->connect ( m_redis_server, m_redis_port, [this](const std::string& host, std::size_t port, cpp_redis::connect_state status) { if (status == cpp_redis::connect_state::ok) { m_api->LoggerOut(MTLogOK, L"redis server connected"); } }, 500, 10000000, 1000 ); if (m_redis_password != "") { auto fut = m_redis_client->auth(m_redis_password); m_redis_client->sync_commit(); auto reply = fut.get(); if (reply.is_error()) { m_api->LoggerOut(MTLogErr, L"connect: authentication failed"); return false; } } } catch (tacopie::tacopie_error& e) { m_api->LoggerOut(MTLogErr, L"redis conn: %s", e.what()); return false; } catch (std::exception& e) { m_api->LoggerOut(MTLogErr, L"redis conn: %s", e.what()); return false; } catch (...) { m_api->LoggerOut(MTLogErr, L"failed to connect to server"); return false; } return true; } void CPluginInstance::stop_redis() { try { if (m_redis_client) { m_redis_client->sync_commit(); m_redis_client->disconnect(); m_redis_client.reset(); } } catch (tacopie::tacopie_error& e) { m_api->LoggerOut(MTLogErr, L"stop redis: %s", e.what()); } catch (std::exception& e) { m_api->LoggerOut(MTLogErr, L"stop redis: %s", e.what()); } catch (...) { m_api->LoggerOut(MTLogErr, L"stop redis: unknown error"); } } void CPluginInstance::keep_alive() { using namespace std::chrono_literals; int counter = 500; while (m_enable) { if (counter-- <= 0) { counter = 500; std::lock_guard lk(m_lock); if (m_redis_client) { if (m_redis_client->is_connected()) { m_redis_client->ping([this](cpp_redis::reply& r) { if (r.ko()) { m_redis_error_notified = true; if (!m_redis_error_notified) { m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str()); } } }); } else { m_redis_error_notified = true; if (!m_redis_error_notified) { m_api->LoggerOut(MTLogErr, L"redis server not connected"); } stop_redis(); start_redis(); } } } std::this_thread::sleep_for(16ms); } }