BeginInvokeOnMainThread - TradeOperations

Created at 29 May 2022, 17:43
How’s your experience with the cTrader Platform?
Your feedback is crucial to cTrader's development. Please take a few seconds to share your opinion and help us improve your trading experience. Thanks!
M4

m4trader4

Joined 19.09.2021

BeginInvokeOnMainThread - TradeOperations
29 May 2022, 17:43


Ahmad

Scenario:

I make trade operation (Buy/Sell/Modify/Close) manually or through same cBot or any other cBot using BeginInvokeOnMainThread() . First time the operation is successful, but when doing the same or either trade operations second time the BeginInvokeOnMainThread  code block doesnt get executed.

 Same as the issue reported 

Regards

Ahmed

 

 if (comparestring[1].Equals("CloseTrade", StringComparison.CurrentCultureIgnoreCase) )
            {
                Print("StringCompare CLoseTrade executed");
                BeginInvokeOnMainThread(() =>
               CloseTrade(GlobalSchedulerMessageWriteFile, SchedulerMessage)
               );
            }



  public void CloseTrade(string SchMsgFile, string[] SchedulerMessage)
        {
            Print("ReadBroadCastCloseTrade executed..");

            //var positionsCBS = Positions.FindAll("", Symbol.Name);
            var positionsCBS = Positions.ToArray();


            //Print(arg2 + " " + "Yes message processed");
            foreach (var psnCBS in positionsCBS)
            {

                Print("BuySell Positon= " + psnCBS.SymbolName);
                if (psnCBS.SymbolName.Equals(GlobalSymbolName, StringComparison.Ordinal)) 
                 {

                    ClosePositionAsync(psnCBS, CallBackClosePositionAsync);  
                 }

                
            }

          
        }

 


@m4trader4
Replies

m4trader4
29 May 2022, 20:13

RE:

Even with the following its the same

 

 public void CLoseTrade(string SchMsgFile, string[] SchedulerMessage)
        {
            
            Print("ReadBroadCastCloseTrade executed..");

            try
            {
                
                BeginInvokeOnMainThread(() =>
                {

                    var positionsCBS = Positions.ToArray();


                    Print("Yes message processed");
                    foreach (var psnCBS in positionsCBS)
                    {

                        Print("BuySell Positon= " + psnCBS.SymbolName);
                        if (psnCBS.SymbolName.Equals(GlobalSymbolName, StringComparison.Ordinal))
                        {

                            ClosePositionAsync(psnCBS, CallBackClosePositionAsync);
                        }


                    }


                }) ;



            }
            catch (Exception ex)
            {
                Print ("beginInvokeclose=" + ex.Message);

            }

           
        }

 


@m4trader4

amusleh
30 May 2022, 10:05

Hi,

I tried but I couldn't reproduce the issue you are facing.

The code I tested:

using System.Linq;
using cAlgo.API;
using System.Threading;

namespace cAlgo
{
    [Robot(TimeZone = TimeZones.UTC, AccessRights = AccessRights.None)]
    public class AdvancedHedgingandScalpingcBot : Robot
    {
        protected override void OnStart()
        {
            for (int i = 0; i < 10; i++)
            {
                ExecuteMarketOrder(i % 2 == 0 ? TradeType.Buy : TradeType.Sell, SymbolName, Symbol.VolumeInUnitsMin, "Test" + i);
            }

            var thread = new Thread(() =>
            {
                CLoseTrade(TradeType.Buy);
                CLoseTrade(TradeType.Sell);
            });

            thread.Start();

            // This also works fine
            //CLoseTrade(TradeType.Buy);
            //CLoseTrade(TradeType.Sell);
        }

        public void CLoseTrade(TradeType tradeType)
        {
            BeginInvokeOnMainThread(() =>
            {
                var positionsCBS = Positions.ToArray();

                foreach (var psnCBS in positionsCBS)
                {
                    if (psnCBS.TradeType != tradeType) continue;

                    ClosePosition(psnCBS);
                }
            });
        }
    }
}

You use BeginInvokeOnMainThread when you are calling or accessing an API member from another thread, on above example I call the CloseTrade method two times from another thread and it closes the positions.

Please post a full cBot example that can reproduce the issue you are facing.


@amusleh

m4trader4
30 May 2022, 17:29

RE:

Ahmad

Attached code

You need install NATS www.nats.io, 

1. Start Nats server from command pompt nats-server.exe

2. download Nats Cli Client https://github.com/nats-io/natscli/releases/download/v0.0.33/nats-0.0.33-windows-amd64.zip

3. Start cBot

4. Manually or through cBot place trades

5. Go to Nats client directory cmd> nats-pub.exe BroadCast<AccountNumber>Renk0 Renko0,CloseTrade,<All>

6. Go to Nats client directory cmd> nats-pub.exe BroadCast<AccountNumber>Renk0 Renko0,CloseTrade,<symbolname>

7. Repeat steps 4-6

 

There are other trade operations and change of parameters performed using NATS message that will be invoked using BeginInvokeOnMainThread()

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using cAlgo.API;
using cAlgo.API.Collections;
using cAlgo.API.Indicators;
using cAlgo.API.Internals;
using NATS.Client;

namespace cAlgo.Robots
{
    [Robot(AccessRights = AccessRights.None)]
    public class NatsTestV1 : Robot
    {
        [Parameter(DefaultValue = "Hello world!")]
        public string Message { get; set; }

        private string GlobalSymbolName;
        #region nats
        public IAsyncSubscription GlobalSchedulersub;
        public IAsyncSubscription GlobalBroadCastsub;
        public IConnection GlobalNormalConnection;
        public IConnection GlobalBroadCastConnection;
        public static string GlobalScheduleSubscribeMsg;
        public static string GlobalSubscribeBroadcastMsg;
        public static string GlobalPublishAlertSubject;
        public static string GlobalSchedulerMessageWriteFile;


        #endregion
        public void ReadBroadCastmsg(string subjj)
        {

            Options opts = ConnectionFactory.GetDefaultOptions();
            Options.ReconnectForever = 1;
            opts.Url = Defaults.Url;
            opts.Verbose = true;
            opts.AllowReconnect = true;
            GlobalBroadCastConnection = new ConnectionFactory().CreateConnection(opts);

            GlobalBroadCastsub = GlobalBroadCastConnection.SubscribeAsync(subjj);
            GlobalBroadCastsub.MessageHandler += BroadCastsub_MessageHandler;
            GlobalBroadCastsub.Start();
            Print(Symbol.Name + "::" + DateTime.UtcNow.ToString("HH:mm:ss.ffff") + "::" + subjj);

        }



        private void BroadCastsub_MessageHandler(object sender, MsgHandlerEventArgs e)
        {

            var msg = Encoding.UTF8.GetString(e.Message.Data);

            Print("NatsMessage : " + DateTime.UtcNow.ToString("HH:mm:ss:fff") + ":" + e.Message.Subject + ":" + msg);
            //var SchedulerMessage = msg.Split(',');

            var SchedulerMessage = msg.Split(new char[]
            {
                ','
            });



            if (SchedulerMessage[0].Equals("Renko0", StringComparison.CurrentCultureIgnoreCase) && SchedulerMessage[1].Equals("CloseTrade", StringComparison.CurrentCultureIgnoreCase) && (SchedulerMessage[2].Equals("All", StringComparison.CurrentCultureIgnoreCase) || SchedulerMessage[2].Equals(GlobalSymbolName, StringComparison.CurrentCultureIgnoreCase)))
            {
                Print("StringCompare CLoseTrade");
                // BeginInvokeOnMainThread(() =>

                try
                {
                    ReadBroadCastCloseTrade(GlobalSchedulerMessageWriteFile, SchedulerMessage);

                }
                catch (Exception ex)
                {

                    Print("exep close trade=" + ex.Message);
                }


                //);
            }


          


        }

        public void ReadBroadCastCloseTrade(string SchMsgFile, string[] SchedulerMessage)
        {
            //String SchedulerMessageStatusWrite = "";
            //string SchedulerMessageStatusDisplay = "";
            //SchedulerMessageStatusDisplay = DateTime.UtcNow.ToString("HH:mm:ss") + ":" + SchedulerMessage[0] + ":" + SchedulerMessage[1];
            //SchedulerMessageStatusWrite += SchedulerMessageStatusDisplay;
            Print("ReadBroadCastCloseTrade executed..");

            try
            {

                BeginInvokeOnMainThread(() =>
                {

                    var positionsCBS = Positions.ToArray();


                    Print("Yes message processed");
                    foreach (var psnCBS in positionsCBS)
                    {

                        Print("BuySell Positon= " + psnCBS.SymbolName);
                        if (psnCBS.SymbolName.Equals(GlobalSymbolName, StringComparison.Ordinal))
                        {

                            ClosePositionAsync(psnCBS, CallBackClosePositionAsync);
                        }


                    }


                });



            }
            catch (Exception ex)
            {
                Print("beginInvokeclose=" + ex.Message);

            }








            //var positionsCBS = Positions.FindAll("", Symbol.Name);


            //File.WriteAllText(GlobalSchedulerMessageWriteFile + Symbol.Name + SchedulerMessage[0] + SchedulerMessage[1] + DateTime.UtcNow.ToString("ddMMyyyyHHmmssfff") + ".csv", Symbol.Name + SchedulerMessageStatusWrite);
        }

        private void CallBackClosePositionAsync(TradeResult tradeResult)
        {


            if (!tradeResult.IsSuccessful)
            {
                ClosePositionAsync(tradeResult.Position, CallBackClosePositionAsync);
            }

        }

        protected override void OnStart()
        {
            GlobalSymbolName = Symbol.Name;
            var SubscribeBroadcastMsg = "BroadCast" + Account.Number + "Renko0";
            ReadBroadCastmsg(SubscribeBroadcastMsg);
        }

        protected override void OnTick()
        {
            // Handle price updates here
        }

        protected override void OnStop()
        {
            // Handle cBot stop here
        }
    }
}

 


@m4trader4

m4trader4
30 May 2022, 17:34

RE: RE:

Using the same concept there are lot interactions happening between other bots for sharing parameters or trades


@m4trader4

amusleh
31 May 2022, 10:54

Hi,

On your posted code, does GlobalBroadCastsub.Start() blocks the thread? or it's asynchronous? 

It it blocks then use a different thread for it and dispatch the calls from that thread to cBot main thread via BeginInvokeOnMainThread.


@amusleh

m4trader4
31 May 2022, 13:48

RE:

Ahmad

Close trade works only once , second time it doesnt work. Were you able to reproduce the issue?

 

 

 

   // Alternatively, create an asynchronous subscriber on subject foo,
            // assign a message handler, then start the subscriber.   When
            // multicasting delegates, this allows all message handlers
            // to be setup before messages start arriving.
            IAsyncSubscription sAsync = c.SubscribeAsync("foo");
            sAsync.MessageHandler += h;
            sAsync.Start();

 

 

 


@m4trader4

amusleh
01 Jun 2022, 10:58

Hi,

I followed your steps but when I start the cBot it throws connection time out exception:

01/06/2022 07:58:02.004 | Crashed in OnStart with NATSConnectionException: timeout

 


@amusleh

m4trader4
01 Jun 2022, 11:35

RE:

Nats Server is not running. First start the Nats server with default options then start the cBot.

To check whether server is running use

cmd window1> Nats-subs.exe Test 

Listening on [Test]

 

cmdwindow2>Nats-pub.exe Test msg1

 

On the cmdwindow1 you should get the following message

Listening on [Test]
[#1] Received on [Test]: 'msg1'

 

 

 

 


@m4trader4

amusleh
01 Jun 2022, 12:23

Hi,

I tested it and it worked fine, all message commands got executed without any issue.

I tested on Spotware cTrader 4.2.7.

Video:

On the video I sent some message with symbols other than chart symbol which results on nothing.

cBot Code I used:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using cAlgo.API;
using cAlgo.API.Collections;
using cAlgo.API.Indicators;
using cAlgo.API.Internals;
using NATS.Client;
using System.Threading;

namespace cAlgo.Robots
{
    [Robot(AccessRights = AccessRights.FullAccess)]
    public class NatsTestV1 : Robot
    {
        [Parameter(DefaultValue = "Hello world!")]
        public string Message { get; set; }

        private string GlobalSymbolName;

        #region nats

        public IAsyncSubscription GlobalSchedulersub;
        public IAsyncSubscription GlobalBroadCastsub;
        public IConnection GlobalNormalConnection;
        public IConnection GlobalBroadCastConnection;
        public static string GlobalScheduleSubscribeMsg;
        public static string GlobalSubscribeBroadcastMsg;
        public static string GlobalPublishAlertSubject;
        public static string GlobalSchedulerMessageWriteFile;

        #endregion nats

        public void ReadBroadCastmsg(string subjj)
        {
            Options opts = ConnectionFactory.GetDefaultOptions();
            Options.ReconnectForever = 1;
            opts.Url = Defaults.Url;
            opts.Verbose = true;
            opts.AllowReconnect = true;
            GlobalBroadCastConnection = new ConnectionFactory().CreateConnection(opts);

            GlobalBroadCastsub = GlobalBroadCastConnection.SubscribeAsync(subjj);
            GlobalBroadCastsub.MessageHandler += BroadCastsub_MessageHandler;
            GlobalBroadCastsub.Start();
            Print(Symbol.Name + "::" + DateTime.UtcNow.ToString("HH:mm:ss.ffff") + "::" + subjj);
        }

        private void BroadCastsub_MessageHandler(object sender, MsgHandlerEventArgs e)
        {
            var msg = Encoding.UTF8.GetString(e.Message.Data);

            Print("NatsMessage : " + DateTime.UtcNow.ToString("HH:mm:ss:fff") + ":" + e.Message.Subject + ":" + msg);
            //var SchedulerMessage = msg.Split(',');

            var SchedulerMessage = msg.Split(new char[]
            {
                ','
            });

            if (SchedulerMessage[0].Equals("Renko0", StringComparison.CurrentCultureIgnoreCase) && SchedulerMessage[1].Equals("CloseTrade", StringComparison.CurrentCultureIgnoreCase) && (SchedulerMessage[2].Equals("All", StringComparison.CurrentCultureIgnoreCase) || SchedulerMessage[2].Equals(GlobalSymbolName, StringComparison.CurrentCultureIgnoreCase)))
            {
                Print("StringCompare CLoseTrade");
                // BeginInvokeOnMainThread(() =>

                try
                {
                    ReadBroadCastCloseTrade(GlobalSchedulerMessageWriteFile, SchedulerMessage);
                }
                catch (Exception ex)
                {
                    Print("exep close trade=" + ex.Message);
                }

                //);
            }
        }

        public void ReadBroadCastCloseTrade(string SchMsgFile, string[] SchedulerMessage)
        {
            //String SchedulerMessageStatusWrite = "";
            //string SchedulerMessageStatusDisplay = "";
            //SchedulerMessageStatusDisplay = DateTime.UtcNow.ToString("HH:mm:ss") + ":" + SchedulerMessage[0] + ":" + SchedulerMessage[1];
            //SchedulerMessageStatusWrite += SchedulerMessageStatusDisplay;
            Print($"ReadBroadCastCloseTrade Thread ID: {Thread.CurrentThread.ManagedThreadId}");

            BeginInvokeOnMainThread(() =>
            {
                var positionsCBS = Positions.ToArray();

                Print("Yes message processed");

                foreach (var psnCBS in positionsCBS)
                {
                    Print("BuySell Positon= " + psnCBS.SymbolName);

                    if (SchedulerMessage[2].Equals("All", StringComparison.CurrentCultureIgnoreCase) || psnCBS.SymbolName.Equals(GlobalSymbolName, StringComparison.Ordinal))
                    {
                        ClosePositionAsync(psnCBS, CallBackClosePositionAsync);

                        Print($"Position Closed {psnCBS.SymbolName}");
                    }
                }
            });

            //var positionsCBS = Positions.FindAll("", Symbol.Name);

            //File.WriteAllText(GlobalSchedulerMessageWriteFile + Symbol.Name + SchedulerMessage[0] + SchedulerMessage[1] + DateTime.UtcNow.ToString("ddMMyyyyHHmmssfff") + ".csv", Symbol.Name + SchedulerMessageStatusWrite);
        }

        private void CallBackClosePositionAsync(TradeResult tradeResult)
        {
            if (!tradeResult.IsSuccessful)
            {
                ClosePositionAsync(tradeResult.Position, CallBackClosePositionAsync);
            }
        }

        protected override void OnStart()
        {
            GlobalSymbolName = Symbol.Name;
            var SubscribeBroadcastMsg = "BroadCast" + Account.Number + "Renko0";

            Print(SubscribeBroadcastMsg);

            Print($"Thread ID: {Thread.CurrentThread.ManagedThreadId}");

            ReadBroadCastmsg(SubscribeBroadcastMsg);
        }

        protected override void OnTick()
        {
            // Handle price updates here
        }

        protected override void OnStop()
        {
            // Handle cBot stop here
        }
    }
}

Regarding your code, you don't have to put the BeginInvokeOnMainThread call inside try/catch, it's useless as the delegate you pass to BeginInvokeOnMainThread will be executed on another thread and you can't catch the exception, instead put the try/catch inside your delegate code.


@amusleh

m4trader4
01 Jun 2022, 19:41

RE:

Ahmad

am on version 4.2.5.5087

I executed your code. It worls only when only one instance of cBot is running on a symbol. 

 

1. Run the cBot on all symbols

2. place trade in combination of manual and cbot 

Not all the trades are closed

Regards

 

 


@m4trader4

m4trader4
01 Jun 2022, 19:42

RE: RE:

Why dint the usdjpy trade got close?


@m4trader4

m4trader4
01 Jun 2022, 20:36

RE: RE: RE:

After closing trades through command line (Nats Client) then stopping the cBot get error "aborted by timeout". It doesnt even print the statement of execution.

There is no timeout error if cBot is closed without closing/or placing trades.

 

 protected override void OnStop()
        {
            Print("on stop executed");

            GlobalBroadCastConnection.Close();
            Print("Connection Closed =" + GlobalBroadCastConnection.IsClosed());
            // Handle cBot stop here
        }

 


@m4trader4

amusleh
03 Jun 2022, 08:09

RE: RE: RE:

Hi,

m4trader4 said:

Why dint the usdjpy trade got close?

Because the chart was not a USDJPY chart.

I tested again and here is the result:

 

The cBot code I used:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using cAlgo.API;
using cAlgo.API.Collections;
using cAlgo.API.Indicators;
using cAlgo.API.Internals;
using NATS.Client;
using System.Threading;
using System.Reflection;
using System.IO;

namespace cAlgo.Robots
{
    [Robot(AccessRights = AccessRights.FullAccess)]
    public class NatsTestV1 : Robot
    {
        private string GlobalSymbolName;

        #region nats

        public IAsyncSubscription GlobalSchedulersub;
        public IAsyncSubscription GlobalBroadCastsub;
        public IConnection GlobalNormalConnection;
        public IConnection GlobalBroadCastConnection;
        public static string GlobalScheduleSubscribeMsg;
        public static string GlobalSubscribeBroadcastMsg;
        public static string GlobalPublishAlertSubject;
        public static string GlobalSchedulerMessageWriteFile;

        #endregion nats

        public void ReadBroadCastmsg(string subjj)
        {
            Options opts = ConnectionFactory.GetDefaultOptions();
            Options.ReconnectForever = 1;
            opts.Url = Defaults.Url;
            opts.Verbose = true;
            opts.AllowReconnect = true;
            GlobalBroadCastConnection = new ConnectionFactory().CreateConnection(opts);

            GlobalBroadCastsub = GlobalBroadCastConnection.SubscribeAsync(subjj);
            GlobalBroadCastsub.MessageHandler += BroadCastsub_MessageHandler;
            GlobalBroadCastsub.Start();
        }

        private void BroadCastsub_MessageHandler(object sender, MsgHandlerEventArgs e)
        {
            var msg = Encoding.UTF8.GetString(e.Message.Data);

            Print("NatsMessage : " + DateTime.UtcNow.ToString("HH:mm:ss:fff") + ":" + e.Message.Subject + ":" + msg);
            //var SchedulerMessage = msg.Split(',');

            var SchedulerMessage = msg.Split(new char[]
            {
                ','
            });

            if (SchedulerMessage[0].Equals("Renko0", StringComparison.CurrentCultureIgnoreCase) && SchedulerMessage[1].Equals("CloseTrade", StringComparison.CurrentCultureIgnoreCase) && !string.IsNullOrWhiteSpace(SchedulerMessage[2]))
            {
                // BeginInvokeOnMainThread(() =>

                try
                {
                    ReadBroadCastCloseTrade(GlobalSchedulerMessageWriteFile, SchedulerMessage);
                }
                catch (Exception ex)
                {
                    Print("exep close trade=" + ex.Message);
                }

                //);
            }
        }

        public void ReadBroadCastCloseTrade(string SchMsgFile, string[] SchedulerMessage)
        {
            //String SchedulerMessageStatusWrite = "";
            //string SchedulerMessageStatusDisplay = "";
            //SchedulerMessageStatusDisplay = DateTime.UtcNow.ToString("HH:mm:ss") + ":" + SchedulerMessage[0] + ":" + SchedulerMessage[1];
            //SchedulerMessageStatusWrite += SchedulerMessageStatusDisplay;

            BeginInvokeOnMainThread(() =>
            {
                var positionsCBS = Positions.ToArray();

                foreach (var psnCBS in positionsCBS)
                {
                    Print("BuySell Positon= " + psnCBS.SymbolName);

                    if (SchedulerMessage[2].Equals("All", StringComparison.CurrentCultureIgnoreCase) || psnCBS.SymbolName.Equals(GlobalSymbolName, StringComparison.Ordinal))
                    {
                        ClosePositionAsync(psnCBS, CallBackClosePositionAsync);

                        Print($"Position Closed {psnCBS.SymbolName}");
                    }
                }
            });

            //var positionsCBS = Positions.FindAll("", Symbol.Name);

            //File.WriteAllText(GlobalSchedulerMessageWriteFile + Symbol.Name + SchedulerMessage[0] + SchedulerMessage[1] + DateTime.UtcNow.ToString("ddMMyyyyHHmmssfff") + ".csv", Symbol.Name + SchedulerMessageStatusWrite);
        }

        private void CallBackClosePositionAsync(TradeResult tradeResult)
        {
            if (!tradeResult.IsSuccessful)
            {
                ClosePositionAsync(tradeResult.Position, CallBackClosePositionAsync);
            }
        }

        protected override void OnStart()
        {
            GlobalSymbolName = Symbol.Name;
            var SubscribeBroadcastMsg = string.Format("Nats_{0}_{1}_{2}", Account.Number, SymbolName, TimeFrame);

            Print(SubscribeBroadcastMsg);

            ReadBroadCastmsg(SubscribeBroadcastMsg);
        }

        protected override void OnStop()
        {
            Print("on stop executed");

            GlobalBroadCastConnection.Close();

            Print("Connection Closed =" + GlobalBroadCastConnection.IsClosed());
        }
    }
}

You have to use different connection names for each of your cBot instances, otherwise it will cause conflict and you will not be able to close the connection properly which result on timeout error.

I consider this thread closed, as it's now getting out of cTrader Automate API scope.


@amusleh

m4trader4
04 Jun 2022, 04:19

After the i got update to 4.2.8.5759 its working

 


@m4trader4