فهرست منبع

优化回调中的工作

KarsusNeko 6 سال پیش
والد
کامیت
8b9256fd98
2فایلهای تغییر یافته به همراه157 افزوده شده و 133 حذف شده
  1. 154 133
      MT5MonkMAM/ManagerExtension.cpp
  2. 3 0
      MT5MonkMAM/ManagerExtension.h

+ 154 - 133
MT5MonkMAM/ManagerExtension.cpp

@@ -233,166 +233,177 @@ void CManagerExtension::OnOrderDelete(const IMTOrder * order)
 	LogOut(tools::logger::LVTRACK, "OnOrderDelete, login: %lld, ord: %lld, pos: %lld, state: %d, vol_init: %lld, vol_cur: %lld",
 		order->Login(), order->Order(), order->PositionID(), order->State(), order->VolumeInitial(), order->VolumeCurrent());
 
-	IMTAccount* account = m_manager->UserCreateAccount();
-	IMTRequest* request = m_manager->RequestCreate();
-	ScopeGuard guard([request, account]()
-	{
-		if (request)
-			request->Release();
-		if (account)
-			account->Release();
-	});
-
 	UINT64 orig_position = order->PositionID();
-	char position_buf[128];
-	sprintf(position_buf, "%lld", order->PositionID());
-
-	char request_buf[64] = { 0 };
-	char login_buf[128] = { 0 };
-	if (order->Order() == order->PositionID())
-	{
-
-		for (auto login : m_logins)
+	UINT64 orig_order = order->Order();
+	UINT orig_ordtype = order->Type();
+	UINT64 orig_volume_init = order->VolumeInitial();
+	DOUBLE orig_price = order->PriceOrder();
+	std::wstring orig_symbol = order->Symbol();
+
+	m_tasks.push(
+	[this, orig_position, orig_order, orig_ordtype, orig_volume_init, orig_symbol, orig_price]()
+	{
+		IMTAccount* account = m_manager->UserCreateAccount();
+		IMTRequest* request = m_manager->RequestCreate();
+		ScopeGuard guard([request, account]()
+		{
+			if (request)
+				request->Release();
+			if (account)
+				account->Release();
+		});
+
+		char position_buf[128];
+		sprintf(position_buf, "%lld", orig_position);
+
+		char request_buf[64] = { 0 };
+		char login_buf[128] = { 0 };
+		if (orig_order == orig_position)
 		{
-			MTAPIRES res = MT_RET_OK;
 
-			if (m_manager_pumping->UserAccountGet(login, account) != MT_RET_OK)
+			for (auto login : m_logins)
 			{
-				// TODO: 这里失败直接跳过没有做其他处理,下一次进来可能依然会遇到一样的问题
-				LogOut(tools::logger::LVTRACK, "failed to get account: %lld", login);
-				continue;
-			}
+				MTAPIRES res = MT_RET_OK;
 
-			UINT level = (UINT)(account->Balance() / m_step);
-			UINT64 volume = round((double)level * order->VolumeInitial() / 10000) * 100;
+				if (m_manager_pumping->UserAccountGet(login, account) != MT_RET_OK)
+				{
+					// TODO: 这里失败直接跳过没有做其他处理,下一次进来可能依然会遇到一样的问题
+					LogOut(tools::logger::LVTRACK, "failed to get account: %lld", login);
+					continue;
+				}
 
-			LogOut(tools::logger::LVTRACK, "start position, login: %lld, level: %d, vol: %lld", login, level, volume);
-			if (volume <= 0)	continue;
+				UINT level = (UINT)(account->Balance() / m_step);
+				UINT64 volume = round((double)level * orig_volume_init / 10000) * 100;
 
-			request->Clear();
-			request->Login(login);
-			request->SourceLogin(m_dealer_login);
-			request->Action(IMTRequest::TA_DEALER_POS_EXECUTE);
-			request->Type(order->Type());
-			request->Symbol(order->Symbol());
-			request->PriceOrder(order->PriceOrder());
-			request->Volume(volume);
+				LogOut(tools::logger::LVTRACK, "start position, login: %lld, level: %d, vol: %lld", login, level, volume);
+				if (volume <= 0)	continue;
 
-			UINT request_id = 0;
-			res = m_manager->DealerSend(request, this, request_id);
+				request->Clear();
+				request->Login(login);
+				request->SourceLogin(m_dealer_login);
+				request->Action(IMTRequest::TA_DEALER_POS_EXECUTE);
+				request->Type(orig_ordtype);
+				request->Symbol(orig_symbol.c_str());
+				request->PriceOrder(orig_price);
+				request->Volume(volume);
 
-			LogOut(tools::logger::LVTRACK, "start position, login: %lld, req id: %d", login, request_id);
+				UINT request_id = 0;
+				res = m_manager->DealerSend(request, this, request_id);
 
-			// 存储下这个请求的cache,给插件使用
-			// TODO:这里有可能插件的OnTradeRequestAdd回调比写入缓存更早调用
-			request_cache cache;
-			cache.request = request_id;
-			cache.orig_position = orig_position;
-			cache.login = login;
+				LogOut(tools::logger::LVTRACK, "start position, login: %lld, req id: %d", login, request_id);
 
-			sprintf(request_buf, "reqid_%d_%lld", request_id, login);
-			m_redis_client->set(request_buf, std::string((char*)&cache, sizeof(request_cache)), [this](cpp_redis::reply& r)
-			{
-				if (r.ko())
+				// 存储下这个请求的cache,给插件使用
+				// TODO:这里有可能插件的OnTradeRequestAdd回调比写入缓存更早调用
+				request_cache cache;
+				cache.request = request_id;
+				cache.orig_position = orig_position;
+				cache.login = login;
+
+				sprintf(request_buf, "reqid_%d_%lld", request_id, login);
+				m_redis_client->set(request_buf, std::string((char*)&cache, sizeof(request_cache)), [this](cpp_redis::reply& r)
 				{
-					LogOut(tools::logger::LVERROR, "redis: %s", r.error().c_str());
-				}
-			});
+					if (r.ko())
+					{
+						LogOut(tools::logger::LVERROR, "redis: %s", r.error().c_str());
+					}
+				});
 
-			m_redis_client->commit();
+				m_redis_client->commit();
 
-			position_context context;
-			context.request_id = request_id;
-			context.level = level;
-			//int direction = 1;
-			//if (order->Type() == IMTOrder::OP_SELL)
-			//	direction = -1;
+				position_context context;
+				context.request_id = request_id;
+				context.level = level;
+				//int direction = 1;
+				//if (order->Type() == IMTOrder::OP_SELL)
+				//	direction = -1;
 
-			sprintf(login_buf, "%lld", login);
-			m_redis_client->hset(position_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r)
-			{
-				if (r.ko())
+				sprintf(login_buf, "%lld", login);
+				m_redis_client->hset(position_buf, login_buf, std::string((char*)&context, sizeof(position_context)), [this](cpp_redis::reply& r)
 				{
-					LogOut(tools::logger::LVERROR, "redis: %s", r.error().c_str());
-				}
-			});
+					if (r.ko())
+					{
+						LogOut(tools::logger::LVERROR, "redis: %s", r.error().c_str());
+					}
+				});
 
-			m_redis_client->commit();
+				m_redis_client->commit();
+			}
 		}
-	}
-	else
-	{
-		for (auto login : m_logins)
+		else
 		{
-			// 获取跟单持仓的上下文
-			sprintf(login_buf, "%lld", login);
-			auto fut = m_redis_client->hget(position_buf, login_buf);
-			m_redis_client->sync_commit();
-			auto reply = fut.get();
-
-			LogOut(tools::logger::LVTRACK, "request context, key %s, field %s", position_buf, login_buf);
+			for (auto login : m_logins)
+			{
+				// 获取跟单持仓的上下文
+				sprintf(login_buf, "%lld", login);
+				auto fut = m_redis_client->hget(position_buf, login_buf);
+				m_redis_client->sync_commit();
+				auto reply = fut.get();
 
-			// 如果不存在,忽略
-			if (reply.ko())	continue;
-			if (reply.is_null())	continue;
-			MTAPIRES res = MT_RET_OK;
+				LogOut(tools::logger::LVTRACK, "request context, key %s, field %s", position_buf, login_buf);
 
-			// 读取跟单持仓
-			position_context context;
-			memcpy(&context, reply.as_string().c_str(), sizeof(position_context));
+				// 如果不存在,忽略
+				if (reply.ko())	continue;
+				if (reply.is_null())	continue;
+				MTAPIRES res = MT_RET_OK;
 
-			LogOut(tools::logger::LVTRACK, "orig position: %s, login: %s, volume: %lld, dest position: %lld", position_buf, login_buf, context.volume, context.position_id);
+				// 读取跟单持仓
+				position_context context;
+				memcpy(&context, reply.as_string().c_str(), sizeof(position_context));
 
-			if (m_manager_pumping->UserAccountGet(login, account) != MT_RET_OK)
-			{
-				// TODO: 这里失败直接跳过没有做其他处理,下一次进来可能依然会遇到一样的问题
-				LogOut(tools::logger::LVTRACK, "failed to get account: %lld", login);
-				continue;
-			}
+				LogOut(tools::logger::LVTRACK, "orig position: %s, login: %s, volume: %lld, dest position: %lld", position_buf, login_buf, context.volume, context.position_id);
 
-			UINT level = context.level;
-			UINT volume = round((double)level * order->VolumeInitial() / 10000) * 100;
-
-			LogOut(tools::logger::LVTRACK, "add order, login: %lld, level: %d, vol: %lld", login, level, volume);
-			if (volume <= 0)	continue;
-
-			request->Clear();
-			request->Login(login);
-			request->SourceLogin(m_dealer_login);
-			request->Action(IMTRequest::TA_DEALER_POS_EXECUTE);
-			request->Type(order->Type());
-			request->Symbol(order->Symbol());
-			request->PriceOrder(order->PriceOrder());
-			request->Volume(volume);
-			// 加入position id
-			request->Position(context.position_id);
-
-			UINT request_id = 0;
-			res = m_manager->DealerSend(request, this, request_id);
-
-			LogOut(tools::logger::LVTRACK, "add order, login: %lld, req id: %d", login, request_id);
-
-			request_cache cache;
-			cache.request = request_id;
-			cache.orig_position = orig_position;
-			//cache.dest_position = context.position_id;
-			cache.login = login;
-			int direction = 1;
-			if (order->Type() == IMTOrder::OP_SELL)
-				direction = -1;
-
-			sprintf(request_buf, "reqid_%d", request_id);
-			m_redis_client->hset(request_buf, login_buf, std::string((char*)&cache, sizeof(position_context)), [this](cpp_redis::reply& r)
-			{
-				if (r.ko())
+				if (m_manager_pumping->UserAccountGet(login, account) != MT_RET_OK)
 				{
-					LogOut(tools::logger::LVERROR, "redis: %s", r.error().c_str());
+					// TODO: 这里失败直接跳过没有做其他处理,下一次进来可能依然会遇到一样的问题
+					LogOut(tools::logger::LVTRACK, "failed to get account: %lld", login);
+					continue;
 				}
-			});
 
-			m_redis_client->commit();
+				UINT level = context.level;
+				UINT volume = round((double)level * orig_volume_init / 10000) * 100;
+
+				LogOut(tools::logger::LVTRACK, "add order, login: %lld, level: %d, vol: %lld", login, level, volume);
+				if (volume <= 0)	continue;
+
+				request->Clear();
+				request->Login(login);
+				request->SourceLogin(m_dealer_login);
+				request->Action(IMTRequest::TA_DEALER_POS_EXECUTE);
+				request->Type(orig_ordtype);
+				request->Symbol(orig_symbol.c_str());
+				request->PriceOrder(orig_price);
+				request->Volume(volume);
+				// 加入position id
+				request->Position(context.position_id);
+
+				UINT request_id = 0;
+				res = m_manager->DealerSend(request, this, request_id);
+
+				LogOut(tools::logger::LVTRACK, "add order, login: %lld, req id: %d", login, request_id);
+
+				request_cache cache;
+				cache.request = request_id;
+				cache.orig_position = orig_position;
+				//cache.dest_position = context.position_id;
+				cache.login = login;
+				int direction = 1;
+				if (orig_ordtype == IMTOrder::OP_SELL)
+					direction = -1;
+
+				sprintf(request_buf, "reqid_%d", request_id);
+				m_redis_client->hset(request_buf, login_buf, std::string((char*)&cache, sizeof(position_context)), [this](cpp_redis::reply& r)
+				{
+					if (r.ko())
+					{
+						LogOut(tools::logger::LVERROR, "redis: %s", r.error().c_str());
+					}
+				});
+
+				m_redis_client->commit();
+			}
 		}
-	}
+
+	});
 }
 
 void CManagerExtension::OnConnect()
@@ -594,6 +605,16 @@ void CManagerExtension::keep_alive()
 
 	while (m_run)
 	{
+		// 
+		auto ptask = m_tasks.try_pop();
+		if (ptask != nullptr)
+		{
+			(*ptask)();
+
+			//
+			continue;
+		}
+
 		if (counter-- <= 0)
 		{
 			counter = 500;
@@ -652,7 +673,7 @@ MTAPIRES CManagerExtension::LoadLogins()
 	std::vector<std::wstring> groups;
 
 	char groups_buf[256] = { 0 };
-	sprintf(groups_buf, "%s", m_groups);
+	sprintf(groups_buf, "%s", m_groups.c_str());
 	for (int i = 0; i < total; i++)
 	{
 		group->Clear();

+ 3 - 0
MT5MonkMAM/ManagerExtension.h

@@ -3,6 +3,7 @@
 #include <memory>
 #include <mutex>
 #include <thread>
+#include <functional>
 
 #include <ScopeGuard.hpp>
 
@@ -86,6 +87,8 @@ private:
 	std::mutex m_lock;
 	std::atomic<bool>	m_run;
 
+	threadsafe_queue<std::function<void()>> m_tasks;
+
 	std::shared_ptr<cpp_redis::client>	m_redis_client;
 	bool		m_redis_conn;
 	bool		start_redis();