Overview
Examples
Screenshots
Comparisons
Applications
Download
Documentation
Tutorials
Bazaar
Status & Roadmap
FAQ
Authors & License
Forums
Funding Ultimate++
Search on this site
Search in forums












SourceForge.net Logo
Home » Developing U++ » U++ Developers corner » Kqueue/epoll based interface for TcpSocket and WebSocket
Re: Kqueue/epoll based interface for TcpSocket and WebSocket [message #49774 is a reply to message #49698] Mon, 30 April 2018 11:20 Go to previous messageGo to previous message
mirek is currently offline  mirek
Messages: 13975
Registered: November 2005
Ultimate Member
Sorry for the delay...

Quote:

template <class T_SOCKET>
class SocketEventQueue: NoCopy
{
public:
	SocketEventQueue(): errorCode(NOERR) { InitEventQueue(); }
	~SocketEventQueue();

	bool ClearEventQueue();
	QueueHandler GetQueueHandler() const;

	bool IsError() const { return errorCode != NOERR; }
	ErrorCode GetErrorCode();

	bool SubscribeSocketRead(const T_SOCKET &sock);
	bool SubscribeSocketWrite(const T_SOCKET &sock);
	bool SubscribeSocketReadWrite(const T_SOCKET &sock);

	bool DisableSocketRead(const T_SOCKET &sock);
	bool DisableSocketWrite(const T_SOCKET &sock);
	bool DisableSocketReadWrite(const T_SOCKET &sock);

	bool RemoveSocket(const T_SOCKET &sock);

	bool IsSocketSubscribedRead(const T_SOCKET &sock) const { return IsSockeSubscribed(sock, WAIT_READ); }
	bool IsSocketSubscribedWrite(const T_SOCKET &sock) const { return IsSockeSubscribed(sock, WAIT_WRITE); }

	bool IsSocketDisabledRead(const T_SOCKET &sock) const { return IsSockeDisabled(sock, WAIT_READ); }
	bool IsSocketDisabledWrite(const T_SOCKET &sock) const { return IsSockeDisabled(sock, WAIT_WRITE); }

	Vector<SocketEvent<T_SOCKET>> Wait(int timeout);
};



Why is T_SOCKET a template parameter? Because of websocket?

What is DisableSocketRead supposed to do? Opposite of Subscribe?

If I am right about T_SOCKET, I have to disagree with the interface a bit.

Particulary, I think

Vector<SocketEvent<T_SOCKET>> Wait(int timeout);

is clumsy - this will IMO cause problems with mapping T_SOCKET back to its "processes".

When I was thinking about how to proceed with this, I was considering to simply expand SocketWaitEvent. I believe that the best would probably be to treat it as full array of sockets, using indices to identify the 'process'. Something like

class SocketWaitEvent {
......
public:
	void  Clear()                                            { socket.Clear(); }
	void  Add(SOCKET s, dword events)                        { socket.Add(MakeTuple((int)s, events)); }
	void  Add(TcpSocket& s, dword events)                    { Add(s.GetSOCKET(), events); }

	int   Wait(int timeout);
	dword Get(int i) const;
	dword operator[](int i) const                            { return Get(i); }

// new:
        void  Set(int ii, TcpSocket& s, dword events);
        void  Insert(int ii, TcpSocket& s, dword events);
        void  Remove(int ii, TcpSocket& s, dowrd events);

        Vector<int> WaitEvent(int timeout);
   // or perhaps
        Vector<Tuple<int, dword>> WaitEvent(int timeout); // dword part contains Get bitmask

// maybe:
        void  Clear(int ii); // makes index empty
        int   FindEmpty() const; // finds the first index that is empty

	
	SocketWaitEvent();
};


Is there a reason to make things more complicated that this?

Quote:

The first problem is related to the current implementation of TcpSocket::RawWait(...). It uses select(...) system call for determining possibility of reading/writing data or exceptional state of socket. As far as I can understand, it means that server with large nubmer of sockets will work slowly anyway. I patched TcpSocket::RawWait(...) for BSD platform on my local machine (see below) before I started to implement the interface. Now I think that it's possible to use SocketEventQueue in purpose of determining socket state instead of raw kqueue/epoll/select. What do you think about this?


I agree.

Quote:


But there is another problem related to kqueue/epoll reaction on socket closing for both my patch and SocketEventQueue. So here's the patch:

#ifdef PLATFORM_BSD
	timespec *tvalp = NULL;
	timespec tval;
	if(end_time != INT_MAX || WhenWait) {
		to = max(to, 0);
		tval.tv_sec = to / 1000;
		tval.tv_nsec = 1000000 * (to % 1000);
		tvalp = &tval;
		if (to)
			LLOG("RawWait timeout: " << to);
	}

	struct kevent eventrx, eventw;
	struct kevent triggeredEvents[2];
	int kq;
	int eventFlags = EV_ADD | EV_ONESHOT;

	if( ( kq = kqueue() ) == -1 ) // queue fd should be created once at the moment of socket opening
	{			      // and closed at the moment of socket closing
				      // the same is for SocketEventQueue object

		LLOG("kq = kqueue() returned -1");
		SetSockError("wait");
		return false;
	}

	if(flags & WAIT_READ)
	{
		EV_SET( &eventrx, socket, EVFILT_READ, eventFlags, 0, 0, NULL );
		if( kevent( kq, &eventrx, 1, NULL, 0, NULL ) == -1 )
		{
			LLOG("kevent( kq, &eventrx, 1, NULL, 0, NULL ) returned -1");
			SetSockError("wait");
			close(kq);
			return false;
		}
	}

	if(flags & WAIT_WRITE)
	{
		EV_SET( &eventw, socket, EVFILT_WRITE, eventFlags, 0, 0, NULL );
		if( kevent( kq, &eventw, 1, NULL, 0, NULL ) == -1 )
		{
			LLOG("kevent( kq, &eventw, 1, NULL, 0, NULL ) returned -1");
			SetSockError("wait");
			close(kq);
			return false;
		}
	}

	int avail = kevent( kq, nullptr, 0, triggeredEvents, 2, tvalp ); // here is the problem if socket																	 
                                                                         // works in blocking mode																	
                                                                         // or if timeout is too long
	close( kq );
#else
    // default select implementation


Now let's imagine the situation:

TcpSocket server; // passes through TcpSocket::Listen()

...

void Server() // runs in several threads
{
	static StaticMutex serverMutex;
	
	while(!Thread::IsShutdownThreads())
	{
		TcpSocket		   client;
		bool acceptStatus;
		
		{
			Mutex::Lock __(serverMutex);
			//acception is in blocking mode
			acceptStatus = client.Accept(server); // calls TcpSocket::RawWait(...)
		}
	
		... // connection handling
	}
}

...

void SignalHandler(int sig)
{
	server.Close(); // Doesn't interrupt kevent system call in TcpSocket::RawWait(...)
			// close(socket) just makes kqueue to delete all events
			// associated with socket descriptor from it's kernel queue
	
	Thread::ShutdownThreads();
}



So I can't normally terminate the server if I work with sockets in blocking mode.
Do you have any ideas how to interrupt kevent waiting loop?
I've tried to call shutdown(socket, SD_BOTH) for sockets that hadn't been passed through TcpSocket::Listen(), and it works for me.
But I still can't deal with listening socket. Solution I've found is to use pipe-trick: read-end descriptor attaches to kqueue/epoll, and write-end descriptor attaches to socket. When socket closes, it writes some data in pipe with write-end descriptor.
But it means that socket must hold all queue write-end descriptors it was attached.
Could you help me with my problem?[/quote]

Well, I was fighting with this one too, years ago. Thats nasty little problem there.

I the end, I believe that the best solution is to make Accept return until there are any active threads by doing localhost connect.

	TcpSocket s;
	s.Connect("127.0.0.1", port);


You can check Skylart/App.cpp.

Another option, not always applicable, is not to bother and let the signal kill the application just as it is supposed to.... Smile

[Updated on: Mon, 30 April 2018 11:23]

Report message to a moderator

 
Read Message
Read Message
Read Message
Read Message
Read Message
Previous Topic: Help!!! Building 32 bit apps on Ubuntu64
Next Topic: SortedVectorMap - attempting to reference a deleted function error
Goto Forum:
  


Current Time: Sun May 05 23:37:52 CEST 2024

Total time taken to generate the page: 0.02837 seconds