PluginInstance.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768
  1. #include "pch.h"
  2. #include "PluginInstance.h"
  3. #define REDIS_TIMER_ID (1)
  4. CPluginInstance::CPluginInstance()
  5. : m_enable(false)
  6. , m_redis_error_notified(false)
  7. {
  8. }
  9. CPluginInstance::~CPluginInstance()
  10. {
  11. Stop();
  12. }
  13. void CPluginInstance::Release()
  14. {
  15. delete this;
  16. }
  17. MTAPIRES CPluginInstance::Start(IMTServerAPI * server)
  18. {
  19. MTAPIRES ret = MT_RET_OK;
  20. if (!server) return MT_RET_ERR_PARAMS;
  21. m_api = server;
  22. if ((m_config = m_api->PluginCreate()) == nullptr)
  23. return MT_RET_ERR_MEM;
  24. ret = m_api->About(m_info);
  25. if (ret != MT_RET_OK)
  26. m_api->LoggerOut(MTLogOK, L"Server info failed [%d]", ret);
  27. if ((ret = m_api->PluginSubscribe(this)) != MT_RET_OK)
  28. {
  29. m_api->LoggerOut(MTLogAtt, L"Plugin subscribe failed [%d]", ret);
  30. return ret;
  31. }
  32. if (ret = m_api->OrderSubscribe(this) != MT_RET_OK)
  33. {
  34. m_api->LoggerOut(MTLogAtt, L"Order subscribe failed [%d]", ret);
  35. return ret;
  36. }
  37. if (ret = m_api->DealSubscribe(this) != MT_RET_OK)
  38. {
  39. m_api->LoggerOut(MTLogAtt, L"Deal subscribe failed [%d]", ret);
  40. return ret;
  41. }
  42. if ((ret = LoadParam()) != MT_RET_OK)
  43. {
  44. m_api->LoggerOut(MTLogAtt, L"Load param failed [%d]", ret);
  45. return ret;
  46. }
  47. return MT_RET_OK;
  48. }
  49. MTAPIRES CPluginInstance::Stop()
  50. {
  51. MTAPIRES ret = MT_RET_OK;
  52. std::lock_guard<decltype(m_lock)> lk(m_lock);
  53. m_followers.clear();
  54. if (m_api == nullptr)
  55. return MT_RET_OK;
  56. if (m_config != nullptr)
  57. {
  58. m_config->Release();
  59. m_config = nullptr;
  60. }
  61. if ((ret = m_api->PluginUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND)
  62. m_api->LoggerOut(MTLogErr, L"failed to unsubscribe from plugin config updates [%s (%u)]",
  63. SMTFormat::FormatError(ret), ret);
  64. if ((ret = m_api->OrderUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND)
  65. m_api->LoggerOut(MTLogErr, L"failed to unsubscribe order [%s (%u)]",
  66. SMTFormat::FormatError(ret), ret);
  67. if ((ret = m_api->DealUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND)
  68. m_api->LoggerOut(MTLogErr, L"failed to unsubscribe deal [%s (%u)]",
  69. SMTFormat::FormatError(ret), ret);
  70. m_api = nullptr;
  71. m_enable = false;
  72. if (m_work_thread.joinable())
  73. m_work_thread.join();
  74. return MT_RET_OK;
  75. }
  76. void CPluginInstance::OnPluginUpdate(const IMTConPlugin * plugin)
  77. {
  78. int ret = MT_RET_OK;
  79. if ((ret = LoadParam()) != MT_RET_OK)
  80. {
  81. m_api->LoggerOut(MTLogAtt, L"Load param failed [%d]", ret);
  82. }
  83. }
  84. //void CPluginInstance::OnOrderAdd(const IMTOrder * order)
  85. //{
  86. // if (order == nullptr) return;
  87. //
  88. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  89. // if (!m_enable) return;
  90. // if (order->Login() != m_trader) return;
  91. //
  92. // m_api->LoggerOut(MTLogOK, L"OnOrderAdd, login: %lld, ord: %lld, pos: %lld, state: %lld, vol_init: %lld, vol_cur: %lld",
  93. // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent());
  94. //}
  95. //void CPluginInstance::OnOrderUpdate(const IMTOrder * order)
  96. //{
  97. // if (order == nullptr) return;
  98. //
  99. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  100. // if (!m_enable) return;
  101. // if (order->Login() != m_trader) return;
  102. //
  103. // m_api->LoggerOut(MTLogOK, L"OnOrderAdd, login: %lld, ord: %lld, pos: %lld, state: %lld, vol_init: %lld, vol_cur: %lld",
  104. // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent());
  105. //}
  106. void CPluginInstance::OnOrderDelete(const IMTOrder * order)
  107. {
  108. if (order == nullptr) return;
  109. std::lock_guard<decltype(m_lock)> lk(m_lock);
  110. if (!m_enable) return;
  111. if (order->Login() != m_trader) return;
  112. //m_api->LoggerOut(MTLogOK, L"OnOrderDelete, login: %lld, ord: %lld, pos: %lld, state: %d, vol_init: %lld, vol_cur: %lld",
  113. // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent());
  114. if (order->Type() != IMTOrder::OP_BUY
  115. && order->Type() != IMTOrder::OP_SELL)
  116. {
  117. // 不是buy或者sell,不跟,直接退出
  118. return;
  119. }
  120. char order_buf[128];
  121. sprintf(order_buf, "%lld", order->Order());
  122. IMTAccount* account = m_api->UserCreateAccount();
  123. IMTOrder* new_order = m_api->OrderCreate();
  124. ScopeGuard guard([account, new_order]
  125. {
  126. account->Release();
  127. new_order->Release();
  128. });
  129. // 订单在进入filled状态时,也会产生一个order delete回调
  130. if (order->Order() == order->PositionID())
  131. {
  132. // 如果是新建订单,那么写入新纪录
  133. char login_buf[128];
  134. for (auto login : m_followers)
  135. {
  136. if (m_api->UserAccountGet(login, account) != MT_RET_OK)
  137. {
  138. // TODO: 这里失败直接跳过没有做其他处理,下一次进来可能依然会遇到一样的问题
  139. continue;
  140. }
  141. sprintf(login_buf, "%lld", login);
  142. UINT level = (UINT)(account->Balance() / m_step);
  143. UINT volume = round((double)level * order->VolumeCurrent() / 10000) * 100;
  144. UINT volume_ext = round((double)level * order->VolumeCurrentExt() / 10000) * 100;
  145. UINT init_volume = round((double)level * order->VolumeInitial() / 10000) * 100;
  146. UINT init_volume_ext = round((double)level * order->VolumeInitialExt() / 10000) * 100;
  147. UINT64 new_order_id = 0;
  148. // 尚未完成建仓,目前position id不填,仅完成订单后记录
  149. new_order->Clear();
  150. new_order->VolumeInitial(init_volume);
  151. new_order->VolumeCurrent(volume);
  152. new_order->Login(login);
  153. new_order->Symbol(order->Symbol());
  154. new_order->Type(order->Type());
  155. new_order->Digits(order->Digits());
  156. new_order->DigitsCurrency(order->DigitsCurrency());
  157. //new_order->ContractSize(order->ContractSize());
  158. new_order->PriceOrder(order->PriceOrder());
  159. //new_order->PriceCurrent(order->PriceCurrent());
  160. // State不能填PARTIAL FILLED REJECTED EXPIRED
  161. new_order->StateSet(IMTOrder::ORDER_STATE_STARTED);
  162. new_order->TimeSetup(m_api->TimeCurrent());
  163. new_order->TimeSetupMsc(m_api->TimeCurrentMsc());
  164. // 一定不能填入time done
  165. //new_order->TimeDone(order->TimeDone());
  166. //new_order->TimeDoneMsc(order->TimeDoneMsc());
  167. // --
  168. //new_order->PriceSL(order->PriceSL());
  169. //new_order->PriceTP(order->PriceTP());
  170. //new_order->Comment(order->Comment());
  171. //new_order->ActivationFlags(order->ActivationFlags());
  172. //new_order->ActivationMode(order->ActivationMode());
  173. //new_order->ActivationPrice(order->ActivationPrice());
  174. //new_order->ActivationTime(order->ActivationTime());
  175. //new_order->PriceTrigger(order->PriceTrigger());
  176. //new_order->RateMargin(order->RateMargin()); //
  177. //new_order->ReasonSet(order->Reason()); //
  178. //new_order->TypeFill(order->TypeFill()); //
  179. //new_order->TypeTime(order->TypeTime()); //
  180. MTAPIRES ret = m_api->OrderAdd(new_order);
  181. if (ret != MT_RET_OK)
  182. {
  183. m_api->LoggerOut(MTLogErr, L"%lld failed to add order, original order #%lld [%d]", login, order->Login(), ret);
  184. }
  185. else
  186. {
  187. new_order_id = new_order->Order();
  188. }
  189. // TODO: 现在做法是,如果level为0,后面都不管
  190. // 如果跟单失败,那么position id也为0,所以后面也不应该处理
  191. position_context context;
  192. context.level = level;
  193. context.cur_ord = new_order_id;
  194. context.position_id = new_order_id; // 建仓,order id = position id
  195. int direction = 1;
  196. if (order->Type() == IMTOrder::OP_SELL)
  197. direction = -1;
  198. context.volume = direction * order->VolumeInitial();
  199. auto fut = m_redis_client->hset(order_buf, login_buf, std::string((char*)&context, sizeof(context)));
  200. m_redis_client->sync_commit();
  201. auto rep = fut.get();
  202. if (rep.ko())
  203. {
  204. // FIXME: 错误处理
  205. }
  206. }
  207. }
  208. else
  209. {
  210. // 如果不是新建订单,先使用position id先检查记录是否存在
  211. char login_buf[128];
  212. for (auto login : m_followers)
  213. {
  214. sprintf(login_buf, "%lld", login);
  215. auto fut = m_redis_client->hget(order_buf, login_buf);
  216. m_redis_client->sync_commit();
  217. auto reply = fut.get();
  218. // 如果不存在,忽略
  219. if (reply.ko()) continue;
  220. if (reply.is_null()) continue;
  221. // 获取context
  222. position_context context;
  223. memcpy(&context, reply.as_string().c_str(), sizeof(position_context));
  224. // 按建仓时的叙述,如果level或者position为0,亦忽略该记录
  225. if (context.level == 0) continue;
  226. if (context.position_id == 0) continue;
  227. // 如果存在,则继续操作
  228. UINT volume = round((double)context.level * order->VolumeCurrent() / 10000) * 100;
  229. UINT volume_ext = round((double)context.level * order->VolumeCurrentExt() / 10000) * 100;
  230. UINT init_volume = round((double)context.level * order->VolumeInitial() / 10000) * 100;
  231. UINT init_volume_ext = round((double)context.level * order->VolumeInitialExt() / 10000) * 100;
  232. UINT64 new_order_id = 0;
  233. new_order->Clear();
  234. new_order->VolumeInitial(init_volume);
  235. new_order->VolumeCurrent(volume);
  236. new_order->Login(login);
  237. new_order->Symbol(order->Symbol());
  238. new_order->Type(order->Type());
  239. new_order->Digits(order->Digits());
  240. new_order->DigitsCurrency(order->DigitsCurrency());
  241. //new_order->ContractSize(order->ContractSize());
  242. new_order->PriceOrder(order->PriceOrder());
  243. //new_order->PriceCurrent(order->PriceCurrent());
  244. // State不能填PARTIAL FILLED REJECTED EXPIRED
  245. new_order->StateSet(IMTOrder::ORDER_STATE_STARTED);
  246. new_order->TimeSetup(m_api->TimeCurrent());
  247. new_order->TimeSetupMsc(m_api->TimeCurrentMsc());
  248. new_order->PositionID(context.position_id);
  249. MTAPIRES ret = m_api->OrderAdd(new_order);
  250. if (ret != MT_RET_OK)
  251. {
  252. m_api->LoggerOut(MTLogErr, L"%lld failed to add order, original order #%lld [%d]", login, order->Login(), ret);
  253. // FIXME: 如果做单失败该怎么办
  254. continue;
  255. }
  256. else
  257. {
  258. new_order_id = new_order->Order();
  259. }
  260. // 完成之后,写入新纪录
  261. context.cur_ord = new_order_id;
  262. int direction = 1;
  263. if (order->Type() == IMTOrder::OP_SELL)
  264. direction = -1;
  265. context.volume += direction * order->VolumeInitial();
  266. auto wfut = m_redis_client->hset(order_buf, login_buf, std::string((char*)&context, sizeof(context)));
  267. m_redis_client->sync_commit();
  268. auto wrep = fut.get();
  269. if (wrep.ko())
  270. {
  271. // FIXME: 错误处理
  272. }
  273. }
  274. }
  275. }
  276. //void CPluginInstance::OnOrderClean(const UINT64 login)
  277. //{
  278. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  279. // if (!m_enable) return;
  280. // if (login != m_trader) return;
  281. //
  282. // m_api->LoggerOut(MTLogOK, L"OnOrderClean, Login: %d", login);
  283. //}
  284. //void CPluginInstance::OnDealAdd(const IMTDeal * deal)
  285. //{
  286. // if (deal == nullptr) return;
  287. //
  288. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  289. // if (!m_enable) return;
  290. // if (deal->Login() != m_trader) return;
  291. //
  292. // m_api->LoggerOut(MTLogOK, L"OnDealAdd, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  293. // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  294. //}
  295. //void CPluginInstance::OnDealUpdate(const IMTDeal * deal)
  296. //{
  297. // if (deal == nullptr) return;
  298. //
  299. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  300. // if (!m_enable) return;
  301. // if (deal->Login() != m_trader) return;
  302. //
  303. // m_api->LoggerOut(MTLogOK, L"OnDealUpdate, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  304. // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  305. //}
  306. //void CPluginInstance::OnDealDelete(const IMTDeal * deal)
  307. //{
  308. // if (deal == nullptr) return;
  309. //
  310. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  311. // if (!m_enable) return;
  312. // if (deal->Login() != m_trader) return;
  313. //
  314. // m_api->LoggerOut(MTLogOK, L"OnDealDelete, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  315. // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  316. //}
  317. //void CPluginInstance::OnDealClean(const UINT64 login)
  318. //{
  319. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  320. // if (!m_enable) return;
  321. // if (login != m_trader) return;
  322. //
  323. // m_api->LoggerOut(MTLogOK, L"OndealClean, Login: %d", login);
  324. //}
  325. void CPluginInstance::OnDealPerform(const IMTDeal * deal, IMTAccount * account, IMTPosition * position)
  326. {
  327. // position为nullptr时,说明该deal不是由交易本身触发
  328. // account是deal完成后的用户状况
  329. // position是交易完成后的持仓状况
  330. // 对于deal是关闭一个持仓的情况,该position的volume会是0
  331. if (position == nullptr
  332. || deal == nullptr
  333. || account == nullptr)
  334. return;
  335. std::lock_guard<decltype(m_lock)> lk(m_lock);
  336. if (!m_enable) return;
  337. if (deal->Login() != m_trader) return;
  338. //m_api->LoggerOut(MTLogOK, L"OnDealPerform, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  339. // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  340. // FIXME: 需要检查现在的deal是否和之前的order对应。如果没有记录到,则没法根本无法正常跟单以及平仓
  341. char order_buf[128];
  342. sprintf(order_buf, "%lld", deal->Order());
  343. IMTDeal* new_deal = m_api->DealCreate();
  344. std::vector<std::string>* fields = nullptr;
  345. ScopeGuard guard([new_deal, fields]
  346. {
  347. new_deal->Release();
  348. if (fields)
  349. {
  350. fields->clear();
  351. delete fields;
  352. }
  353. });
  354. char login_buf[128];
  355. for (auto login : m_followers)
  356. {
  357. sprintf(login_buf, "%lld", login);
  358. auto fut = m_redis_client->hget(order_buf, login_buf);
  359. m_redis_client->sync_commit();
  360. auto reply = fut.get();
  361. // 如果不存在,忽略
  362. if (reply.ko()) continue;
  363. if (reply.is_null()) continue;
  364. // 获取context
  365. position_context context;
  366. memcpy(&context, reply.as_string().c_str(), sizeof(position_context));
  367. // 按建仓时的叙述,如果level或者position为0,亦忽略该记录
  368. if (context.level == 0) continue;
  369. if (context.position_id == 0) continue;
  370. uint64_t volume = context.level * deal->Volume() / 100;
  371. uint64_t volume_ext = context.level * deal->VolumeExt() / 100;
  372. uint64_t volume_closed = context.level * deal->VolumeClosed() / 100;
  373. uint64_t volume_closed_ext = context.level * deal->VolumeClosed() / 100;
  374. double prop = (double)volume / deal->Volume();
  375. double profit = deal->Profit() * prop;
  376. double commission = deal->Commission() * prop;
  377. double storage = deal->Storage() * prop;
  378. double raw_profit = deal->ProfitRaw() * prop;
  379. new_deal->Clear();
  380. new_deal->DealSet(0);
  381. new_deal->Volume(volume);
  382. new_deal->VolumeExt(volume_ext);
  383. new_deal->VolumeClosed(volume_closed);
  384. new_deal->VolumeClosedExt();
  385. new_deal->ProfitRaw(raw_profit);
  386. new_deal->Profit(profit);
  387. new_deal->Commission(commission);
  388. new_deal->Storage(storage);
  389. new_deal->Order(context.cur_ord);
  390. new_deal->PositionID(context.position_id);
  391. MTAPIRES ret = m_api->DealAdd(new_deal);
  392. if (ret != MT_RET_OK)
  393. {
  394. // TODO: 有没有更多的错误处理?
  395. m_api->LoggerOut(MTLogErr, L"%lld cannot add deal [%d], original deal: #%lld", login, ret, deal->Deal());
  396. continue;
  397. }
  398. if (position->Volume() == 0)
  399. {
  400. if (fields == nullptr)
  401. fields = new(std::vector<std::string>);
  402. fields->push_back(login_buf);
  403. }
  404. }
  405. if (position->Volume() == 0)
  406. {
  407. // 如果平仓,则删除hash值
  408. if (position->Volume() == 0)
  409. {
  410. auto fut = m_redis_client->hdel(order_buf, *fields);
  411. m_redis_client->sync_commit();
  412. auto rep = fut.get();
  413. if (rep.ko())
  414. {
  415. // TODO 错误处理
  416. }
  417. }
  418. // TODO 当position中的volume为0时,持仓被彻底平调,被跟订单是否也该检查
  419. }
  420. }
  421. MTAPIRES CPluginInstance::LoadParam()
  422. {
  423. MTAPIRES res = MT_RET_OK;
  424. IMTConParam* param = NULL;
  425. CMTStr128 tmp;
  426. if (!m_api || !m_config) return MT_RET_ERR_PARAMS;
  427. if ((res = m_api->PluginCurrent(m_config)) != MT_RET_OK)
  428. {
  429. m_api->LoggerOut(MTLogErr, L"failed to get current plugin configuration [%s (%u)]",
  430. SMTFormat::FormatError(res), res);
  431. return res;
  432. }
  433. if ((param = m_api->PluginParamCreate()) == NULL)
  434. {
  435. m_api->LoggerOut(MTLogErr, L"failed to create plugin parameter object");
  436. return MT_RET_ERR_MEM;
  437. }
  438. ScopeGuard param_release([param]()
  439. {
  440. param->Release();
  441. });
  442. std::lock_guard<decltype(m_lock)> lk(m_lock);
  443. if ((res = m_config->ParameterGet(L"Redis Server", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  444. {
  445. return(MT_RET_ERR_PARAMS);
  446. }
  447. std::string redis_server = ws2s(param->ValueString());
  448. if ((res = m_config->ParameterGet(L"Redis Port", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  449. {
  450. return(MT_RET_ERR_PARAMS);
  451. }
  452. int redis_port = param->ValueInt();
  453. //if ((res = m_config->ParameterGet(L"Redis User", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  454. //{
  455. // return(MT_RET_ERR_PARAMS);
  456. //}
  457. //std::string redis_user = ws2s(param->ValueString());
  458. if ((res = m_config->ParameterGet(L"Redis Password", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  459. {
  460. return(MT_RET_ERR_PARAMS);
  461. }
  462. std::string redis_password = ws2s(param->ValueString());
  463. if (m_redis_server != redis_server
  464. || m_redis_port != redis_port
  465. //|| m_redis_user != redis_user
  466. || m_redis_password != redis_password)
  467. {
  468. m_redis_server = redis_server;
  469. m_redis_port = redis_port;
  470. //m_redis_user = redis_user;
  471. m_redis_password = redis_password;
  472. stop_redis();
  473. if (start_redis())
  474. {
  475. if (!m_work_thread.joinable())
  476. {
  477. m_work_thread = std::thread(&CPluginInstance::keep_alive, this);
  478. }
  479. }
  480. else
  481. {
  482. m_api->LoggerOut(MTLogErr, L"failed to connect to redis server");
  483. return MT_RET_ERR_CONNECTION;
  484. }
  485. }
  486. if ((res = m_config->ParameterGet(L"Trader", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  487. {
  488. return(MT_RET_ERR_PARAMS);
  489. }
  490. m_trader = param->ValueInt();
  491. if ((res = m_config->ParameterGet(L"Step", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  492. {
  493. return(MT_RET_ERR_PARAMS);
  494. }
  495. m_step = param->ValueInt();
  496. if ((res = m_config->ParameterGet(L"Tolerance", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  497. {
  498. return(MT_RET_ERR_PARAMS);
  499. }
  500. m_tolerance = param->ValueInt();
  501. if ((res = m_config->ParameterGet(L"Groups", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  502. {
  503. return(MT_RET_ERR_PARAMS);
  504. }
  505. m_groups = param->ValueString();
  506. //if ((res = m_config->ParameterGet(L"Logins", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  507. //{
  508. // return(MT_RET_ERR_PARAMS);
  509. //}
  510. //m_logins = param->ValueString();
  511. res = LoadLogins();
  512. if (res != MT_RET_OK)
  513. return res;
  514. m_enable = true;
  515. return MT_RET_OK;
  516. }
  517. MTAPIRES CPluginInstance::LoadLogins()
  518. {
  519. m_followers.clear();
  520. IMTConGroup* group = m_api->GroupCreate();
  521. ScopeGuard group_guard([&]()
  522. {
  523. group->Release();
  524. });
  525. if (group == nullptr) return MT_RET_ERR_MEM;
  526. UINT total = m_api->GroupTotal();
  527. MTAPIRES ret = MT_RET_OK;
  528. std::vector<std::wstring> groups;
  529. for (int i = 0; i < total; i++)
  530. {
  531. ret = m_api->GroupNext(i, group);
  532. if (ret != MT_RET_OK) break;
  533. groups.push_back(group->Group());
  534. }
  535. for (auto& group : groups)
  536. {
  537. UINT64* logins = nullptr;
  538. UINT total_users = 0;
  539. ret = m_api->UserLogins(group.c_str(), logins, total_users);
  540. if (ret = MT_RET_OK)
  541. {
  542. for (int i = 0; i < total_users; ++i)
  543. {
  544. m_followers.push_back(logins[i]);
  545. }
  546. }
  547. if (logins)
  548. {
  549. m_api->Free(logins);
  550. }
  551. }
  552. }
  553. bool CPluginInstance::start_redis()
  554. {
  555. try
  556. {
  557. m_redis_error_notified = false;
  558. if (m_redis_client)
  559. {
  560. if (m_redis_client->is_connected())
  561. return true;
  562. }
  563. m_redis_client.reset(new cpp_redis::client);
  564. m_redis_client->connect
  565. (
  566. m_redis_server,
  567. m_redis_port,
  568. [this](const std::string& host, std::size_t port, cpp_redis::connect_state status)
  569. {
  570. if (status == cpp_redis::connect_state::ok)
  571. {
  572. m_api->LoggerOut(MTLogErr, L"redis server connected");
  573. }
  574. },
  575. 500,
  576. 10000000,
  577. 1000
  578. );
  579. if (m_redis_password != "")
  580. {
  581. auto fut = m_redis_client->auth(m_redis_password);
  582. m_redis_client->sync_commit();
  583. auto reply = fut.get();
  584. if (reply.is_error())
  585. {
  586. m_api->LoggerOut(MTLogErr, L"connect: authentication failed");
  587. return false;
  588. }
  589. }
  590. }
  591. catch (tacopie::tacopie_error& e)
  592. {
  593. m_api->LoggerOut(MTLogErr, L"redis conn: %s", e.what());
  594. return false;
  595. }
  596. catch (std::exception& e)
  597. {
  598. m_api->LoggerOut(MTLogErr, L"redis conn: %s", e.what());
  599. return false;
  600. }
  601. catch (...)
  602. {
  603. m_api->LoggerOut(MTLogErr, L"failed to connect to server");
  604. return false;
  605. }
  606. return true;
  607. }
  608. void CPluginInstance::stop_redis()
  609. {
  610. try
  611. {
  612. if (m_redis_client)
  613. {
  614. m_redis_client->sync_commit();
  615. m_redis_client->disconnect();
  616. m_redis_client.reset();
  617. }
  618. }
  619. catch (tacopie::tacopie_error& e)
  620. {
  621. m_api->LoggerOut(MTLogErr, L"stop redis: %s", e.what());
  622. }
  623. catch (std::exception& e)
  624. {
  625. m_api->LoggerOut(MTLogErr, L"stop redis: %s", e.what());
  626. }
  627. catch (...)
  628. {
  629. m_api->LoggerOut(MTLogErr, L"stop redis: unknown error");
  630. }
  631. }
  632. void CPluginInstance::keep_alive()
  633. {
  634. using namespace std::chrono_literals;
  635. int counter = 500;
  636. while (m_enable)
  637. {
  638. if (counter-- <= 0)
  639. {
  640. counter = 500;
  641. std::lock_guard<decltype(m_lock)> lk(m_lock);
  642. if (m_redis_client)
  643. {
  644. if (m_redis_client->is_connected())
  645. {
  646. m_redis_client->ping([this](cpp_redis::reply& r)
  647. {
  648. if (r.ko())
  649. {
  650. m_redis_error_notified = true;
  651. if (!m_redis_error_notified)
  652. {
  653. m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  654. }
  655. }
  656. });
  657. }
  658. else
  659. {
  660. m_redis_error_notified = true;
  661. if (!m_redis_error_notified)
  662. {
  663. m_api->LoggerOut(MTLogErr, L"redis server not connected");
  664. }
  665. stop_redis();
  666. start_redis();
  667. }
  668. }
  669. }
  670. std::this_thread::sleep_for(16ms);
  671. }
  672. }