PluginInstance.cpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993
  1. #include "pch.h"
  2. #include "PluginInstance.h"
  3. #define REDIS_TIMER_ID (1)
  4. CPluginInstance::CPluginInstance()
  5. : m_enable(false)
  6. , m_redis_error_notified(false)
  7. , m_debug_mode(0)
  8. {
  9. }
  10. CPluginInstance::~CPluginInstance()
  11. {
  12. Stop();
  13. }
  14. void CPluginInstance::Release()
  15. {
  16. delete this;
  17. }
  18. MTAPIRES CPluginInstance::Start(IMTServerAPI * server)
  19. {
  20. MTAPIRES ret = MT_RET_OK;
  21. if (!server) return MT_RET_ERR_PARAMS;
  22. m_api = server;
  23. if ((m_config = m_api->PluginCreate()) == nullptr)
  24. return MT_RET_ERR_MEM;
  25. ret = m_api->About(m_info);
  26. if (ret != MT_RET_OK)
  27. m_api->LoggerOut(MTLogOK, L"Server info failed [%d]", ret);
  28. if ((ret = m_api->PluginSubscribe(this)) != MT_RET_OK)
  29. {
  30. m_api->LoggerOut(MTLogAtt, L"Plugin subscribe failed [%d]", ret);
  31. return ret;
  32. }
  33. //if (ret = m_api->OrderSubscribe(this) != MT_RET_OK)
  34. //{
  35. // m_api->LoggerOut(MTLogAtt, L"Order subscribe failed [%d]", ret);
  36. // return ret;
  37. //}
  38. //if (ret = m_api->DealSubscribe(this) != MT_RET_OK)
  39. //{
  40. // m_api->LoggerOut(MTLogAtt, L"Deal subscribe failed [%d]", ret);
  41. // return ret;
  42. //}
  43. if (ret = m_api->TradeSubscribe(this) != MT_RET_OK)
  44. {
  45. m_api->LoggerOut(MTLogAtt, L"Trade subscribe failed [%d]", ret);
  46. return ret;
  47. }
  48. if ((ret = LoadParam()) != MT_RET_OK)
  49. {
  50. m_api->LoggerOut(MTLogAtt, L"Load param failed [%d]", ret);
  51. return ret;
  52. }
  53. return MT_RET_OK;
  54. }
  55. MTAPIRES CPluginInstance::Stop()
  56. {
  57. MTAPIRES ret = MT_RET_OK;
  58. std::lock_guard<decltype(m_lock)> lk(m_lock);
  59. m_followers.clear();
  60. if (m_api == nullptr)
  61. return MT_RET_OK;
  62. if (m_config != nullptr)
  63. {
  64. m_config->Release();
  65. m_config = nullptr;
  66. }
  67. if ((ret = m_api->PluginUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND)
  68. m_api->LoggerOut(MTLogErr, L"Failed to unsubscribe from plugin config updates [%s (%u)]",
  69. SMTFormat::FormatError(ret), ret);
  70. //if ((ret = m_api->OrderUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND)
  71. // m_api->LoggerOut(MTLogErr, L"Failed to unsubscribe order sink [%s (%u)]",
  72. // SMTFormat::FormatError(ret), ret);
  73. //if ((ret = m_api->DealUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND)
  74. // m_api->LoggerOut(MTLogErr, L"failed to unsubscribe deal [%s (%u)]",
  75. // SMTFormat::FormatError(ret), ret);
  76. if ((ret = m_api->TradeUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND)
  77. m_api->LoggerOut(MTLogErr, L"Failed to unsubscribe trade sink [%s (%u)]",
  78. SMTFormat::FormatError(ret), ret);
  79. m_api = nullptr;
  80. m_enable = false;
  81. stop_redis();
  82. if (m_work_thread.joinable())
  83. m_work_thread.join();
  84. return MT_RET_OK;
  85. }
  86. void CPluginInstance::OnPluginUpdate(const IMTConPlugin * plugin)
  87. {
  88. int ret = MT_RET_OK;
  89. if ((ret = LoadParam()) != MT_RET_OK)
  90. {
  91. m_api->LoggerOut(MTLogAtt, L"Load param failed [%d]", ret);
  92. }
  93. }
  94. //void CPluginInstance::OnOrderAdd(const IMTOrder * order)
  95. //{
  96. // if (order == nullptr) return;
  97. //
  98. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  99. // if (!m_enable) return;
  100. // if (order->Login() != m_trader) return;
  101. //
  102. // m_api->LoggerOut(MTLogOK, L"OnOrderAdd, login: %lld, ord: %lld, pos: %lld, state: %lld, vol_init: %lld, vol_cur: %lld",
  103. // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent());
  104. //}
  105. //void CPluginInstance::OnOrderUpdate(const IMTOrder * order)
  106. //{
  107. // if (order == nullptr) return;
  108. //
  109. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  110. // if (!m_enable) return;
  111. // if (order->Login() != m_trader) return;
  112. //
  113. // m_api->LoggerOut(MTLogOK, L"OnOrderAdd, login: %lld, ord: %lld, pos: %lld, state: %lld, vol_init: %lld, vol_cur: %lld",
  114. // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent());
  115. //}
  116. //void CPluginInstance::OnOrderDelete(const IMTOrder * order)
  117. //{
  118. // if (order == nullptr) return;
  119. //
  120. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  121. // if (!m_enable) return;
  122. // if (order->Login() != m_trader) return;
  123. //
  124. // m_api->LoggerOut(MTLogOK, L"OnOrderDelete, login: %lld, ord: %lld, pos: %lld, state: %d, vol_init: %lld, vol_cur: %lld",
  125. // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent());
  126. //
  127. // if (order->Type() != IMTOrder::OP_BUY
  128. // && order->Type() != IMTOrder::OP_SELL)
  129. // {
  130. // // 不是buy或者sell,不跟,直接退出
  131. // return;
  132. // }
  133. //
  134. // char order_buf[128];
  135. // sprintf(order_buf, "%lld", order->PositionID());
  136. // UINT64 position = order->PositionID();
  137. // UINT64 order_id = order->Order();
  138. // IMTAccount* account = m_api->UserCreateAccount();
  139. // IMTOrder* new_order = m_api->OrderCreate();
  140. //
  141. // ScopeGuard guard([account, new_order, this]
  142. // {
  143. // if (account)
  144. // account->Release();
  145. // if (new_order)
  146. // new_order->Release();
  147. //
  148. // m_redis_client->commit();
  149. // });
  150. //
  151. // // 订单在进入filled状态时,也会产生一个order delete回调
  152. // if (order->Order() == order->PositionID())
  153. // {
  154. // // 如果是新建订单,那么写入新纪录
  155. // char login_buf[128] = { 0 };
  156. // char req_buf[128] = { 0 };
  157. // for (auto login : m_followers)
  158. // {
  159. // if (m_api->UserAccountGet(login, account) != MT_RET_OK)
  160. // {
  161. // // TODO: 这里失败直接跳过没有做其他处理,下一次进来可能依然会遇到一样的问题
  162. // continue;
  163. // }
  164. // sprintf(login_buf, "%lld", login);
  165. //
  166. // UINT level = (UINT)(account->Balance() / m_step);
  167. // UINT volume = round((double)level * order->VolumeCurrent() / 10000) * 100;
  168. // UINT volume_ext = round((double)level * order->VolumeCurrentExt() / 10000) * 100;
  169. // UINT init_volume = round((double)level * order->VolumeInitial() / 10000) * 100;
  170. // UINT init_volume_ext = round((double)level * order->VolumeInitialExt() / 10000) * 100;
  171. // UINT64 new_order_id = 0;
  172. //
  173. // m_api->LoggerOut(MTLogOK, L"add order, vol_init: %lld, vol_cur: %lld", init_volume, volume);
  174. //
  175. // char msg[1024] = { 0 };
  176. // sprintf(msg, "orig_position=&lld&login=%lld&source_login=1005&symbol=%s&action=200&type=%d&volume=%lld&price_order=%lf",
  177. // order->PositionID(), login, ws2s(order->Symbol()).c_str(), order->Type(), init_volume, order->PriceOrder());
  178. // m_redis_client->publish("dealer_send", msg, [this, position, login, order_id](cpp_redis::reply& r)
  179. // {
  180. // // 记录下login order等信息
  181. // m_api->LoggerOut(MTLogErr, L"original position #%lld, original order #%lld, login %lld, failed to excute dealer_send",
  182. // );
  183. // });
  184. //
  185. // // dealer send之后需要另外一边记录request id
  186. //
  187. // // 尚未完成建仓,目前position id不填,仅完成订单后记录
  188. // new_order->Clear();
  189. // new_order->VolumeInitial(init_volume);
  190. // new_order->VolumeCurrent(volume);
  191. // new_order->Login(login);
  192. // new_order->Symbol(order->Symbol());
  193. // new_order->Type(order->Type());
  194. // new_order->Digits(order->Digits());
  195. // new_order->DigitsCurrency(order->DigitsCurrency());
  196. // new_order->ContractSize(order->ContractSize());
  197. // new_order->PriceOrder(order->PriceOrder());
  198. // new_order->PriceCurrent(order->PriceCurrent());
  199. // new_order->StateSet(order->State());
  200. // new_order->TimeSetup(order->TimeSetup());
  201. // new_order->TimeSetupMsc(order->TimeSetupMsc());
  202. // new_order->TimeDone(order->TimeDone());
  203. // new_order->TimeDoneMsc(order->TimeDoneMsc());
  204. // // --
  205. // new_order->PriceSL(order->PriceSL());
  206. // new_order->PriceTP(order->PriceTP());
  207. // new_order->Comment(order->Comment());
  208. // new_order->ActivationFlags(order->ActivationFlags());
  209. // new_order->ActivationMode(order->ActivationMode());
  210. // new_order->ActivationPrice(order->ActivationPrice());
  211. // new_order->ActivationTime(order->ActivationTime());
  212. // new_order->PriceTrigger(order->PriceTrigger());
  213. // new_order->RateMargin(order->RateMargin()); //
  214. // new_order->ReasonSet(order->Reason()); //
  215. // new_order->TypeFill(order->TypeFill()); //
  216. // new_order->TypeTime(order->TypeTime()); //
  217. //
  218. // MTAPIRES ret = m_api->HistoryAdd(new_order);
  219. // if (ret != MT_RET_OK)
  220. // {
  221. // m_api->LoggerOut(MTLogErr, L"%lld failed to add order, original order #%lld [%d]", login, order->Login(), ret);
  222. // }
  223. // else
  224. // {
  225. // new_order_id = new_order->Order();
  226. // }
  227. //
  228. // m_api->LoggerOut(MTLogOK, L"%lld add order #%lld, original order #%lld [%d]", login, new_order_id, order->Order());
  229. //
  230. // // TODO: 现在做法是,如果level为0,后面都不管
  231. // // 如果跟单失败,那么position id也为0,所以后面也不应该处理
  232. // position_context context;
  233. // context.level = level;
  234. // context.cur_ord = new_order_id;
  235. // context.position_id = new_order_id; // 建仓,order id = position id
  236. // int direction = 1;
  237. // if (order->Type() == IMTOrder::OP_SELL)
  238. // direction = -1;
  239. // context.volume = direction * order->VolumeInitial();
  240. //
  241. // m_redis_client->hset(order_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r)
  242. // {
  243. // if (r.ko())
  244. // {
  245. // m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  246. // }
  247. // });
  248. //
  249. // // 可以在退出前提交
  250. // // m_redis_client->commit();
  251. // }
  252. // }
  253. // else
  254. // {
  255. // // 如果不是新建订单,先使用position id先检查记录是否存在
  256. // char login_buf[128];
  257. // for (auto login : m_followers)
  258. // {
  259. // sprintf(login_buf, "%lld", login);
  260. // auto fut = m_redis_client->hget(order_buf, login_buf);
  261. // m_redis_client->sync_commit();
  262. // auto reply = fut.get();
  263. //
  264. // m_api->LoggerOut(MTLogOK, L"request cache, key %s, field %s", s2ws(order_buf).c_str(), s2ws(login_buf).c_str());
  265. //
  266. // // 如果不存在,忽略
  267. // if (reply.ko()) continue;
  268. // if (reply.is_null()) continue;
  269. //
  270. // m_api->LoggerOut(MTLogOK, L"get context, login: %lld", login);
  271. //
  272. // // 获取context
  273. // position_context context;
  274. // memcpy(&context, reply.as_string().c_str(), sizeof(position_context));
  275. //
  276. // // 按建仓时的叙述,如果level或者position为0,亦忽略该记录
  277. // if (context.level == 0) continue;
  278. // if (context.position_id == 0) continue;
  279. //
  280. // m_api->LoggerOut(MTLogOK, L"get context, order: #%lld, position: #%lld", context.cur_ord, context.position_id);
  281. //
  282. // // 如果存在,则继续操作
  283. // UINT volume = round((double)context.level * order->VolumeCurrent() / 10000) * 100;
  284. // UINT volume_ext = round((double)context.level * order->VolumeCurrentExt() / 10000) * 100;
  285. // UINT init_volume = round((double)context.level * order->VolumeInitial() / 10000) * 100;
  286. // UINT init_volume_ext = round((double)context.level * order->VolumeInitialExt() / 10000) * 100;
  287. // UINT64 new_order_id = 0;
  288. //
  289. // new_order->Clear();
  290. // new_order->VolumeInitial(init_volume);
  291. // new_order->VolumeCurrent(volume); // 这里是状态START,如果是状态4的FILLED时,volume current为0
  292. // //new_order->VolumeCurrent(init_volume);
  293. // new_order->Login(login);
  294. // new_order->Symbol(order->Symbol());
  295. // new_order->Type(order->Type());
  296. // new_order->Digits(order->Digits());
  297. // new_order->DigitsCurrency(order->DigitsCurrency());
  298. // new_order->ContractSize(order->ContractSize());
  299. // new_order->PriceOrder(order->PriceOrder());
  300. // new_order->PriceCurrent(order->PriceCurrent());
  301. // new_order->StateSet(order->State());
  302. // new_order->TimeSetup(order->TimeSetup());
  303. // new_order->TimeSetupMsc(order->TimeSetupMsc());
  304. // new_order->TimeDone(order->TimeDone());
  305. // new_order->TimeDoneMsc(order->TimeDoneMsc());
  306. // new_order->PositionID(context.position_id);
  307. //
  308. // new_order->PriceSL(order->PriceSL());
  309. // new_order->PriceTP(order->PriceTP());
  310. // new_order->Comment(order->Comment());
  311. // new_order->ActivationFlags(order->ActivationFlags());
  312. // new_order->ActivationMode(order->ActivationMode());
  313. // new_order->ActivationPrice(order->ActivationPrice());
  314. // new_order->ActivationTime(order->ActivationTime());
  315. // new_order->PriceTrigger(order->PriceTrigger());
  316. // new_order->RateMargin(order->RateMargin()); //
  317. // new_order->ReasonSet(order->Reason()); //
  318. // new_order->TypeFill(order->TypeFill()); //
  319. // new_order->TypeTime(order->TypeTime()); //
  320. //
  321. // MTAPIRES ret = m_api->HistoryAdd(new_order);
  322. // if (ret != MT_RET_OK)
  323. // {
  324. // m_api->LoggerOut(MTLogErr, L"%lld failed to add order, original order #%lld [%d]", login, order->Order(), ret);
  325. // // FIXME: 如果做单失败该怎么办
  326. // continue;
  327. // }
  328. //
  329. // new_order_id = new_order->Order();
  330. // m_api->LoggerOut(MTLogOK, L"%lld add order #%lld, original order #%lld", login, new_order_id, order->Order());
  331. //
  332. // // 完成之后,写入新纪录
  333. // context.cur_ord = new_order_id;
  334. // int direction = 1;
  335. // if (order->Type() == IMTOrder::OP_SELL)
  336. // direction = -1;
  337. // context.volume += direction * order->VolumeInitial();
  338. //
  339. // m_api->LoggerOut(MTLogOK, L"write cache, key %s, field %s", s2ws(order_buf).c_str(), s2ws(login_buf).c_str());
  340. //
  341. // m_redis_client->hset(order_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r)
  342. // {
  343. // if (r.ko())
  344. // {
  345. // m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  346. // }
  347. // });
  348. // // 下一次get会调用commit
  349. // }
  350. // }
  351. //}
  352. //void CPluginInstance::OnOrderClean(const UINT64 login)
  353. //{
  354. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  355. // if (!m_enable) return;
  356. // if (login != m_trader) return;
  357. //
  358. // m_api->LoggerOut(MTLogOK, L"OnOrderClean, Login: %d", login);
  359. //}
  360. //void CPluginInstance::OnDealAdd(const IMTDeal * deal)
  361. //{
  362. // if (deal == nullptr) return;
  363. //
  364. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  365. // if (!m_enable) return;
  366. // if (deal->Login() != m_trader) return;
  367. //
  368. // m_api->LoggerOut(MTLogOK, L"OnDealAdd, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  369. // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  370. //}
  371. //void CPluginInstance::OnDealUpdate(const IMTDeal * deal)
  372. //{
  373. // if (deal == nullptr) return;
  374. //
  375. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  376. // if (!m_enable) return;
  377. // if (deal->Login() != m_trader) return;
  378. //
  379. // m_api->LoggerOut(MTLogOK, L"OnDealUpdate, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  380. // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  381. //}
  382. //void CPluginInstance::OnDealDelete(const IMTDeal * deal)
  383. //{
  384. // if (deal == nullptr) return;
  385. //
  386. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  387. // if (!m_enable) return;
  388. // if (deal->Login() != m_trader) return;
  389. //
  390. // m_api->LoggerOut(MTLogOK, L"OnDealDelete, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  391. // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  392. //}
  393. //void CPluginInstance::OnDealClean(const UINT64 login)
  394. //{
  395. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  396. // if (!m_enable) return;
  397. // if (login != m_trader) return;
  398. //
  399. // m_api->LoggerOut(MTLogOK, L"OndealClean, Login: %d", login);
  400. //}
  401. //void CPluginInstance::OnDealPerform(const IMTDeal * deal, IMTAccount * account, IMTPosition * position)
  402. //{
  403. // // position为nullptr时,说明该deal不是由交易本身触发
  404. // // account是deal完成后的用户状况
  405. // // position是交易完成后的持仓状况
  406. // // 对于deal是关闭一个持仓的情况,该position的volume会是0
  407. // if (position == nullptr
  408. // || deal == nullptr
  409. // || account == nullptr)
  410. // return;
  411. //
  412. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  413. // if (!m_enable) return;
  414. // if (deal->Login() != m_trader) return;
  415. //
  416. // //m_api->LoggerOut(MTLogOK, L"OnDealPerform, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  417. // // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  418. //
  419. // // FIXME: 需要检查现在的deal是否和之前的order对应。如果没有记录到,则没法根本无法正常跟单以及平仓
  420. // char order_buf[128];
  421. // sprintf(order_buf, "%lld", deal->PositionID());
  422. // IMTDeal* new_deal = m_api->DealCreate();
  423. //
  424. // std::vector<std::string>* fields = nullptr;
  425. //
  426. // ScopeGuard guard([new_deal, &fields, this]
  427. // {
  428. // new_deal->Release();
  429. // if (fields)
  430. // {
  431. // fields->clear();
  432. // delete fields;
  433. // fields = nullptr;
  434. // }
  435. //
  436. // // 退出前调用commit
  437. // m_redis_client->commit();
  438. // });
  439. //
  440. // char login_buf[128];
  441. // for (auto login : m_followers)
  442. // {
  443. // ScopeGuard g([position, &fields, this, login, login_buf]()
  444. // {
  445. // if (position->Volume() == 0)
  446. // {
  447. // if (fields == nullptr)
  448. // fields = new(std::vector<std::string>);
  449. //
  450. // m_api->LoggerOut(MTLogOK, L"add field %lld to be delete", login);
  451. // fields->push_back(login_buf);
  452. // }
  453. // });
  454. //
  455. // sprintf(login_buf, "%lld", login);
  456. // auto fut = m_redis_client->hget(order_buf, login_buf);
  457. // m_redis_client->sync_commit();
  458. // auto reply = fut.get();
  459. //
  460. // m_api->LoggerOut(MTLogOK, L"%lld add deal, original deal #%lld", login, deal->Deal());
  461. //
  462. // // 如果不存在,忽略
  463. // if (reply.ko()) continue;
  464. // if (reply.is_null()) continue;
  465. //
  466. // // 获取context
  467. // position_context context;
  468. // memcpy(&context, reply.as_string().c_str(), sizeof(position_context));
  469. //
  470. // // 按建仓时的叙述,如果level或者position为0,亦忽略该记录
  471. // if (context.level == 0) continue;
  472. // if (context.position_id == 0) continue;
  473. //
  474. // uint64_t volume = context.level * deal->Volume() / 100;
  475. // uint64_t volume_ext = context.level * deal->VolumeExt() / 100;
  476. // uint64_t volume_closed = context.level * deal->VolumeClosed() / 100;
  477. // uint64_t volume_closed_ext = context.level * deal->VolumeClosed() / 100;
  478. //
  479. // double prop = (double)volume / deal->Volume();
  480. // double profit = deal->Profit() * prop;
  481. // double commission = deal->Commission() * prop;
  482. // double storage = deal->Storage() * prop;
  483. // double raw_profit = deal->ProfitRaw() * prop;
  484. //
  485. // m_api->LoggerOut(MTLogOK, L"add new deal, volume %lld, volume closed %lld, order #%lld, position #%lld", volume, volume_closed, context.cur_ord, context.position_id);
  486. //
  487. // new_deal->Clear();
  488. // new_deal->Assign(deal);
  489. //
  490. // new_deal->Login(login);
  491. // new_deal->DealSet(0);
  492. // new_deal->Volume(volume);
  493. // new_deal->VolumeExt(volume_ext);
  494. // new_deal->VolumeClosed(volume_closed);
  495. // new_deal->VolumeClosedExt(volume_closed_ext);
  496. // new_deal->ProfitRaw(raw_profit);
  497. // new_deal->Profit(profit);
  498. // new_deal->Commission(commission);
  499. // new_deal->Storage(storage);
  500. // new_deal->Order(context.cur_ord);
  501. // new_deal->PositionID(context.position_id);
  502. //
  503. // MTAPIRES ret = m_api->DealAdd(new_deal);
  504. // if (ret != MT_RET_OK)
  505. // {
  506. // // TODO: 有没有更多的错误处理?
  507. // m_api->LoggerOut(MTLogErr, L"%lld cannot add deal [%d] to order #%lld, original deal: #%lld", login, ret, context.cur_ord, deal->Deal());
  508. // continue;
  509. // }
  510. //
  511. // m_api->LoggerOut(MTLogOK, L"add deal #%lld, original deal #%lld", new_deal->Deal(), deal->Deal());
  512. //
  513. // m_redis_client->publish("mt5_balance_fix", login_buf, [this](cpp_redis::reply r)
  514. // {
  515. // if (r.ko())
  516. // {
  517. // m_api->LoggerOut(MTLogErr, L"redis publish: %s", r.error().c_str());
  518. // }
  519. // });
  520. // // commit 在退出前完成
  521. // }
  522. //
  523. // m_api->LoggerOut(MTLogOK, L"deal #%lld, position #%lld, volume %lld", deal->Deal(), position->Position(), position->Volume());
  524. // if (position->Volume() == 0)
  525. // {
  526. // // 如果平仓,则删除hash值
  527. // if (position->Volume() == 0 && fields != nullptr)
  528. // {
  529. // m_redis_client->hdel(order_buf, *fields, [this](cpp_redis::reply& r)
  530. // {
  531. // if (r.ko())
  532. // {
  533. // // TODO 错误处理
  534. // try
  535. // {
  536. // m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  537. // }
  538. // catch (...)
  539. // {
  540. // }
  541. // }
  542. // });
  543. // }
  544. //
  545. // // TODO 当position中的volume为0时,持仓被彻底平调,被跟订单是否也该检查
  546. // }
  547. //}
  548. void CPluginInstance::OnTradeRequestProcess(const IMTRequest* request, const IMTConfirm* confirm, const IMTConGroup* group,
  549. const IMTConSymbol* symbol, const IMTPosition* position, const IMTOrder* order, const IMTDeal* deal)
  550. {
  551. if (request == nullptr
  552. || group == nullptr
  553. || symbol == nullptr)
  554. {
  555. return;
  556. }
  557. UINT64 login = request->Login();
  558. bool found = false;
  559. for (auto follower : m_followers)
  560. {
  561. if (login == follower)
  562. {
  563. found = true;
  564. break;
  565. }
  566. }
  567. // 未找到直接退出
  568. if (!found) return;
  569. if (request->SourceLogin() != m_dealer)
  570. {
  571. // 不是指定dealer做的,跳过
  572. // 这样可以减少redis的内容
  573. DebugOut(MTLogOK, L"dealer not matched: %lld", request->SourceLogin());
  574. return;
  575. }
  576. // 获取request cache并删除
  577. request_cache cache;
  578. std::string cache_key = std::string("reqid_") + std::to_string(request->IDClient()) + "_" + std::to_string(request->Login());
  579. auto fut = m_redis_client->get(cache_key);
  580. m_redis_client->sync_commit();
  581. auto reply = fut.get();
  582. DebugOut(MTLogOK, L"request cache, key: %s", s2ws(cache_key).c_str());
  583. if (reply.ko()) return;
  584. if (reply.is_null()) return;
  585. memcpy(&cache, reply.as_string().c_str(), sizeof(request_cache));
  586. DebugOut(MTLogOK, L"delete cache, key: %s", s2ws(cache_key).c_str());
  587. std::vector<std::string> keys;
  588. keys.push_back(cache_key);
  589. m_redis_client->del(keys, [this](cpp_redis::reply& r)
  590. {
  591. if (r.ko())
  592. {
  593. m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  594. }
  595. });
  596. m_redis_client->commit();
  597. // 获取做单上下文
  598. position_context context;
  599. char login_buf[128] = { 0 };
  600. char orig_pos_buf[128] = { 0 };
  601. sprintf(orig_pos_buf, "%lld", cache.orig_position);
  602. sprintf(login_buf, "%lld", cache.login);
  603. DebugOut(MTLogOK, L"request context, key: %s, field: %s", s2ws(orig_pos_buf).c_str(), s2ws(login_buf).c_str());
  604. auto ctx_fut = m_redis_client->hget(orig_pos_buf, login_buf);
  605. m_redis_client->sync_commit();
  606. auto ctx_reply = ctx_fut.get();
  607. if (ctx_reply.ko()) return;
  608. if (ctx_reply.is_null()) return;
  609. //DebugOut(MTLogOK, L"copy context, key: %s, field: %s", s2ws(orig_pos_buf).c_str(), s2ws(login_buf).c_str());
  610. memcpy(&context, ctx_reply.as_string().c_str(), sizeof(position_context));
  611. // 修改跟单上下文
  612. int direction = 1;
  613. if (order->Type() == IMTOrder::OP_SELL) direction = -1;
  614. context.volume += direction * order->VolumeInitial();
  615. context.position_id = position->Position();
  616. DebugOut(MTLogOK, L"writeback context, key: %s, field: %s, volume: %lld, context.position_id: %lld",
  617. s2ws(orig_pos_buf).c_str(), s2ws(login_buf).c_str(), context.volume, context.position_id);
  618. // 写入
  619. m_redis_client->hset(orig_pos_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r)
  620. {
  621. if (r.ko())
  622. {
  623. m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  624. }
  625. });
  626. m_redis_client->commit();
  627. }
  628. MTAPIRES CPluginInstance::LoadParam()
  629. {
  630. MTAPIRES res = MT_RET_OK;
  631. IMTConParam* param = NULL;
  632. CMTStr128 tmp;
  633. if (!m_api || !m_config) return MT_RET_ERR_PARAMS;
  634. if ((res = m_api->PluginCurrent(m_config)) != MT_RET_OK)
  635. {
  636. m_api->LoggerOut(MTLogErr, L"failed to get current plugin configuration [%s (%u)]",
  637. SMTFormat::FormatError(res), res);
  638. return res;
  639. }
  640. if ((param = m_api->PluginParamCreate()) == NULL)
  641. {
  642. m_api->LoggerOut(MTLogErr, L"failed to create plugin parameter object");
  643. return MT_RET_ERR_MEM;
  644. }
  645. ScopeGuard param_release([param]()
  646. {
  647. param->Release();
  648. });
  649. std::lock_guard<decltype(m_lock)> lk(m_lock);
  650. // 首先一定要关闭debug mode,显示调用了debug mode才能生效
  651. m_debug_mode = 0;
  652. //m_api->LoggerOut(MTLogOK, L"Load redis server params");
  653. if ((res = m_config->ParameterGet(L"Redis Server", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  654. {
  655. return(MT_RET_ERR_PARAMS);
  656. }
  657. std::string redis_server = ws2s(param->ValueString());
  658. if ((res = m_config->ParameterGet(L"Redis Port", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  659. {
  660. return(MT_RET_ERR_PARAMS);
  661. }
  662. int redis_port = param->ValueInt();
  663. //if ((res = m_config->ParameterGet(L"Redis User", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  664. //{
  665. // return(MT_RET_ERR_PARAMS);
  666. //}
  667. //std::string redis_user = ws2s(param->ValueString());
  668. if ((res = m_config->ParameterGet(L"Redis Password", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  669. {
  670. return(MT_RET_ERR_PARAMS);
  671. }
  672. std::string redis_password = ws2s(param->ValueString());
  673. if (m_redis_server != redis_server
  674. || m_redis_port != redis_port
  675. //|| m_redis_user != redis_user
  676. || m_redis_password != redis_password)
  677. {
  678. m_redis_server = redis_server;
  679. m_redis_port = redis_port;
  680. //m_redis_user = redis_user;
  681. m_redis_password = redis_password;
  682. stop_redis();
  683. if (start_redis())
  684. {
  685. if (!m_work_thread.joinable())
  686. {
  687. m_work_thread = std::thread(&CPluginInstance::keep_alive, this);
  688. }
  689. }
  690. else
  691. {
  692. m_api->LoggerOut(MTLogErr, L"failed to connect to redis server");
  693. return MT_RET_ERR_CONNECTION;
  694. }
  695. }
  696. //m_api->LoggerOut(MTLogOK, L"Load trade params");
  697. if ((res = m_config->ParameterGet(L"Trader", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  698. {
  699. return(MT_RET_ERR_PARAMS);
  700. }
  701. m_trader = param->ValueInt();
  702. //if ((res = m_config->ParameterGet(L"Step", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  703. //{
  704. // return(MT_RET_ERR_PARAMS);
  705. //}
  706. //m_step = param->ValueInt();
  707. //if ((res = m_config->ParameterGet(L"Tolerance", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  708. //{
  709. // return(MT_RET_ERR_PARAMS);
  710. //}
  711. //m_tolerance = param->ValueInt();
  712. if ((res = m_config->ParameterGet(L"Groups", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  713. {
  714. return(MT_RET_ERR_PARAMS);
  715. }
  716. wcsncpy(m_groups, param->ValueString(), 1024);
  717. if ((res = m_config->ParameterGet(L"DealerID", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  718. {
  719. return(MT_RET_ERR_PARAMS);
  720. }
  721. m_dealer = param->ValueInt();
  722. //if ((res = m_config->ParameterGet(L"Logins", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  723. //{
  724. // return(MT_RET_ERR_PARAMS);
  725. //}
  726. //m_logins = param->ValueString();
  727. if ((res = m_config->ParameterGet(L"DebugMode", param)) == MT_RET_OK && param->Type() == IMTConParam::TYPE_INT)
  728. {
  729. m_debug_mode = param->ValueInt();
  730. if (m_debug_mode != 0)
  731. {
  732. m_api->LoggerOut(MTLogOK, L"Debug mode enabled");
  733. }
  734. }
  735. res = LoadLogins();
  736. if (res != MT_RET_OK)
  737. {
  738. m_api->LoggerOut(MTLogErr, L"failed to load clients [%d]", res);
  739. return res;
  740. }
  741. //m_api->LoggerOut(MTLogOK, L"Load param success");
  742. m_enable = true;
  743. return MT_RET_OK;
  744. }
  745. MTAPIRES CPluginInstance::LoadLogins()
  746. {
  747. m_followers.clear();
  748. IMTConGroup* group = m_api->GroupCreate();
  749. ScopeGuard group_guard([&]()
  750. {
  751. group->Release();
  752. });
  753. if (group == nullptr) return MT_RET_ERR_MEM;
  754. UINT total = m_api->GroupTotal();
  755. MTAPIRES ret = MT_RET_OK;
  756. std::vector<std::wstring> groups;
  757. for (int i = 0; i < total; i++)
  758. {
  759. ret = m_api->GroupNext(i, group);
  760. if (ret != MT_RET_OK) break;
  761. if (CheckGroup(m_groups, group->Group()) == FALSE) continue;
  762. DebugOut(MTLogOK, L"group %s matched config", group->Group());
  763. groups.push_back(group->Group());
  764. }
  765. for (auto& group : groups)
  766. {
  767. UINT64* logins = nullptr;
  768. UINT total_users = 0;
  769. ret = m_api->UserLogins(group.c_str(), logins, total_users);
  770. DebugOut(MTLogOK, L"add group %s, total users: %d", group.c_str(), total_users);
  771. if (ret == MT_RET_OK && logins != nullptr)
  772. {
  773. for (int i = 0; i < total_users; ++i)
  774. {
  775. DebugOut(MTLogOK, L"add %d to list", logins[i]);
  776. m_followers.push_back(logins[i]);
  777. }
  778. }
  779. if (logins)
  780. {
  781. m_api->Free(logins);
  782. }
  783. }
  784. return ret;
  785. }
  786. bool CPluginInstance::start_redis()
  787. {
  788. try
  789. {
  790. m_redis_error_notified = false;
  791. if (m_redis_client)
  792. {
  793. if (m_redis_client->is_connected())
  794. return true;
  795. }
  796. m_redis_client.reset(new cpp_redis::client);
  797. m_redis_client->connect
  798. (
  799. m_redis_server,
  800. m_redis_port,
  801. [this](const std::string& host, std::size_t port, cpp_redis::connect_state status)
  802. {
  803. if (status == cpp_redis::connect_state::ok)
  804. {
  805. m_api->LoggerOut(MTLogOK, L"redis server connected");
  806. }
  807. }
  808. );
  809. if (m_redis_password != "")
  810. {
  811. auto fut = m_redis_client->auth(m_redis_password);
  812. m_redis_client->sync_commit();
  813. auto reply = fut.get();
  814. if (reply.is_error())
  815. {
  816. m_api->LoggerOut(MTLogErr, L"connect: authentication failed");
  817. return false;
  818. }
  819. }
  820. }
  821. catch (tacopie::tacopie_error& e)
  822. {
  823. m_api->LoggerOut(MTLogErr, L"redis conn: %s", e.what());
  824. return false;
  825. }
  826. catch (std::exception& e)
  827. {
  828. m_api->LoggerOut(MTLogErr, L"redis conn: %s", e.what());
  829. return false;
  830. }
  831. catch (...)
  832. {
  833. m_api->LoggerOut(MTLogErr, L"failed to connect to server");
  834. return false;
  835. }
  836. return true;
  837. }
  838. void CPluginInstance::stop_redis()
  839. {
  840. try
  841. {
  842. if (m_redis_client)
  843. {
  844. // 使用sync commit保证操作全部提交
  845. m_redis_client->sync_commit();
  846. m_redis_client->disconnect();
  847. m_redis_client.reset();
  848. }
  849. }
  850. catch (tacopie::tacopie_error& e)
  851. {
  852. m_api->LoggerOut(MTLogErr, L"stop redis: %s", e.what());
  853. }
  854. catch (std::exception& e)
  855. {
  856. m_api->LoggerOut(MTLogErr, L"stop redis: %s", e.what());
  857. }
  858. catch (...)
  859. {
  860. m_api->LoggerOut(MTLogErr, L"stop redis: unknown error");
  861. }
  862. }
  863. void CPluginInstance::keep_alive()
  864. {
  865. using namespace std::chrono_literals;
  866. int counter = 500;
  867. while (m_enable)
  868. {
  869. if (counter-- <= 0)
  870. {
  871. counter = 500;
  872. std::lock_guard<decltype(m_lock)> lk(m_lock);
  873. if (m_redis_client)
  874. {
  875. if (m_redis_client->is_connected())
  876. {
  877. m_redis_client->ping([this](cpp_redis::reply& r)
  878. {
  879. if (r.ko())
  880. {
  881. m_redis_error_notified = true;
  882. if (!m_redis_error_notified)
  883. {
  884. m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  885. }
  886. }
  887. });
  888. }
  889. else
  890. {
  891. m_redis_error_notified = true;
  892. if (!m_redis_error_notified)
  893. {
  894. m_api->LoggerOut(MTLogErr, L"redis server not connected");
  895. }
  896. stop_redis();
  897. start_redis();
  898. }
  899. }
  900. }
  901. std::this_thread::sleep_for(16ms);
  902. }
  903. }