Overview
Examples
Screenshots
Comparisons
Applications
Download
Documentation
Tutorials
Bazaar
Status & Roadmap
FAQ
Authors & License
Forums
Funding Ultimate++
Search on this site
Search in forums












SourceForge.net Logo
Home » Developing U++ » UppHub » Job package: A lightweight worker thread for non-blocking operations. (A)
Job package: A lightweight worker thread for non-blocking operations. [message #48749] Sun, 10 September 2017 12:29 Go to next message
Oblivion is currently offline  Oblivion
Messages: 1091
Registered: August 2007
Senior Contributor
Hello,

Below you will find a worker thread implementation which -hopefully- can simplify creating non-blocking or asynchronous applications,
or allow you to easily port your applications to MT-era. I also supplied an example code demonstrating its basic usage pattern.

Job package for Ultimate++
---------------------------

This template class implements a scope bound, single worker thread based on RAII principle. 
It provides a return semantics for result gathering, functionally similar to promise/future 
pattern (including void type specialization). Also it provides a convenient error management 
and exception propagation mechanisms for worker threads, and it is compatible with U++ 
single-threaded mode.

Note that while Job is a general purpose multithreading tool, for high performance loop 
parallelization scenarios CoWork would be a more suitable option. This class is mainly designed 
to allow applications and libraries to gain an easily managable, optional non-blocking behavior 
where high latency is expected such as network operations and file I/O, and a safe, 
container-style access to the data processed by the worker threads is preferred.

Features and Highlights
-----------------------

- A safe way to gather results from worker threads.
- Simple and easy-to-use thread halting, and error reporting mechanism.
- Exception propagation.
- External blocking is possible.
- Optional constant reference access to job results.
- Compatible with U++ single-threaded environment.
- All Job instances are scope bound and will forced to finish job when they get out of scope.

Known Issues
-----------------------

- Currently none.

History
-----------------------

- 2017-10-07: Compatibility with U++ single-threaded mode is added.

- 2017-10-01: Global variables moved into JobGlobal namespace in order to avoid multiple
              definitions error. Accordingly, global functions are defined in Job.cpp.

- 2017-09-22: Exception propagation mechanism for job is properly added. From now on
              worker threads will pass exceptions to their caller.
              Void template specialization is re-implemented (without using future/promise).
              Constant reference access operator is added. This is especially useful
              where the data is a container with iterators (such as Vector or Array).

- 2017-09-19: std::exception class exceptions are handled, and treaated as errors. 
              (For the time being.)
              void instantiation is now possible.
              Jobs will notify their workers on shutdown. 
              Clean up & cosmetics...

- 2017-09-18: Clear() method is added. Worker id generator is using int64. Documentation 
              updated.

- 2017-09-17: Future/promise mechanism, and std template library code completely removed.
              From now on Job has its own result gathering mechanism with zero copy/move
              overhead.

- 2017-09-16: Job is redesigned. It is now a proper worker thread.

- 2017-09-10: Initial public beta version is released.



Hope you'll find it useful.

Best Regards,
Oblivion


[Updated on: Sat, 07 October 2017 14:54]

Report message to a moderator

Re: Job package: A lightweight multithreading tool, using promise/future mechanism [message #48750 is a reply to message #48749] Sun, 10 September 2017 15:08 Go to previous messageGo to next message
mirek is currently offline  mirek
Messages: 13975
Registered: November 2005
Ultimate Member
Trying to figure out what is relative advantage of using Job vs CoWork vs C++11 future/promise....

For now, it looks like Job is not using worker threads, creating thread for each run. Which actually can have advantage if there are blocking operations (file I/O etc), but that is meant to be dealt with in CoWork by increasing the number of threads.

Other than that, your examples can be rewritten with CoWork easily. "WaitForJobs" is outright ugly, being single global function for all Jobs; I think CoWork::Finish has a clear upper hand here...

	{
		// Print a message to stdout.
		// Returns void.
		CoWork job;
		job & [=] { Cout() << "Hello world!\n"; };
		job.Finish();
	}
	
	{
		// Print all the divisors of random numbers.
		// Returns a String.
		CoWork jobs;
		Vector<String> results;
		for(int i = 0; i < 5; i++)
			jobs & [=, &results] { String h = GetDivisors2(); CoWork::FinLock(); results.At(i) = GetDivisors(); };
		jobs.Finish();
		for(auto r : results)
			Cout() << r << '\n';
	}


Re: Job package: A lightweight multithreading tool, using promise/future mechanism [message #48751 is a reply to message #48750] Sun, 10 September 2017 15:56 Go to previous messageGo to next message
mirek is currently offline  mirek
Messages: 13975
Registered: November 2005
Ultimate Member
BTW, as exercise:

#include <CtrlLib/CtrlLib.h>

#include <future>

using namespace Upp;

template< class Function, class... Args>
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
Async(Function&& f, Args&&... args )
{
	std::promise<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>> p;
	auto ftr = p.get_future();
	CoWork::Schedule([=, p = pick(p)]() mutable {
		p.set_value(f(args...));
	});
	return ftr;
}

GUI_APP_MAIN
{
	DDUMP(Async([] { return "Hello world"; }).get());
}


Still not sure about real world scenarion where I would prefer using future/promise over CoWork.

Maybe my problem with future/promise really is that fact that usually the "result" of async operation as a change in some data that gets into it as reference. future forces me to do a copy to store the function result.
Re: Job package: A lightweight multithreading tool, using promise/future mechanism [message #48753 is a reply to message #48750] Sun, 10 September 2017 17:05 Go to previous messageGo to next message
Oblivion is currently offline  Oblivion
Messages: 1091
Registered: August 2007
Senior Contributor
Hello Mirek,

Thank you very much for your comments.


Quote:
For now, it looks like Job is not using worker threads, creating thread for each run.



Yes, this is because Job is not a queue or pool. Maybe I have given you the wrong impression: It is not even meant to be an alternative or rival to CoWork which works just well. (I used a similar interface, because CoWork is already part of core U++ with a suitable interce design.)

Job is designed to be an interface to plain Thread, with a simplified error and result handling. (I admit the Job/Worker metaphor can be somewhat misleading.)


Quote:
Which actually can have advantage if there are blocking operations (file I/O etc), but that is meant to be dealt with in CoWork by increasing the number of threads.



This is why I designed it for in the first place: For file and network operations. (That's why I supplied SocketClients example).

As for the CoWork's capabilities: Sure, CoWork can deal with such operations as well.


Quote:
"WaitForJobs" is outright ugly, being single global function for all Jobs



I can turn it into Job::FinishAll(). However, Jobs, by default, wait for their workers to finish when they go out of scope.(Unless their workers are explicitly detached.)
And Finish & IsFinished will wait for each job to be done. (Though they can be improved.)

Consider this one:
	{
		Job<String> filejob([=]{
			FileIn fi("/home/test_session/Very_Large_Test_File.txt");
			if(fi.IsError())
				JobError(fi.GetError(), fi.GetErrorText());
			Cout() << "Reading file...";
			return LoadStream(fi);
		});
		while(!filejob.IsFinished()) Cout() << "....";
		Cout() << (filejob.IsError() ? filejob.GetErrorDesc() : filejob.GetResult()) << '\n';
	}



Now, of course this can be written in CoWork, but for such operations as above, I believe the interface of Job is simpler for individual thread operations, and would be less error prone. (No locks, no sharing, individual asynchronous operations, and their results...)

As a side note I run a simple test (I'm not sure if it's legitimate, but was curious.):

Below is the timing results for the same operations carried out by Job and CoWork (I see them as somewhat different, and complementary tools, but wanted to see how well they perform.)
When I reduce Sleep value in WaitForJosb() (I know it's ugly) to 1, I get this for 1500 computations (Under latest GCC):

TIMING Job            : 137.00 ms - 137.00 ms (137.00 ms / 1 ), min: 137.00 ms, max: 137.00 ms, nesting: 1 - 1
TIMING CoWork         : 206.00 ms - 206.00 ms (206.00 ms / 1 ), min: 206.00 ms, max: 206.00 ms, nesting: 1 - 1




Code is:


String GetDivisors()
{
	String s;
	int number = (int) 1000;
	Vector<int> divisors;
	for(auto i = 1, j = 0; i < number + 1; i++) {
		auto d = number % i;
		if(d == 0){
			divisors.Add(i);
			j++;
		}
		if(i == number)
			s = Format("Worker Id: %d, Number: %d, Divisors (count: %d): %s",
						GetWorkerId(),
						number,
						j,
						divisors.ToString());
	}
	return pick(s);
}

CONSOLE_APP_MAIN
{
	{
		CoWork jobs;
		jobs.SetPoolSize(1500);
		Vector<String> results;
		TIMING("CoWork");
		for(int i = 0; i < 1500; i++)
			jobs & [=, &results] { String h = GetDivisors(); CoWork::FinLock(); results.At(i) = h; };
		jobs.Finish();
		for(auto r : results)
			Cout() << r << '\n';
	

	}
	
	{
		Array<Job<String>> jobs;
		jobs.SetCount(1500);
		TIMING("Job");
		for(int i = 0; i < 1500; i++)
		 jobs[i].Start([=]{ return GetDivisors(); });
		WaitForJobs();
		for(auto& job : jobs) 
				Cout() << job.GetResult()    << '\n';
		
	}

}



Best regards,
Oblivion


[Updated on: Sun, 10 September 2017 19:28]

Report message to a moderator

Re: Job package: A lightweight multithreading tool, using promise/future mechanism [message #48754 is a reply to message #48753] Sun, 10 September 2017 19:08 Go to previous messageGo to next message
Oblivion is currently offline  Oblivion
Messages: 1091
Registered: August 2007
Senior Contributor
And below is where CoWork starts to outperform Job (As you pointed out, because of the additional copying involved, I guess.):

I changed the code to see other options:
{
		CoWork jobs;
		Vector<String> results;
		TIMING("CoWork");
		jobs & [=, &results] {
			Vector<String> s;
			for(int i = 0; i < 50000; i++)
				 s.Add() = GetDivisors();
				CoWork::FinLock();
				results = pick(s);
		};
		jobs.Finish();
	}
	
	{
		Job<Vector<String>> job;
		TIMING("Job");
		job.Start([=]{
			Vector<String> s;
			for(int i = 0; i < 50000; i++)
				s.Add() = GetDivisors();
			return pick(s);
		});
		job.Finish();
		auto s = job.GetResult();
	}




TIMING Job            :  1.42 s  -  1.42 s  ( 1.42 s  / 1 ), min:  1.42 s , max:  1.42 s , nesting: 1 - 1
TIMING CoWork         :  1.39 s  -  1.39 s  ( 1.39 s  / 1 ), min:  1.39 s , max:  1.39 s , nesting: 1 - 1



Job is not an alternative to CoWork, but it's not a bad tool either. It does simplify writing high performance MT code in a convenient way, thanks to U++.
It is suitable for such asynchronous operations mainly where a high latency is expected (IO/sockets, etc.) and where the code needs to be easily managable (errors, and results should be easily and immediately dealt with.)

Best regards,
Oblivion


[Updated on: Sun, 10 September 2017 20:00]

Report message to a moderator

Re: Job package: A lightweight multithreading tool, using promise/future mechanism [message #48755 is a reply to message #48753] Sun, 10 September 2017 20:09 Go to previous messageGo to next message
mirek is currently offline  mirek
Messages: 13975
Registered: November 2005
Ultimate Member
Oblivion wrote on Sun, 10 September 2017 17:05

jobs.SetPoolSize(1500);


I think above is potentionally performance killer.
Re: Job package: A lightweight multithreading tool, using promise/future mechanism [message #48756 is a reply to message #48754] Sun, 10 September 2017 20:13 Go to previous messageGo to next message
mirek is currently offline  mirek
Messages: 13975
Registered: November 2005
Ultimate Member
OK, so the difference is

- "return" semantics (implemented using promise/future)
- error states

Right?

If true, what about using future / promise directly? IMO the only problem is to have them interfaced with Thread and/or CoWork (for different kinds of usage).
Re: Job package: A lightweight multithreading tool, using promise/future mechanism [message #48757 is a reply to message #48756] Sun, 10 September 2017 21:16 Go to previous messageGo to next message
Oblivion is currently offline  Oblivion
Messages: 1091
Registered: August 2007
Senior Contributor
Hello Mirek, and thank you for your patience.

Quote:
mirek
OK, so the difference is

- "return" semantics (implemented using promise/future)
- error states

Right?

If true, what about using future / promise directly? IMO the only problem is to have them interfaced with Thread and/or CoWork (for different kinds of usage).


Well, Job is one way of having them interfaced with U++ Thread. Using f/p pair directly is somewhat cumbersome.
However, TBH, what I really see lacking in multithreading tools in general is a simple error handling mechanism, and a per-thread shutdown mechanism.
This is the one of the reasons why I've come up with the Job class. It is also an attempt to solve these problems.

E.g.
Thread.IsError()
Thread.GetError()
Thread.GetErrorDesc()
Thread.ShutDown()
and
Thread::Error (exception)

As you can see in the Job.hpp these are not very hard to implement. (I don't see why it should be difficult for Thread at least (performance?). Cowork, being a job queue, is naturally a more complex beast.)

Best regards,
Oblivion


[Updated on: Sun, 10 September 2017 21:34]

Report message to a moderator

Re: Job package: A lightweight multithreading tool, using promise/future mechanism [message #48759 is a reply to message #48757] Mon, 11 September 2017 00:06 Go to previous messageGo to next message
mirek is currently offline  mirek
Messages: 13975
Registered: November 2005
Ultimate Member
Oblivion wrote on Sun, 10 September 2017 21:16
and a per-thread shutdown mechanism.


That one is a borderline to impossible.

The problem are all those destructors that need to called. It is hard to abort thread that is does not know about it...

Mirek
Re: Job package: A lightweight multithreading tool, using promise/future mechanism [message #48760 is a reply to message #48759] Mon, 11 September 2017 00:25 Go to previous messageGo to next message
Oblivion is currently offline  Oblivion
Messages: 1091
Registered: August 2007
Senior Contributor
Quote:


That one is a borderline to impossible.

The problem are all those destructors that need to called. It is hard to abort thread that is does not know about it...


Ah no, what I mean is setting a unique flag for each thread (maybe using ThreadId), so that it can be checked from within like IsShutdownThreads.
(I do this with Job:Cancel() method and IsJobCancelled() global function, by keeping an index of their IDs, which are integer numbers, incremented with each new thread, using a thread local variable as the unique id.)
I was just talking about signalling the shutdown to specific targets so that user can design his/her MT code and handle its shutdown conditions easily.

And this is exactly where Job comes handy. It is a higher level interface, a convenience wrapper, if yow will, with a simpler return semantics, error management, and a more refined shutdown mechanism. With a reasonably small overhead.


Best regards,
Oblivion


[Updated on: Mon, 11 September 2017 11:40]

Report message to a moderator

RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48780 is a reply to message #48759] Sun, 17 September 2017 23:20 Go to previous messageGo to next message
Oblivion is currently offline  Oblivion
Messages: 1091
Registered: August 2007
Senior Contributor
Hello Mirek (and all U++ community),

After your review and criticism of Job class. I went back to design board and come up with a new version, mostly re-written.
I updated the description and the package in the first post, as usual, but allow me to copy/paste it's description here:

This package contains a lightweight and easy-to-use multithreading tool for U++ framework: Job.
Job template class implements a scope bound, single worker thread based on RAII principle.
It provides a return semantics for result gathering functionally similar to promise/future
pattern but with three major differences:

1)  future/promise pair requires at least moving of the resulted data, which can be
    relatively expensive depending on the object type. On the other hand, Job acts as a simple
    container and uses a reference based result gathering method.  This makes it possible to
    reduce move/copy overhead involved (nearly down to zero).

2)  Job does not allow the T to be of plain void type (of course, void pointer is allowed).

3)  Trying to access the resulting data while it is still invalid will not throw.
    Resources are allocated during construction (including the job data).

Note that for higher performance loop parallelization scenarios, CoWork would be a more
suitable option. This class is mainly designed to allow the applications and libraries to gain 
an easily managable, optional non-blocking behaviour where high latency is expected (Such as
network operations and file I/O), and a safe "referential access" to the objects processed
by the worker threads is preferred.


- It is now a proper single worker thread. (performance gain, and memory reduction is visible.) By design Job has no work scheduling (it is not meant to be a queue, not directly at least.)

- It is re-designed around the RAII principle: A scope-bound single worker thread that only gets destroyed when it is out-of-its scope.

- Most importantly, thanks to your criticisim, I ditched the future/promise mechanism completely, in favour of "Upp-native" way: Job instance are from now on basically simple data containers with referential acces to their data (result). Yet I've kept the alternative return semantics. It is really useful.

Granted, none of these are impossible to implement with CoWork or Thread. AFAIK CoWork is scope bound too. But Job's purpose is different. Although it can be used as a general parallelization tool, it is really meant to simplify writing non-blocking applications, or porting exisiting ones to them, providing a simple yet convenient interface.

For example, with this new design it took around 2 hours for me to port my own FTP class fully into MT environment, using a simple switch (Ftp::Blocking(false)) (News: Upcoming version (2.0) will support MT internally.).
Again, I've begun porting (an experiment for now) the SSH package to MT using Job, and it solves nearly every problem that I ran against wrapping SSH (non-blocking), and also both source code and interface is reduced drastically. It is very clean now. (all those Startxxx() and xxx() method pairs are gone, there are now only xxx ones. E.g. Ssh:Connect() ) You can see this uniform programming/porting pattern emerging int the new SocketClients example I provided:

class Client : public Job<String> {
public:
	Client&	Blocking(bool b = true)	{ blocking = b; return *this; }
	String	Request(const String& host, int port);
	
private:
	String	Run(Event<>&& cmd);
	bool	blocking = true;
};

String Client::Run(Event<>&& cmd)
{
	Start(pick(cmd));
	if(blocking) Finish();
	return blocking && !IsError() ? GetResult() : GetErrorDesc();
}

String Client::Request(const String& host, int port)
{
	auto cmd = [=]{
		TcpSocket	socket;
		auto& output = Job<String>::Data(); // This method allows referential access to the data of respective job.
		output = Format("Client #%d: ", GetWorkerId());
		
		INTERLOCKED {	Cout() << output << "Starting...\n"; }
		
		if(socket.Timeout(10000).Connect(host, port))
			output.Cat(socket.GetLine());
		if(socket.IsError())
			throw JobError(socket.GetError(), socket.GetErrorDesc());
	};
	return Run(cmd);
}

CONSOLE_APP_MAIN
{
        //.....

        // Requesting in a simple, blocking way.
	{
		Cout() << "----- Processing individual blocking requests...\n";
		Cout() << c1.Request(host1, 21) << '\n';
		Cout() << c2.Request(host2, 21)	<< '\n';
	}
	
	// Reuse workers and make requests in a simple, non-blocking way.
	{
		Cout() << "----- Processing individual non-blocking requests...\n";
		// We can "clear" the data (String):
		c1.GetResult().Clear();
		c2.GetResult().Clear();

		c1.Blocking(false).Request(host1, 21);
		c2.Blocking(false).Request(host2, 21);

		while(!c1.IsFinished() || !c2.IsFinished())
			;
		if(c1.IsError()) Cerr() << c1.GetErrorDesc() << '\n';
		else Cout() << ~c1 << '\n';
		
		if(c2.IsError()) Cerr() << c2.GetErrorDesc() << '\n';
		else Cout() << ~c2 << '\n';

	}
        
        //....

}



Please also take a look into the full code.


As always, review, bug reports, criticism, feedback are greatly appreciated.

Best regards,
Oblivion


[Updated on: Mon, 18 September 2017 00:10]

Report message to a moderator

Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48782 is a reply to message #48780] Mon, 18 September 2017 08:23 Go to previous messageGo to next message
mirek is currently offline  mirek
Messages: 13975
Registered: November 2005
Ultimate Member
Oblivion wrote on Sun, 17 September 2017 23:20


- Most importantly, thanks to your criticisim, I ditched the future/promise mechanism completely, in favour of "Upp-native" way: Job instance are from now on basically simple data containers with referential acces to their data (result). Yet I've kept the alternative return semantics. It is really useful.



That's intereseting, because I actually suggested adding future/promise to U++ facilities... Smile

Anyway, I might check it, but I do not see any archive / link posted.

Mirek
Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48783 is a reply to message #48782] Mon, 18 September 2017 08:29 Go to previous messageGo to next message
Oblivion is currently offline  Oblivion
Messages: 1091
Registered: August 2007
Senior Contributor
Quote:
That's intereseting, because I actually suggested adding future/promise to U++ facilities... Smile

Anyway, I might check it, but I do not see any archive / link posted.

Mirek


Laughing That can also be done, I can work on it separately, or re-introduce it as an option if you think it is worth it. (In it's current state Job package does not cover a shared_future alternative. Maybe I should consider adding a SharedJob class, but 1) there is already CoWork for paralellization, 2) I don't quite know a real life scenario where it would be more useful than CoWork.)


Nevertheless, recently I've found my "reference based" approach much more performant and convenient than future/promise.

Consider this (pseudo) code:

    double sum_total = 0;
    Job<Vector<double>> job;
    job.Start(DoSomeHeavyCalculationAndFillTheVector);

    //....

    if(job.IsFinishe() && !job.IsError()) {
       for(auto& e : ~job)          // Reference access to Vector. Data is not copied or moved. Vector is filled via a direct -yet safe- access.
             sum_total += e;        // And retrieved in same fashion as well. Note also that there is no requirement for the data type to be contained.
                                    // In fact, using One or Any, or Value, we can even differentiate between the job 
                                    // results for given operations in a single Job easily. :)
    }




Package's link is on the bottom of the first post of this topic.


Best regards,
Oblivion


[Updated on: Mon, 18 September 2017 11:49]

Report message to a moderator

Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48787 is a reply to message #48782] Tue, 19 September 2017 00:01 Go to previous messageGo to next message
Oblivion is currently offline  Oblivion
Messages: 1091
Registered: August 2007
Senior Contributor
Here is the code:

[Updated on: Tue, 19 September 2017 08:12]

Report message to a moderator

Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48788 is a reply to message #48782] Tue, 19 September 2017 08:12 Go to previous messageGo to next message
Oblivion is currently offline  Oblivion
Messages: 1091
Registered: August 2007
Senior Contributor
Hello Mirek,

(Thank you for your comments. I deeply appreciate it.)

I was curious about if the new design of Job class will pay off, whether it is also a resonable general paralellization tool, and did some benchmarking with the Divisors example.

I assumed Job and CoWork to be functionally and effectively identical in this example (In the sense that, regardless of their internals, they are both doing the same thing: Calculating divisiors for the number 1000, 10.000 times in available worker threads, then printing the results to the screen.)


I simply changed the jobs loop to take advantage of new return semantics (I don't know if CoWork can be put into a similar loop, so I am taking this with a grain of salt):
The loop for the job is simply a very crude slot manager for 8 Job workers. (Tested on AMD FX 6100, six core processor.)

	Array<Job<String>> jobs;
	jobs.SetCount(CPU_Cores() + 2);

	CoWork cowork;
//	cowork.SetPoolSize(CPU_Cores() + 2);
	
	Vector<String> results;
	DUMP(CPU_Cores());
	{

		TIMING("CoWork -- With stdout output");
		for(int i = 0; i < 10000; i++)
			cowork & [=, &results] { String h = GetDivisors(); CoWork::FinLock(); results.At(i) = h; };
		cowork.Finish();
		// Stdout output section.
		for(auto& r : results)
			Cout() << r << '\n';

	}
	{
		TIMING("Job -- With stdout output");

		int i = 0;
		while(i < 10000) {
			for(auto& job : jobs) {
				if(!job.IsFinished()) {
					continue;
				}
				job & [=]{ Job<String>::Data() = GetDivisors(); };
				if(!(~job).IsEmpty()) {
					Cout() << ~job << '\n';
					if(++i == 10000) break;
				}
			}
	
		}
	}


Results (consistent):

For 10000 computiation.
CPU_Cores() = 6
TIMING Job -- With stdout output: 370.00 ms - 370.00 ms (370.00 ms / 1 ), min: 370.00 ms, max: 370.00 ms, nesting: 1 - 1
TIMING CoWork -- With stdout output: 461.00 ms - 461.00 ms (461.00 ms / 1 ), min: 461.00 ms, max: 461.00 ms, nesting: 1 - 1

CPU_Cores() = 6
TIMING Job -- Without stdout output: 228.00 ms - 228.00 ms (228.00 ms / 1 ), min: 228.00 ms, max: 228.00 ms, nesting: 1 - 1
TIMING CoWork -- Without stdout output: 234.00 ms - 234.00 ms (234.00 ms / 1 ), min: 234.00 ms, max: 234.00 ms, nesting: 1 - 1


for 1000 compuitation.

CPU_Cores() = 6
TIMING Job -- With stdout output: 34.00 ms - 34.00 ms (34.00 ms / 1 ), min: 34.00 ms, max: 34.00 ms, nesting: 1 - 1
TIMING CoWork -- With stdout output: 53.00 ms - 53.00 ms (53.00 ms / 1 ), min: 53.00 ms, max: 53.00 ms, nesting: 1 - 1

CPU_Cores() = 6
TIMING Job -- Without stdout output: 24.00 ms - 24.00 ms (24.00 ms / 1 ), min: 24.00 ms, max: 24.00 ms, nesting: 1 - 1
TIMING CoWork -- Without stdout output: 31.00 ms - 31.00 ms (31.00 ms / 1 ), min: 31.00 ms, max: 31.00 ms, nesting: 1 - 1


What do you think?



Best regards,
Oblivion


[Updated on: Tue, 19 September 2017 09:51]

Report message to a moderator

Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48789 is a reply to message #48787] Tue, 19 September 2017 08:37 Go to previous messageGo to next message
mirek is currently offline  mirek
Messages: 13975
Registered: November 2005
Ultimate Member
This is ugly:

	// Reuse workers and make requests in a simple, non-blocking way.
	{
		Cout() << "----- Processing individual non-blocking requests...\n";
		// We can "clear" the data. Useful especially if the data is "picked".
		c1.Clear();
		c2.Clear();

		c1.Blocking(false).Request(host1, 21);
		c2.Blocking(false).Request(host2, 21);

		while(!c1.IsFinished() || !c2.IsFinished())
			;
		if(c1.IsError()) Cerr() << c1.GetErrorDesc() << '\n';
		else Cout() << ~c1 << '\n';
		
		if(c2.IsError()) Cerr() << c2.GetErrorDesc() << '\n';
		else Cout() << ~c2 << '\n';

	}


I think this should work without IsFinished loop.

Anyway, all thing still feels very much like future, with implicit promise connected to Thread. In that regard, full future/promise still sounds more powerful.

Mirek
Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48790 is a reply to message #48789] Tue, 19 September 2017 08:53 Go to previous messageGo to next message
Oblivion is currently offline  Oblivion
Messages: 1091
Registered: August 2007
Senior Contributor
Yes that does work withoud IsFinished loop.
I put that simply to show that it is possible to do someting else in that while() loop (some checks, calculations, whatever is needed.) while the requests are being processed.
It gives per-job control over the async works.

Example:
while(!c1.IsFinished() || !c2.IsFinished()) {
if(c2.IsFinished())
    Cout() << second request is complete before the first\n";
   // Etc...

}


Best regards,

Oblivion


[Updated on: Tue, 19 September 2017 08:58]

Report message to a moderator

Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48791 is a reply to message #48790] Tue, 19 September 2017 10:12 Go to previous messageGo to next message
mirek is currently offline  mirek
Messages: 13975
Registered: November 2005
Ultimate Member
Small issues with the code:

sData template does not compile with MSC (I believe it is not legit C++ code, probably GCC eats it but it is not OK). Replaced by

template<class T>
One<T>*& sData()
{
	static thread_local One<T>* sData = NULL;
	return sData;
}



GetNewWorkerId has int64 internal counter (which IMO is reasonable), but the rest of the code is using int.

Anyway, as you are using "TIMING", it appears you are benchmarking in debug mode. I have done some changes:

#include <Core/Core.h>
#include <Job/Job.h>

using namespace Upp;

String GetDivisors()
{
	String s;
	int number = (int) 1000;
	Vector<int> divisors;
	for(auto i = 1, j = 0; i < number + 1; i++) {
		auto d = number % i;
		if(d == 0){
			divisors.Add(i);
			j++;
		}
		if(i == number)
			s = Format("Worker Id: %d, Number: %d, Divisors (count: %d): %s",
						GetWorkerId(),
						number,
						j,
						divisors.ToString());
	}
	return pick(s);
}

// #define OUTPUT
#define N 100000

CONSOLE_APP_MAIN
{
	Array<Job<String>> jobs;
	jobs.SetCount(CPU_Cores() + 2);

	CoWork cowork;
//	cowork.SetPoolSize(CPU_Cores() + 2);
	
	Vector<String> results;
	RDUMP(CPU_Cores());
	{

		RTIMING("CoWork -- With stdout output");
		for(int i = 0; i < N; i++)
			cowork & [=, &results] { String h = GetDivisors(); CoWork::FinLock(); results.At(i) = h; };
		cowork.Finish();
		// Stdout output section.
	#ifdef OUTPUT
		for(auto& r : results)
			Cout() << r << '\n';
	#endif
	}
	{
		RTIMING("Job -- With stdout output");

		int i = 0;
		while(i < N) {
			for(auto& job : jobs) {
				if(!job.IsFinished()) {
					continue;
				}
				job & [=]{ Job<String>::Data() = GetDivisors(); };
				if(!(~job).IsEmpty()) {
				#ifdef OUTPUT
					Cout() << ~job << '\n';
				#endif
					if(++i == N) break;
				}
			}
	
		}
	}
}


with results in _RELEASE_ mode:

* d:\upp.out\MyApps\MSC15\JobBenchmark.exe 19.09.2017 10:01:27, user: cxl

CPU_Cores() = 8
TIMING Job -- With stdout output: 123.00 ms - 123.00 ms (123.00 ms / 1 ), min: 123.00 ms, max: 123.00 ms, nesting: 1 - 1
TIMING CoWork -- With stdout output: 103.00 ms - 103.00 ms (103.00 ms / 1 ), min: 103.00 ms, max: 103.00 ms, nesting: 1 - 1


Now I fully understand that performance is not the reason for Job, but as I have seen no technical reasons why CoWork should be that much slower, I had to check....
Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48792 is a reply to message #48780] Tue, 19 September 2017 10:15 Go to previous messageGo to next message
mirek is currently offline  mirek
Messages: 13975
Registered: November 2005
Ultimate Member
Oblivion wrote on Sun, 17 September 2017 23:20

[code]
1) future/promise pair requires at least moving of the resulted data, which can be
relatively expensive depending on the object type. On the other hand, Job acts as a simple
container and uses a reference based result gathering method. This makes it possible to
reduce move/copy overhead involved (nearly down to zero).


This is not quite true (unlike Job, which in fact requires you to move the data if you need it outside the Job). E.g.:

#include <Core/Core.h>
#include <future>

using namespace Upp;

template <class D, class P, class F, class... Args>
void set_future_value(D *dummy, P&& p, F&& f, Args&&... args)
{
	p.set_value(f(args...));
}

template <class P, class F, class... Args>
void set_future_value(void *dummy, P&& p, F&& f, Args&&... args)
{
	f(args...);
	p.set_value();
}

template< class Function, class... Args>
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
Async(Function&& f, Args&&... args )
{
	std::promise<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>> p;
	auto ftr = p.get_future();
	CoWork::Schedule([=, p = pick(p)]() mutable {
		std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)> *dummy = NULL;
		set_future_value(dummy, pick(p), pick(f));
	});
	return ftr;
}

CONSOLE_APP_MAIN
{
// with reference
	String h;
	auto f = Async([&h] { h = "Hello world"; });
	f.get();
	DDUMP(h);

// using return value
	DDUMP(Async([] { return "Hello world"; }).get());
}


BTW, I am mostly arguing there because it is a good way for me to investigate these issues....
Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48793 is a reply to message #48792] Tue, 19 September 2017 10:24 Go to previous messageGo to previous message
mirek is currently offline  mirek
Messages: 13975
Registered: November 2005
Ultimate Member
future / promise also seems to have superior error handling facilities (exceptions can easily be passed from thread to caller).

The one thing that seems to be missing is future -> promise abort.
Previous Topic: Added SysExec package
Next Topic: firebird: CharSet
Goto Forum:
  


Current Time: Fri Mar 29 06:56:38 CET 2024

Total time taken to generate the page: 0.01481 seconds