KarsusNeko пре 6 година
родитељ
комит
d1b5c62ba8
3 измењених фајлова са 272 додато и 9 уклоњено
  1. 234 0
      MT5MonkMAM/ManagerExtension.cpp
  2. 36 9
      MT5MonkMAM/ManagerExtension.h
  3. 2 0
      MT5MonkMAM/pch.h

+ 234 - 0
MT5MonkMAM/ManagerExtension.cpp

@@ -70,6 +70,30 @@ bool CManagerExtension::Initialize()
 	}
 	//m_admin->Subscribe(this);
 	//--- ok
+
+	// 连接redis
+	stop_redis();
+	if (start_redis())
+	{
+		m_run = true;
+		if (!m_work_thread.joinable())
+		{
+			m_work_thread = std::thread(&CManagerExtension::keep_alive, this);
+		}
+	}
+	else
+	{
+		LogOut(tools::logger::LVFATAL, "failed to connect to redis server");
+		return false;
+	}
+
+	// 根据组配置加载login
+	retcode = LoadLogins();
+	if (retcode != MT_RET_OK)
+	{
+		return false;
+	}
+
 	return true;
 }
 
@@ -197,6 +221,126 @@ void CManagerExtension::OnOrderDelete(const IMTOrder * order)
 	if (order->Login() != m_trader)	return;
 
 	LogOut(tools::logger::LVTRACK, "OnOrderDelete");
+
+	IMTAccount* account = m_manager->UserCreateAccount();
+	IMTRequest* request = m_manager->RequestCreate();
+	ScopeGuard guard([request, account]()
+	{
+		if (request)
+			request->Release();
+		if (account)
+			account->Release();
+	});
+
+	UINT64 orig_position = order->PositionID();
+	char order_buf[128];
+	sprintf(order_buf, "%lld", order->PositionID());
+
+	char request_buf[64] = { 0 };
+	char login_buf[128] = { 0 };
+	if (order->Order() == order->PositionID())
+	{
+
+		for (auto login : m_logins)
+		{
+			MTAPIRES res = MT_RET_OK;
+
+			if (m_manager->UserAccountGet(login, account) != MT_RET_OK)
+			{
+				// TODO: 这里失败直接跳过没有做其他处理,下一次进来可能依然会遇到一样的问题
+				continue;
+			}
+
+			UINT level = (UINT)(account->Balance() / m_step);
+			UINT volume = round((double)level * order->VolumeCurrent() / 10000) * 100;
+
+			request->Clear();
+			request->Login(login);
+			request->SourceLogin(1005);
+			request->Action(IMTRequest::TA_DEALER_POS_EXECUTE);
+			request->Type(order->Type());
+			request->Symbol(order->Symbol());
+			request->PriceOrder(order->PriceOrder());
+			request->Volume(volume);
+
+			UINT request_id = 0;
+			res = m_manager->DealerSend(request, this, request_id);
+
+			// 存储下这个请求的cache,给插件使用
+			// TODO:这里有可能插件的OnTradeRequestAdd回调比写入缓存更早调用
+			request_cache cache;
+			cache.request = request_id;
+			cache.orig_position = orig_position;
+			cache.login = login;
+			int direction = 1;
+			if (order->Type() == IMTOrder::OP_SELL)
+				direction = -1;
+
+			sprintf(request_buf, "reqid_%d", request_id);
+			m_redis_client->hset(request_buf, login_buf, std::string((char*)&cache, sizeof(position_context)), [this](cpp_redis::reply& r)
+			{
+				if (r.ko())
+				{
+					LogOut(tools::logger::LVERROR, "redis: %s", r.error().c_str());
+				}
+			});
+		}
+	}
+	else
+	{
+		for (auto login : m_logins)
+		{
+			sprintf(login_buf, "%lld", login);
+			auto fut = m_redis_client->hget(order_buf, login_buf);
+			m_redis_client->sync_commit();
+			auto reply = fut.get();
+
+			LogOut(tools::logger::LVTRACK, "request cache, key %s, field %lld", order_buf, login_buf);
+
+			// 如果不存在,忽略
+			if (reply.ko())	continue;
+			if (reply.is_null())	continue;
+			MTAPIRES res = MT_RET_OK;
+
+			if (m_manager->UserAccountGet(login, account) != MT_RET_OK)
+			{
+				// TODO: 这里失败直接跳过没有做其他处理,下一次进来可能依然会遇到一样的问题
+				continue;
+			}
+
+			UINT level = (UINT)(account->Balance() / m_step);
+			UINT volume = round((double)level * order->VolumeCurrent() / 10000) * 100;
+
+			request->Clear();
+			request->Login(login);
+			request->SourceLogin(1005);
+			request->Action(IMTRequest::TA_DEALER_POS_EXECUTE);
+			request->Type(order->Type());
+			request->Symbol(order->Symbol());
+			request->PriceOrder(order->PriceOrder());
+			request->Volume(volume);
+
+			UINT request_id = 0;
+			res = m_manager->DealerSend(request, this, request_id);
+
+			request_cache cache;
+			cache.request = request_id;
+			cache.orig_position = orig_position;
+			cache.login = login;
+			int direction = 1;
+			if (order->Type() == IMTOrder::OP_SELL)
+				direction = -1;
+
+			sprintf(request_buf, "reqid_%d", request_id);
+			m_redis_client->hset(request_buf, login_buf, std::string((char*)&cache, sizeof(position_context)), [this](cpp_redis::reply& r)
+			{
+				if (r.ko())
+				{
+					LogOut(tools::logger::LVERROR, "redis: %s", r.error().c_str());
+				}
+			});
+		}
+	}
 }
 
 void CManagerExtension::OnConnect()
@@ -209,6 +353,44 @@ void CManagerExtension::OnDisconnect()
 	LogOut(tools::logger::LVLOG, "Server lost connection");
 }
 
+void CManagerExtension::OnDealPerform(const IMTDeal * deal, IMTAccount * account, IMTPosition * position)
+{
+	// deal perform会在order delete之后调用
+	// 也许把order delete中的内容移动到这里来更好?
+	// 如果持仓被平,则遍历记录,删除所有记录
+	if (deal == nullptr
+		|| account == nullptr
+		|| position == nullptr)
+		return;
+
+	if (account->Login() != m_trader)	return;
+	if (position->Volume() != 0)		return;
+
+	char position_buf[128] = { 0 };
+	sprintf(position_buf, "%lld", position->Position());
+
+	std::vector<std::string> fields;
+	for (auto login : m_logins)
+	{
+		fields.push_back(std::to_string(login));
+	}
+
+	m_redis_client->hdel(position_buf, fields, [this](cpp_redis::reply& r)
+	{
+		if (r.ko())
+		{
+			// TODO 错误处理
+			try
+			{
+				LogOut(tools::logger::LVERROR, "redis: %s", r.error().c_str());
+			}
+			catch (...)
+			{
+			}
+		}
+	});
+}
+
 void CManagerExtension::SetTrader(const UINT64 trader)
 {
 	m_trader = trader;
@@ -381,3 +563,55 @@ void CManagerExtension::keep_alive()
 		std::this_thread::sleep_for(16ms);
 	}
 }
+
+MTAPIRES CManagerExtension::LoadLogins()
+{
+	m_logins.clear();
+
+	IMTConGroup* group = m_manager->GroupCreate();
+	ScopeGuard group_guard([&]()
+	{
+		group->Release();
+	});
+
+	if (group == nullptr)	return MT_RET_ERR_MEM;
+	UINT total = m_manager->GroupTotal();
+
+	MTAPIRES ret = MT_RET_OK;
+	std::vector<std::wstring> groups;
+	for (int i = 0; i < total; i++)
+	{
+		ret = m_manager->GroupNext(i, group);
+		if (ret != MT_RET_OK)	break;
+		char groups_buf[256] = { 0 };
+		sprintf(groups_buf, "%s", m_groups.c_str());
+		if (CheckGroup(groups_buf, ws2s(group->Group()).c_str()) == FALSE)	continue;
+
+		LogOut(tools::logger::LVTRACK, "group %s matched config", group->Group());
+		groups.push_back(group->Group());
+	}
+
+	for (auto& group : groups)
+	{
+		UINT64* logins = nullptr;
+		UINT total_users = 0;
+
+		ret = m_manager->UserLogins(group.c_str(), logins, total_users);
+		LogOut(tools::logger::LVTRACK, "add group %s, total users: %d", group.c_str(), total_users);
+		if (ret == MT_RET_OK && logins != nullptr)
+		{
+			for (int i = 0; i < total_users; ++i)
+			{
+				LogOut(tools::logger::LVTRACK, "add %d to list", logins[i]);
+				m_logins.push_back(logins[i]);
+			}
+		}
+
+		if (logins)
+		{
+			m_manager->Free(logins);
+		}
+	}
+
+	return ret;
+}

+ 36 - 9
MT5MonkMAM/ManagerExtension.h

@@ -2,6 +2,9 @@
 
 #include <memory>
 #include <mutex>
+#include <thread>
+
+#include <ScopeGuard.hpp>
 
 #include <cpp_redis/cpp_redis>
 #include <WinSock2.h>
@@ -14,9 +17,28 @@
 
 #include "Misc.h"
 
+struct position_context
+{
+	UINT64	level;
+	UINT64	position_id;
+	UINT64	request_id;
+	UINT64	cur_ord;
+	UINT64	volume; // Õý±íʾbuy£¬¸º±íʾsell
+};
+
+struct request_cache
+{
+	UINT	request;
+	UINT64	orig_position;
+	UINT64	dest_position;
+	UINT64	login;
+};
+
 class CManagerExtension
 	: public IMTManagerSink
 	, public IMTOrderSink
+	, public IMTDealerSink
+	, public IMTDealSink
 {
 	//--- constants     
 	enum
@@ -34,17 +56,19 @@ public:
 	CManagerExtension();
 	~CManagerExtension();
 	//--- init and shutdown
-	bool              Initialize();
-	void              Shutdown();
+	bool            Initialize();
+	void            Shutdown();
 	//--- connect, disconnect
-	bool              Connect(const CMTStr &server, const UINT64 login, const CMTStr &password);
-	void              Disconnect();
+	bool            Connect(const CMTStr &server, const UINT64 login, const CMTStr &password);
+	void            Disconnect();
 
-	virtual void      OnOrderAdd(const IMTOrder* order);
-	virtual void      OnOrderDelete(const IMTOrder* order);
+	//virtual void      OnOrderAdd(const IMTOrder* order);
+	virtual void    OnOrderDelete(const IMTOrder* order);
 
-	virtual void		 OnConnect();
-	virtual void		 OnDisconnect();
+	virtual void	OnConnect();
+	virtual void	OnDisconnect();
+
+	virtual void	OnDealPerform(const IMTDeal* deal, IMTAccount* account, IMTPosition* position);
 
 	void			SetTrader(const UINT64 trader);
 	void			SetGroups(const std::string& groups);
@@ -52,7 +76,7 @@ public:
 	void			SetTolerance(const int tolerance);
 
 	void			SetLogger(const std::shared_ptr<tools::logger>& logger);
-	void			  LogOut(const tools::logger::log_level level, const char* log, ...);
+	void			LogOut(const tools::logger::log_level level, const char* log, ...);
 private:
 	std::shared_ptr<tools::logger>	m_logger;
 	std::mutex m_lock;
@@ -75,4 +99,7 @@ private:
 	UINT64			m_trader;
 	int				m_step;
 	int				m_tolerance;
+
+	std::vector<UINT64>	m_logins;
+	MTAPIRES		LoadLogins();
 };

+ 2 - 0
MT5MonkMAM/pch.h

@@ -16,4 +16,6 @@
 #include <MT5APIManager.h>
 #include "ManagerExtension.h"
 
+#include "Misc.h"
+
 #endif //PCH_H