mr_ped wrote on Tue, 14 October 2008 10:27 |
Is there some reason why this subset can't live along full mutex/etc. stuff in the same library? |
void QueueCoWork::Manage(Callback1 &cb) { int mostFreeThread = -1; //find most free (unbusy) Thread queueThreads[mostFreeThread] << cb; } void QueueCoWork::ManageTypical(Callback1 &cb) { //simply rotate through threads to average tasks count lastUsedThread = (++lastUsedThread) % queueThreads.GetCount(); queueThreads[lastUsedThread] << cb; } //---------------------------------------------------- void MainWindow::OnPaint { queueCoWork << THISBACK(PaintLines); } void MainWindow::PaintLines() { for (int y=0; y<height; ++y) queueCoWork.ManageTypical(THISBACK1(PaintLine, y)); } void MainWindow::PaintLine(int y) { //paint the y-th line }
void QueueCoWork::Manage(Callback1 &cb) { int mostFreeThread = -1; //find most free (unbusy) Thread queueThreads[mostFreeThread] << cb; } void QueueCoWork::ManageTypical(Callback1 &cb) { //simply rotate through threads to average tasks count lastUsedThread = (++lastUsedThread) % queueThreads.GetCount(); queueThreads[lastUsedThread] << cb; } //---------------------------------------------------- void MainWindow::OnPaint { queueCoWork << THISBACK(PaintLines); } void MainWindow::PaintLines() { for (int y=0; y<height; ++y) queueCoWork.ManageTypical(THISBACK1(PaintLine, y)); } void MainWindow::PaintLine(int y) { //paint the y-th line }
#include <Core/Core.h> #include <math.h> using namespace Upp; class TestClass { public: void func(int a, int b) { int c = a + b*b; double xx = sin(a+b*1.34-3.14159252596428); double yy = cos((double) (b-a)); x += ceil(xx*yy*(c-(b << 4)+(a*b))); } private: int x; }; CONSOLE_APP_MAIN { TestClass tc; Callback2<int,int> cb = callback(&tc, &TestClass::func); dword cnt = 0; for (int j=0; j<50; ++j) { Cout() << FormatInt(j) << " "; dword t1 = GetTickCount(); for (int i=0; i<2000000; ++i) { cb(100,500); //tc.func(100, 500); } cnt += GetTickCount()-t1; Sleep(1500); } Cout() << "\n" << FormatInt(cnt/j); }
tc.func(100, 500);
cb(100,500);
Callback2<int,int> cb = callback(&tc, &TestClass::func); cb(100,500);
for (...) { Callback cb = callback2(&tc, &TestClass::func, 100,500); cb(); }
BiVector<Callback> cbv; cbv.Reserve(100); for (...) { cbv.AddTail(callback2(&tc, &TestClass::func, 100,500)); cbv.Head().Execute(); cbv.DropHead(); }
class AThreaded { public: AThreaded() { args.SetCount(0xFF+1); //yes, simple array+"hash" instead of Index. that is because Index` elements are constant } template<class OBJECT, class P1, class P2> void RequestAction(void (OBJECT::*m)(P1,P2), const P1 &p1, const P2 &p2) { typedef void (OBJECT::*Func)(P1,P2); struct Args : public Moveable<Args> { P1 p1; P2 p2; }; //using method pointer as hash value. notice that method`s pointer size may be >= plain (void *) int methodPtrSize = sizeof(m) / sizeof(unsigned); unsigned *cur = (unsigned *) (&m); unsigned hashV = 0; for (int i=0; i<methodPtrSize; ++i, ++cur) hashV+=*cur; hashV &= 0xFF; int argsI = hashV;//args[hashV]; if (args[argsI].IsEmpty()) { //creating arguments queue for new callback Any aa; aa.Create< BiVector<Args> >(); args[hashV] = aa; args[argsI].Get< BiVector<Args> >().Reserve(100); } Args newArgs; newArgs.p1 = p1; newArgs.p2 = p2; //just emulating add+execute+drop BiVector<Args> &curArgsQueue = args[argsI].Get< BiVector<Args> >(); curArgsQueue.AddTail(newArgs); Args &curArgs = curArgsQueue.Head(); (((OBJECT *) this)->*m)(curArgs.p1, curArgs.p2); curArgsQueue.DropHead(); } protected: private: Array<Any> args; };
class TestClass : public AThreaded {...}; TestClass tc; tc.RequestAction(&TestClass::func, 100, 500);
Mindtraveller wrote on Mon, 17 November 2008 16:41 |
class AThreaded { public: AThreaded() { args.SetCount(0xFF+1); //yes, simple array+"hash" instead of Index. that is because Index` elements are constant } template<class OBJECT, class P1, class P2> void RequestAction(void (OBJECT::*m)(P1,P2), const P1 &p1, const P2 &p2) { typedef void (OBJECT::*Func)(P1,P2); struct Args : public Moveable<Args> { P1 p1; P2 p2; }; //using method pointer as hash value. notice that method`s pointer size may be >= plain (void *) int methodPtrSize = sizeof(m) / sizeof(unsigned); unsigned *cur = (unsigned *) (&m); unsigned hashV = 0; for (int i=0; i<methodPtrSize; ++i, ++cur) hashV+=*cur; hashV &= 0xFF; int argsI = hashV;//args[hashV]; if (args[argsI].IsEmpty()) { //creating arguments queue for new callback Any aa; aa.Create< BiVector<Args> >(); args[hashV] = aa; args[argsI].Get< BiVector<Args> >().Reserve(100); } Args newArgs; newArgs.p1 = p1; newArgs.p2 = p2; //just emulating add+execute+drop BiVector<Args> &curArgsQueue = args[argsI].Get< BiVector<Args> >(); curArgsQueue.AddTail(newArgs); Args &curArgs = curArgsQueue.Head(); (((OBJECT *) this)->*m)(curArgs.p1, curArgs.p2); curArgsQueue.DropHead(); } protected: private: Array<Any> args; }; And execution time is... ~640 msecs. This is almost as fast as plain function call which took 600 msecs instead of 840 msecs while using classic U++ callbacks. More of that, posting callback looks rather nice for user: class TestClass : public AThreaded {...}; TestClass tc; tc.RequestAction(&TestClass::func, 100, 500); |
StaticMutex mutex; VectorMap<String, String> data; void SetData(const String& key, const String& value) { INTERLOCKED(mutex) data.GetAdd(key) = value; } String GetData(const String& key) { INTERLOCKED(mutex) return data.Get(key, Null); }
StaticMutex mutex; VectorMap<String, String> data; void SetData(const String& key, const String& value) { INTERLOCKED(mutex) data.GetAdd(key) = value; } String GetData(const String& key) { INTERLOCKED(mutex) return data.Get(key, Null); }
class CachedSQL : public CallbackThread { public: void GetDataGiveToAnswerer(String request) { int rqIndex = data.Find(request); bool needAdd = false; String answer; if (rqIndex < 0) { answer = FetchFromDatabase(request); needAdd = true; } else answer = data[rqIndex]; theAnswersProcessor.AddTask(HttpAnswersProcessor::ProcessData, request, answer); if (needAdd) data.Add(request, answer); } }; class HttpRequestsProcessor : public CallThread { public: void ProcessHttpRequest(String rq) { //some processing here //ok, you need users`s info theCachedSQL.AddTask(CachedSQL::GetDataGiveToAnswerer, user); //optional additional processing } }; class HttpAnswersProcessor : public CallThread { public: void ProcessHttpAnswer(String userInfo) { //some processing and sending answer } }; /////////////////////////////////////////////////////////////////////////// // Let`s imagine 2 users sent their requests simultaneously. // // Time offset: // 0 1 2 3 4 5 6 7 // 0123456789012345689012345678901234567890123456789012345678901234567890 //-------------------------------------------------------------------------- // Synchronous work // |--request1--|--sql1--|--answer1--|--request2--|--sql2--|--answer2--|... // user1 will wait 3.5 // user2 will wait 6.9 //-------------------------------------------------------------------------- // Asyncronous work; '%' symbol stands for queue event i/o/thread-awake // |--request1--| // %|--sql1--| *cache-miss; adding to data AFTER answer answer sent to httpAnswerer* // %|--answer1--| // |--request2--| // %|--sql2--| *value cached, no need to add this time* // %|--answer2--| // user1 will wait 3.2 // user2 will wait 4.8 //--------------------------------------------------------------------------
class JobThread1 : public CallbackThread { public: //these members may be added into thread queue and must not be called directly void Job11(); void Job12(const String &); private: //all the realization details are private //... }; class class JobThread2 : public CallbackThread { public: //these members may be added into thread queue and must not be called directly void Job21(const String &); private: //all the realization details are private //... }; CONSOLE_APP_MAIN { JobThread1 jobs1; JobThread2 jobs2; for (int i=0; i<10; ++i) { jobs1.Add(&JobThread1::Job11); jobs1.Add(&JobThread1::Job12, FormatInt(i)); jobs2.Add(&JobThread2::Job21, FormatIntHex(i)); } };
luzr wrote on Tue, 30 June 2009 00:15 |
Well, I have some experiences now (did project based on queues, now planning to rewrite it to plain old locking) and I have something to say about the topic (IMO!): Synchronization objects are simple to manage as compared to often complex race condition relations in queued systems. |
luzr wrote on Fri, 03 July 2009 20:49 |
I would like to, but right now I seem to be unable to describe it right. The problem was that it was user driven application and there are problems basically with "queue lag". Maybe that the heart of problem is (was) the fact that it worked in "post" mode (not "execute") - messages (callbacks) being posted and not waiting for completition. Too often I ended with wrong events in the queue... BTW, without posting, your method is equivalent to one mutex per instance and locking for any method call... Mirek |
Quote: |
Personally I denied ANY types of events because I consider them absolutely artifical. The only thing which is really needed is executing some callback. So I had no problems identifying any types of events. I will appreciate any example where this approach fails (of course avoiding boundaries mentioned). |