Peter said:
If you're saying that the int itself is unneeded, I agree...that may very
well be. What little code we did see seems potentially awkward if not
outright flawed from a design standpoint. But the OP didn't show enough
code for us to know for sure; it could just be that we're not seeing the
whole picture.
Pete
If my code is bad i want to know. Here is the whole thing. This is a
service. It works and has been for a while so whatever flaw must be drawn
out by special circumstances that i haven't encountered yet. There isn't
anything secret going on here.
I have written the client side too, so i know what inputs are expected.
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.ServiceProcess;
using System.Text;
using SNS = System.Net.Sockets;
namespace RouteMatch.RMQS
{
public partial class RMQSService : ServiceBase
{
//initial size of the read buffer
private const int Initial_Buffer_Size = 65536;
private SNS.TcpListener listener = null;
//this is incremented before the execution that is why it is
initialized to -1 instead of 0.
private int nextServantIndex = -1;
private ClientState nextServantIndexSerializeObject = new
ClientState(null, null);
private Dictionary<string, ServiceAdapterInterface>
serviceAdapterPool = new Dictionary<string, ServiceAdapterInterface>();
public RMQSService()
{
InitializeComponent();
}
protected override void OnStart(string[] args)
{
try
{
eventLog1.Log = "Application";
eventLog1.Source = "RMQS";
listener = new SNS.TcpListener(System.Net.IPAddress.Any,
Properties.Settings.Default.Port);
listener.Start();
listener.BeginAcceptTcpClient(new
AsyncCallback(DoAcceptTcpClientCallback), listener);
}
catch (Exception ex)
{
if (eventLog1 != null)
eventLog1.WriteEntry(ex.ToString(),
EventLogEntryType.Error);
}
}
private void DoAcceptTcpClientCallback(IAsyncResult ar)
{
try
{
SNS.TcpListener listener = (SNS.TcpListener)ar.AsyncState;
SNS.TcpClient clientSckt =
listener.EndAcceptTcpClient(ar);
listener.BeginAcceptTcpClient(new
AsyncCallback(DoAcceptTcpClientCallback), listener);
byte[] readbuffer = new byte[Initial_Buffer_Size];
SNS.NetworkStream ns = clientSckt.GetStream();
StringBuilder message = new StringBuilder();
int bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
while (bytesread > 0)
{
message.Append(Encoding.UTF8.GetString(readbuffer, 0,
bytesread));
bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
}
if (message.Length > 0)
{
if (Properties.Settings.Default.RouterMode)
{
//lock the int. since this is async IO more than
1 IO could be at this point
//need to make sure that the variable only gets
accessed in a serial manner.
//move to the next servant
string servername = string.Empty;
int port = 0;
lock (nextServantIndexSerializeObject)
{
if (++nextServantIndex >=
Properties.Settings.Default.Servants.Count)
nextServantIndex = 0;
servername =
Properties.Settings.Default.Servants[nextServantIndex].ServerName;
port =
Properties.Settings.Default.Servants[nextServantIndex].Port;
}
SNS.TcpClient qsServant = new SNS.TcpClient();
qsServant.Connect(servername , port);
//write client's incoming message to servant
byte[] writebuffer =
Encoding.ASCII.GetBytes(message.ToString());
SNS.NetworkStream servant_ns =
qsServant.GetStream();
servant_ns.Write(writebuffer, 0,
writebuffer.Length);
qsServant.Client.Shutdown(System.Net.Sockets.SocketShutdown.Send);
//read from servant and send to client
bytesread = servant_ns.Read(readbuffer, 0,
readbuffer.Length);
while (bytesread > 0)
{
clientSckt.GetStream().Write(readbuffer, 0,
bytesread);
bytesread = ns.Read(readbuffer, 0,
readbuffer.Length);
}
qsServant.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
qsServant.Close();
}
else
{
ClientState cs = new ClientState(clientSckt,
message.ToString());
System.Threading.ThreadPool.QueueUserWorkItem(
new
System.Threading.WaitCallback(ProcessMessage), cs
);
}
}
else
{
clientSckt.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
clientSckt.Close();
}
}
catch (Exception ex)
{
eventLog1.WriteEntry(ex.ToString(),
EventLogEntryType.Error);
}
}
/// <summary>
/// ths is where all the "work" gets done.
///
/// The client state consists of the message and the client
socket. that allows this proc
/// to responsd to the caller.
///
/// Because right now Rse is not reentrant the dictionary that
keeps the Rse objects is
/// locked so that simultaneous access is disallowed.
/// </summary>
/// <param name="clientState"></param>
/// <exception cref="RMQSMessageException"/>
/// <exception cref="RMQSInvalidCommandValueException"/>
private void ProcessMessage(object clientState)
{
RMQSMessage question= null;
RMQSResponse answer = null;
ClientState state = null;
try
{
state = (ClientState)clientState;
question = new RMQSMessage();
question.BuildMessage(state.Message);
ServiceAdapterInterface serviceInterface = null;
//we key the compool by the agency and the database
because there could be a
//training database in use. that data could be different
//the message target is there as well so that different
modules would be allowed
//to work simultaneously. right now they are all in the
same pool. if another message target
//was created then it would be good to add a target pool
so that only the objects of the same
//target of locked. messages to different targets would
then be able to work in parallel.
string comPoolKey = question.AgencyName + "|" +
question.DatabaseName + "|" + question.MessageTarget;
//this lock essentially places all queued objects in line.
The first one spawned is the
//first one to work. The other then back up behind and
wait for the lock to be released.
//when Rse is re-entrant this lock can be moved such that
the processing isn't included in
//the lock. that will allow the TreadPool to operate as
expected.
lock (serviceAdapterPool)
{
if (!serviceAdapterPool.ContainsKey(comPoolKey))
{
//this is the only line in the server that
references the RSE code.
//when/if there is a reason this code will be made
to read from a file
//dynamically load the class connected with the
MessageTarget.
//it would also be possible to load each of those
in their own app domain
//so process isolation could be enabled. this
might be a solution to the FIFO queue
//that is here.
//also note that each dynamically loaded service
will need to implement ServiceAdapterInterface.
ServiceAdapterInterface newAdapter =
(ServiceAdapterInterface)new RouteMatch.TS.Rse.RseAdapterContext(); ;
newAdapter.Initialize(question);
serviceAdapterPool.Add(comPoolKey, newAdapter);
}
serviceInterface = serviceAdapterPool[comPoolKey];
answer = serviceInterface.HandleMessage(question);
}
}
catch (RMQSMessageException ex)
{
if (question == null)
answer = new RMQSError("", ex.ErrorCode, ex.Message);
else
answer = new RMQSError(question, ex.ErrorCode,
ex.Message);
}
catch (Exception ex)
{
if (question == null)
answer = new RMQSError("", -99, ex.Message);
else
answer = new RMQSError(question, -99, ex.Message);
}
SNS.NetworkStream ns = state.ClientSocket.GetStream();
if (ns.CanWrite)
{
byte[] writebuffer = answer.ToByteArray();
ns.Write(writebuffer, 0, writebuffer.Length);
state.ClientSocket.Client.Shutdown(System.Net.Sockets.SocketShutdown.Send);
}
else
eventLog1.WriteEntry("Couldn't write");
state.ClientSocket.Close();
}
/// <summary>
/// Used to hold state for the message and is placed in the
treadpool
/// </summary>
private class ClientState
{
public ClientState(SNS.TcpClient clientSocket, string message)
{
Message = message;
ClientSocket = clientSocket;
}
private string _message;
public string Message
{
get { return _message; }
set { _message = value; }
}
private SNS.TcpClient _socket;
public SNS.TcpClient ClientSocket
{
get { return _socket; }
set { _socket = value; }
}
}
}
public class QsServant
{
private string _serverName;
public string ServerName
{
get { return _serverName; }
set { _serverName = value; }
}
private int _port;
public int Port
{
get { return _port; }
set { _port = value; }
}
}
[Serializable]
public class QsServants : List<QsServant>
{
public QsServants() : base() { }
public QsServants(IEnumerable<QsServant> collection) :
base(collection) { }
public QsServants(int capacity) : base(capacity) { }
}
}