Home » Developing U++ » UppHub » Job package: A lightweight worker thread for non-blocking operations. (A)
| Job package: A lightweight worker thread for non-blocking operations. [message #48749] |
Sun, 10 September 2017 12:29  |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Hello,
Below you will find a worker thread implementation which -hopefully- can simplify creating non-blocking or asynchronous applications,
or allow you to easily port your applications to MT-era. I also supplied an example code demonstrating its basic usage pattern.
Job package for Ultimate++
---------------------------
This template class implements a scope bound, single worker thread based on RAII principle.
It provides a return semantics for result gathering, functionally similar to promise/future
pattern (including void type specialization). Also it provides a convenient error management
and exception propagation mechanisms for worker threads, and it is compatible with U++
single-threaded mode.
Note that while Job is a general purpose multithreading tool, for high performance loop
parallelization scenarios CoWork would be a more suitable option. This class is mainly designed
to allow applications and libraries to gain an easily managable, optional non-blocking behavior
where high latency is expected such as network operations and file I/O, and a safe,
container-style access to the data processed by the worker threads is preferred.
Features and Highlights
-----------------------
- A safe way to gather results from worker threads.
- Simple and easy-to-use thread halting, and error reporting mechanism.
- Exception propagation.
- External blocking is possible.
- Optional constant reference access to job results.
- Compatible with U++ single-threaded environment.
- All Job instances are scope bound and will forced to finish job when they get out of scope.
Known Issues
-----------------------
- Currently none.
History
-----------------------
- 2017-10-07: Compatibility with U++ single-threaded mode is added.
- 2017-10-01: Global variables moved into JobGlobal namespace in order to avoid multiple
definitions error. Accordingly, global functions are defined in Job.cpp.
- 2017-09-22: Exception propagation mechanism for job is properly added. From now on
worker threads will pass exceptions to their caller.
Void template specialization is re-implemented (without using future/promise).
Constant reference access operator is added. This is especially useful
where the data is a container with iterators (such as Vector or Array).
- 2017-09-19: std::exception class exceptions are handled, and treaated as errors.
(For the time being.)
void instantiation is now possible.
Jobs will notify their workers on shutdown.
Clean up & cosmetics...
- 2017-09-18: Clear() method is added. Worker id generator is using int64. Documentation
updated.
- 2017-09-17: Future/promise mechanism, and std template library code completely removed.
From now on Job has its own result gathering mechanism with zero copy/move
overhead.
- 2017-09-16: Job is redesigned. It is now a proper worker thread.
- 2017-09-10: Initial public beta version is released.
Hope you'll find it useful.
Best Regards,
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Sat, 07 October 2017 14:54] Report message to a moderator
|
|
|
|
|
|
|
|
| Re: Job package: A lightweight multithreading tool, using promise/future mechanism [message #48753 is a reply to message #48750] |
Sun, 10 September 2017 17:05   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Hello Mirek,
Thank you very much for your comments.
Quote:For now, it looks like Job is not using worker threads, creating thread for each run.
Yes, this is because Job is not a queue or pool. Maybe I have given you the wrong impression: It is not even meant to be an alternative or rival to CoWork which works just well. (I used a similar interface, because CoWork is already part of core U++ with a suitable interce design.)
Job is designed to be an interface to plain Thread, with a simplified error and result handling. (I admit the Job/Worker metaphor can be somewhat misleading.)
Quote:Which actually can have advantage if there are blocking operations (file I/O etc), but that is meant to be dealt with in CoWork by increasing the number of threads.
This is why I designed it for in the first place: For file and network operations. (That's why I supplied SocketClients example).
As for the CoWork's capabilities: Sure, CoWork can deal with such operations as well.
Quote:"WaitForJobs" is outright ugly, being single global function for all Jobs
I can turn it into Job::FinishAll(). However, Jobs, by default, wait for their workers to finish when they go out of scope.(Unless their workers are explicitly detached.)
And Finish & IsFinished will wait for each job to be done. (Though they can be improved.)
Consider this one:
{
Job<String> filejob([=]{
FileIn fi("/home/test_session/Very_Large_Test_File.txt");
if(fi.IsError())
JobError(fi.GetError(), fi.GetErrorText());
Cout() << "Reading file...";
return LoadStream(fi);
});
while(!filejob.IsFinished()) Cout() << "....";
Cout() << (filejob.IsError() ? filejob.GetErrorDesc() : filejob.GetResult()) << '\n';
}
Now, of course this can be written in CoWork, but for such operations as above, I believe the interface of Job is simpler for individual thread operations, and would be less error prone. (No locks, no sharing, individual asynchronous operations, and their results...)
As a side note I run a simple test (I'm not sure if it's legitimate, but was curious.):
Below is the timing results for the same operations carried out by Job and CoWork (I see them as somewhat different, and complementary tools, but wanted to see how well they perform.)
When I reduce Sleep value in WaitForJosb() (I know it's ugly) to 1, I get this for 1500 computations (Under latest GCC):
TIMING Job : 137.00 ms - 137.00 ms (137.00 ms / 1 ), min: 137.00 ms, max: 137.00 ms, nesting: 1 - 1
TIMING CoWork : 206.00 ms - 206.00 ms (206.00 ms / 1 ), min: 206.00 ms, max: 206.00 ms, nesting: 1 - 1
Code is:
String GetDivisors()
{
String s;
int number = (int) 1000;
Vector<int> divisors;
for(auto i = 1, j = 0; i < number + 1; i++) {
auto d = number % i;
if(d == 0){
divisors.Add(i);
j++;
}
if(i == number)
s = Format("Worker Id: %d, Number: %d, Divisors (count: %d): %s",
GetWorkerId(),
number,
j,
divisors.ToString());
}
return pick(s);
}
CONSOLE_APP_MAIN
{
{
CoWork jobs;
jobs.SetPoolSize(1500);
Vector<String> results;
TIMING("CoWork");
for(int i = 0; i < 1500; i++)
jobs & [=, &results] { String h = GetDivisors(); CoWork::FinLock(); results.At(i) = h; };
jobs.Finish();
for(auto r : results)
Cout() << r << '\n';
}
{
Array<Job<String>> jobs;
jobs.SetCount(1500);
TIMING("Job");
for(int i = 0; i < 1500; i++)
jobs[i].Start([=]{ return GetDivisors(); });
WaitForJobs();
for(auto& job : jobs)
Cout() << job.GetResult() << '\n';
}
}
Best regards,
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Sun, 10 September 2017 19:28] Report message to a moderator
|
|
|
|
| Re: Job package: A lightweight multithreading tool, using promise/future mechanism [message #48754 is a reply to message #48753] |
Sun, 10 September 2017 19:08   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
And below is where CoWork starts to outperform Job (As you pointed out, because of the additional copying involved, I guess.):
I changed the code to see other options:
{
CoWork jobs;
Vector<String> results;
TIMING("CoWork");
jobs & [=, &results] {
Vector<String> s;
for(int i = 0; i < 50000; i++)
s.Add() = GetDivisors();
CoWork::FinLock();
results = pick(s);
};
jobs.Finish();
}
{
Job<Vector<String>> job;
TIMING("Job");
job.Start([=]{
Vector<String> s;
for(int i = 0; i < 50000; i++)
s.Add() = GetDivisors();
return pick(s);
});
job.Finish();
auto s = job.GetResult();
}
TIMING Job : 1.42 s - 1.42 s ( 1.42 s / 1 ), min: 1.42 s , max: 1.42 s , nesting: 1 - 1
TIMING CoWork : 1.39 s - 1.39 s ( 1.39 s / 1 ), min: 1.39 s , max: 1.39 s , nesting: 1 - 1
Job is not an alternative to CoWork, but it's not a bad tool either. It does simplify writing high performance MT code in a convenient way, thanks to U++.
It is suitable for such asynchronous operations mainly where a high latency is expected (IO/sockets, etc.) and where the code needs to be easily managable (errors, and results should be easily and immediately dealt with.)
Best regards,
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Sun, 10 September 2017 20:00] Report message to a moderator
|
|
|
|
|
|
|
|
| Re: Job package: A lightweight multithreading tool, using promise/future mechanism [message #48757 is a reply to message #48756] |
Sun, 10 September 2017 21:16   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Hello Mirek, and thank you for your patience.
Quote:mirek
OK, so the difference is
- "return" semantics (implemented using promise/future)
- error states
Right?
If true, what about using future / promise directly? IMO the only problem is to have them interfaced with Thread and/or CoWork (for different kinds of usage).
Well, Job is one way of having them interfaced with U++ Thread. Using f/p pair directly is somewhat cumbersome.
However, TBH, what I really see lacking in multithreading tools in general is a simple error handling mechanism, and a per-thread shutdown mechanism.
This is the one of the reasons why I've come up with the Job class. It is also an attempt to solve these problems.
E.g.
Thread.IsError()
Thread.GetError()
Thread.GetErrorDesc()
Thread.ShutDown()
and
Thread::Error (exception)
As you can see in the Job.hpp these are not very hard to implement. (I don't see why it should be difficult for Thread at least (performance?). Cowork, being a job queue, is naturally a more complex beast.)
Best regards,
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Sun, 10 September 2017 21:34] Report message to a moderator
|
|
|
|
|
|
| Re: Job package: A lightweight multithreading tool, using promise/future mechanism [message #48760 is a reply to message #48759] |
Mon, 11 September 2017 00:25   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Quote:
That one is a borderline to impossible.
The problem are all those destructors that need to called. It is hard to abort thread that is does not know about it...
Ah no, what I mean is setting a unique flag for each thread (maybe using ThreadId), so that it can be checked from within like IsShutdownThreads.
(I do this with Job:Cancel() method and IsJobCancelled() global function, by keeping an index of their IDs, which are integer numbers, incremented with each new thread, using a thread local variable as the unique id.)
I was just talking about signalling the shutdown to specific targets so that user can design his/her MT code and handle its shutdown conditions easily.
And this is exactly where Job comes handy. It is a higher level interface, a convenience wrapper, if yow will, with a simpler return semantics, error management, and a more refined shutdown mechanism. With a reasonably small overhead.
Best regards,
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Mon, 11 September 2017 11:40] Report message to a moderator
|
|
|
|
| RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48780 is a reply to message #48759] |
Sun, 17 September 2017 23:20   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Hello Mirek (and all U++ community),
After your review and criticism of Job class. I went back to design board and come up with a new version, mostly re-written.
I updated the description and the package in the first post, as usual, but allow me to copy/paste it's description here:
This package contains a lightweight and easy-to-use multithreading tool for U++ framework: Job.
Job template class implements a scope bound, single worker thread based on RAII principle.
It provides a return semantics for result gathering functionally similar to promise/future
pattern but with three major differences:
1) future/promise pair requires at least moving of the resulted data, which can be
relatively expensive depending on the object type. On the other hand, Job acts as a simple
container and uses a reference based result gathering method. This makes it possible to
reduce move/copy overhead involved (nearly down to zero).
2) Job does not allow the T to be of plain void type (of course, void pointer is allowed).
3) Trying to access the resulting data while it is still invalid will not throw.
Resources are allocated during construction (including the job data).
Note that for higher performance loop parallelization scenarios, CoWork would be a more
suitable option. This class is mainly designed to allow the applications and libraries to gain
an easily managable, optional non-blocking behaviour where high latency is expected (Such as
network operations and file I/O), and a safe "referential access" to the objects processed
by the worker threads is preferred.
- It is now a proper single worker thread. (performance gain, and memory reduction is visible.) By design Job has no work scheduling (it is not meant to be a queue, not directly at least.)
- It is re-designed around the RAII principle: A scope-bound single worker thread that only gets destroyed when it is out-of-its scope.
- Most importantly, thanks to your criticisim, I ditched the future/promise mechanism completely, in favour of "Upp-native" way: Job instance are from now on basically simple data containers with referential acces to their data (result). Yet I've kept the alternative return semantics. It is really useful.
Granted, none of these are impossible to implement with CoWork or Thread. AFAIK CoWork is scope bound too. But Job's purpose is different. Although it can be used as a general parallelization tool, it is really meant to simplify writing non-blocking applications, or porting exisiting ones to them, providing a simple yet convenient interface.
For example, with this new design it took around 2 hours for me to port my own FTP class fully into MT environment, using a simple switch (Ftp::Blocking(false)) (News: Upcoming version (2.0) will support MT internally.).
Again, I've begun porting (an experiment for now) the SSH package to MT using Job, and it solves nearly every problem that I ran against wrapping SSH (non-blocking), and also both source code and interface is reduced drastically. It is very clean now. (all those Startxxx() and xxx() method pairs are gone, there are now only xxx ones. E.g. Ssh:Connect() ) You can see this uniform programming/porting pattern emerging int the new SocketClients example I provided:
class Client : public Job<String> {
public:
Client& Blocking(bool b = true) { blocking = b; return *this; }
String Request(const String& host, int port);
private:
String Run(Event<>&& cmd);
bool blocking = true;
};
String Client::Run(Event<>&& cmd)
{
Start(pick(cmd));
if(blocking) Finish();
return blocking && !IsError() ? GetResult() : GetErrorDesc();
}
String Client::Request(const String& host, int port)
{
auto cmd = [=]{
TcpSocket socket;
auto& output = Job<String>::Data(); // This method allows referential access to the data of respective job.
output = Format("Client #%d: ", GetWorkerId());
INTERLOCKED { Cout() << output << "Starting...\n"; }
if(socket.Timeout(10000).Connect(host, port))
output.Cat(socket.GetLine());
if(socket.IsError())
throw JobError(socket.GetError(), socket.GetErrorDesc());
};
return Run(cmd);
}
CONSOLE_APP_MAIN
{
//.....
// Requesting in a simple, blocking way.
{
Cout() << "----- Processing individual blocking requests...\n";
Cout() << c1.Request(host1, 21) << '\n';
Cout() << c2.Request(host2, 21) << '\n';
}
// Reuse workers and make requests in a simple, non-blocking way.
{
Cout() << "----- Processing individual non-blocking requests...\n";
// We can "clear" the data (String):
c1.GetResult().Clear();
c2.GetResult().Clear();
c1.Blocking(false).Request(host1, 21);
c2.Blocking(false).Request(host2, 21);
while(!c1.IsFinished() || !c2.IsFinished())
;
if(c1.IsError()) Cerr() << c1.GetErrorDesc() << '\n';
else Cout() << ~c1 << '\n';
if(c2.IsError()) Cerr() << c2.GetErrorDesc() << '\n';
else Cout() << ~c2 << '\n';
}
//....
}
Please also take a look into the full code.
As always, review, bug reports, criticism, feedback are greatly appreciated.
Best regards,
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Mon, 18 September 2017 00:10] Report message to a moderator
|
|
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48783 is a reply to message #48782] |
Mon, 18 September 2017 08:29   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Quote:That's intereseting, because I actually suggested adding future/promise to U++ facilities... Smile
Anyway, I might check it, but I do not see any archive / link posted.
Mirek
That can also be done, I can work on it separately, or re-introduce it as an option if you think it is worth it. (In it's current state Job package does not cover a shared_future alternative. Maybe I should consider adding a SharedJob class, but 1) there is already CoWork for paralellization, 2) I don't quite know a real life scenario where it would be more useful than CoWork.)
Nevertheless, recently I've found my "reference based" approach much more performant and convenient than future/promise.
Consider this (pseudo) code:
double sum_total = 0;
Job<Vector<double>> job;
job.Start(DoSomeHeavyCalculationAndFillTheVector);
//....
if(job.IsFinishe() && !job.IsError()) {
for(auto& e : ~job) // Reference access to Vector. Data is not copied or moved. Vector is filled via a direct -yet safe- access.
sum_total += e; // And retrieved in same fashion as well. Note also that there is no requirement for the data type to be contained.
// In fact, using One or Any, or Value, we can even differentiate between the job
// results for given operations in a single Job easily. :)
}
Package's link is on the bottom of the first post of this topic.
Best regards,
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Mon, 18 September 2017 11:49] Report message to a moderator
|
|
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48788 is a reply to message #48782] |
Tue, 19 September 2017 08:12   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Hello Mirek,
(Thank you for your comments. I deeply appreciate it.)
I was curious about if the new design of Job class will pay off, whether it is also a resonable general paralellization tool, and did some benchmarking with the Divisors example.
I assumed Job and CoWork to be functionally and effectively identical in this example (In the sense that, regardless of their internals, they are both doing the same thing: Calculating divisiors for the number 1000, 10.000 times in available worker threads, then printing the results to the screen.)
I simply changed the jobs loop to take advantage of new return semantics (I don't know if CoWork can be put into a similar loop, so I am taking this with a grain of salt):
The loop for the job is simply a very crude slot manager for 8 Job workers. (Tested on AMD FX 6100, six core processor.)
Array<Job<String>> jobs;
jobs.SetCount(CPU_Cores() + 2);
CoWork cowork;
// cowork.SetPoolSize(CPU_Cores() + 2);
Vector<String> results;
DUMP(CPU_Cores());
{
TIMING("CoWork -- With stdout output");
for(int i = 0; i < 10000; i++)
cowork & [=, &results] { String h = GetDivisors(); CoWork::FinLock(); results.At(i) = h; };
cowork.Finish();
// Stdout output section.
for(auto& r : results)
Cout() << r << '\n';
}
{
TIMING("Job -- With stdout output");
int i = 0;
while(i < 10000) {
for(auto& job : jobs) {
if(!job.IsFinished()) {
continue;
}
job & [=]{ Job<String>::Data() = GetDivisors(); };
if(!(~job).IsEmpty()) {
Cout() << ~job << '\n';
if(++i == 10000) break;
}
}
}
}
Results (consistent):
For 10000 computiation.
CPU_Cores() = 6
TIMING Job -- With stdout output: 370.00 ms - 370.00 ms (370.00 ms / 1 ), min: 370.00 ms, max: 370.00 ms, nesting: 1 - 1
TIMING CoWork -- With stdout output: 461.00 ms - 461.00 ms (461.00 ms / 1 ), min: 461.00 ms, max: 461.00 ms, nesting: 1 - 1
CPU_Cores() = 6
TIMING Job -- Without stdout output: 228.00 ms - 228.00 ms (228.00 ms / 1 ), min: 228.00 ms, max: 228.00 ms, nesting: 1 - 1
TIMING CoWork -- Without stdout output: 234.00 ms - 234.00 ms (234.00 ms / 1 ), min: 234.00 ms, max: 234.00 ms, nesting: 1 - 1
for 1000 compuitation.
CPU_Cores() = 6
TIMING Job -- With stdout output: 34.00 ms - 34.00 ms (34.00 ms / 1 ), min: 34.00 ms, max: 34.00 ms, nesting: 1 - 1
TIMING CoWork -- With stdout output: 53.00 ms - 53.00 ms (53.00 ms / 1 ), min: 53.00 ms, max: 53.00 ms, nesting: 1 - 1
CPU_Cores() = 6
TIMING Job -- Without stdout output: 24.00 ms - 24.00 ms (24.00 ms / 1 ), min: 24.00 ms, max: 24.00 ms, nesting: 1 - 1
TIMING CoWork -- Without stdout output: 31.00 ms - 31.00 ms (31.00 ms / 1 ), min: 31.00 ms, max: 31.00 ms, nesting: 1 - 1
What do you think?
Best regards,
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Tue, 19 September 2017 09:51] Report message to a moderator
|
|
|
|
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48791 is a reply to message #48790] |
Tue, 19 September 2017 10:12   |
 |
mirek
Messages: 14290 Registered: November 2005
|
Ultimate Member |
|
|
Small issues with the code:
sData template does not compile with MSC (I believe it is not legit C++ code, probably GCC eats it but it is not OK). Replaced by
template<class T>
One<T>*& sData()
{
static thread_local One<T>* sData = NULL;
return sData;
}
GetNewWorkerId has int64 internal counter (which IMO is reasonable), but the rest of the code is using int.
Anyway, as you are using "TIMING", it appears you are benchmarking in debug mode. I have done some changes:
#include <Core/Core.h>
#include <Job/Job.h>
using namespace Upp;
String GetDivisors()
{
String s;
int number = (int) 1000;
Vector<int> divisors;
for(auto i = 1, j = 0; i < number + 1; i++) {
auto d = number % i;
if(d == 0){
divisors.Add(i);
j++;
}
if(i == number)
s = Format("Worker Id: %d, Number: %d, Divisors (count: %d): %s",
GetWorkerId(),
number,
j,
divisors.ToString());
}
return pick(s);
}
// #define OUTPUT
#define N 100000
CONSOLE_APP_MAIN
{
Array<Job<String>> jobs;
jobs.SetCount(CPU_Cores() + 2);
CoWork cowork;
// cowork.SetPoolSize(CPU_Cores() + 2);
Vector<String> results;
RDUMP(CPU_Cores());
{
RTIMING("CoWork -- With stdout output");
for(int i = 0; i < N; i++)
cowork & [=, &results] { String h = GetDivisors(); CoWork::FinLock(); results.At(i) = h; };
cowork.Finish();
// Stdout output section.
#ifdef OUTPUT
for(auto& r : results)
Cout() << r << '\n';
#endif
}
{
RTIMING("Job -- With stdout output");
int i = 0;
while(i < N) {
for(auto& job : jobs) {
if(!job.IsFinished()) {
continue;
}
job & [=]{ Job<String>::Data() = GetDivisors(); };
if(!(~job).IsEmpty()) {
#ifdef OUTPUT
Cout() << ~job << '\n';
#endif
if(++i == N) break;
}
}
}
}
}
with results in _RELEASE_ mode:
* d:\upp.out\MyApps\MSC15\JobBenchmark.exe 19.09.2017 10:01:27, user: cxl
CPU_Cores() = 8
TIMING Job -- With stdout output: 123.00 ms - 123.00 ms (123.00 ms / 1 ), min: 123.00 ms, max: 123.00 ms, nesting: 1 - 1
TIMING CoWork -- With stdout output: 103.00 ms - 103.00 ms (103.00 ms / 1 ), min: 103.00 ms, max: 103.00 ms, nesting: 1 - 1
Now I fully understand that performance is not the reason for Job, but as I have seen no technical reasons why CoWork should be that much slower, I had to check....
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48792 is a reply to message #48780] |
Tue, 19 September 2017 10:15   |
 |
mirek
Messages: 14290 Registered: November 2005
|
Ultimate Member |
|
|
Oblivion wrote on Sun, 17 September 2017 23:20
[code]
1) future/promise pair requires at least moving of the resulted data, which can be
relatively expensive depending on the object type. On the other hand, Job acts as a simple
container and uses a reference based result gathering method. This makes it possible to
reduce move/copy overhead involved (nearly down to zero).
This is not quite true (unlike Job, which in fact requires you to move the data if you need it outside the Job). E.g.:
#include <Core/Core.h>
#include <future>
using namespace Upp;
template <class D, class P, class F, class... Args>
void set_future_value(D *dummy, P&& p, F&& f, Args&&... args)
{
p.set_value(f(args...));
}
template <class P, class F, class... Args>
void set_future_value(void *dummy, P&& p, F&& f, Args&&... args)
{
f(args...);
p.set_value();
}
template< class Function, class... Args>
std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
Async(Function&& f, Args&&... args )
{
std::promise<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>> p;
auto ftr = p.get_future();
CoWork::Schedule([=, p = pick(p)]() mutable {
std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)> *dummy = NULL;
set_future_value(dummy, pick(p), pick(f));
});
return ftr;
}
CONSOLE_APP_MAIN
{
// with reference
String h;
auto f = Async([&h] { h = "Hello world"; });
f.get();
DDUMP(h);
// using return value
DDUMP(Async([] { return "Hello world"; }).get());
}
BTW, I am mostly arguing there because it is a good way for me to investigate these issues....
|
|
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48794 is a reply to message #48793] |
Tue, 19 September 2017 11:12   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Hello Mirek,
Thanks for all the corrections. I find your opinions really valuable and useful. I was suspicios there was something wrong with that benchmark too.
Quote:This is not quite true (unlike Job, which in fact requires you to move the data if you need it outside the Job).
Not necessarily. I can always use a pointer or Ref(), or std::ref. (In it's current state it wouldn't be as clean as capturing, I admit, but can be improved)
Or I can simply pass reference, using lambda capture. That is not forbidden. But then I have to take into account the object's life time too.
Job solves some of those head aches. (Of course, tt may introduce its own).
For the sake of discussion (ugly):
In Job.h
template<class V = T>
Job(V v) : Job() { *data = v; }
then, in Example.cpp:
int i;
Job<int*> job(&i);
job.Start([]{ auto& n = *Job<int*>::Data() = 100;});
job.Finish();
Cout() << "Number is " << i << '\n';
Quote:future / promise also seems to have superior error handling facilities (exceptions can easily be passed from thread to caller).
Not really hard to add. I can: process JobErrors, and re-throw plain Upp::Exc, or std::exception category.
Quote:
The one thing that seems to be missing is future -> promise abort.
Job has a per-job cancel mechanism.
So what do you suggest?
Your Async wrapper is really simple and nice. Are you going to add it to U++?
Or should I improve Job? (add exception forwarding, easier way to pass references and pointers...)
My favourite : Or should I re-introduce Future/promise pair as was before? After all Job was inially a future/promise wrapper. But it lacked RAII and worker thead model.
Now they are in place. All I need to do is to incorporate it to the current model using the Async example you provided (except cowork).
(Without changing the public api of Job much. IMO It looks familiar and acts fine.)
Or should I simply abandon it? This tool is born out of need, which plain future/promise mechanism couldn't satisfy (i.e. a fine-grained control over threads for non-blocking behaviour.)
I don't see a reson to hang onto it if there will be a better solution.
Again, thank you very much for taking time to discuss the issue.
Job currently lacks shared states but it is not what Job is meant for anyway. IMO CoWork is there for it (and for bunch of other things).
Best regards,
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Tue, 19 September 2017 11:37] Report message to a moderator
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48795 is a reply to message #48793] |
Tue, 19 September 2017 12:31   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
How about adding An Async/Result pair, the way you suggested.
ASync will be an improved version of the example you provided with (using CoWork), addition of result id generator (for cancellation), exception handling:
Result will be a wrapper for future where we can do such things as follows:
auto result1 = Async([=]{ return GetDivisors(); })
while(!result.IsReady())
; // Do something;
Cout() << result.IsValid() ? result.Get() : result.GetErrorDesc();
// and of course...
Cout() << Async([=]{ return "Hello World\n"; }).Get();
Result's Interface may be consisted of:
Result::Get();
Result::IsReady();
Result::IsValid();
Result::GetError();
Result::GetErrorDesc():
Result::GetId();
Result::Cancel();
If you'd like, I can implement it this way. (IMO Result is a more meaningful name, but naming can change...)
Best regards,
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Tue, 19 September 2017 12:35] Report message to a moderator
|
|
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48807 is a reply to message #48796] |
Sat, 23 September 2017 14:17   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Hello Mirek (and Upp community),
Job package is updated in accordance with the above discussion.
To sum up, it is heavily improved (tested on GCC 7.2.0, MinGW (supplied with U++), and latest MinGWx64, and MSC 2017):
- Void type instantiation is re-introduced for good (does not rely on anything other than plain template specializaton rules.)
- Value return semantics is re-introduced for good.
- Constant reference access operator is added. (This is especially useful when using Job with containers such as Vector, or array (e.g. Job<Vector<int>>) in a loop. See JobExample test app provided)
- Exception propagation mechanism is added. Workers will propagate raised exceptions to their caller. (when one of the GeResult(), operator~(), or IsError() methods is called. )
- JobExample test application is added to the package. This example demonstrates the above mentioned traits.
I believe with the recent update Job is now slightly superior to async/future/promise trio (not shared_future), in that it has full control over its thread (which is a proper worker thread) worker cancelleation/abortion mechanics,
and optional container-like reference access to its result which can reduce copying/moving where unnecessary. (Except Job doesn'T support reference specialization. Job<T&> is not possible for now. But that is not a big deal, really) 
Bug reports, criticism and feedback is deeply appreciated.
[ See below message for new version ]
Best regards.
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Sat, 07 October 2017 14:55] Report message to a moderator
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48829 is a reply to message #48796] |
Sat, 07 October 2017 15:03   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Hello,
Job package is now compatible with single-threaded U++ environment. (Yet -almost- all methods and global functions retain their functionality under ST env.)
Documentation and provided example is updated accordingly.
The code below is a part of JobExample.cpp and demonstrates both non-blocking behaviour and Job cancelling feature in a single-threaded environment:
void CancelJob()
{
// Singlethreaded version. (non-blocking operation)
// Below example is one of the simplest ways to achive non-blocking operations with Job in a
// single-threaded environment. A finer-grained operation would involve handling of return
// codes. (e.g. using Job<int>)
Job<void> job;
auto work = [=] {
if(IsJobCancelled()) {
Cout() << "Worker #" << GetWorkerId() << " received cancel signal. Cancelling job...\n";
throw JobError("Operation cancelled.");
}
};
Cout() << "Worker #" << GetWorkerId() << " started. (Waiting the cancellation signal...)\n";
const int timeout = 5000;
auto start_time = msecs();
while(!job.IsError()) {
job.Start(work);
if(msecs(start_time) >= timeout)
job.Cancel(); // Or if you have more than one non-blocking operation in progress,
// you can call CancelJobs() global function to cancel all running jobs.
}
}
As always, reviews, criticism, bug reports, etc. are deeply appreciated.
Best regards,
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Sat, 07 October 2017 15:09] Report message to a moderator
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48830 is a reply to message #48829] |
Sat, 07 October 2017 15:51   |
 |
mirek
Messages: 14290 Registered: November 2005
|
Ultimate Member |
|
|
Oblivion wrote on Sat, 07 October 2017 15:03Hello,
Job package is now compatible with single-threaded U++ environment. (Yet -almost- all methods and global functions retain their functionality under ST env.)
ST is now deprecated...
BTW, have you noticed that, inspired by this discussion thread, CoWork now has cancelation and exception propagation?
IMO, that makes it very similar to job, the only part that is really missing is the return value.
Actually, the main reason I have not implemented that yet is that I do not like the explicit specification of return value, as in Job. Which requires to implement it as function, which in turn requires some form of 'future'.
Mirek
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48831 is a reply to message #48830] |
Sat, 07 October 2017 16:04   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Hello Mirek,
Quote:ST is now deprecated...
Sure, The reason I added ST support is that I have some code that needs to be backward (ST) compatible. And for those who want't to keep their code as such. (It wasn't very difficult to add, anyway).
Quote:
BTW, have you noticed that, inspired by this discussion thread, CoWork now has cancelation and exception propagation?
IMO, that makes it very similar to job, the only part that is really missing is the return value.
Actually, the main reason I have not implemented that yet is that I do not like the explicit specification of return value, as in Job. Which requires to implement it as function, which in turn requires some form of 'future'.
Mirek
No, I didn't notice it till now, that's great, one less reason for me to use Job, thanks! 
I'll try and see if I can use CoWork instead. (It shouldn't be very difficult)
Job's return semantics proved really useful, so for the time being I intend to maintain it.
And in the meantime let's see if I can come up with a suitable alternative to std::future (to use it with CoWork).
Best regards,
Oblivion.
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Sat, 07 October 2017 16:33] Report message to a moderator
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48832 is a reply to message #48830] |
Sun, 08 October 2017 15:01   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Hello Mirek,
I wrote a prototype for U++ implementation of promise/future mechanism. It is 104 LOCs, and it took an hour for me to to design and write it (It is in its current state not perfect but it works.)
It is consisted of three classes (please do not dwell on naming, I didn't think them through, they'll change):
- WorkEntry -> std:promise (This class is not explicitly called. It is allocated on the heap (using new), and its ownership is passed to WorkResult.)
- WorkResult -> std: future (With pick semantics, void type specialization, error handling, exception propagation, and optional constant reference access to the result.)
- Work (Contains a CoWork instance, and exposes Do() and Finish() methods.)
I re-implemented your Async() code in Work class, using CoWork::Do() instead of CoWork::Schedule(). The rationale behind this decision is that sometimes asynchronous operations need to be externally blocked, all at once ("wait for all works to finish"). And I found no other reliable way. Also, works use a single semaphore to wait and run.
Example code:
// Error handling, picking/moving.
auto r1 = work.Do([=] { throw Work::Error("Worker failed."); });
work.Finish();
auto r2 = pick(r1); // r1 is picked.
if(r2.IsError()) {
Cout() << r2.GetErrorDesc() << '\n';
}
There is room for improvements, and probably some issues I didn't encounter yet or I overlooked. Feel free to comment on it. (One area that needs improving and to be made more error resistant is moving/picking behaviour. I'll improve them)
Below you can find the Work, and WorkExample, which is basically JobBencmark adapted to Work.
Best regards,
Oblivion
-
Attachment: Work.zip
(Size: 3.19KB, Downloaded 445 times)
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Sun, 08 October 2017 15:07] Report message to a moderator
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48835 is a reply to message #48830] |
Tue, 10 October 2017 11:51   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
A further refinement would be to explicity relate CoWork to promise/future pattern:
My suggested namings:
-
- WorkResult (former WorkEntry): Unlike std::promise, this is called privately. I don't really see any use in calling it explicitly
- Work (former WorkResult): This can be the U++ std::future counterpart (see the first prototype packageI provided above) This can be a helper class to CoWork, representing a single, isolated (semantically) thread.
- CoWork::Work(): This is the thread-starter method which will return, well, Work :)
I propose adding CoWork::Work() as a (non-static) method to CoWork, because from my experience with Job,Thread, future/promise and CoWork, and as I noted on my previous message, keeping workers contained in CoWork instances, using a CoWork::Do() call, have some advantages over using a static method such as CoWork::Schedule(): Such as the ability to use CoWork::Finish() on demand, and waiting the workers to be finished automatically on class instance destruction.
As for the error management mechanism using a specific exception type such as Work::Error, along with IsError(), GetError() and GetErrorDesc() methods, IMO they would be a very useful addition, but they are not necessary, can be removed.
What do you think?
Best regards,
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Tue, 10 October 2017 12:01] Report message to a moderator
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48836 is a reply to message #48835] |
Tue, 10 October 2017 16:10   |
 |
mirek
Messages: 14290 Registered: November 2005
|
Ultimate Member |
|
|
My try.
#include <Core/Core.h>
using namespace Upp;
template <class Ret>
class AsyncWork {
template <class Ret>
struct Imp {
CoWork co;
Ret ret;
template< class Function, class... Args>
void Do(Function&& f, Args&&... args) { co.Do([=]() { ret = f(args...); }); }
const Ret& Get() { return ret; }
};
template <>
struct Imp<void> {
CoWork co;
template< class Function, class... Args>
void Do(Function&& f, Args&&... args) { co.Do([=]() { f(args...); }); }
void Get() {}
};
One<Imp<Ret>> imp;
public:
template< class Function, class... Args>
void Do(Function&& f, Args&&... args) { imp.Create().Do(f, args...); }
void Cancel() { if(imp) imp->co.Cancel(); }
Ret Get() { ASSERT(imp); imp->co.Finish(); return imp->Get(); }
Ret operator~() { return Get(); }
AsyncWork(AsyncWork&&) = default;
AsyncWork() {}
~AsyncWork() { if(imp) imp->co.Cancel(); }
};
template< class Function, class... Args>
AsyncWork<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
Async(Function&& f, Args&&... args)
{
AsyncWork<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>> h;
h.Do(f, args...);
return pick(h);
}
CONSOLE_APP_MAIN
{
auto h = Async([] { return "Hello world"; });
DDUMP(h.Get());
DDUMP(~Async([](int x) { return x * x; }, 9));
auto x = Async([] { throw "error"; });
try {
x.Get();
}
catch(...) {
DLOG("Exception caught");
}
}
I guess it is quite similar now.
[Updated on: Tue, 10 October 2017 16:17] Report message to a moderator
|
|
|
|
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48840 is a reply to message #48839] |
Tue, 10 October 2017 23:48   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Hello Mirek,
tested Async/AsyncWork with MSC 2017, MinGW on windows, and with GCC on linux.
- It compiles on MSC without any hiccup.
- It does not compile on GCC (7.2) or MinGW unless the nested classes are moved out, (That's why I wrote my prototype that way.) and "Ret" is changed to some other parameter name. Here are the error codes I get:
\AsyncTest.cpp (8): error: declaration of template parameter 'Ret' shadows template parameter
\AsyncTest.cpp (18): error: explicit specialization in non-namespace scope 'class AsyncWork<Ret>'
\AsyncTest.cpp (19): error: template parameters not deducible in partial specialization:
\AsyncTest.cpp (31): error: too many template-parameter-lists
\AsyncTest.cpp (47): error: 'class AsyncWork<const char*>' has no member named 'Do' (): h.Do(f, args...);
\AsyncTest.cpp: In instantiation of 'AsyncWork<typename std::result_of<typename std::decay<_Tp>::type(std::decay_t<Args>...)>::type> Async(Function&&, Args&& ...) [wit
h Function = ConsoleMainFn_()::<lambda(int)>; Args = {int}; typename std::result_of<typename std::decay<_Tp>::type(std::decay_t<Args>...)>::type = int]':
\AsyncTest.cpp (56): required from here
\AsyncTest.cpp (47): error: 'class AsyncWork<int>' has no member named 'Do'
\AsyncTest.cpp (47): error: 'class AsyncWork<void>' has no member named 'Do'
\AsyncTest.cpp (11): error: 'AsyncWork<Ret>::Imp<Ret>::ret' has incomplete type
\AsyncTest.cpp (11): error: invalid use of 'void'
\AsyncTest.cpp (15): error: forming reference to void
\AsyncTest.cpp (34): error: 'struct AsyncWork<void>::Imp<void>' has no member named 'Get'; did you mean 'ret'?
\AsyncTest.cpp (34): error: return-statement with a value, in function returning 'void' [-fpermissive]
- More importantly there seems to be something wrong with the exception propagation mechanism. For,
1) Sometimes it fails to catch the exception, and the application crashes with that exception.
2) When it catches the exception the application hangs at the end (after the "exception caught" message is printed.)
3) Sometimes the application simply hangs.
I got this erratic behaviour both on windows and on linux, on a single machine, so it maybe a local hardware problem, I need to investigate it further...
Quote:s a tricky catch with IsFinished:
template <class Range>
ValueTypeOf<Range> ASum(const Range& rng, const ValueTypeOf<Range>& zero)
{
int n = rng.GetCount();
if(n == 1)
return rng[0];
if(n == 0)
return 0;
auto l = Async([&] { return ASum(SubRange(rng, 0, n / 2)); });
auto r = Async([&] { return ASum(SubRange(rng, n / 2, n - n / 2)); });
while(!l.IsFinished() || !r.IsFinished())
Sleep(1);
return l.Get() + r.Get();
}
What do you think is wrong with this code? Smile
Mirek
Sure, but can this really be attributed to a design flaw?
I mean, f I'm not really missing anything else, it seems that here we simply have a careless programming.
Recursion is potentially tricky by nature, and requires the developer to be extra cautious with his/her assumptions.
A proper use of IsFinished() can be found in my JobBenchmark example , where it is simply used to check the worker, and move on to others if the job is not finished... (at least, that's what I have in my mind in the first place)
Best regards.
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Wed, 11 October 2017 08:44] Report message to a moderator
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48841 is a reply to message #48840] |
Wed, 11 October 2017 09:23   |
 |
mirek
Messages: 14290 Registered: November 2005
|
Ultimate Member |
|
|
Oblivion wrote on Tue, 10 October 2017 23:48Hello Mirek,
tested Async/AsyncWork with MSC 2017, MinGW on windows, and with GCC on linux.
- It compiles on MSC without any hiccup.
- It does not compile on GCC (7.2) or MinGW unless the nested classes are moved out, (That's why I wrote my prototype that way.) and "Ret" is changed to some other parameter name. Here are the error codes I get:
After a bit of wrestling with C++11, it now compiles.
Quote:
- More importantly there seems to be something wrong with the exception propagation mechanism. For,
1) Sometimes it fails to catch the exception, and the application crashes with that exception.
2) When it catches the exception the application hangs at the end (after the "exception caught" message is printed.)
3) Sometimes the application simply hangs.
I got this erratic behaviour both on windows and on linux, on a single machine, so it maybe a local hardware problem, I need to investigate it further...
I would like to investigate, but need a testcase.
Quote:
Sure, but can this really be attributed to a design flaw?
Recursion is potentially tricky by nature, and requires the developer to be extra cautious with his/her assumptions.
Well, recursion aside, I think that in general, we want the mechanism reentrant. I mean, it should not be a part of contract/function documentation whether is it using IsFinished internally.
I can actually fix the issue, but then IsFinished will not be doing the same thing.
BTW, the very same issue is true for CoWork. So something to think about..
Mirek
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48842 is a reply to message #48841] |
Wed, 11 October 2017 09:42   |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Quote:
Well, recursion aside, I think that in general, we want the mechanism reentrant. I mean, it should not be a part of contract/function documentation whether is it using IsFinished internally.
Ah, I see. Apparently I'm still not thinking out of my Job-mindset (It was theoretically reentrant, although I did not actually test it.).
CoWork is a different beast internally. I forgot that.
As for the erratic exception handling, I didn't use any code other than you provided above. Simply put, it sometimes works and sometimes doesn't. I didn't notice any regular pattern.
Best regards,
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Wed, 11 October 2017 09:44] Report message to a moderator
|
|
|
|
|
|
|
|
|
|
| Re: RE: Job package: A scope-bound worker thread for non-blocking operations. [message #48847 is a reply to message #48846] |
Wed, 11 October 2017 20:14  |
Oblivion
Messages: 1264 Registered: August 2007
|
Senior Contributor |
|
|
Hello Mirek,
Quote:Anyway, (maybe I forgot to mention to AsyncWork is now in U++): Are you trying with current trunk?
This is great news! Thank you very much for your patience and efforts. This is a really handy tool which obsoletes Job. 
As for the weird error I get, it seems to be stopped after I purged UPPOUT.
IF I encounter it again, I'll report it.
By the way, I noticed that there are missing topic titles in Core/src.tpp : (Huge, Function, Unicode Support). Also, Huge class title is missing in it's doc.
Best regards.
Oblivion
Github page: https://github.com/ismail-yilmaz
Bobcat the terminal emulator: https://github.com/ismail-yilmaz/Bobcat
[Updated on: Wed, 11 October 2017 20:53] Report message to a moderator
|
|
|
|
Goto Forum:
Current Time: Sat Apr 25 18:15:59 GMT+2 2026
Total time taken to generate the page: 0.01346 seconds
|