Переглянути джерело

修复bug,大致完成功能

KarsusNeko 6 роки тому
батько
коміт
72f1553741

+ 85 - 39
MT5MonkMAM/ManagerExtension.cpp

@@ -87,13 +87,6 @@ bool CManagerExtension::Initialize()
 		return false;
 	}
 
-	// 根据组配置加载login
-	retcode = LoadLogins();
-	if (retcode != MT_RET_OK)
-	{
-		return false;
-	}
-
 	return true;
 }
 
@@ -153,6 +146,10 @@ bool CManagerExtension::Connect(const CMTStr & server, const UINT64 login, const
 		LogOut(tools::logger::LVFATAL, "Connection failed [%u]\n", retcode);
 		return false;
 	}
+
+	// 根据组配置加载login
+	// 在回调可能产生前调用此函数
+
 	if (!m_manager_pumping)
 		return false;
 	if ((retcode = m_manager_pumping->Connect
@@ -161,7 +158,10 @@ bool CManagerExtension::Connect(const CMTStr & server, const UINT64 login, const
 			login,
 			password.Str(),
 			L"",
-			IMTManagerAPI::PUMP_MODE_ORDERS,
+			IMTManagerAPI::PUMP_MODE_ORDERS
+			| IMTManagerAPI::PUMP_MODE_GROUPS
+			| IMTManagerAPI::PUMP_MODE_USERS
+			| IMTManagerAPI::PUMP_MODE_POSITIONS,
 			CONNECTION_TIMEOUT
 		)) != MT_RET_OK)
 	{
@@ -188,6 +188,13 @@ bool CManagerExtension::Connect(const CMTStr & server, const UINT64 login, const
 		m_work_thread = std::thread(&CManagerExtension::keep_alive, this);
 
 	m_dealer_login = login;
+
+	retcode = LoadLogins();
+	if (retcode != MT_RET_OK)
+	{
+		return false;
+	}
+
 	//--- ok
 	return true;
 }
@@ -222,7 +229,9 @@ void CManagerExtension::OnOrderDelete(const IMTOrder * order)
 	if (order == nullptr)	return;
 	if (order->Login() != m_trader)	return;
 
-	LogOut(tools::logger::LVTRACK, "OnOrderDelete");
+	//LogOut(tools::logger::LVTRACK, "OnOrderDelete");
+	LogOut(tools::logger::LVTRACK, "OnOrderDelete, login: %lld, ord: %lld, pos: %lld, state: %d, vol_init: %lld, vol_cur: %lld",
+		order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent());
 
 	IMTAccount* account = m_manager->UserCreateAccount();
 	IMTRequest* request = m_manager->RequestCreate();
@@ -235,8 +244,8 @@ void CManagerExtension::OnOrderDelete(const IMTOrder * order)
 	});
 
 	UINT64 orig_position = order->PositionID();
-	char order_buf[128];
-	sprintf(order_buf, "%lld", order->PositionID());
+	char position_buf[128];
+	sprintf(position_buf, "%lld", order->PositionID());
 
 	char request_buf[64] = { 0 };
 	char login_buf[128] = { 0 };
@@ -247,14 +256,18 @@ void CManagerExtension::OnOrderDelete(const IMTOrder * order)
 		{
 			MTAPIRES res = MT_RET_OK;
 
-			if (m_manager->UserAccountGet(login, account) != MT_RET_OK)
+			if (m_manager_pumping->UserAccountGet(login, account) != MT_RET_OK)
 			{
 				// TODO: 这里失败直接跳过没有做其他处理,下一次进来可能依然会遇到一样的问题
+				LogOut(tools::logger::LVTRACK, "failed to get account: %lld", login);
 				continue;
 			}
 
 			UINT level = (UINT)(account->Balance() / m_step);
-			UINT volume = round((double)level * order->VolumeCurrent() / 10000) * 100;
+			UINT64 volume = round((double)level * order->VolumeInitial() / 10000) * 100;
+
+			LogOut(tools::logger::LVTRACK, "start position, login: %lld, level: %d, vol: %lld", login, level, volume);
+			if (volume <= 0)	continue;
 
 			request->Clear();
 			request->Login(login);
@@ -268,24 +281,43 @@ void CManagerExtension::OnOrderDelete(const IMTOrder * order)
 			UINT request_id = 0;
 			res = m_manager->DealerSend(request, this, request_id);
 
+			LogOut(tools::logger::LVTRACK, "start position, login: %lld, req id: %d", login, request_id);
+
 			// 存储下这个请求的cache,给插件使用
 			// TODO:这里有可能插件的OnTradeRequestAdd回调比写入缓存更早调用
 			request_cache cache;
 			cache.request = request_id;
 			cache.orig_position = orig_position;
 			cache.login = login;
-			int direction = 1;
-			if (order->Type() == IMTOrder::OP_SELL)
-				direction = -1;
 
-			sprintf(request_buf, "reqid_%d", request_id);
-			m_redis_client->hset(request_buf, login_buf, std::string((char*)&cache, sizeof(position_context)), [this](cpp_redis::reply& r)
+			sprintf(request_buf, "reqid_%d_%lld", request_id, login);
+			m_redis_client->set(request_buf, std::string((char*)&cache, sizeof(request_cache)), [this](cpp_redis::reply& r)
 			{
 				if (r.ko())
 				{
 					LogOut(tools::logger::LVERROR, "redis: %s", r.error().c_str());
 				}
 			});
+
+			m_redis_client->commit();
+
+			position_context context;
+			context.request_id = request_id;
+			context.level = level;
+			//int direction = 1;
+			//if (order->Type() == IMTOrder::OP_SELL)
+			//	direction = -1;
+
+			sprintf(login_buf, "%lld", login);
+			m_redis_client->hset(position_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r)
+			{
+				if (r.ko())
+				{
+					LogOut(tools::logger::LVERROR, "redis: %s", r.error().c_str());
+				}
+			});
+
+			m_redis_client->commit();
 		}
 	}
 	else
@@ -294,11 +326,11 @@ void CManagerExtension::OnOrderDelete(const IMTOrder * order)
 		{
 			// 获取跟单持仓的上下文
 			sprintf(login_buf, "%lld", login);
-			auto fut = m_redis_client->hget(order_buf, login_buf);
+			auto fut = m_redis_client->hget(position_buf, login_buf);
 			m_redis_client->sync_commit();
 			auto reply = fut.get();
 
-			LogOut(tools::logger::LVTRACK, "request cache, key %s, field %lld", order_buf, login_buf);
+			LogOut(tools::logger::LVTRACK, "request context, key %s, field %s", position_buf, login_buf);
 
 			// 如果不存在,忽略
 			if (reply.ko())	continue;
@@ -309,14 +341,20 @@ void CManagerExtension::OnOrderDelete(const IMTOrder * order)
 			position_context context;
 			memcpy(&context, reply.as_string().c_str(), sizeof(position_context));
 
-			if (m_manager->UserAccountGet(login, account) != MT_RET_OK)
+			LogOut(tools::logger::LVTRACK, "orig position: %s, login: %s, volume: %lld, dest position: %lld", position_buf, login_buf, context.volume, context.position_id);
+
+			if (m_manager_pumping->UserAccountGet(login, account) != MT_RET_OK)
 			{
 				// TODO: 这里失败直接跳过没有做其他处理,下一次进来可能依然会遇到一样的问题
+				LogOut(tools::logger::LVTRACK, "failed to get account: %lld", login);
 				continue;
 			}
 
-			UINT level = (UINT)(account->Balance() / m_step);
-			UINT volume = round((double)level * order->VolumeCurrent() / 10000) * 100;
+			UINT level = context.level;
+			UINT volume = round((double)level * order->VolumeInitial() / 10000) * 100;
+
+			LogOut(tools::logger::LVTRACK, "add order, login: %lld, level: %d, vol: %lld", login, level, volume);
+			if (volume <= 0)	continue;
 
 			request->Clear();
 			request->Login(login);
@@ -332,6 +370,8 @@ void CManagerExtension::OnOrderDelete(const IMTOrder * order)
 			UINT request_id = 0;
 			res = m_manager->DealerSend(request, this, request_id);
 
+			LogOut(tools::logger::LVTRACK, "add order, login: %lld, req id: %d", login, request_id);
+
 			request_cache cache;
 			cache.request = request_id;
 			cache.orig_position = orig_position;
@@ -349,10 +389,10 @@ void CManagerExtension::OnOrderDelete(const IMTOrder * order)
 					LogOut(tools::logger::LVERROR, "redis: %s", r.error().c_str());
 				}
 			});
+
+			m_redis_client->commit();
 		}
 	}
-
-	m_redis_client->commit();
 }
 
 void CManagerExtension::OnConnect()
@@ -376,6 +416,9 @@ void CManagerExtension::OnDealPerform(const IMTDeal * deal, IMTAccount * account
 		return;
 
 	if (account->Login() != m_trader)	return;
+
+	LogOut(tools::logger::LVTRACK, "on deal perform, volume: %lld", position->Volume());
+
 	if (position->Volume() != 0)		return;
 
 	char position_buf[128] = { 0 };
@@ -484,10 +527,7 @@ bool CManagerExtension::start_redis()
 				{
 					LogOut(tools::logger::LVTRACK, "redis server connected");
 				}
-			},
-			500,
-			10000000,
-			1000
+			}
 		);
 
 		if (m_redis_password != "")
@@ -597,26 +637,32 @@ MTAPIRES CManagerExtension::LoadLogins()
 {
 	m_logins.clear();
 
-	IMTConGroup* group = m_manager->GroupCreate();
+	IMTConGroup* group = m_manager_pumping->GroupCreate();
 	ScopeGuard group_guard([&]()
 	{
 		group->Release();
 	});
 
 	if (group == nullptr)	return MT_RET_ERR_MEM;
-	UINT total = m_manager->GroupTotal();
+	UINT total = m_manager_pumping->GroupTotal();
+
+	LogOut(tools::logger::LVTRACK, "total group(s): %d", total);
 
 	MTAPIRES ret = MT_RET_OK;
 	std::vector<std::wstring> groups;
+
+	char groups_buf[256] = { 0 };
+	sprintf(groups_buf, "%s", m_groups);
 	for (int i = 0; i < total; i++)
 	{
-		ret = m_manager->GroupNext(i, group);
-		if (ret != MT_RET_OK)	break;
-		char groups_buf[256] = { 0 };
-		sprintf(groups_buf, "%s", m_groups.c_str());
+		group->Clear();
+		ret = m_manager_pumping->GroupNext(i, group);
+
+		if (ret != MT_RET_OK)	continue;
+
 		if (CheckGroup(groups_buf, ws2s(group->Group()).c_str()) == FALSE)	continue;
 
-		LogOut(tools::logger::LVTRACK, "group %s matched config", group->Group());
+		LogOut(tools::logger::LVTRACK, "group %s matched config", ws2s(group->Group()).c_str());
 		groups.push_back(group->Group());
 	}
 
@@ -625,8 +671,8 @@ MTAPIRES CManagerExtension::LoadLogins()
 		UINT64* logins = nullptr;
 		UINT total_users = 0;
 
-		ret = m_manager->UserLogins(group.c_str(), logins, total_users);
-		LogOut(tools::logger::LVTRACK, "add group %s, total users: %d", group.c_str(), total_users);
+		ret = m_manager_pumping->UserLogins(group.c_str(), logins, total_users);
+		LogOut(tools::logger::LVTRACK, "add group %s, total users: %d", ws2s(group).c_str(), total_users);
 		if (ret == MT_RET_OK && logins != nullptr)
 		{
 			for (int i = 0; i < total_users; ++i)
@@ -638,7 +684,7 @@ MTAPIRES CManagerExtension::LoadLogins()
 
 		if (logins)
 		{
-			m_manager->Free(logins);
+			m_manager_pumping->Free(logins);
 		}
 	}
 

+ 1 - 1
MT5MonkMAM/ManagerExtension.h

@@ -28,7 +28,7 @@ struct position_context
 
 struct request_cache
 {
-	UINT	request;
+	UINT64	request;
 	UINT64	orig_position;
 	//UINT64	dest_position;
 	UINT64	login;

+ 3 - 1
MT5MonkPAMM/MT5MonkMAM.cpp

@@ -21,7 +21,9 @@ MTPluginParam ExtPluginDefaults[] =
 	{ MTPluginParam::TYPE_INT, L"Trader", L"-" },
 	//{ MTPluginParam::TYPE_INT, L"Step", L"-" },
 	//{ MTPluginParam::TYPE_INT, L"Tolerance", L"-" },
-	{ MTPluginParam::TYPE_STRING, L"Groups", L"-" }
+	{ MTPluginParam::TYPE_STRING, L"Groups", L"-" },
+	{ MTPluginParam::TYPE_INT, L"DealerID", L"0" },
+	{ MTPluginParam::TYPE_INT, L"DebugMode", L"0" }
 };
 
 BOOL APIENTRY DllMain(HMODULE hModule, DWORD reason, LPVOID lpReserved)

+ 34 - 12
MT5MonkPAMM/PluginInstance.cpp

@@ -600,21 +600,32 @@ void CPluginInstance::OnTradeRequestProcess(const IMTRequest* request, const IMT
 	// 未找到直接退出
 	if (!found)	return;
 
+	if (request->SourceLogin() != m_dealer)
+	{
+		// 不是指定dealer做的,跳过
+		// 这样可以减少redis的内容
+		DebugOut(MTLogOK, L"dealer not matched: %lld", request->SourceLogin());
+		return;
+	}
+
 	// 获取request cache并删除
 	request_cache cache;
-	std::string cache_key = std::string("reqid_") + std::to_string(request->ID());
+	std::string cache_key = std::string("reqid_") + std::to_string(request->IDClient()) + "_" + std::to_string(request->Login());
 	auto fut = m_redis_client->get(cache_key);
 	m_redis_client->sync_commit();
 	auto reply = fut.get();
 
-	DebugOut(MTLogOK, L"request cache, key: %s", cache_key.c_str());
+	DebugOut(MTLogOK, L"request cache, key: %s", s2ws(cache_key).c_str());
 
 	if (reply.ko())			return;
 	if (reply.is_null())	return;
 
 	memcpy(&cache, reply.as_string().c_str(), sizeof(request_cache));
 
-	std::vector<std::string> keys = { cache_key };
+	DebugOut(MTLogOK, L"delete cache, key: %s", s2ws(cache_key).c_str());
+
+	std::vector<std::string> keys;
+	keys.push_back(cache_key);
 	m_redis_client->del(keys, [this](cpp_redis::reply& r)
 	{
 		if (r.ko())
@@ -622,6 +633,8 @@ void CPluginInstance::OnTradeRequestProcess(const IMTRequest* request, const IMT
 			m_api->LoggerOut(MTLogErr, L"redis: %s", r.error().c_str());
 		}
 	});
+
+	m_redis_client->commit();
 	
 	// 获取做单上下文
 	position_context context;
@@ -631,24 +644,27 @@ void CPluginInstance::OnTradeRequestProcess(const IMTRequest* request, const IMT
 	sprintf(orig_pos_buf, "%lld", cache.orig_position);
 	sprintf(login_buf, "%lld", cache.login);
 
+	DebugOut(MTLogOK, L"request context, key: %s, field: %s", s2ws(orig_pos_buf).c_str(), s2ws(login_buf).c_str());
+
 	auto ctx_fut = m_redis_client->hget(orig_pos_buf, login_buf);
 	m_redis_client->sync_commit();
-	auto ctx_reply = fut.get();
-
-	DebugOut(MTLogOK, L"request cache, key: %s", cache_key.c_str());
+	auto ctx_reply = ctx_fut.get();
 
 	if (ctx_reply.ko())			return;
 	if (ctx_reply.is_null())	return;
 
+	//DebugOut(MTLogOK, L"copy context, key: %s, field: %s", s2ws(orig_pos_buf).c_str(), s2ws(login_buf).c_str());
+
 	memcpy(&context, ctx_reply.as_string().c_str(), sizeof(position_context));
 
 	// 修改跟单上下文
 	int direction = 1;
 	if (order->Type() == IMTOrder::OP_SELL)	direction = -1;
-	context.volume += direction * order->VolumeCurrent();
+	context.volume += direction * order->VolumeInitial();
 	context.position_id = position->Position();
 
-	DebugOut(MTLogOK, L"writeback context, key: %s, field: %s", orig_pos_buf, login_buf);
+	DebugOut(MTLogOK, L"writeback context, key: %s, field: %s, volume: %lld, context.position_id: %lld",
+		s2ws(orig_pos_buf).c_str(), s2ws(login_buf).c_str(), context.volume, context.position_id);
 
 	// 写入
 	m_redis_client->hset(orig_pos_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r)
@@ -761,6 +777,11 @@ MTAPIRES CPluginInstance::LoadParam()
 		return(MT_RET_ERR_PARAMS);
 	}
 	wcsncpy(m_groups, param->ValueString(), 1024);
+	if ((res = m_config->ParameterGet(L"DealerID", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_INT)
+	{
+		return(MT_RET_ERR_PARAMS);
+	}
+	m_dealer = param->ValueInt();
 	//if ((res = m_config->ParameterGet(L"Logins", param)) != MT_RET_OK || param->Type() != IMTConParam::TYPE_STRING)
 	//{
 	//	return(MT_RET_ERR_PARAMS);
@@ -769,6 +790,10 @@ MTAPIRES CPluginInstance::LoadParam()
 	if ((res = m_config->ParameterGet(L"DebugMode", param)) == MT_RET_OK && param->Type() == IMTConParam::TYPE_INT)
 	{
 		m_debug_mode = param->ValueInt();
+		if (m_debug_mode != 0)
+		{
+			m_api->LoggerOut(MTLogOK, L"Debug mode enabled");
+		}
 	}
 
 
@@ -861,10 +886,7 @@ bool CPluginInstance::start_redis()
 				{
 					m_api->LoggerOut(MTLogOK, L"redis server connected");
 				}
-			},
-			500,
-			10000000,
-			1000
+			}
 		);
 
 		if (m_redis_password != "")

+ 2 - 1
MT5MonkPAMM/PluginInstance.h

@@ -26,7 +26,7 @@ struct position_context
 
 struct request_cache
 {
-	UINT	request;
+	UINT64	request;
 	UINT64	orig_position;
 	//UINT64	dest_position;
 	UINT64	login;
@@ -101,6 +101,7 @@ private:
 	wchar_t			m_groups[1024];
 	//std::wstring	m_logins;
 	UINT64			m_trader;
+	UINT64			m_dealer;
 	//int				m_step;
 	//int				m_tolerance;
 	int				m_debug_mode;