PluginInstance.cpp 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963
  1. #include "pch.h"
  2. #include "PluginInstance.h"
  3. #define REDIS_TIMER_ID (1)
  4. CPluginInstance::CPluginInstance()
  5. : m_enable(false)
  6. , m_redis_error_notified(false)
  7. {
  8. }
  9. CPluginInstance::~CPluginInstance()
  10. {
  11. Stop();
  12. }
  13. void CPluginInstance::Release()
  14. {
  15. delete this;
  16. }
  17. MTAPIRES CPluginInstance::Start(IMTServerAPI * server)
  18. {
  19. MTAPIRES ret = MT_RET_OK;
  20. if (!server) return MT_RET_ERR_PARAMS;
  21. m_api = server;
  22. if ((m_config = m_api->PluginCreate()) == nullptr)
  23. return MT_RET_ERR_MEM;
  24. ret = m_api->About(m_info);
  25. if (ret != MT_RET_OK)
  26. m_api->LoggerOut(MTLogOK, L"Server info failed [%d]", ret);
  27. if ((ret = m_api->PluginSubscribe(this)) != MT_RET_OK)
  28. {
  29. m_api->LoggerOut(MTLogAtt, L"Plugin subscribe failed [%d]", ret);
  30. return ret;
  31. }
  32. //if (ret = m_api->OrderSubscribe(this) != MT_RET_OK)
  33. //{
  34. // m_api->LoggerOut(MTLogAtt, L"Order subscribe failed [%d]", ret);
  35. // return ret;
  36. //}
  37. //if (ret = m_api->DealSubscribe(this) != MT_RET_OK)
  38. //{
  39. // m_api->LoggerOut(MTLogAtt, L"Deal subscribe failed [%d]", ret);
  40. // return ret;
  41. //}
  42. if (ret = m_api->TradeSubscribe(this) != MT_RET_OK)
  43. {
  44. m_api->LoggerOut(MTLogAtt, L"Trade subscribe failed [%d]", ret);
  45. return ret;
  46. }
  47. if ((ret = LoadParam()) != MT_RET_OK)
  48. {
  49. m_api->LoggerOut(MTLogAtt, L"Load param failed [%d]", ret);
  50. return ret;
  51. }
  52. return MT_RET_OK;
  53. }
  54. MTAPIRES CPluginInstance::Stop()
  55. {
  56. MTAPIRES ret = MT_RET_OK;
  57. std::lock_guard<decltype(m_lock)> lk(m_lock);
  58. m_followers.clear();
  59. if (m_api == nullptr)
  60. return MT_RET_OK;
  61. if (m_config != nullptr)
  62. {
  63. m_config->Release();
  64. m_config = nullptr;
  65. }
  66. if ((ret = m_api->PluginUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND)
  67. m_api->LoggerOut(MTLogErr, L"Failed to unsubscribe from plugin config updates [%s (%u)]",
  68. SMTFormat::FormatError(ret), ret);
  69. //if ((ret = m_api->OrderUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND)
  70. // m_api->LoggerOut(MTLogErr, L"Failed to unsubscribe order sink [%s (%u)]",
  71. // SMTFormat::FormatError(ret), ret);
  72. //if ((ret = m_api->DealUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND)
  73. // m_api->LoggerOut(MTLogErr, L"failed to unsubscribe deal [%s (%u)]",
  74. // SMTFormat::FormatError(ret), ret);
  75. if ((ret = m_api->TradeUnsubscribe(this)) != MT_RET_OK && ret != MT_RET_ERR_NOTFOUND)
  76. m_api->LoggerOut(MTLogErr, L"Failed to unsubscribe trade sink [%s (%u)]",
  77. SMTFormat::FormatError(ret), ret);
  78. m_api = nullptr;
  79. m_enable = false;
  80. stop_redis();
  81. if (m_work_thread.joinable())
  82. m_work_thread.join();
  83. return MT_RET_OK;
  84. }
  85. void CPluginInstance::OnPluginUpdate(const IMTConPlugin * plugin)
  86. {
  87. int ret = MT_RET_OK;
  88. if ((ret = LoadParam()) != MT_RET_OK)
  89. {
  90. m_api->LoggerOut(MTLogAtt, L"Load param failed [%d]", ret);
  91. }
  92. }
  93. //void CPluginInstance::OnOrderAdd(const IMTOrder * order)
  94. //{
  95. // if (order == nullptr) return;
  96. //
  97. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  98. // if (!m_enable) return;
  99. // if (order->Login() != m_trader) return;
  100. //
  101. // m_api->LoggerOut(MTLogOK, L"OnOrderAdd, login: %lld, ord: %lld, pos: %lld, state: %lld, vol_init: %lld, vol_cur: %lld",
  102. // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent());
  103. //}
  104. //void CPluginInstance::OnOrderUpdate(const IMTOrder * order)
  105. //{
  106. // if (order == nullptr) return;
  107. //
  108. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  109. // if (!m_enable) return;
  110. // if (order->Login() != m_trader) return;
  111. //
  112. // m_api->LoggerOut(MTLogOK, L"OnOrderAdd, login: %lld, ord: %lld, pos: %lld, state: %lld, vol_init: %lld, vol_cur: %lld",
  113. // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent());
  114. //}
  115. //void CPluginInstance::OnOrderDelete(const IMTOrder * order)
  116. //{
  117. // if (order == nullptr) return;
  118. //
  119. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  120. // if (!m_enable) return;
  121. // if (order->Login() != m_trader) return;
  122. //
  123. // m_api->LoggerOut(MTLogOK, L"OnOrderDelete, login: %lld, ord: %lld, pos: %lld, state: %d, vol_init: %lld, vol_cur: %lld",
  124. // order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent());
  125. //
  126. // if (order->Type() != IMTOrder::OP_BUY
  127. // && order->Type() != IMTOrder::OP_SELL)
  128. // {
  129. // // 不是buy或者sell,不跟,直接退出
  130. // return;
  131. // }
  132. //
  133. // char order_buf[128];
  134. // sprintf(order_buf, "%lld", order->PositionID());
  135. // UINT64 position = order->PositionID();
  136. // UINT64 order_id = order->Order();
  137. // IMTAccount* account = m_api->UserCreateAccount();
  138. // IMTOrder* new_order = m_api->OrderCreate();
  139. //
  140. // ScopeGuard guard([account, new_order, this]
  141. // {
  142. // if (account)
  143. // account->Release();
  144. // if (new_order)
  145. // new_order->Release();
  146. //
  147. // m_redis_client->commit();
  148. // });
  149. //
  150. // // 订单在进入filled状态时,也会产生一个order delete回调
  151. // if (order->Order() == order->PositionID())
  152. // {
  153. // // 如果是新建订单,那么写入新纪录
  154. // char login_buf[128] = { 0 };
  155. // char req_buf[128] = { 0 };
  156. // for (auto login : m_followers)
  157. // {
  158. // if (m_api->UserAccountGet(login, account) != MT_RET_OK)
  159. // {
  160. // // TODO: 这里失败直接跳过没有做其他处理,下一次进来可能依然会遇到一样的问题
  161. // continue;
  162. // }
  163. // sprintf(login_buf, "%lld", login);
  164. //
  165. // UINT level = (UINT)(account->Balance() / m_step);
  166. // UINT volume = round((double)level * order->VolumeCurrent() / 10000) * 100;
  167. // UINT volume_ext = round((double)level * order->VolumeCurrentExt() / 10000) * 100;
  168. // UINT init_volume = round((double)level * order->VolumeInitial() / 10000) * 100;
  169. // UINT init_volume_ext = round((double)level * order->VolumeInitialExt() / 10000) * 100;
  170. // UINT64 new_order_id = 0;
  171. //
  172. // m_api->LoggerOut(MTLogOK, L"add order, vol_init: %lld, vol_cur: %lld", init_volume, volume);
  173. //
  174. // char msg[1024] = { 0 };
  175. // sprintf(msg, "orig_position=&lld&login=%lld&source_login=1005&symbol=%s&action=200&type=%d&volume=%lld&price_order=%lf",
  176. // order->PositionID(), login, ws2s(order->Symbol()).c_str(), order->Type(), init_volume, order->PriceOrder());
  177. // m_redis_client->publish("dealer_send", msg, [this, position, login, order_id](cpp_redis::reply& r)
  178. // {
  179. // // 记录下login order等信息
  180. // m_api->LoggerOut(MTLogErr, L"original position #%lld, original order #%lld, login %lld, failed to excute dealer_send",
  181. // );
  182. // });
  183. //
  184. // // dealer send之后需要另外一边记录request id
  185. //
  186. // // 尚未完成建仓,目前position id不填,仅完成订单后记录
  187. // new_order->Clear();
  188. // new_order->VolumeInitial(init_volume);
  189. // new_order->VolumeCurrent(volume);
  190. // new_order->Login(login);
  191. // new_order->Symbol(order->Symbol());
  192. // new_order->Type(order->Type());
  193. // new_order->Digits(order->Digits());
  194. // new_order->DigitsCurrency(order->DigitsCurrency());
  195. // new_order->ContractSize(order->ContractSize());
  196. // new_order->PriceOrder(order->PriceOrder());
  197. // new_order->PriceCurrent(order->PriceCurrent());
  198. // new_order->StateSet(order->State());
  199. // new_order->TimeSetup(order->TimeSetup());
  200. // new_order->TimeSetupMsc(order->TimeSetupMsc());
  201. // new_order->TimeDone(order->TimeDone());
  202. // new_order->TimeDoneMsc(order->TimeDoneMsc());
  203. // // --
  204. // new_order->PriceSL(order->PriceSL());
  205. // new_order->PriceTP(order->PriceTP());
  206. // new_order->Comment(order->Comment());
  207. // new_order->ActivationFlags(order->ActivationFlags());
  208. // new_order->ActivationMode(order->ActivationMode());
  209. // new_order->ActivationPrice(order->ActivationPrice());
  210. // new_order->ActivationTime(order->ActivationTime());
  211. // new_order->PriceTrigger(order->PriceTrigger());
  212. // new_order->RateMargin(order->RateMargin()); //
  213. // new_order->ReasonSet(order->Reason()); //
  214. // new_order->TypeFill(order->TypeFill()); //
  215. // new_order->TypeTime(order->TypeTime()); //
  216. //
  217. // MTAPIRES ret = m_api->HistoryAdd(new_order);
  218. // if (ret != MT_RET_OK)
  219. // {
  220. // m_api->LoggerOut(MTLogErr, L"%lld failed to add order, original order #%lld [%d]", login, order->Login(), ret);
  221. // }
  222. // else
  223. // {
  224. // new_order_id = new_order->Order();
  225. // }
  226. //
  227. // m_api->LoggerOut(MTLogOK, L"%lld add order #%lld, original order #%lld [%d]", login, new_order_id, order->Order());
  228. //
  229. // // TODO: 现在做法是,如果level为0,后面都不管
  230. // // 如果跟单失败,那么position id也为0,所以后面也不应该处理
  231. // position_context context;
  232. // context.level = level;
  233. // context.cur_ord = new_order_id;
  234. // context.position_id = new_order_id; // 建仓,order id = position id
  235. // int direction = 1;
  236. // if (order->Type() == IMTOrder::OP_SELL)
  237. // direction = -1;
  238. // context.volume = direction * order->VolumeInitial();
  239. //
  240. // m_redis_client->hset(order_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r)
  241. // {
  242. // if (r.ko())
  243. // {
  244. // m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  245. // }
  246. // });
  247. //
  248. // // 可以在退出前提交
  249. // // m_redis_client->commit();
  250. // }
  251. // }
  252. // else
  253. // {
  254. // // 如果不是新建订单,先使用position id先检查记录是否存在
  255. // char login_buf[128];
  256. // for (auto login : m_followers)
  257. // {
  258. // sprintf(login_buf, "%lld", login);
  259. // auto fut = m_redis_client->hget(order_buf, login_buf);
  260. // m_redis_client->sync_commit();
  261. // auto reply = fut.get();
  262. //
  263. // m_api->LoggerOut(MTLogOK, L"request cache, key %s, field %s", s2ws(order_buf).c_str(), s2ws(login_buf).c_str());
  264. //
  265. // // 如果不存在,忽略
  266. // if (reply.ko()) continue;
  267. // if (reply.is_null()) continue;
  268. //
  269. // m_api->LoggerOut(MTLogOK, L"get context, login: %lld", login);
  270. //
  271. // // 获取context
  272. // position_context context;
  273. // memcpy(&context, reply.as_string().c_str(), sizeof(position_context));
  274. //
  275. // // 按建仓时的叙述,如果level或者position为0,亦忽略该记录
  276. // if (context.level == 0) continue;
  277. // if (context.position_id == 0) continue;
  278. //
  279. // m_api->LoggerOut(MTLogOK, L"get context, order: #%lld, position: #%lld", context.cur_ord, context.position_id);
  280. //
  281. // // 如果存在,则继续操作
  282. // UINT volume = round((double)context.level * order->VolumeCurrent() / 10000) * 100;
  283. // UINT volume_ext = round((double)context.level * order->VolumeCurrentExt() / 10000) * 100;
  284. // UINT init_volume = round((double)context.level * order->VolumeInitial() / 10000) * 100;
  285. // UINT init_volume_ext = round((double)context.level * order->VolumeInitialExt() / 10000) * 100;
  286. // UINT64 new_order_id = 0;
  287. //
  288. // new_order->Clear();
  289. // new_order->VolumeInitial(init_volume);
  290. // new_order->VolumeCurrent(volume); // 这里是状态START,如果是状态4的FILLED时,volume current为0
  291. // //new_order->VolumeCurrent(init_volume);
  292. // new_order->Login(login);
  293. // new_order->Symbol(order->Symbol());
  294. // new_order->Type(order->Type());
  295. // new_order->Digits(order->Digits());
  296. // new_order->DigitsCurrency(order->DigitsCurrency());
  297. // new_order->ContractSize(order->ContractSize());
  298. // new_order->PriceOrder(order->PriceOrder());
  299. // new_order->PriceCurrent(order->PriceCurrent());
  300. // new_order->StateSet(order->State());
  301. // new_order->TimeSetup(order->TimeSetup());
  302. // new_order->TimeSetupMsc(order->TimeSetupMsc());
  303. // new_order->TimeDone(order->TimeDone());
  304. // new_order->TimeDoneMsc(order->TimeDoneMsc());
  305. // new_order->PositionID(context.position_id);
  306. //
  307. // new_order->PriceSL(order->PriceSL());
  308. // new_order->PriceTP(order->PriceTP());
  309. // new_order->Comment(order->Comment());
  310. // new_order->ActivationFlags(order->ActivationFlags());
  311. // new_order->ActivationMode(order->ActivationMode());
  312. // new_order->ActivationPrice(order->ActivationPrice());
  313. // new_order->ActivationTime(order->ActivationTime());
  314. // new_order->PriceTrigger(order->PriceTrigger());
  315. // new_order->RateMargin(order->RateMargin()); //
  316. // new_order->ReasonSet(order->Reason()); //
  317. // new_order->TypeFill(order->TypeFill()); //
  318. // new_order->TypeTime(order->TypeTime()); //
  319. //
  320. // MTAPIRES ret = m_api->HistoryAdd(new_order);
  321. // if (ret != MT_RET_OK)
  322. // {
  323. // m_api->LoggerOut(MTLogErr, L"%lld failed to add order, original order #%lld [%d]", login, order->Order(), ret);
  324. // // FIXME: 如果做单失败该怎么办
  325. // continue;
  326. // }
  327. //
  328. // new_order_id = new_order->Order();
  329. // m_api->LoggerOut(MTLogOK, L"%lld add order #%lld, original order #%lld", login, new_order_id, order->Order());
  330. //
  331. // // 完成之后,写入新纪录
  332. // context.cur_ord = new_order_id;
  333. // int direction = 1;
  334. // if (order->Type() == IMTOrder::OP_SELL)
  335. // direction = -1;
  336. // context.volume += direction * order->VolumeInitial();
  337. //
  338. // m_api->LoggerOut(MTLogOK, L"write cache, key %s, field %s", s2ws(order_buf).c_str(), s2ws(login_buf).c_str());
  339. //
  340. // m_redis_client->hset(order_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r)
  341. // {
  342. // if (r.ko())
  343. // {
  344. // m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  345. // }
  346. // });
  347. // // 下一次get会调用commit
  348. // }
  349. // }
  350. //}
  351. //void CPluginInstance::OnOrderClean(const UINT64 login)
  352. //{
  353. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  354. // if (!m_enable) return;
  355. // if (login != m_trader) return;
  356. //
  357. // m_api->LoggerOut(MTLogOK, L"OnOrderClean, Login: %d", login);
  358. //}
  359. //void CPluginInstance::OnDealAdd(const IMTDeal * deal)
  360. //{
  361. // if (deal == nullptr) return;
  362. //
  363. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  364. // if (!m_enable) return;
  365. // if (deal->Login() != m_trader) return;
  366. //
  367. // m_api->LoggerOut(MTLogOK, L"OnDealAdd, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  368. // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  369. //}
  370. //void CPluginInstance::OnDealUpdate(const IMTDeal * deal)
  371. //{
  372. // if (deal == nullptr) return;
  373. //
  374. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  375. // if (!m_enable) return;
  376. // if (deal->Login() != m_trader) return;
  377. //
  378. // m_api->LoggerOut(MTLogOK, L"OnDealUpdate, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  379. // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  380. //}
  381. //void CPluginInstance::OnDealDelete(const IMTDeal * deal)
  382. //{
  383. // if (deal == nullptr) return;
  384. //
  385. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  386. // if (!m_enable) return;
  387. // if (deal->Login() != m_trader) return;
  388. //
  389. // m_api->LoggerOut(MTLogOK, L"OnDealDelete, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  390. // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  391. //}
  392. //void CPluginInstance::OnDealClean(const UINT64 login)
  393. //{
  394. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  395. // if (!m_enable) return;
  396. // if (login != m_trader) return;
  397. //
  398. // m_api->LoggerOut(MTLogOK, L"OndealClean, Login: %d", login);
  399. //}
  400. //void CPluginInstance::OnDealPerform(const IMTDeal * deal, IMTAccount * account, IMTPosition * position)
  401. //{
  402. // // position为nullptr时,说明该deal不是由交易本身触发
  403. // // account是deal完成后的用户状况
  404. // // position是交易完成后的持仓状况
  405. // // 对于deal是关闭一个持仓的情况,该position的volume会是0
  406. // if (position == nullptr
  407. // || deal == nullptr
  408. // || account == nullptr)
  409. // return;
  410. //
  411. // std::lock_guard<decltype(m_lock)> lk(m_lock);
  412. // if (!m_enable) return;
  413. // if (deal->Login() != m_trader) return;
  414. //
  415. // //m_api->LoggerOut(MTLogOK, L"OnDealPerform, login: %lld, deal: %lld, ord: %lld, pos: %lld, vol: %lld, volc: %lld",
  416. // // deal->Login(), deal->Deal(), deal->Order(), deal->PositionID(), deal->Volume(), deal->VolumeClosed());
  417. //
  418. // // FIXME: 需要检查现在的deal是否和之前的order对应。如果没有记录到,则没法根本无法正常跟单以及平仓
  419. // char order_buf[128];
  420. // sprintf(order_buf, "%lld", deal->PositionID());
  421. // IMTDeal* new_deal = m_api->DealCreate();
  422. //
  423. // std::vector<std::string>* fields = nullptr;
  424. //
  425. // ScopeGuard guard([new_deal, &fields, this]
  426. // {
  427. // new_deal->Release();
  428. // if (fields)
  429. // {
  430. // fields->clear();
  431. // delete fields;
  432. // fields = nullptr;
  433. // }
  434. //
  435. // // 退出前调用commit
  436. // m_redis_client->commit();
  437. // });
  438. //
  439. // char login_buf[128];
  440. // for (auto login : m_followers)
  441. // {
  442. // ScopeGuard g([position, &fields, this, login, login_buf]()
  443. // {
  444. // if (position->Volume() == 0)
  445. // {
  446. // if (fields == nullptr)
  447. // fields = new(std::vector<std::string>);
  448. //
  449. // m_api->LoggerOut(MTLogOK, L"add field %lld to be delete", login);
  450. // fields->push_back(login_buf);
  451. // }
  452. // });
  453. //
  454. // sprintf(login_buf, "%lld", login);
  455. // auto fut = m_redis_client->hget(order_buf, login_buf);
  456. // m_redis_client->sync_commit();
  457. // auto reply = fut.get();
  458. //
  459. // m_api->LoggerOut(MTLogOK, L"%lld add deal, original deal #%lld", login, deal->Deal());
  460. //
  461. // // 如果不存在,忽略
  462. // if (reply.ko()) continue;
  463. // if (reply.is_null()) continue;
  464. //
  465. // // 获取context
  466. // position_context context;
  467. // memcpy(&context, reply.as_string().c_str(), sizeof(position_context));
  468. //
  469. // // 按建仓时的叙述,如果level或者position为0,亦忽略该记录
  470. // if (context.level == 0) continue;
  471. // if (context.position_id == 0) continue;
  472. //
  473. // uint64_t volume = context.level * deal->Volume() / 100;
  474. // uint64_t volume_ext = context.level * deal->VolumeExt() / 100;
  475. // uint64_t volume_closed = context.level * deal->VolumeClosed() / 100;
  476. // uint64_t volume_closed_ext = context.level * deal->VolumeClosed() / 100;
  477. //
  478. // double prop = (double)volume / deal->Volume();
  479. // double profit = deal->Profit() * prop;
  480. // double commission = deal->Commission() * prop;
  481. // double storage = deal->Storage() * prop;
  482. // double raw_profit = deal->ProfitRaw() * prop;
  483. //
  484. // m_api->LoggerOut(MTLogOK, L"add new deal, volume %lld, volume closed %lld, order #%lld, position #%lld", volume, volume_closed, context.cur_ord, context.position_id);
  485. //
  486. // new_deal->Clear();
  487. // new_deal->Assign(deal);
  488. //
  489. // new_deal->Login(login);
  490. // new_deal->DealSet(0);
  491. // new_deal->Volume(volume);
  492. // new_deal->VolumeExt(volume_ext);
  493. // new_deal->VolumeClosed(volume_closed);
  494. // new_deal->VolumeClosedExt(volume_closed_ext);
  495. // new_deal->ProfitRaw(raw_profit);
  496. // new_deal->Profit(profit);
  497. // new_deal->Commission(commission);
  498. // new_deal->Storage(storage);
  499. // new_deal->Order(context.cur_ord);
  500. // new_deal->PositionID(context.position_id);
  501. //
  502. // MTAPIRES ret = m_api->DealAdd(new_deal);
  503. // if (ret != MT_RET_OK)
  504. // {
  505. // // TODO: 有没有更多的错误处理?
  506. // m_api->LoggerOut(MTLogErr, L"%lld cannot add deal [%d] to order #%lld, original deal: #%lld", login, ret, context.cur_ord, deal->Deal());
  507. // continue;
  508. // }
  509. //
  510. // m_api->LoggerOut(MTLogOK, L"add deal #%lld, original deal #%lld", new_deal->Deal(), deal->Deal());
  511. //
  512. // m_redis_client->publish("mt5_balance_fix", login_buf, [this](cpp_redis::reply r)
  513. // {
  514. // if (r.ko())
  515. // {
  516. // m_api->LoggerOut(MTLogErr, L"redis publish: %s", r.error().c_str());
  517. // }
  518. // });
  519. // // commit 在退出前完成
  520. // }
  521. //
  522. // m_api->LoggerOut(MTLogOK, L"deal #%lld, position #%lld, volume %lld", deal->Deal(), position->Position(), position->Volume());
  523. // if (position->Volume() == 0)
  524. // {
  525. // // 如果平仓,则删除hash值
  526. // if (position->Volume() == 0 && fields != nullptr)
  527. // {
  528. // m_redis_client->hdel(order_buf, *fields, [this](cpp_redis::reply& r)
  529. // {
  530. // if (r.ko())
  531. // {
  532. // // TODO 错误处理
  533. // try
  534. // {
  535. // m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  536. // }
  537. // catch (...)
  538. // {
  539. // }
  540. // }
  541. // });
  542. // }
  543. //
  544. // // TODO 当position中的volume为0时,持仓被彻底平调,被跟订单是否也该检查
  545. // }
  546. //}
  547. void CPluginInstance::OnTradeRequestProcess(const IMTRequest* request, const IMTConfirm* confirm, const IMTConGroup* group,
  548. const IMTConSymbol* symbol, const IMTPosition* position, const IMTOrder* order, const IMTDeal* deal)
  549. {
  550. if (request == nullptr
  551. || group == nullptr
  552. || symbol == nullptr)
  553. {
  554. return;
  555. }
  556. UINT64 login = request->Login();
  557. bool found = false;
  558. for (auto follower : m_followers)
  559. {
  560. if (login == follower)
  561. {
  562. found = true;
  563. break;
  564. }
  565. }
  566. // 未找到直接退出
  567. if (!found) return;
  568. // 获取request cache并删除
  569. request_cache cache;
  570. std::string cache_key = std::string("reqid_") + std::to_string(request->ID());
  571. auto fut = m_redis_client->get(cache_key);
  572. m_redis_client->sync_commit();
  573. auto reply = fut.get();
  574. m_api->LoggerOut(MTLogOK, L"request cache, key: %s", cache_key.c_str());
  575. if (reply.ko()) return;
  576. if (reply.is_null()) return;
  577. memcpy(&cache, reply.as_string().c_str(), sizeof(request_cache));
  578. std::vector<std::string> keys = { cache_key };
  579. m_redis_client->del(keys, [this](cpp_redis::reply& r)
  580. {
  581. if (r.ko())
  582. {
  583. m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  584. }
  585. });
  586. // 获取做单上下文
  587. position_context context;
  588. char login_buf[128] = { 0 };
  589. char orig_pos_buf[128] = { 0 };
  590. sprintf(orig_pos_buf, "%lld", cache.orig_position);
  591. sprintf(login_buf, "%lld", cache.login);
  592. auto ctx_fut = m_redis_client->hget(orig_pos_buf, login_buf);
  593. m_redis_client->sync_commit();
  594. auto ctx_reply = fut.get();
  595. m_api->LoggerOut(MTLogOK, L"request cache, key: %s", cache_key.c_str());
  596. if (ctx_reply.ko()) return;
  597. if (ctx_reply.is_null()) return;
  598. memcpy(&context, ctx_reply.as_string().c_str(), sizeof(position_context));
  599. // 修改跟单上下文
  600. int direction = 1;
  601. if (order->Type() == IMTOrder::OP_SELL) direction = -1;
  602. context.volume += direction * order->VolumeCurrent();
  603. context.position_id = position->Position();
  604. m_api->LoggerOut(MTLogOK, L"writeback context, key: %s, field: %s", orig_pos_buf, login_buf);
  605. // 写入
  606. m_redis_client->hset(orig_pos_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r)
  607. {
  608. if (r.ko())
  609. {
  610. m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  611. }
  612. });
  613. m_redis_client->commit();
  614. }
  615. MTAPIRES CPluginInstance::LoadParam()
  616. {
  617. MTAPIRES res = MT_RET_OK;
  618. IMTConParam* param = NULL;
  619. CMTStr128 tmp;
  620. if (!m_api || !m_config) return MT_RET_ERR_PARAMS;
  621. if ((res = m_api->PluginCurrent(m_config)) != MT_RET_OK)
  622. {
  623. m_api->LoggerOut(MTLogErr, L"failed to get current plugin configuration [%s (%u)]",
  624. SMTFormat::FormatError(res), res);
  625. return res;
  626. }
  627. if ((param = m_api->PluginParamCreate()) == NULL)
  628. {
  629. m_api->LoggerOut(MTLogErr, L"failed to create plugin parameter object");
  630. return MT_RET_ERR_MEM;
  631. }
  632. ScopeGuard param_release([param]()
  633. {
  634. param->Release();
  635. });
  636. std::lock_guard<decltype(m_lock)> lk(m_lock);
  637. //m_api->LoggerOut(MTLogOK, L"Load redis server params");
  638. if ((res = m_config->ParameterGet(L"Redis Server", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  639. {
  640. return(MT_RET_ERR_PARAMS);
  641. }
  642. std::string redis_server = ws2s(param->ValueString());
  643. if ((res = m_config->ParameterGet(L"Redis Port", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  644. {
  645. return(MT_RET_ERR_PARAMS);
  646. }
  647. int redis_port = param->ValueInt();
  648. //if ((res = m_config->ParameterGet(L"Redis User", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  649. //{
  650. // return(MT_RET_ERR_PARAMS);
  651. //}
  652. //std::string redis_user = ws2s(param->ValueString());
  653. if ((res = m_config->ParameterGet(L"Redis Password", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  654. {
  655. return(MT_RET_ERR_PARAMS);
  656. }
  657. std::string redis_password = ws2s(param->ValueString());
  658. if (m_redis_server != redis_server
  659. || m_redis_port != redis_port
  660. //|| m_redis_user != redis_user
  661. || m_redis_password != redis_password)
  662. {
  663. m_redis_server = redis_server;
  664. m_redis_port = redis_port;
  665. //m_redis_user = redis_user;
  666. m_redis_password = redis_password;
  667. stop_redis();
  668. if (start_redis())
  669. {
  670. if (!m_work_thread.joinable())
  671. {
  672. m_work_thread = std::thread(&CPluginInstance::keep_alive, this);
  673. }
  674. }
  675. else
  676. {
  677. m_api->LoggerOut(MTLogErr, L"failed to connect to redis server");
  678. return MT_RET_ERR_CONNECTION;
  679. }
  680. }
  681. //m_api->LoggerOut(MTLogOK, L"Load trade params");
  682. if ((res = m_config->ParameterGet(L"Trader", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  683. {
  684. return(MT_RET_ERR_PARAMS);
  685. }
  686. m_trader = param->ValueInt();
  687. if ((res = m_config->ParameterGet(L"Step", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  688. {
  689. return(MT_RET_ERR_PARAMS);
  690. }
  691. m_step = param->ValueInt();
  692. if ((res = m_config->ParameterGet(L"Tolerance", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
  693. {
  694. return(MT_RET_ERR_PARAMS);
  695. }
  696. m_tolerance = param->ValueInt();
  697. if ((res = m_config->ParameterGet(L"Groups", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  698. {
  699. return(MT_RET_ERR_PARAMS);
  700. }
  701. wcsncpy(m_groups, param->ValueString(), 1024);
  702. //if ((res = m_config->ParameterGet(L"Logins", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
  703. //{
  704. // return(MT_RET_ERR_PARAMS);
  705. //}
  706. //m_logins = param->ValueString();
  707. res = LoadLogins();
  708. if (res != MT_RET_OK)
  709. {
  710. m_api->LoggerOut(MTLogErr, L"failed to load clients [%d]", res);
  711. return res;
  712. }
  713. //m_api->LoggerOut(MTLogOK, L"Load param success");
  714. m_enable = true;
  715. return MT_RET_OK;
  716. }
  717. MTAPIRES CPluginInstance::LoadLogins()
  718. {
  719. m_followers.clear();
  720. IMTConGroup* group = m_api->GroupCreate();
  721. ScopeGuard group_guard([&]()
  722. {
  723. group->Release();
  724. });
  725. if (group == nullptr) return MT_RET_ERR_MEM;
  726. UINT total = m_api->GroupTotal();
  727. MTAPIRES ret = MT_RET_OK;
  728. std::vector<std::wstring> groups;
  729. for (int i = 0; i < total; i++)
  730. {
  731. ret = m_api->GroupNext(i, group);
  732. if (ret != MT_RET_OK) break;
  733. if (CheckGroup(m_groups, group->Group()) == FALSE) continue;
  734. m_api->LoggerOut(MTLogOK, L"group %s matched config", group->Group());
  735. groups.push_back(group->Group());
  736. }
  737. for (auto& group : groups)
  738. {
  739. UINT64* logins = nullptr;
  740. UINT total_users = 0;
  741. ret = m_api->UserLogins(group.c_str(), logins, total_users);
  742. m_api->LoggerOut(MTLogOK, L"add group %s, total users: %d", group.c_str(), total_users);
  743. if (ret == MT_RET_OK && logins != nullptr)
  744. {
  745. for (int i = 0; i < total_users; ++i)
  746. {
  747. m_api->LoggerOut(MTLogOK, L"add %d to list", logins[i]);
  748. m_followers.push_back(logins[i]);
  749. }
  750. }
  751. if (logins)
  752. {
  753. m_api->Free(logins);
  754. }
  755. }
  756. return ret;
  757. }
  758. bool CPluginInstance::start_redis()
  759. {
  760. try
  761. {
  762. m_redis_error_notified = false;
  763. if (m_redis_client)
  764. {
  765. if (m_redis_client->is_connected())
  766. return true;
  767. }
  768. m_redis_client.reset(new cpp_redis::client);
  769. m_redis_client->connect
  770. (
  771. m_redis_server,
  772. m_redis_port,
  773. [this](const std::string& host, std::size_t port, cpp_redis::connect_state status)
  774. {
  775. if (status == cpp_redis::connect_state::ok)
  776. {
  777. m_api->LoggerOut(MTLogOK, L"redis server connected");
  778. }
  779. },
  780. 500,
  781. 10000000,
  782. 1000
  783. );
  784. if (m_redis_password != "")
  785. {
  786. auto fut = m_redis_client->auth(m_redis_password);
  787. m_redis_client->sync_commit();
  788. auto reply = fut.get();
  789. if (reply.is_error())
  790. {
  791. m_api->LoggerOut(MTLogErr, L"connect: authentication failed");
  792. return false;
  793. }
  794. }
  795. }
  796. catch (tacopie::tacopie_error& e)
  797. {
  798. m_api->LoggerOut(MTLogErr, L"redis conn: %s", e.what());
  799. return false;
  800. }
  801. catch (std::exception& e)
  802. {
  803. m_api->LoggerOut(MTLogErr, L"redis conn: %s", e.what());
  804. return false;
  805. }
  806. catch (...)
  807. {
  808. m_api->LoggerOut(MTLogErr, L"failed to connect to server");
  809. return false;
  810. }
  811. return true;
  812. }
  813. void CPluginInstance::stop_redis()
  814. {
  815. try
  816. {
  817. if (m_redis_client)
  818. {
  819. // 使用sync commit保证操作全部提交
  820. m_redis_client->sync_commit();
  821. m_redis_client->disconnect();
  822. m_redis_client.reset();
  823. }
  824. }
  825. catch (tacopie::tacopie_error& e)
  826. {
  827. m_api->LoggerOut(MTLogErr, L"stop redis: %s", e.what());
  828. }
  829. catch (std::exception& e)
  830. {
  831. m_api->LoggerOut(MTLogErr, L"stop redis: %s", e.what());
  832. }
  833. catch (...)
  834. {
  835. m_api->LoggerOut(MTLogErr, L"stop redis: unknown error");
  836. }
  837. }
  838. void CPluginInstance::keep_alive()
  839. {
  840. using namespace std::chrono_literals;
  841. int counter = 500;
  842. while (m_enable)
  843. {
  844. if (counter-- <= 0)
  845. {
  846. counter = 500;
  847. std::lock_guard<decltype(m_lock)> lk(m_lock);
  848. if (m_redis_client)
  849. {
  850. if (m_redis_client->is_connected())
  851. {
  852. m_redis_client->ping([this](cpp_redis::reply& r)
  853. {
  854. if (r.ko())
  855. {
  856. m_redis_error_notified = true;
  857. if (!m_redis_error_notified)
  858. {
  859. m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
  860. }
  861. }
  862. });
  863. }
  864. else
  865. {
  866. m_redis_error_notified = true;
  867. if (!m_redis_error_notified)
  868. {
  869. m_api->LoggerOut(MTLogErr, L"redis server not connected");
  870. }
  871. stop_redis();
  872. start_redis();
  873. }
  874. }
  875. }
  876. std::this_thread::sleep_for(16ms);
  877. }
  878. }