J
Jonas Hei
In all the samples that illustrate the use of Socket.BeginReceiveFrom(),
once a BeginReceiveFrom() call is issued, then we wait for an incoming
message, and then
(a) either another BeginReceiveFrom() is issued from the
AsyncCallback or
(b) in the AsyncCallback sets an event which signals the main thread
to continue and issue another BeginReceiveFrom().
My concern is that if we need to be highly scalable (say 100s or 1000s
of incoming UDP messages) then this way isn't really going to cut it.
I've thought of quite a few optimizations such as allocating buffer
space at startup (to avoid troubling GC with pinned memory) and also
posting the incoming message to a local queue and processing it later
and also playing with ThreadPool.SetMinThreads to setup optimum values.
My question is:
Is it possible to issue multiple calls to Socket.BeginReceiveFrom() at
one go? If yes, what is the maximum number? I've tried this with 20 or
so and it seems to work fine. I've illustrated the technique in the code
below. Any comments?
public class Communicator {
Socket skt = new Socket(AddressFamily.InterNetwork,
SocketType.Dgram, ProtocolType.Udp);
private Thread listenerThread;
int initialListeners;
public Communicator() { }
public void Start(int initialNumberOfListeners) {
initialListeners = initialNumberOfListeners;
listenerThread = new Thread(new ThreadStart(Listen));
listenerThread.Start();
}
private void Listen() {
IPEndPoint localEndPoint = new IPEndPoint(
IPAddress.Parse("0.0.0.0"), 9020);
skt.SetSocketOption(SocketOptionLevel.Socket,
SocketOptionName.ReceiveBuffer,
65536);
skt.Bind(localEndPoint);
StateObject[] so2 = new StateObject[initialListeners];
for(int i=0; i<initialListeners; i++)
{
so2 = new StateObject(4096);
so2.workSocket = skt;
skt.BeginReceiveFrom(
so2.buffer,
0,
so2.BufferSize,
0,
ref so2.tempRemoteEP,
new AsyncCallback(ReceiveFromCallback),
so2);
}
}
public void Stop() {
bStop = true
}
public void ReceiveFromCallback(IAsyncResult ar) {
try {
StateObject so = (StateObject) ar.AsyncState;
Socket listenerSocket = so.workSocket;
StateObject newst = new StateObject(4096);
newst.workSocket = listenerSocket;
listenerSocket.BeginReceiveFrom(
newst.buffer,
0,
newst.BufferSize,
0,
ref newst.tempRemoteEP,
new AsyncCallback(ReceiveFromCallback),
newst);
IPEndPoint sender = new IPEndPoint(IPAddress.Any, 0);
EndPoint tempRemoteEP = (EndPoint)sender;
int read = listenerSocket.EndReceiveFrom(
ar,
ref tempRemoteEP);
if(read > 0) {
so.sb.Append(Encoding.ASCII.GetString(
so.buffer,
0,
read));
string strContent;
strContent = so.sb.ToString();
//post strContent to a Queue for later processing
}
}
catch(ObjectDisposedException) { }
}
}
public class StateObject {
public Socket workSocket = null;
public int BufferSize;
public byte[] buffer = null;
public StringBuilder sb = new StringBuilder();
public StateObject(int buffersize) {
BufferSize = buffersize;
buffer = new byte[BufferSize];
}
public EndPoint tempRemoteEP = (EndPoint)(new
IPEndPoint(IPAddress.Any, 0));
}
once a BeginReceiveFrom() call is issued, then we wait for an incoming
message, and then
(a) either another BeginReceiveFrom() is issued from the
AsyncCallback or
(b) in the AsyncCallback sets an event which signals the main thread
to continue and issue another BeginReceiveFrom().
My concern is that if we need to be highly scalable (say 100s or 1000s
of incoming UDP messages) then this way isn't really going to cut it.
I've thought of quite a few optimizations such as allocating buffer
space at startup (to avoid troubling GC with pinned memory) and also
posting the incoming message to a local queue and processing it later
and also playing with ThreadPool.SetMinThreads to setup optimum values.
My question is:
Is it possible to issue multiple calls to Socket.BeginReceiveFrom() at
one go? If yes, what is the maximum number? I've tried this with 20 or
so and it seems to work fine. I've illustrated the technique in the code
below. Any comments?
public class Communicator {
Socket skt = new Socket(AddressFamily.InterNetwork,
SocketType.Dgram, ProtocolType.Udp);
private Thread listenerThread;
int initialListeners;
public Communicator() { }
public void Start(int initialNumberOfListeners) {
initialListeners = initialNumberOfListeners;
listenerThread = new Thread(new ThreadStart(Listen));
listenerThread.Start();
}
private void Listen() {
IPEndPoint localEndPoint = new IPEndPoint(
IPAddress.Parse("0.0.0.0"), 9020);
skt.SetSocketOption(SocketOptionLevel.Socket,
SocketOptionName.ReceiveBuffer,
65536);
skt.Bind(localEndPoint);
StateObject[] so2 = new StateObject[initialListeners];
for(int i=0; i<initialListeners; i++)
{
so2 = new StateObject(4096);
so2.workSocket = skt;
skt.BeginReceiveFrom(
so2.buffer,
0,
so2.BufferSize,
0,
ref so2.tempRemoteEP,
new AsyncCallback(ReceiveFromCallback),
so2);
}
}
public void Stop() {
bStop = true
}
public void ReceiveFromCallback(IAsyncResult ar) {
try {
StateObject so = (StateObject) ar.AsyncState;
Socket listenerSocket = so.workSocket;
StateObject newst = new StateObject(4096);
newst.workSocket = listenerSocket;
listenerSocket.BeginReceiveFrom(
newst.buffer,
0,
newst.BufferSize,
0,
ref newst.tempRemoteEP,
new AsyncCallback(ReceiveFromCallback),
newst);
IPEndPoint sender = new IPEndPoint(IPAddress.Any, 0);
EndPoint tempRemoteEP = (EndPoint)sender;
int read = listenerSocket.EndReceiveFrom(
ar,
ref tempRemoteEP);
if(read > 0) {
so.sb.Append(Encoding.ASCII.GetString(
so.buffer,
0,
read));
string strContent;
strContent = so.sb.ToString();
//post strContent to a Queue for later processing
}
}
catch(ObjectDisposedException) { }
}
}
public class StateObject {
public Socket workSocket = null;
public int BufferSize;
public byte[] buffer = null;
public StringBuilder sb = new StringBuilder();
public StateObject(int buffersize) {
BufferSize = buffersize;
buffer = new byte[BufferSize];
}
public EndPoint tempRemoteEP = (EndPoint)(new
IPEndPoint(IPAddress.Any, 0));
}