PluginInstance.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855
  1. #include "pch.h"
  2. #include "PluginInstance.h"
  3. #define REDIS_TIMER_ID (1)
  4. CPluginInstance::CPluginInstance()
  5. : m_enable(false)
  6. , m_redis_error_notified(false)
  7. {
  8. }
  9. CPluginInstance::~CPluginInstance()
  10. {
  11. Stop();
  12. }
  13. void CPluginInstance::Release()
  14. {
  15. delete this;
  16. }
  17. MTAPIRES CPluginInstance::Start(IMTServerAPI * server)
  18. {
  19. MTAPIRES ret = MT_RET_OK;
  20. if (!server) return MT_RET_ERR_PARAMS;
  21. m_api = server;
  22. if ((m_config = m_api->PluginCreate()) == nullptr)
  23. return MT_RET_ERR_MEM;
  24. ret = m_api->About(m_info);
  25. if (ret != MT_RET_OK)
  26. m_api->LoggerOut(MTLogOK, L"Server info failed [%d]", ret);
  27. if ((ret = m_api->PluginSubscribe(this)) != MT_RET_OK)
  28. {
  29. m_api->LoggerOut(MTLogAtt, L"Plugin subscribe failed [%d]", ret);
  30. return ret;
  31. }
  32. if (ret = m_api->OrderSubscribe(this) != MT_RET_OK)
  33. {
  34. m_api->LoggerOut(MTLogAtt, L"Order subscribe failed [%d]", ret);
  35. return ret;
  36. }
  37. if (ret = m_api->DealSubscribe(this) != MT_RET_OK)
  38. {
  39. m_api->LoggerOut(MTLogAtt, L"Deal subscribe failed [%d]", ret);
  40. return ret;
  41. }
  42. if ((ret = LoadParam()) != MT_RET_OK)
  43. {
  44. m_api->LoggerOut(MTLogAtt, L"Load param failed [%d]", ret);
  45. return ret;
  46. }
  47. return MT_RET_OK;
  48. }
  49. MTAPIRES CPluginInstance::Stop()
  50. {
  51. MTAPIRES ret = MT_RET_OK;
  52. std::lock_guard<decltype(m_lock)> lk(m_lock);
  53. m_followers.clear();
  54. if (m_api == nullptr)
  55. return MT_RET_OK;
  56. if (m_config != nullptr)
  57. {
  58. m_config->Release();
  59. m_config = nullptr;
  60. }
  61. if ((ret = m_api->PluginUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND)
  62. m_api->LoggerOut(MTLogErr, L"failed to unsubscribe from plugin config updates [%s (%u)]",
  63. SMTFormat::FormatError(ret), ret);
  64. if ((ret = m_api->OrderUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND)
  65. m_api->LoggerOut(MTLogErr, L"failed to unsubscribe order [%s (%u)]",
  66. SMTFormat::FormatError(ret), ret);
  67. if ((ret = m_api->DealUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND)
  68. m_api->LoggerOut(MTLogErr, L"failed to unsubscribe deal [%s (%u)]",
  69. SMTFormat::FormatError(ret), ret);
  70. m_api = nullptr;
  71. m_enable = false;
  72. stop_redis();
  73. if (m_work_thread.joinable())
  74. m_work_thread.join();
  75. return MT_RET_OK;
  76. }
  77. void CPluginInstance::OnPluginUpdate(const IMTConPlugin * plugin)
  78. {
  79. int ret = MT_RET_OK;
  80. if ((ret = LoadParam()) != MT_RET_OK)
  81. {
  82. m_api->LoggerOut(MTLogAtt, L"Load param failed [%d]", ret);
  83. }
  84. }
  85. //void CPluginInstance::OnOrderAdd(const IMTOrder * order)
  86. //{
  87. // if (order == nullptr) return;
  88. //
  89. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  90. // if (!m_enable) return;
  91. // if (order->Login() != m_trader) return;
  92. //
  93. // m_api->LoggerOut(MTLogOK, L"OnOrderAdd, login: %lld, ord: %lld, pos: %lld, state: %lld, vol_init: %lld, vol_cur: %lld",
  94. // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent());
  95. //}
  96. //void CPluginInstance::OnOrderUpdate(const IMTOrder * order)
  97. //{
  98. // if (order == nullptr) return;
  99. //
  100. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  101. // if (!m_enable) return;
  102. // if (order->Login() != m_trader) return;
  103. //
  104. // m_api->LoggerOut(MTLogOK, L"OnOrderAdd, login: %lld, ord: %lld, pos: %lld, state: %lld, vol_init: %lld, vol_cur: %lld",
  105. // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent());
  106. //}
  107. void CPluginInstance::OnOrderDelete(const IMTOrder * order)
  108. {
  109. if (order == nullptr) return;
  110. std::lock_guard<decltype(m_lock)> lk(m_lock);
  111. if (!m_enable) return;
  112. if (order->Login() != m_trader) return;
  113. m_api->LoggerOut(MTLogOK, L"OnOrderDelete, login: %lld, ord: %lld, pos: %lld, state: %d, vol_init: %lld, vol_cur: %lld",
  114. order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent());
  115. if (order->Type() != IMTOrder::OP_BUY
  116. && order->Type() != IMTOrder::OP_SELL)
  117. {
  118. // 不是buy或者sell,不跟,直接退出
  119. return;
  120. }
  121. char order_buf[128];
  122. sprintf(order_buf, "%lld", order->PositionID());
  123. IMTAccount* account = m_api->UserCreateAccount();
  124. IMTOrder* new_order = m_api->OrderCreate();
  125. ScopeGuard guard([account, new_order, this]
  126. {
  127. if (account)
  128. account->Release();
  129. if (new_order)
  130. new_order->Release();
  131. m_redis_client->commit();
  132. });
  133. // 订单在进入filled状态时,也会产生一个order delete回调
  134. if (order->Order() == order->PositionID())
  135. {
  136. // 如果是新建订单,那么写入新纪录
  137. char login_buf[128];
  138. for (auto login : m_followers)
  139. {
  140. if (m_api->UserAccountGet(login, account) != MT_RET_OK)
  141. {
  142. // TODO: 这里失败直接跳过没有做其他处理,下一次进来可能依然会遇到一样的问题
  143. continue;
  144. }
  145. sprintf(login_buf, "%lld", login);
  146. UINT level = (UINT)(account->Balance() / m_step);
  147. UINT volume = round((double)level * order->VolumeCurrent() / 10000) * 100;
  148. UINT volume_ext = round((double)level * order->VolumeCurrentExt() / 10000) * 100;
  149. UINT init_volume = round((double)level * order->VolumeInitial() / 10000) * 100;
  150. UINT init_volume_ext = round((double)level * order->VolumeInitialExt() / 10000) * 100;
  151. UINT64 new_order_id = 0;
  152. m_api->LoggerOut(MTLogOK, L"add order, vol_init: %lld, vol_cur: %lld", init_volume, volume);
  153. // 尚未完成建仓,目前position id不填,仅完成订单后记录
  154. new_order->Clear();
  155. new_order->VolumeInitial(init_volume);
  156. new_order->VolumeCurrent(volume);
  157. new_order->Login(login);
  158. new_order->Symbol(order->Symbol());
  159. new_order->Type(order->Type());
  160. new_order->Digits(order->Digits());
  161. new_order->DigitsCurrency(order->DigitsCurrency());
  162. new_order->ContractSize(order->ContractSize());
  163. new_order->PriceOrder(order->PriceOrder());
  164. new_order->PriceCurrent(order->PriceCurrent());
  165. new_order->StateSet(order->State());
  166. new_order->TimeSetup(order->TimeSetup());
  167. new_order->TimeSetupMsc(order->TimeSetupMsc());
  168. new_order->TimeDone(order->TimeDone());
  169. new_order->TimeDoneMsc(order->TimeDoneMsc());
  170. // --
  171. new_order->PriceSL(order->PriceSL());
  172. new_order->PriceTP(order->PriceTP());
  173. new_order->Comment(order->Comment());
  174. new_order->ActivationFlags(order->ActivationFlags());
  175. new_order->ActivationMode(order->ActivationMode());
  176. new_order->ActivationPrice(order->ActivationPrice());
  177. new_order->ActivationTime(order->ActivationTime());
  178. new_order->PriceTrigger(order->PriceTrigger());
  179. new_order->RateMargin(order->RateMargin()); //
  180. new_order->ReasonSet(order->Reason()); //
  181. new_order->TypeFill(order->TypeFill()); //
  182. new_order->TypeTime(order->TypeTime()); //
  183. MTAPIRES ret = m_api->HistoryAdd(new_order);
  184. if (ret != MT_RET_OK)
  185. {
  186. m_api->LoggerOut(MTLogErr, L"%lld failed to add order, original order #%lld [%d]", login, order->Login(), ret);
  187. }
  188. else
  189. {
  190. new_order_id = new_order->Order();
  191. }
  192. m_api->LoggerOut(MTLogOK, L"%lld add order #%lld, original order #%lld [%d]", login, new_order_id, order->Order());
  193. // TODO: 现在做法是,如果level为0,后面都不管
  194. // 如果跟单失败,那么position id也为0,所以后面也不应该处理
  195. position_context context;
  196. context.level = level;
  197. context.cur_ord = new_order_id;
  198. context.position_id = new_order_id; // 建仓,order id = position id
  199. int direction = 1;
  200. if (order->Type() == IMTOrder::OP_SELL)
  201. direction = -1;
  202. context.volume = direction * order->VolumeInitial();
  203. m_redis_client->hset(order_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r)
  204. {
  205. if (r.ko())
  206. {
  207. m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  208. }
  209. });
  210. // 可以在退出前提交
  211. // m_redis_client->commit();
  212. }
  213. }
  214. else
  215. {
  216. // 如果不是新建订单,先使用position id先检查记录是否存在
  217. char login_buf[128];
  218. for (auto login : m_followers)
  219. {
  220. sprintf(login_buf, "%lld", login);
  221. auto fut = m_redis_client->hget(order_buf, login_buf);
  222. m_redis_client->sync_commit();
  223. auto reply = fut.get();
  224. m_api->LoggerOut(MTLogOK, L"request cache, key %s, field %s", s2ws(order_buf).c_str(), s2ws(login_buf).c_str());
  225. // 如果不存在,忽略
  226. if (reply.ko()) continue;
  227. if (reply.is_null()) continue;
  228. m_api->LoggerOut(MTLogOK, L"get context, login: %lld", login);
  229. // 获取context
  230. position_context context;
  231. memcpy(&context, reply.as_string().c_str(), sizeof(position_context));
  232. // 按建仓时的叙述,如果level或者position为0,亦忽略该记录
  233. if (context.level == 0) continue;
  234. if (context.position_id == 0) continue;
  235. m_api->LoggerOut(MTLogOK, L"get context, order: #%lld, position: #%lld", context.cur_ord, context.position_id);
  236. // 如果存在,则继续操作
  237. UINT volume = round((double)context.level * order->VolumeCurrent() / 10000) * 100;
  238. UINT volume_ext = round((double)context.level * order->VolumeCurrentExt() / 10000) * 100;
  239. UINT init_volume = round((double)context.level * order->VolumeInitial() / 10000) * 100;
  240. UINT init_volume_ext = round((double)context.level * order->VolumeInitialExt() / 10000) * 100;
  241. UINT64 new_order_id = 0;
  242. new_order->Clear();
  243. new_order->VolumeInitial(init_volume);
  244. new_order->VolumeCurrent(volume); // 这里是状态START,如果是状态4的FILLED时,volume current为0
  245. //new_order->VolumeCurrent(init_volume);
  246. new_order->Login(login);
  247. new_order->Symbol(order->Symbol());
  248. new_order->Type(order->Type());
  249. new_order->Digits(order->Digits());
  250. new_order->DigitsCurrency(order->DigitsCurrency());
  251. new_order->ContractSize(order->ContractSize());
  252. new_order->PriceOrder(order->PriceOrder());
  253. new_order->PriceCurrent(order->PriceCurrent());
  254. new_order->StateSet(order->State());
  255. new_order->TimeSetup(order->TimeSetup());
  256. new_order->TimeSetupMsc(order->TimeSetupMsc());
  257. new_order->TimeDone(order->TimeDone());
  258. new_order->TimeDoneMsc(order->TimeDoneMsc());
  259. new_order->PositionID(context.position_id);
  260. new_order->PriceSL(order->PriceSL());
  261. new_order->PriceTP(order->PriceTP());
  262. new_order->Comment(order->Comment());
  263. new_order->ActivationFlags(order->ActivationFlags());
  264. new_order->ActivationMode(order->ActivationMode());
  265. new_order->ActivationPrice(order->ActivationPrice());
  266. new_order->ActivationTime(order->ActivationTime());
  267. new_order->PriceTrigger(order->PriceTrigger());
  268. new_order->RateMargin(order->RateMargin()); //
  269. new_order->ReasonSet(order->Reason()); //
  270. new_order->TypeFill(order->TypeFill()); //
  271. new_order->TypeTime(order->TypeTime()); //
  272. MTAPIRES ret = m_api->HistoryAdd(new_order);
  273. if (ret != MT_RET_OK)
  274. {
  275. m_api->LoggerOut(MTLogErr, L"%lld failed to add order, original order #%lld [%d]", login, order->Order(), ret);
  276. // FIXME: 如果做单失败该怎么办
  277. continue;
  278. }
  279. new_order_id = new_order->Order();
  280. m_api->LoggerOut(MTLogOK, L"%lld add order #%lld, original order #%lld", login, new_order_id, order->Order());
  281. // 完成之后,写入新纪录
  282. context.cur_ord = new_order_id;
  283. int direction = 1;
  284. if (order->Type() == IMTOrder::OP_SELL)
  285. direction = -1;
  286. context.volume += direction * order->VolumeInitial();
  287. m_api->LoggerOut(MTLogOK, L"write cache, key %s, field %s", s2ws(order_buf).c_str(), s2ws(login_buf).c_str());
  288. m_redis_client->hset(order_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r)
  289. {
  290. if (r.ko())
  291. {
  292. m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  293. }
  294. });
  295. // 下一次get会调用commit
  296. }
  297. }
  298. }
  299. //void CPluginInstance::OnOrderClean(const UINT64 login)
  300. //{
  301. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  302. // if (!m_enable) return;
  303. // if (login != m_trader) return;
  304. //
  305. // m_api->LoggerOut(MTLogOK, L"OnOrderClean, Login: %d", login);
  306. //}
  307. //void CPluginInstance::OnDealAdd(const IMTDeal * deal)
  308. //{
  309. // if (deal == nullptr) return;
  310. //
  311. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  312. // if (!m_enable) return;
  313. // if (deal->Login() != m_trader) return;
  314. //
  315. // m_api->LoggerOut(MTLogOK, L"OnDealAdd, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  316. // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  317. //}
  318. //void CPluginInstance::OnDealUpdate(const IMTDeal * deal)
  319. //{
  320. // if (deal == nullptr) return;
  321. //
  322. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  323. // if (!m_enable) return;
  324. // if (deal->Login() != m_trader) return;
  325. //
  326. // m_api->LoggerOut(MTLogOK, L"OnDealUpdate, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  327. // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  328. //}
  329. //void CPluginInstance::OnDealDelete(const IMTDeal * deal)
  330. //{
  331. // if (deal == nullptr) return;
  332. //
  333. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  334. // if (!m_enable) return;
  335. // if (deal->Login() != m_trader) return;
  336. //
  337. // m_api->LoggerOut(MTLogOK, L"OnDealDelete, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  338. // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  339. //}
  340. //void CPluginInstance::OnDealClean(const UINT64 login)
  341. //{
  342. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  343. // if (!m_enable) return;
  344. // if (login != m_trader) return;
  345. //
  346. // m_api->LoggerOut(MTLogOK, L"OndealClean, Login: %d", login);
  347. //}
  348. void CPluginInstance::OnDealPerform(const IMTDeal * deal, IMTAccount * account, IMTPosition * position)
  349. {
  350. // position为nullptr时,说明该deal不是由交易本身触发
  351. // account是deal完成后的用户状况
  352. // position是交易完成后的持仓状况
  353. // 对于deal是关闭一个持仓的情况,该position的volume会是0
  354. if (position == nullptr
  355. || deal == nullptr
  356. || account == nullptr)
  357. return;
  358. std::lock_guard<decltype(m_lock)> lk(m_lock);
  359. if (!m_enable) return;
  360. if (deal->Login() != m_trader) return;
  361. //m_api->LoggerOut(MTLogOK, L"OnDealPerform, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  362. // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  363. // FIXME: 需要检查现在的deal是否和之前的order对应。如果没有记录到,则没法根本无法正常跟单以及平仓
  364. char order_buf[128];
  365. sprintf(order_buf, "%lld", deal->PositionID());
  366. IMTDeal* new_deal = m_api->DealCreate();
  367. std::vector<std::string>* fields = nullptr;
  368. ScopeGuard guard([new_deal, &fields, this]
  369. {
  370. new_deal->Release();
  371. if (fields)
  372. {
  373. fields->clear();
  374. delete fields;
  375. fields = nullptr;
  376. }
  377. // 退出前调用commit
  378. m_redis_client->commit();
  379. });
  380. char login_buf[128];
  381. for (auto login : m_followers)
  382. {
  383. ScopeGuard g([position, &fields, this, login, login_buf]()
  384. {
  385. if (position->Volume() == 0)
  386. {
  387. if (fields == nullptr)
  388. fields = new(std::vector<std::string>);
  389. m_api->LoggerOut(MTLogOK, L"add field %lld to be delete", login);
  390. fields->push_back(login_buf);
  391. }
  392. });
  393. sprintf(login_buf, "%lld", login);
  394. auto fut = m_redis_client->hget(order_buf, login_buf);
  395. m_redis_client->sync_commit();
  396. auto reply = fut.get();
  397. m_api->LoggerOut(MTLogOK, L"%lld add deal, original deal #%lld", login, deal->Deal());
  398. // 如果不存在,忽略
  399. if (reply.ko()) continue;
  400. if (reply.is_null()) continue;
  401. // 获取context
  402. position_context context;
  403. memcpy(&context, reply.as_string().c_str(), sizeof(position_context));
  404. // 按建仓时的叙述,如果level或者position为0,亦忽略该记录
  405. if (context.level == 0) continue;
  406. if (context.position_id == 0) continue;
  407. uint64_t volume = context.level * deal->Volume() / 100;
  408. uint64_t volume_ext = context.level * deal->VolumeExt() / 100;
  409. uint64_t volume_closed = context.level * deal->VolumeClosed() / 100;
  410. uint64_t volume_closed_ext = context.level * deal->VolumeClosed() / 100;
  411. double prop = (double)volume / deal->Volume();
  412. double profit = deal->Profit() * prop;
  413. double commission = deal->Commission() * prop;
  414. double storage = deal->Storage() * prop;
  415. double raw_profit = deal->ProfitRaw() * prop;
  416. m_api->LoggerOut(MTLogOK, L"add new deal, volume %lld, volume closed %lld, order #%lld, position #%lld", volume, volume_closed, context.cur_ord, context.position_id);
  417. new_deal->Clear();
  418. new_deal->Assign(deal);
  419. new_deal->Login(login);
  420. new_deal->DealSet(0);
  421. new_deal->Volume(volume);
  422. new_deal->VolumeExt(volume_ext);
  423. new_deal->VolumeClosed(volume_closed);
  424. new_deal->VolumeClosedExt(volume_closed_ext);
  425. new_deal->ProfitRaw(raw_profit);
  426. new_deal->Profit(profit);
  427. new_deal->Commission(commission);
  428. new_deal->Storage(storage);
  429. new_deal->Order(context.cur_ord);
  430. new_deal->PositionID(context.position_id);
  431. MTAPIRES ret = m_api->DealAdd(new_deal);
  432. if (ret != MT_RET_OK)
  433. {
  434. // TODO: 有没有更多的错误处理?
  435. m_api->LoggerOut(MTLogErr, L"%lld cannot add deal [%d] to order #%lld, original deal: #%lld", login, ret, context.cur_ord, deal->Deal());
  436. continue;
  437. }
  438. m_api->LoggerOut(MTLogOK, L"add deal #%lld, original deal #%lld", new_deal->Deal(), deal->Deal());
  439. m_redis_client->publish("mt5_balance_fix", login_buf, [this](cpp_redis::reply r)
  440. {
  441. if (r.ko())
  442. {
  443. m_api->LoggerOut(MTLogErr, L"redis publish: %s", r.error().c_str());
  444. }
  445. });
  446. // commit 在退出前完成
  447. }
  448. m_api->LoggerOut(MTLogOK, L"deal #%lld, position #%lld, volume %lld", deal->Deal(), position->Position(), position->Volume());
  449. if (position->Volume() == 0)
  450. {
  451. // 如果平仓,则删除hash值
  452. if (position->Volume() == 0 && fields != nullptr)
  453. {
  454. m_redis_client->hdel(order_buf, *fields, [this](cpp_redis::reply& r)
  455. {
  456. if (r.ko())
  457. {
  458. // TODO 错误处理
  459. try
  460. {
  461. m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  462. }
  463. catch (...)
  464. {
  465. }
  466. }
  467. });
  468. }
  469. // TODO 当position中的volume为0时,持仓被彻底平调,被跟订单是否也该检查
  470. }
  471. }
  472. MTAPIRES CPluginInstance::LoadParam()
  473. {
  474. MTAPIRES res = MT_RET_OK;
  475. IMTConParam* param = NULL;
  476. CMTStr128 tmp;
  477. if (!m_api || !m_config) return MT_RET_ERR_PARAMS;
  478. if ((res = m_api->PluginCurrent(m_config)) != MT_RET_OK)
  479. {
  480. m_api->LoggerOut(MTLogErr, L"failed to get current plugin configuration [%s (%u)]",
  481. SMTFormat::FormatError(res), res);
  482. return res;
  483. }
  484. if ((param = m_api->PluginParamCreate()) == NULL)
  485. {
  486. m_api->LoggerOut(MTLogErr, L"failed to create plugin parameter object");
  487. return MT_RET_ERR_MEM;
  488. }
  489. ScopeGuard param_release([param]()
  490. {
  491. param->Release();
  492. });
  493. std::lock_guard<decltype(m_lock)> lk(m_lock);
  494. //m_api->LoggerOut(MTLogOK, L"Load redis server params");
  495. if ((res = m_config->ParameterGet(L"Redis Server", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  496. {
  497. return(MT_RET_ERR_PARAMS);
  498. }
  499. std::string redis_server = ws2s(param->ValueString());
  500. if ((res = m_config->ParameterGet(L"Redis Port", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  501. {
  502. return(MT_RET_ERR_PARAMS);
  503. }
  504. int redis_port = param->ValueInt();
  505. //if ((res = m_config->ParameterGet(L"Redis User", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  506. //{
  507. // return(MT_RET_ERR_PARAMS);
  508. //}
  509. //std::string redis_user = ws2s(param->ValueString());
  510. if ((res = m_config->ParameterGet(L"Redis Password", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  511. {
  512. return(MT_RET_ERR_PARAMS);
  513. }
  514. std::string redis_password = ws2s(param->ValueString());
  515. if (m_redis_server != redis_server
  516. || m_redis_port != redis_port
  517. //|| m_redis_user != redis_user
  518. || m_redis_password != redis_password)
  519. {
  520. m_redis_server = redis_server;
  521. m_redis_port = redis_port;
  522. //m_redis_user = redis_user;
  523. m_redis_password = redis_password;
  524. stop_redis();
  525. if (start_redis())
  526. {
  527. if (!m_work_thread.joinable())
  528. {
  529. m_work_thread = std::thread(&CPluginInstance::keep_alive, this);
  530. }
  531. }
  532. else
  533. {
  534. m_api->LoggerOut(MTLogErr, L"failed to connect to redis server");
  535. return MT_RET_ERR_CONNECTION;
  536. }
  537. }
  538. //m_api->LoggerOut(MTLogOK, L"Load trade params");
  539. if ((res = m_config->ParameterGet(L"Trader", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  540. {
  541. return(MT_RET_ERR_PARAMS);
  542. }
  543. m_trader = param->ValueInt();
  544. if ((res = m_config->ParameterGet(L"Step", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  545. {
  546. return(MT_RET_ERR_PARAMS);
  547. }
  548. m_step = param->ValueInt();
  549. if ((res = m_config->ParameterGet(L"Tolerance", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  550. {
  551. return(MT_RET_ERR_PARAMS);
  552. }
  553. m_tolerance = param->ValueInt();
  554. if ((res = m_config->ParameterGet(L"Groups", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  555. {
  556. return(MT_RET_ERR_PARAMS);
  557. }
  558. wcsncpy(m_groups, param->ValueString(), 1024);
  559. //if ((res = m_config->ParameterGet(L"Logins", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  560. //{
  561. // return(MT_RET_ERR_PARAMS);
  562. //}
  563. //m_logins = param->ValueString();
  564. res = LoadLogins();
  565. if (res != MT_RET_OK)
  566. {
  567. m_api->LoggerOut(MTLogErr, L"failed to load clients [%d]", res);
  568. return res;
  569. }
  570. //m_api->LoggerOut(MTLogOK, L"Load param success");
  571. m_enable = true;
  572. return MT_RET_OK;
  573. }
  574. MTAPIRES CPluginInstance::LoadLogins()
  575. {
  576. m_followers.clear();
  577. IMTConGroup* group = m_api->GroupCreate();
  578. ScopeGuard group_guard([&]()
  579. {
  580. group->Release();
  581. });
  582. if (group == nullptr) return MT_RET_ERR_MEM;
  583. UINT total = m_api->GroupTotal();
  584. MTAPIRES ret = MT_RET_OK;
  585. std::vector<std::wstring> groups;
  586. for (int i = 0; i < total; i++)
  587. {
  588. ret = m_api->GroupNext(i, group);
  589. if (ret != MT_RET_OK) break;
  590. if (CheckGroup(m_groups, group->Group()) == FALSE) continue;
  591. m_api->LoggerOut(MTLogOK, L"group %s matched config", group->Group());
  592. groups.push_back(group->Group());
  593. }
  594. for (auto& group : groups)
  595. {
  596. UINT64* logins = nullptr;
  597. UINT total_users = 0;
  598. ret = m_api->UserLogins(group.c_str(), logins, total_users);
  599. m_api->LoggerOut(MTLogOK, L"add group %s, total users: %d", group.c_str(), total_users);
  600. if (ret == MT_RET_OK && logins != nullptr)
  601. {
  602. for (int i = 0; i < total_users; ++i)
  603. {
  604. m_api->LoggerOut(MTLogOK, L"add %d to list", logins[i]);
  605. m_followers.push_back(logins[i]);
  606. }
  607. }
  608. if (logins)
  609. {
  610. m_api->Free(logins);
  611. }
  612. }
  613. return ret;
  614. }
  615. bool CPluginInstance::start_redis()
  616. {
  617. try
  618. {
  619. m_redis_error_notified = false;
  620. if (m_redis_client)
  621. {
  622. if (m_redis_client->is_connected())
  623. return true;
  624. }
  625. m_redis_client.reset(new cpp_redis::client);
  626. m_redis_client->connect
  627. (
  628. m_redis_server,
  629. m_redis_port,
  630. [this](const std::string& host, std::size_t port, cpp_redis::connect_state status)
  631. {
  632. if (status == cpp_redis::connect_state::ok)
  633. {
  634. m_api->LoggerOut(MTLogOK, L"redis server connected");
  635. }
  636. },
  637. 500,
  638. 10000000,
  639. 1000
  640. );
  641. if (m_redis_password != "")
  642. {
  643. auto fut = m_redis_client->auth(m_redis_password);
  644. m_redis_client->sync_commit();
  645. auto reply = fut.get();
  646. if (reply.is_error())
  647. {
  648. m_api->LoggerOut(MTLogErr, L"connect: authentication failed");
  649. return false;
  650. }
  651. }
  652. }
  653. catch (tacopie::tacopie_error& e)
  654. {
  655. m_api->LoggerOut(MTLogErr, L"redis conn: %s", e.what());
  656. return false;
  657. }
  658. catch (std::exception& e)
  659. {
  660. m_api->LoggerOut(MTLogErr, L"redis conn: %s", e.what());
  661. return false;
  662. }
  663. catch (...)
  664. {
  665. m_api->LoggerOut(MTLogErr, L"failed to connect to server");
  666. return false;
  667. }
  668. return true;
  669. }
  670. void CPluginInstance::stop_redis()
  671. {
  672. try
  673. {
  674. if (m_redis_client)
  675. {
  676. // 使用sync commit保证操作全部提交
  677. m_redis_client->sync_commit();
  678. m_redis_client->disconnect();
  679. m_redis_client.reset();
  680. }
  681. }
  682. catch (tacopie::tacopie_error& e)
  683. {
  684. m_api->LoggerOut(MTLogErr, L"stop redis: %s", e.what());
  685. }
  686. catch (std::exception& e)
  687. {
  688. m_api->LoggerOut(MTLogErr, L"stop redis: %s", e.what());
  689. }
  690. catch (...)
  691. {
  692. m_api->LoggerOut(MTLogErr, L"stop redis: unknown error");
  693. }
  694. }
  695. void CPluginInstance::keep_alive()
  696. {
  697. using namespace std::chrono_literals;
  698. int counter = 500;
  699. while (m_enable)
  700. {
  701. if (counter-- <= 0)
  702. {
  703. counter = 500;
  704. std::lock_guard<decltype(m_lock)> lk(m_lock);
  705. if (m_redis_client)
  706. {
  707. if (m_redis_client->is_connected())
  708. {
  709. m_redis_client->ping([this](cpp_redis::reply& r)
  710. {
  711. if (r.ko())
  712. {
  713. m_redis_error_notified = true;
  714. if (!m_redis_error_notified)
  715. {
  716. m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  717. }
  718. }
  719. });
  720. }
  721. else
  722. {
  723. m_redis_error_notified = true;
  724. if (!m_redis_error_notified)
  725. {
  726. m_api->LoggerOut(MTLogErr, L"redis server not connected");
  727. }
  728. stop_redis();
  729. start_redis();
  730. }
  731. }
  732. }
  733. std::this_thread::sleep_for(16ms);
  734. }
  735. }