#include "pch.h" #include "PluginInstance.h" #define REDIS_TIMER_ID (1) CPluginInstance::CPluginInstance() : m_enable(false) , m_redis_error_notified(false) { } CPluginInstance::~CPluginInstance() { Stop(); } void CPluginInstance::Release() { delete this; } MTAPIRES CPluginInstance::Start(IMTServerAPI * server) { MTAPIRES ret = MT_RET_OK; if (!server) return MT_RET_ERR_PARAMS; m_api = server; if ((m_config = m_api->PluginCreate()) == nullptr) return MT_RET_ERR_MEM; ret = m_api->About(m_info); if (ret != MT_RET_OK) m_api->LoggerOut(MTLogOK, L"Server info failed [%d]", ret); if ((ret = m_api->PluginSubscribe(this)) != MT_RET_OK) { m_api->LoggerOut(MTLogAtt, L"Plugin subscribe failed [%d]", ret); return ret; } if (ret = m_api->OrderSubscribe(this) != MT_RET_OK) { m_api->LoggerOut(MTLogAtt, L"Order subscribe failed [%d]", ret); return ret; } if (ret = m_api->DealSubscribe(this) != MT_RET_OK) { m_api->LoggerOut(MTLogAtt, L"Deal subscribe failed [%d]", ret); return ret; } if ((ret = LoadParam()) != MT_RET_OK) { m_api->LoggerOut(MTLogAtt, L"Load param failed [%d]", ret); return ret; } return MT_RET_OK; } MTAPIRES CPluginInstance::Stop() { MTAPIRES ret = MT_RET_OK; std::lock_guard lk(m_lock); m_followers.clear(); if (m_api == nullptr) return MT_RET_OK; if (m_config != nullptr) { m_config->Release(); m_config = nullptr; } if ((ret = m_api->PluginUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND) m_api->LoggerOut(MTLogErr, L"failed to unsubscribe from plugin config updates [%s (%u)]", SMTFormat::FormatError(ret), ret); if ((ret = m_api->OrderUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND) m_api->LoggerOut(MTLogErr, L"failed to unsubscribe order [%s (%u)]", SMTFormat::FormatError(ret), ret); if ((ret = m_api->DealUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND) m_api->LoggerOut(MTLogErr, L"failed to unsubscribe deal [%s (%u)]", SMTFormat::FormatError(ret), ret); m_api = nullptr; m_enable = false; if (m_work_thread.joinable()) m_work_thread.join(); return MT_RET_OK; } void CPluginInstance::OnPluginUpdate(const IMTConPlugin * plugin) { int ret = MT_RET_OK; if ((ret = LoadParam()) != MT_RET_OK) { m_api->LoggerOut(MTLogAtt, L"Load param failed [%d]", ret); } } //void CPluginInstance::OnOrderAdd(const IMTOrder * order) //{ // if (order == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (order->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnOrderAdd, login: %lld, ord: %lld, pos: %lld, state: %lld, vol_init: %lld, vol_cur: %lld", // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent()); //} //void CPluginInstance::OnOrderUpdate(const IMTOrder * order) //{ // if (order == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (order->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnOrderAdd, login: %lld, ord: %lld, pos: %lld, state: %lld, vol_init: %lld, vol_cur: %lld", // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent()); //} void CPluginInstance::OnOrderDelete(const IMTOrder * order) { if (order == nullptr) return; std::lock_guard lk(m_lock); if (!m_enable) return; if (order->Login() != m_trader) return; //m_api->LoggerOut(MTLogOK, L"OnOrderDelete, login: %lld, ord: %lld, pos: %lld, state: %d, vol_init: %lld, vol_cur: %lld", // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent()); // 订单在进入filled状态时,也会产生一个order delete回调 for (auto login : m_followers) { if (order->Order() == order->PositionID()) { // 如果是新建订单,那么写入新纪录 } else { // 如果不是新建订单,先使用position id先检查记录是否存在 // 如果存在,则继续操作 // 如果不存在,忽略 } } } //void CPluginInstance::OnOrderClean(const UINT64 login) //{ // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (login != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnOrderClean, Login: %d", login); //} //void CPluginInstance::OnDealAdd(const IMTDeal * deal) //{ // if (deal == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (deal->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnDealAdd, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld", // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed()); //} //void CPluginInstance::OnDealUpdate(const IMTDeal * deal) //{ // if (deal == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (deal->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnDealUpdate, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld", // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed()); //} //void CPluginInstance::OnDealDelete(const IMTDeal * deal) //{ // if (deal == nullptr) return; // // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (deal->Login() != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OnDealDelete, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld", // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed()); //} //void CPluginInstance::OnDealClean(const UINT64 login) //{ // std::lock_guard lk(m_lock); // if (!m_enable) return; // if (login != m_trader) return; // // m_api->LoggerOut(MTLogOK, L"OndealClean, Login: %d", login); //} void CPluginInstance::OnDealPerform(const IMTDeal * deal, IMTAccount * account, IMTPosition * position) { // position为nullptr时,说明该deal不是由交易本身触发 // account是deal完成后的用户状况 // position是交易完成后的持仓状况 // 对于deal是关闭一个持仓的情况,该position的volume会是0 if (position == nullptr || deal == nullptr || account == nullptr) return; std::lock_guard lk(m_lock); if (!m_enable) return; if (deal->Login() != m_trader) return; //m_api->LoggerOut(MTLogOK, L"OnDealPerform, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld", // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed()); // FIXME: 需要检查现在的deal是否和之前的order对应。如果没有记录到,则没法根本无法正常跟单以及平仓 } MTAPIRES CPluginInstance::LoadParam() { MTAPIRES res = MT_RET_OK; IMTConParam* param = NULL; CMTStr128 tmp; if (!m_api || !m_config) return MT_RET_ERR_PARAMS; if ((res = m_api->PluginCurrent(m_config)) != MT_RET_OK) { m_api->LoggerOut(MTLogErr, L"failed to get current plugin configuration [%s (%u)]", SMTFormat::FormatError(res), res); return res; } if ((param = m_api->PluginParamCreate()) == NULL) { m_api->LoggerOut(MTLogErr, L"failed to create plugin parameter object"); return MT_RET_ERR_MEM; } ScopeGuard param_release([param]() { param->Release(); }); std::lock_guard lk(m_lock); if ((res = m_config->ParameterGet(L"Redis Server", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) { return(MT_RET_ERR_PARAMS); } std::string redis_server = ws2s(param->ValueString()); if ((res = m_config->ParameterGet(L"Redis Port", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT) { return(MT_RET_ERR_PARAMS); } int redis_port = param->ValueInt(); if ((res = m_config->ParameterGet(L"Redis User", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) { return(MT_RET_ERR_PARAMS); } std::string redis_user = ws2s(param->ValueString()); if ((res = m_config->ParameterGet(L"Redis Password", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) { return(MT_RET_ERR_PARAMS); } std::string redis_password = ws2s(param->ValueString()); if (m_redis_server != redis_server || m_redis_port != redis_port || m_redis_user != redis_user || m_redis_password != redis_password) { m_redis_server = redis_server; m_redis_port = redis_port; m_redis_user = redis_user; m_redis_password = redis_password; stop_redis(); if (start_redis()) { if (!m_work_thread.joinable()) { m_work_thread = std::thread(&CPluginInstance::keep_alive, this); } } else { m_api->LoggerOut(MTLogErr, L"failed to connect to redis server"); return MT_RET_ERR_CONNECTION; } } if ((res = m_config->ParameterGet(L"Trader", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT) { return(MT_RET_ERR_PARAMS); } m_trader = param->ValueInt(); if ((res = m_config->ParameterGet(L"Step", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT) { return(MT_RET_ERR_PARAMS); } m_step = param->ValueInt(); if ((res = m_config->ParameterGet(L"Tolerance", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT) { return(MT_RET_ERR_PARAMS); } m_tolerance = param->ValueInt(); if ((res = m_config->ParameterGet(L"Groups", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) { return(MT_RET_ERR_PARAMS); } m_groups = param->ValueString(); //if ((res = m_config->ParameterGet(L"Logins", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING) //{ // return(MT_RET_ERR_PARAMS); //} //m_logins = param->ValueString(); res = LoadLogins(); if (res != MT_RET_OK) return res; m_enable = true; return MT_RET_OK; } MTAPIRES CPluginInstance::LoadLogins() { m_followers.clear(); IMTConGroup* group = m_api->GroupCreate(); ScopeGuard group_guard([&]() { group->Release(); }); if (group == nullptr) return MT_RET_ERR_MEM; UINT total = m_api->GroupTotal(); MTAPIRES ret = MT_RET_OK; std::vector groups; for (int i = 0; i < total; i++) { ret = m_api->GroupNext(i, group); if (ret != MT_RET_OK) break; groups.push_back(group->Group()); } for (auto& group : groups) { UINT64* logins = nullptr; UINT total_users = 0; ret = m_api->UserLogins(group.c_str(), logins, total_users); if (ret = MT_RET_OK) { for (int i = 0; i < total_users; ++i) { m_followers.push_back(logins[i]); } } if (logins) { m_api->Free(logins); } } } bool CPluginInstance::start_redis() { try { m_redis_error_notified = false; if (m_redis_client) { if (m_redis_client->is_connected()) return true; } m_redis_client.reset(new cpp_redis::client); m_redis_client->connect ( m_redis_server, m_redis_port, [this](const std::string& host, std::size_t port, cpp_redis::connect_state status) { if (status == cpp_redis::connect_state::ok) { m_api->LoggerOut(MTLogErr, L"redis server connected"); } }, 500, 10000000, 1000 ); } catch (tacopie::tacopie_error& e) { m_api->LoggerOut(MTLogErr, L"redis conn: %s", e.what()); return false; } catch (std::exception& e) { m_api->LoggerOut(MTLogErr, L"redis conn: %s", e.what()); return false; } catch (...) { m_api->LoggerOut(MTLogErr, L"failed to connect to server"); return false; } return true; } void CPluginInstance::stop_redis() { try { if (m_redis_client) { m_redis_client->sync_commit(); m_redis_client->disconnect(); m_redis_client.reset(); } } catch (tacopie::tacopie_error& e) { m_api->LoggerOut(MTLogErr, L"stop redis: %s", e.what()); } catch (std::exception& e) { m_api->LoggerOut(MTLogErr, L"stop redis: %s", e.what()); } catch (...) { m_api->LoggerOut(MTLogErr, L"stop redis: unknown error"); } } void CPluginInstance::keep_alive() { using namespace std::chrono_literals; int counter = 500; while (m_enable) { if (counter-- <= 0) { counter = 500; std::lock_guard lk(m_lock); if (m_redis_client) { if (m_redis_client->is_connected()) { m_redis_client->ping([this](cpp_redis::reply& r) { if (r.ko()) { m_redis_error_notified = true; if (!m_redis_error_notified) { m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str()); } } }); } else { m_redis_error_notified = true; if (!m_redis_error_notified) { m_api->LoggerOut(MTLogErr, L"redis server not connected"); } stop_redis(); start_redis(); } } } std::this_thread::sleep_for(16ms); } }