Конференция "Сети" » Многонитевой TCP-сервер на асинхронных событиях [D7]
 
  • xss22 © (15.02.12 20:54) [0]
    Сервер на асинхронных событиях. В нем  есть нить для обработки
    подключений клиентов и по одной нити на каждого клиента, а главная нить только создает слушающий сокет и запускает обслуживающую его нить.

    Проблема в том, что на кадое новое подключение создается новый поток.
    А подключившихся клиентов будет несколько тысяч ?
    Сервер неприменно помрет.

    Помогите переделать сервер так, чтобы клиентский поток обслуживал не одного клиента, а пакет из нескольких клиентов.

    Написанный ниже пример рабочий и стабильный.
    Надеюсь найдутся ребята с доброй душой.
    Кому нужно, вышлю полностью исходник.
  • xss22 © (15.02.12 20:55) [1]
    unit ListenThread;
    { Нить, следящая за подключением клиента к слушающему сокету.
     При обнаружении подключения создаёт новую нить для работы
     с подключившимся клиентом, а сама продолжает обслуживать
     "слушающий" сокет.
    }


    interface

    uses
     SysUtils, Classes, WinSock, WinSock2_Events;

    type
     TListenThread = class(TThread)
     private
       // Сообщение, которое нужно добавить в лог.
       // Хранится в отдельном поле, т.к. метод, вызывающийся через Synchronize,
       // не может иметь параметров.
       FMessage: string;
       // Сокет, находящийся в режиме прослушивания
       FServerSocket: TSocket;
       // События нити
       //  FEvents[0] используется для остановки нити
       //  FEvents[1] связывается с событием FD_ACCEPT
       FEvents: array[0..1] of TWSAEvent;
       // Список нитей клиентов
       FClientThreads: TList;
       // Если True, сервер посылает клиенту сообщения
       // по собственной инициативе
       FServerMsg: Boolean;
       // Вспомогательный метод для вызова через Synchronize
       procedure DoLogMessage;
     protected
       procedure Execute; override;
       // Вывод сообщения в лог главной формы
       procedure LogMessage(const Msg: string);
     public
       constructor Create(ServerSocket: TSocket; ServerMsg: Boolean);
       destructor Destroy; override;
       // Вызывается извне для остановки сервера
       procedure StopServer;
     end;

    implementation

    uses
     MainServerUnit, ClientThread;

    { TListenThread }

    // "Слушающий" сокет создаётся в главной нити,
    // а сюда передаётся через параметр конструктора
    constructor TListenThread.Create(ServerSocket: TSocket; ServerMsg: Boolean);
    begin
     FServerSocket := ServerSocket;
     FServerMsg := ServerMsg;
     // Создаём события
     FEvents[0] := WSACreateEvent;
     if FEvents[0] = WSA_INVALID_EVENT then
       raise ESocketError.Create('Ошибка при создании события для сервера: ' + GetErrorString);
     FEvents[1] := WSACreateEvent;
     if FEvents[1] = WSA_INVALID_EVENT then
       raise ESocketError.Create('Ошибка при создании события для сервера: ' + GetErrorString);
     if WSAEventSelect(FServerSocket, FEvents[1], FD_ACCEPT) = SOCKET_ERROR then
       raise ESocketError.Create('Ошибка при привязывании серверного сокета к событию: ' + GetErrorString);
     FClientThreads := TList.Create;
     inherited Create(False);
    end;

    destructor TListenThread.Destroy;
    begin
     // Убираем за собой
     FClientThreads.Free;
     WSACloseEvent(FEvents[0]);
     WSACloseEvent(FEvents[1]);
     inherited;
    end;


  • xss22 © (15.02.12 20:56) [2]
    продолжение модуля ListenThread

    procedure TListenThread.Execute;
    var
     // Сокет, созданный для общения с подключившимся клиентом
     ClientSocket: TSocket;
     // Адрес подключившегося клиента
     ClientAddr: TSockAddr;
     ClientAddrLen: Integer;
     NetEvents: TWSANetworkEvents;
     I: Integer;
     WaitRes: Cardinal;
    begin
     LogMessage('Сервер начал работу');
     // Начинаем бесконечный цикл
     repeat
       // Ожидание события с 15-секундным таймаутом
       WaitRes := WSAWaitForMultipleEvents(2, @FEvents, False, 15000, False);
       case WaitRes of
         WSA_WAIT_EVENT_0:
           // Событие FEvents[0] взведено - это означает, что
           // сервер должен остановиться.
           begin
             LogMessage('Сервер получил сигнал завершения работы');
             // Просто выходим из цикла, остальное сделает код после цикла
             Break;
           end;
         WSA_WAIT_EVENT_0 + 1:
           // Событие FEvents[1] взведено.
           // Это должно означать наступление события FD_ACCEPT.
           begin
             // Проверяем, почему событие взведено,
             // и заодно сбрасываем его
             if WSAEnumNetworkEvents(FServerSocket, FEvents[1], NetEvents) = SOCKET_ERROR then
             begin
               LogMessage('Ошибка при получении списка событий: ' + GetErrorString);
               Break;
             end;
             // Защита от "тупой" ошибки - проверка того, что наступило нужное событие
             if NetEvents.lNetworkEvents and FD_ACCEPT = 0 then
             begin
               LogMessage('Внутренняя ошибка сервера - неизвестное событие');
               Break;
             end;
             // Проверка, не было ли ошибок
             if NetEvents.iErrorCode[FD_ACCEPT_BIT] <> 0 then
             begin
               LogMessage('Ошибка при подключении клиента: ' + GetErrorString(NetEvents.iErrorCode[FD_ACCEPT_BIT]));
               Break;
             end;
             ClientAddrLen := SizeOf(ClientAddr);
             // Проверяем наличие подключения
             ClientSocket := accept(FServerSocket, @ClientAddr, @ClientAddrLen);
             if ClientSocket = INVALID_SOCKET then
             begin
               // Ошибка в функции accept возникает только тогда, когда
               // происходит нечто экстраординарное. Продолжать работу
               // в этом случае бессмысленно. Единственное возможное
               // в нашем случае исключение - ошибка WSAEWOULDBLOCK,
               // которая может возникнуть, если срабатывание события
               // было ложным, и подключение от клиента отсутствует.
               if WSAGetLastError <> WSAEWOULDBLOCK then
               begin
                 LogMessage('Ошибка при подключении клиента: ' + GetErrorString);
                 Break;
               end;
             end;
             // Создаём новую нить для обслуживания подключившегося клиента
             // и передаём ей сокет, созданный для взаимодействия с ним.
             // Указатель на нить сохраняем в списке
             FClientThreads.Add(TClientThread.Create(ClientSocket, ClientAddr));
           end;
         WSA_WAIT_TIMEOUT:
           // Ожидание завершено по таймауту
           begin
             // Проверяем, есть ли клиентские нити, завершившие работу.
             // Если есть такие нити, удаляем их из списка и освобождаем объекты.
             for I := FClientThreads.Count - 1 downto 0 do
               if TClientThread(FClientThreads[I]).Finished then
               begin
                 TClientThread(FClientThreads[I]).Free;
                 FClientThreads.Delete(I);
               end;
             // Если разрешены сообщения от сервера, отправляем всем клиентам
             // сообщение с текущим временем
             if FServerMsg then
               for I := 0 to FClientThreads.Count - 1 do
                 TClientThread(FClientThreads[I]).SendString('Время на сервере ' + TimeToStr(Now));
           end;
         WSA_WAIT_FAILED:
           // При ожидании возникла ошибка. Это может означать
           // только какой-то серьёзный сбой в библиотеке сокетов.
           begin
             LogMessage('Ошибка при ожидании события сервера: ' + GetErrorString);
             Break;
           end;
         else
           // Неожиданный результат при ожидании
           begin
             LogMessage('Внутренняя ошибка сервера - неожиданный результат ожидания ' + IntToStr(WaitRes));
             Break;
           end;
       end;
     until False;
     // Останавливаем и уничтожаем все нити клиентов
     for I := 0 to FClientThreads.Count - 1 do
     begin
       TClientThread(FClientThreads[I]).StopThread;
       TClientThread(FClientThreads[I]).WaitFor;
       TClientThread(FClientThreads[I]).Free;
     end;
     closesocket(FServerSocket);
     LogMessage('Сервер завершил работу');
     Synchronize(ServerForm.OnStopServer);
    end;

    // Завершение работы сервера. Просто взводим соответствующее
    // событие, а остальное делает код в методе Execute.
    procedure TListenThread.StopServer;
    begin
     WSASetEvent(FEvents[0]);
    end;

    procedure TListenThread.LogMessage(const Msg: string);
    begin
     FMessage := Msg;
     Synchronize(DoLogMessage);
    end;

    procedure TListenThread.DoLogMessage;
    begin
     ServerForm.AddMessageToLog(FMessage);
    end;

    end.

  • xss22 © (15.02.12 20:57) [3]
    unit ClientThread;
    { Нить, обслуживающая одного клиента.
     Выполняет цикл, выход из которого возможен по внешнему сигналу
     или при возникновении ошибки на сокете. Умеет отправлять
     клиенту сообщения по внешнему сигналу.
    }


    interface

    uses
     Windows, Classes, WinSock, Winsock2_Events, ShutdownConst, SysUtils, SyncObjs;

    type
     TClientThread = class(TThread)
     private
       // Сообщение, которое нужно добавить в лог.
       // Хранится в отдельном поле, т.к. метод, вызывающийся через Synchronize,
       // не может иметь параметров.
       FMessage: string;
       // Префикс для всех сообщений лога, связанных с данным клиентом
       FHeader: string;
       // Сокет для взаимодействия с клиентом
       FSocket: TSocket;
       // События нити
       //  FEvents[0] используется для остановки нити
       //  FEvents[1] используется для отправки сообщения
       //  FEvents[2] связывается с событиями FD_READ, FD_WRITE и FD_CLOSE
       FEvents: array[0..2] of TWSAEvent;
       // Критическая секция для доступа к буферу исходящих
       FSendBufSection: TCriticalSection;
       // Буфер исходящих
       FSendBuf: string;
       // Вспомогательный метод для вызова через Synchronize
       procedure DoLogMessage;
       // Функция, проверяющая, завершила ли нить работу
       function GetFinished: Boolean;
     protected
       procedure Execute; override;
       // Вывод сообщения в лог главной формы
       procedure LogMessage(const Msg: string);
       // Отправка клиенту данных из буфера исходящих
       function DoSendBuf: Boolean;
     public
       constructor Create(ClientSocket: TSocket; const ClientAddr: TSockAddr);
       destructor Destroy; override;
       // Добавление строки в буфер исходящих
       procedure SendString(const S: string);
       // Остановка нити извне
       procedure StopThread;
       property Finished: Boolean read GetFinished;
     end;

     ESocketError = class(Exception);

    implementation

    uses
     MainServerUnit;

    { TClientThread }

    // Сокет для взаимодействия с клиентом создаётся в главной нити,
    // а сюда передаётся через параметр конструктора. Для формирования
    // заголовка сюда же передаётся адрес подключившегося клиента
    constructor TClientThread.Create(ClientSocket: TSocket; const ClientAddr: TSockAddr);
    begin
     FSocket := ClientSocket;
     // Заголовок содержит адрес и номер порта клиента.
     // Этот заголовок будет добавляться ко всем сообщениям в лог
     // от данного клиента.
     FHeader := 'Сообщение от клиента ' + inet_ntoa(ClientAddr.sin_addr) + ':' +
       IntToStr(ntohs(ClientAddr.sin_port)) + ': ';
     // Создаём события и привязываем первое из них к сокету
     FEvents[0] := WSACreateEvent;
     if FEvents[0] = WSA_INVALID_EVENT then
       raise ESocketError.Create(FHeader + 'Ошибка при создании события: ' + GetErrorString);
     FEvents[1] := WSACreateEvent;
     if FEvents[1] = WSA_INVALID_EVENT then
       raise ESocketError.Create(FHeader + 'Ошибка при создании события: ' + GetErrorString);
     FEvents[2] := WSACreateEvent;
     if FEvents[2] = WSA_INVALID_EVENT then
       raise ESocketError.Create(FHeader + 'Ошибка при создании события: ' + GetErrorString);
     if WSAEventSelect(FSocket, FEvents[2], FD_READ or FD_WRITE or FD_CLOSE) = SOCKET_ERROR then
       raise ESocketError.Create(FHeader + 'Ошибка при привязывании сокета к событию: ' + GetErrorString);
     FSendBufSection := TCriticalSection.Create;
     // Объект этой нити не должен удаляться сам
     FreeOnTerminate := False;
     inherited Create(False);
    end;

    destructor TClientThread.Destroy;
    begin
     FSendBufSection.Free;
     WSACloseEvent(FEvents[0]);
     WSACloseEvent(FEvents[1]);
     WSACloseEvent(FEvents[2]);
     inherited;
    end;

    // Функция добавляет строку в буфер для отправки
    procedure TClientThread.SendString(const S: string);
    begin
     FSendBufSection.Enter;
     try
       FSendBuf := FSendBuf + S + #0;
     finally
       FSendBufSection.Leave;
     end;
     LogMessage('Сообщение \"' + S + '\" поставлено в очередь для отправки');
     // Взводим событие, которое говорит, что надо отправлять данные
     WSASetEvent(FEvents[1]);
    end;

    // Отправка всех данных, накопленных в буфере
    // Функция возвращает False, если произошла ошибка,
    // и True, если всё в порядке
    function TClientThread.DoSendBuf: Boolean;
    var
     SendRes: Integer;
    begin
     FSendBufSection.Enter;
     try
       // Если отправлять нечего, выходим
       if FSendBuf = '' then
       begin
         Result := True;
         Exit;
       end;
       // Пытаемся отправить всё, что есть в буфере
       SendRes := send(FSocket, FSendBuf[1], Length(FSendBuf), 0);
       if SendRes > 0 then
       begin
         // Удаляем из буфера ту часть, которая отправилась клиенту
         Delete(FSendBuf, 1, SendRes);
         Result := True;
       end
       else
       begin
         Result := WSAGetLastError = WSAEWOULDBLOCK;
         if not Result then
           LogMessage('Ошибка при отправке данных: ' + GetErrorString);
       end;
     finally
       FSendBufSection.Leave;
     end;
    end;


  • xss22 © (15.02.12 20:58) [4]
    продолжение модуля ClientThread


    procedure TClientThread.Execute;
    const
     // Размер буфера для приёма сообщений
     RecvBufSize = 4096;
    var
     // Буфер для приёма сообщений
     RecvBuf: array[0..RecvBufSize - 1] of Byte;
     RecvRes: Integer;
     NetEvents: TWSANetworkEvents;
     // Полученная строка
     Str: string;
     // Длина полученной строки
     StrLen: Integer;
     // Если ReadLength = True, идёт чтение длины строки,
     // если False - самой строки
     ReadLength: Boolean;
     // Смещение от начала приёмника
     Offset: Integer;
     // Число байт, оставшихся при получении длины строки или самой строки
     BytesLeft: Integer;
     P: Integer;
     L: Integer;
     LoopExit: Boolean;
     WaitRes: Cardinal;
    begin
     LogMessage('Соединение установлено');
     ReadLength := True;
     Offset := 0;
     BytesLeft := SizeOf(Integer);
     repeat
       WaitRes := WSAWaitForMultipleEvents(3, @FEvents, False, WSA_INFINITE, False);
       case  WaitRes of
         WSA_WAIT_EVENT_0:
           begin
             // Закрываем соединение с клиентом и останавливаем нить
             LogMessage('Получен сигнал об остановке нити');
             shutdown(FSocket, SD_BOTH);
             Break;
           end;
         WSA_WAIT_EVENT_0 + 1:
           begin
             // Сбрасываем событие и отправляем данные
             WSAResetEvent(FEvents[1]);
             if not DoSendBuf then
               Break;
           end;
         WSA_WAIT_EVENT_0 + 2:
           begin
             // Произошло событие, связанное с сокетом.
             // Проверяем, какое именно, и заодно сбрасываем его
             if WSAEnumNetworkEvents(FSocket, FEvents[2], NetEvents) = SOCKET_ERROR then
             begin
               LogMessage('Ошибка при получении списка событий: ' + GetErrorString);
               Break;
             end;
             if NetEvents.lNetworkEvents and FD_READ <> 0 then
             begin
               if NetEvents.iErrorCode[FD_READ_BIT] <> 0 then
               begin
                 LogMessage('Ошибка в событии FD_READ: ' + GetErrorString(NetEvents.iErrorCode[FD_READ_BIT]));
                 Break;
               end;
               // В буфере сокета есть данные.
               // Копируем данные из буфера сокета в свой буфер RecvBuf
               RecvRes := recv(FSocket, RecvBuf, SizeOf(RecvBuf), 0);
               if RecvRes > 0 then
               begin
                 P := 0;
                 // Эта переменная нужна потому, что здесь появляется вложенный цикл,
                 // при возникновении ошибки в котором нужно выйти и из внешнего цикла
                 // тоже. Так как в Delphi нет конструкции типа Break(2) в Аде,
                 // приходится прибегать к таким способам: если нужен выход из внешнего
                 // цикла, во внутреннем цикле выполняется LoopExit := True, а после
                 // выполнения внутреннего цикла проверяется значение этой переменной,
                 // и при необходимости выполняется выход и из главного цикла.
                 LoopExit := False;
                 // В этом цикле мы извлекаем данные из буфера
                 // и раскидываем их по приёмникам - Str и StrLen
                 while P < RecvRes do
                 begin
                   // Определяем, сколько байт нам хотелось бы скопировать
                   L := BytesLeft;
                   // Если в буфере нет такого количества, довольствуемся тем, что есть
                   if P + L > RecvRes then
                     L := RecvRes - P;
                   // Копируем в соответствующий приёмник
                   if ReadLength then
                     Move(RecvBuf[P], (PChar(@StrLen) + Offset)^, L)
                   else
                     Move(RecvBuf[P], Str[Offset + 1], L);
                   Dec(BytesLeft, L);
                   // Если прочитали всё, что хотели, переходим к следующему этапу
                   if BytesLeft = 0 then
                   begin
                     ReadLength := not ReadLength;
                     Offset := 0;
                     // Если закончено чтение строки, надо вывести её в лог
                     if ReadLength then
                     begin
                       LogMessage('Получена строка: ' + Str);
                       BytesLeft := SizeOf(Integer);
                       // Формируем ответ и записываем его в буфер
                       Str := AnsiUpperCase(StringReplace(Str, #0, '#0', [rfReplaceAll]))  + ' (EventSelect server)';
                       SendString(Str);
                       Str := '';
                     end
                     else
                     begin
                       if StrLen <= 0 then
                       begin
                         LogMessage('Неверная длина строки от клиента: ' + IntToStr(StrLen));
                         LoopExit := True;
                         Break;
                       end;
                       BytesLeft := StrLen;
                       SetLength(Str, StrLen);
                     end;
                   end
                   else
                     Inc(Offset, L);
                   Inc(P, L);
                 end;
                 // Проверяем, был ли аварийный выход из внутреннего цикла,
                 // и если был, выходим и из внешнего, завершая работу с клиентом
                 if LoopExit then
                   Break;
               end
               else if RecvRes = 0 then
               begin
                 LogMessage('Клиент закрыл соединение');
                 Break;
               end
               else
               begin
                 if WSAGetLastError <> WSAEWOULDBLOCK then
                 begin
                   LogMessage('Ошибка при получении данных от клиента: ' + GetErrorString);
                 end;
               end;
             end;
             // Сокет готов к передаче данных
             if NetEvents.lNetworkEvents and FD_WRITE <> 0 then
             begin
               if NetEvents.iErrorCode[FD_WRITE_BIT] <> 0 then
               begin
                 LogMessage('Ошибка в событии FD_WRITE: ' + GetErrorString(NetEvents.iErrorCode[FD_WRITE_BIT]));
                 Break;
               end;
               // Отправляем то, что лежит в буфере
               if not DoSendBuf then
                 Break;
             end;
             if NetEvents.lNetworkEvents and FD_CLOSE <> 0 then
             begin
               // Клиент закрыл соединение
               if NetEvents.iErrorCode[FD_CLOSE_BIT] <> 0 then
               begin
                 LogMessage('Ошибка в событии FD_CLOSE: ' + GetErrorString(NetEvents.iErrorCode[FD_CLOSE_BIT]));
                 Break;
               end;
               LogMessage('Клиент закрыл соединение');
               shutdown(FSocket, SD_BOTH);
               Break;
             end;
           end;
         WSA_WAIT_FAILED:
           begin
             LogMessage('Ошибка при ожидании сообщения: ' + GetErrorString);
             Break;
           end;
         else
           begin
             LogMessage('Внутренняя ошибка сервера - неверный результат ожидания ' + IntToStr(WaitRes));
             Break;
           end;
       end;
     until False;
     closesocket(FSocket);
     LogMessage('Нить остановлена');
    end;


  • xss22 © (15.02.12 20:58) [5]
    продолжение модуля ClientThread


    // Функция возвращает True, если нить завершилась
    function TClientThread.GetFinished: Boolean;
    begin
     // Ждём окончания работы нити с нулевым таймаутом.
     // Если нить завершена, вернётся WAIT_OBJECT_0.
     // Если ещё работает, вернётся WAIT_TIMEOUT.
     Result := WaitForSingleObject(Handle, 0) = WAIT_OBJECT_0;
    end;

    // Метод для остановки нити извне.
    // Взводим соответствующее событие, а остальное сделаем при обработке события
    procedure TClientThread.StopThread;
    begin
     WSASetEvent(FEvents[0]);
    end;

    procedure TClientThread.LogMessage(const Msg: string);
    begin
     FMessage := FHeader + Msg;
     Synchronize(DoLogMessage);
    end;

    procedure TClientThread.DoLogMessage;
    begin
     ServerForm.AddMessageToLog(FMessage);
    end;

    end.

  • xss22 © (15.02.12 20:58) [6]
    unit MainServerUnit;
    { Пример TCP-сервера, использующего отдельную нить
     для работы с каждым клиентом и ещё одну - для
     слушающего сокета. Главная нить только создаёт сокеты,
     но не работает с ними.
    }


    interface

    uses
     Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms, Dialogs,
     StdCtrls, WinSock, ListenThread;

    type
     TServerForm = class(TForm)
       GroupBox1: TGroupBox;
       LabelPortNumber: TLabel;
       EditPortNumber: TEdit;
       BtnStartServer: TButton;
       LabelServerState: TLabel;
       MemoLog: TMemo;
       BtnStopServer: TButton;
       ChkBoxServerMsg: TCheckBox;
       procedure FormCreate(Sender: TObject);
       procedure BtnStartServerClick(Sender: TObject);
       procedure BtnStopServerClick(Sender: TObject);
       procedure FormClose(Sender: TObject; var Action: TCloseAction);
     private
       FListenThread: TListenThread;
       procedure StopServer;
     public
       procedure AddMessageToLog(const Msg: string);
       procedure OnStopServer;
     end;

    var
     ServerForm: TServerForm;

    // Получение сообщения об ошибке. Функция используется
    // и в других нитях, поэтому здесь мы её экспортируем.
    function GetErrorString(Error: Integer = 0): string;

    implementation

    {$R+}

    {$R *.DFM}

    // Функция GetErrorString возвращает сообщение об ошибке,
    // сформированное системой на основе значения, которое
    // передано в качестве параметра. Если это значение
    // равно нулю (по умолчанию), функция сама определяет
    // код ошибки, используя функцию WSAGetLastError.
    // Для получения сообщения используется системная функция
    // FormatMessage.
    function GetErrorString(Error: Integer): string;
    var
     Buffer: array[0..2047] of Char;
    begin
     if Error = 0 then
       Error := WSAGetLastError;
     FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, nil, Error, $400,
       @Buffer, SizeOf(Buffer), nil);
     Result := Buffer;
    end;

    procedure TServerForm.AddMessageToLog(const Msg: string);
    begin
     MemoLog.Lines.Add(Msg);
     MemoLog.SelStart := Length(MemoLog.Text);
    end;

    // Установка свойств элементов управления в состояние
    // "Сервер не работает"
    procedure TServerForm.OnStopServer;
    begin
     LabelPortNumber.Enabled := True;
     EditPortNumber.Enabled := True;
     BtnStartServer.Enabled := True;
     BtnStopServer.Enabled := False;
     ChkBoxServerMsg.Enabled := True;
     LabelServerState.Caption := 'Сервер не работает';
     FListenThread := nil;
    end;

    procedure TServerForm.FormCreate(Sender: TObject);
    var
     WSAData: TWSAData;
    begin
     // Инициализация библиотеки сокетов
     if WSAStartup($101, WSAData) <> 0 then
     begin
       // Если инициализация не удалась, смысла продолжать работу нет
       MessageDlg('Ошибка при инициализации библиотеки WinSock', mtError, [mbOK], 0);
       Application.Terminate;
     end;
     OnStopServer;
    end;

    // Реакция на кнопку "Запустить" - запуск сервера
    procedure TServerForm.BtnStartServerClick(Sender: TObject);
    var
     // Сокет, который будет "слушать"
     ServerSocket: TSocket;
     // Адрес, к которому привязывается слушающий сокет
     ServerAddr: TSockAddr;
    begin
     // Формируем адрес для привязки.
     FillChar(ServerAddr.sin_zero, SizeOf(ServerAddr.sin_zero), 0);
     ServerAddr.sin_family := AF_INET;
     ServerAddr.sin_addr.S_addr := INADDR_ANY;
     try
       ServerAddr.sin_port := htons(StrToInt(EditPortNumber.Text));
       if ServerAddr.sin_port = 0 then
       begin
         MessageDlg('Номер порта должен лежать в диапазоне 1-65535', mtError, [mbOK], 0);
         Exit;
       end;
       // Создание сокета
       ServerSocket := socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
       if ServerSocket = INVALID_SOCKET then
       begin
         MessageDlg('Ошибка при создании сокета:'#13#10 + GetErrorString, mtError, [mbOK], 0);
         Exit;
       end;
       // Привязка сокета к адресу
       if bind(ServerSocket, ServerAddr, SizeOf(ServerAddr)) = SOCKET_ERROR then
       begin
         MessageDlg('Ошибка при привязке сокета к адресу:'#13#10 + GetErrorString, mtError, [mbOK], 0);
         closesocket(ServerSocket);
         Exit;
       end;
       // Перевод сокета в режим прослушивания
       if listen(ServerSocket, SOMAXCONN) = SOCKET_ERROR then
       begin
         MessageDlg('Ошибка при перводе сокета в режим прослушивания:'#13#10 + GetErrorString, mtError, [mbOK], 0);
         closesocket(ServerSocket);
         Exit;
       end;
       // Запуск нити, обслуживающей слушающий сокет
       FListenThread := TListenThread.Create(ServerSocket, ChkBoxServerMsg.Checked);
       // Перевод элементов управления в состояние "Сервер работает"
       LabelPortNumber.Enabled := False;
       EditPortNumber.Enabled := False;
       BtnStartServer.Enabled := False;
       BtnStopServer.Enabled := True;
       ChkBoxServerMsg.Enabled := False;
       LabelServerState.Caption := 'Сервер работает';
     except
       on EConvertError do
         // Это исключение может возникнуть только в одном месте -
         // при вызове StrToInt(EditPortNumber.Text)
         MessageDlg('\"' + EditPortNumber.Text + '\" не является целым числом', mtError, [mbOK], 0);
       on ERangeError do
         // Это исключение может возникнуть только в одном месте -
         // при присваивании значения номеру порта
         MessageDlg('Номер порта должен лежать в диапазоне 1-65535', mtError, [mbOK], 0);
     end;
    end;

    // Остановка сервера
    procedure TServerForm.StopServer;
    begin
     // Запрещаем кнопку, чтобы пользователь не мог нажать её
     // ещё раз, пока сервер не остановится.
     BtnStopServer.Enabled := False;
     // Ожидаем завершения слушающей нити. Так как вывод сообщений
     // эта нить осуществляет через Synchronize, выполняемый главной
     // нитью в петле сообщений, вызов метода WaitFor мог бы привести
     // к взаимной блокировке: главная нить ждала бы, когда завершится
     // нить TListenThread, а та, в свою очередь - когда главная нить
     // выполнит Synchronize. Чтобы этого не происходило, организуется
     // ожидание с локальной петлёй сообщений.
     if Assigned(FListenThread) then
     begin
       FListenThread.StopServer;
       while Assigned(FListenThread) do
       begin
         Application.ProcessMessages;
         Sleep(10);
       end;
     end;
    end;

    procedure TServerForm.BtnStopServerClick(Sender: TObject);
    begin
     StopServer;
    end;

    procedure TServerForm.FormClose(Sender: TObject; var Action: TCloseAction);
    begin
     StopServer;
    end;

    end.

  • _ (16.02.12 00:13) [7]
    AcceptEx
    ConnectEx
    CreateIoCompletionPort
    GetQueuedCompletionStatus

    и 1000+ клиентов и серверов в одном доп. потоке.

    Коли шибко нужно - могу и исходник предложить
  • xss22 © (16.02.12 08:03) [8]
    Да, очень нужно!
    Исходником был бы очень рад.
    Мой ящик xss22@mail.ru

    Заранее огромнейшее спасибо
  • Anatoly Podgoretsky © (16.02.12 09:37) [9]
    > xss22  (15.02.2012 20:54:00)  [0]

    Как бы не так, ассинхронные события и потоки.
    Делай все в главной нити, по событиям, а потоки только для длительных
    операций работы с базой, да и то только пул потоков
  • Anatoly Podgoretsky © (16.02.12 09:39) [10]
    > xss22  (16.02.2012 08:03:08)  [8]

    Есть такой продукт как ICS, вот примеры к нему легко доступны, возьми их для
    понимания как все это делается в одном потоке.
  • Сергей М. © (16.02.12 11:29) [11]
    Если объем данных в запросах к серверу и его соотв.ответах на эти запросы сравнительно небольшой, но при этом обработка сервером клиентских запросов времяемкая, то вне зависимости от модели работы с гнездами (синхронная или асинхронная) можно реализовать след.схему:

    - транспортную логику  - акцептирование кл.запросов на соединение, прием инф.запросов и передача инф.ответов - реализует один-единственный поток "ListeningTransportThread" (например, основной)

    - логику обработки принятых инф.запросов и формирования инф.ответов реализуют доп.потоки "RequestProcessingThread", помещенные в пул, размер которого не превышает определенного значения.

    - логику распределения принятых инф.запросов по свободным потокам из пула реализует доп.поток "ThreadPoolManager"
  • xss22 © (16.02.12 11:58) [12]
    А где по ICS можно документацию найти?
    Желательно на русском.
  • Anatoly Podgoretsky © (16.02.12 12:14) [13]
    > xss22  (16.02.2012 11:58:12)  [12]

    Документпции практически нет, это халява сэр.
  • xss22 © (16.02.12 13:13) [14]
    Anatoly Podgoretsky, ты можешь дать свой ICQ или скайп или майл агент?
    Чтобы иногда задать вопросы.
    Кстати, меня зовут Алексей.
  • Anatoly Podgoretsky © (16.02.12 13:46) [15]
    > xss22  (16.02.2012 13:13:14)  [14]

    майл агента нет как класс.
    ICQ и скайп только представительские.
    email никому не даю.

    Для вопросов существует форум, на личные вопросы ругаюсь и не отвечаю.
  • xss22 © (16.02.12 13:56) [16]
    Тогда у меня вопрос по ICS.
    Разбираю демку сервера OverbyteIcsThrdSrvV3
    понятно как работать с подключенным клиентом.

    Но как найти в Client as  TMyClient конкретно какого то клиента по Nick ?

    Прилагаю код демки:

    unit OverbyteIcsThrdSrvV3_1;

    interface

    uses
     Windows, Messages, SysUtils, Classes, Graphics, Controls, Forms,
     Dialogs, OverbyteIcsIniFiles, StdCtrls, ExtCtrls, OverbyteIcsWSocket, OverbyteIcsWSocketS,
     OverbyteIcsWSocketTS, OverbyteIcsWndControl;

    const
     WM_APPSTARTUP  = WM_USER + 1;
     WM_LOG_MESSAGE = WM_USER + 2;
    var
     LockDisplay : TRtlCriticalSection;

    type

     TMyClient = class(TWSocketThrdClient)
     public
       RcvdLine1    : String;
       ConnectTime : TDateTime;
       Nick    : String;
     end;

     TThrdSrvForm = class(TForm)
       ToolPanel: TPanel;
       DisplayMemo: TMemo;
       ClientsPerThreadEdit: TEdit;
       Label1: TLabel;
       DisconnectAllButton: TButton;
       ClearMemoButton: TButton;
       WSocketThrdServer1: TWSocketThrdServer;
       procedure FormShow(Sender: TObject);
       procedure FormClose(Sender: TObject; var Action: TCloseAction);
       procedure FormCreate(Sender: TObject);
       procedure WSocketThrdServer1ClientConnect(Sender: TObject;
         Client: TWSocketClient; Error: Word);
       procedure WSocketThrdServer1ClientDisconnect(Sender: TObject;
         Client: TWSocketClient; Error: Word);
       procedure WSocketThrdServer1BgException(Sender: TObject; E: Exception;
         var CanClose: Boolean);
       procedure FormDestroy(Sender: TObject);
       procedure WSocketThrdServer1ClientCreate(Sender: TObject;
         Client: TWSocketClient);
       procedure ClientsPerThreadEditChange(Sender: TObject);
       procedure DisconnectAllButtonClick(Sender: TObject);
       procedure ClearMemoButtonClick(Sender: TObject);
       procedure WSocketThrdServer1ThreadException(Sender: TObject;
         AThread: TWsClientThread; const AErrMsg: String);
     private
       FIniFileName : String;
       FInitialized : Boolean;
       FLogList  : TStringList;
       procedure Display(const Msg : String);
       procedure WmLogMessage(var Msg: TMessage); message WM_LOG_MESSAGE;
       procedure WMAppStartup(var Msg: TMessage); message WM_APPSTARTUP;
       procedure ClientDataAvailable(Sender: TObject; Error: Word);
       procedure ProcessData(Client : TMyClient);
       procedure ClientBgException(Sender       : TObject;
                                   E            : Exception;
                                   var CanClose : Boolean);
       procedure ClientLineLimitExceeded(Sender        : TObject;
                                         Cnt           : LongInt;
                                         var ClearData : Boolean);
     public
       property IniFileName : String read FIniFileName write FIniFileName;
     end;

    var
     ThrdSrvForm : TThrdSrvForm;

    implementation

    {$R *.DFM}
    uses
       OverbyteIcsUtils;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.FormCreate(Sender: TObject);
    begin
       FIniFileName := OverbyteIcsIniFiles.GetIcsIniFileName;
       FLogList     := nil;
       FLogList     := TStringList.Create;
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.FormDestroy(Sender: TObject);
    begin
       EnterCriticalSection(LockDisplay);
       try
           if Assigned(FLogList) then
               FLogList.Free;
       finally
           LeaveCriticalSection(LockDisplay);
       end;
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.FormShow(Sender: TObject);
    begin
       if not FInitialized then begin
           FInitialized := TRUE;
           DisplayMemo.Clear;
           { Delay startup code until our UI is ready and visible }
           PostMessage(Handle, WM_APPSTARTUP, 0, 0);
       end;
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.FormClose(Sender: TObject; var Action: TCloseAction);
    begin

    end;

    { Display a message in our display memo. Delete lines to be sure to not     }
    { overflow the memo which may have a limited capacity.                      }
    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.WmLogMessage(var Msg: TMessage);
    var
       I : Integer;
    begin
       DisplayMemo.Lines.BeginUpdate;
       try
           if DisplayMemo.Lines.Count > 200 then begin
               for I := 1 to 50 do
                   DisplayMemo.Lines.Delete(0);
           end;
           EnterCriticalSection(LockDisplay);
           try
               DisplayMemo.Lines.AddStrings(FLogList);
               FLogList.Clear;
           finally
               LeaveCriticalSection(LockDisplay);
           end;
       finally
           DisplayMemo.Lines.EndUpdate;
           DisplayMemo.Perform(EM_SCROLLCARET, 0, 0);
       end;
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.Display(const Msg : String);
    begin
       EnterCriticalSection(LockDisplay);
       try
           FLogList.Add(Msg);
           PostMessage(Handle, WM_LOG_MESSAGE, 0, 0);
       finally
           LeaveCriticalSection(LockDisplay);
       end;
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    { This is our custom message handler. We posted a WM_APPSTARTUP message     }
    { from FormShow event handler. Now UI is ready and visible.                 }
    procedure TThrdSrvForm.WMAppStartup(var Msg: TMessage);
    begin
       WSocketThrdServer1.Proto       := 'tcp';                { Use TCP protocol  }
       WSocketThrdServer1.Port        := '2211';             { Use telnet port   }
       WSocketThrdServer1.Addr        := '0.0.0.0';            { Use any interface }
       WSocketThrdServer1.ClientClass := TMyClient;            { Use our component }
       WSocketThrdServer1.Listen;                              { Start listening    }
       Display('Waiting for clients...');
    end;


  • xss22 © (16.02.12 13:57) [17]
    продолжение кода из демки OverbyteIcsThrdSrvV3


    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.WSocketThrdServer1ClientCreate(Sender: TObject;
     Client: TWSocketClient);
    var
       Cli : TMyClient;
    begin
       Cli := Client as  TMyClient;
       Cli.LineMode            := TRUE;
       Cli.LineEdit            := TRUE;
       Cli.LineLimit           := 255; { Do not accept long lines }
       Cli.OnDataAvailable     := ClientDataAvailable;
       Cli.OnLineLimitExceeded := ClientLineLimitExceeded;
       Cli.OnBgException       := ClientBgException;
       Cli.ConnectTime         := Now;
     
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.WSocketThrdServer1ClientConnect(
       Sender : TObject;
       Client : TWSocketClient;
       Error  : Word);
    begin
       with Client as TMyClient do begin
           Display('Client connected.' +
                   ' Remote: ' + PeerAddr + '/' + PeerPort +
                   ' Local: '  + GetXAddr + '/' + GetXPort +
                   ' ThrdID : $' + IntToStr(ClientThread.ThreadID) +
                   ' ThrdCnt: #' + IntToStr(WSocketThrdServer1.ThreadCount) + #13#10 +
                   'There is now ' +
                   IntToStr(TWSocketThrdServer(Sender).ClientCount) +
                   ' clients connected.');

           Client.LineMode            := TRUE;
           Client.LineEdit            := TRUE;
           Client.LineLimit           := 255; { Do not accept long lines }
           Client.OnDataAvailable     := ClientDataAvailable;
           Client.OnLineLimitExceeded := ClientLineLimitExceeded;
           Client.OnBgException       := ClientBgException;
           TMyClient(Client).ConnectTime  := Now;
       end;
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.WSocketThrdServer1ClientDisconnect(
       Sender : TObject;
       Client : TWSocketClient;
       Error  : Word);
    var
       MyClient       : TMyClient;
       ClientThreadID : Integer;
    begin
       MyClient := Client as TMyClient;
       if Assigned(MyClient.ClientThread) then ClientThreadID := MyClient.ClientThread.ThreadID
       else ClientThreadID := -1;
       
       Display('Client disconnecting: ' + MyClient.PeerAddr + '   ' +
               'Duration: ' + FormatDateTime('hh:nn:ss',
               Now - MyClient.ConnectTime) + ' Error: ' + IntTostr(Error) +
               ' ThrdID: $' + IntToStr(ClientThreadID) +
               ' ThrdCnt: #' + IntToStr(TWSocketThrdServer(Sender).ThreadCount) + #13#10 +
               'There is now ' +
               IntToStr(TWSocketThrdServer(Sender).ClientCount - 1) +
               ' clients connected.');
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.ClientLineLimitExceeded(
       Sender        : TObject;
       Cnt           : LongInt;
       var ClearData : Boolean);
    begin
       with Sender as TMyClient do begin
           Display('Line limit exceeded from ' + GetPeerAddr + '. Closing.');
           ClearData := TRUE;
           Close;
       end;
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.ClientDataAvailable(
       Sender : TObject;
       Error  : Word);
    var
       Cli : TMyClient;
    begin
       Cli := Sender as TMyClient;
       { We use line mode. We will receive complete lines }

       Cli.RcvdLine1 := Cli.ReceiveStr;
           { Remove trailing CR/LF }
       while (Length(Cli.RcvdLine1) > 0) and
                 IsCharInSysCharSet(Cli.RcvdLine1[Length(Cli.RcvdLine1)], [ #13, #10]) do
               Cli.RcvdLine1 := Copy(Cli.RcvdLine1, 1, Length(Cli.RcvdLine1) - 1);
       Display('Received from ' + Cli.GetPeerAddr + ': ''' + Cli.RcvdLine1 + '''');
       ProcessData(Cli);
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.ProcessData(Client : TMyClient);
    var
       I       : Integer;
       AClient : TMyClient;
    begin
       { We could replace all those CompareText with a table lookup }
       if CompareText(Client.RcvdLine1, 'help') = 0 then
           Client.SendStr('Commands are:' + #13#10 +
                          '  exit' + #13#10 +
                          '  who' + #13#10 +
                          '  time' + #13#10 +
                          '  threadexception' + #13#10 )
                          //'  exception' + #13#10 )
       else if CompareText(Client.RcvdLine1, 'exit') = 0 then
           { We can't call Client.Close here because we will immediately }
           { reenter DataAvailable event handler with same line because  }
           { a line is removed from buffer AFTER it has been processed.  }
           { Using CloseDelayed will delay Close until we are out of     }
           { current event handler.                                      }
           Client.CloseDelayed
       else if CompareText(Client.RcvdLine1, '
    time') = 0 then
           { Send server date and time to client }
           Client.SendStr(DateTimeToStr(Now) + #13#10)
       else if CompareText(Client.RcvdLine1, '
    who') = 0 then begin
           { Send client list to client }
           Client.SendStr('
    There are ' + IntToStr(WSocketThrdServer1.ClientCount) +
                          '
    connected users:' + #13#10);
           for I := WSocketThrdServer1.ClientCount - 1 downto 0 do begin
               AClient := TMyClient(WSocketThrdServer1.Client[I]);
               Client.SendStr(AClient.PeerAddr + '
    :' + AClient.GetPeerPort + ' ' +
                              DateTimeToStr(AClient.ConnectTime) + #13#10);
           end;
       end
       (*
       else if CompareText(Client.RcvdLine, '
    exception') = 0 then
           { This will trigger a background exception for client }
           PostMessage(Client.Handle, WM_TRIGGER_EXCEPTION, 0, 0)
       *)

       else if CompareText(Client.RcvdLine1, '
    threadexception') = 0 then
           { This will trigger a background exception for client }
           PostThreadMessage(GetCurrentThreadID, WM_THREAD_EXCEPTION_TEST, 0, 0)
       else
           if Client.State = wsConnected then
               Client.SendStr('
    Unknown command: ''' + Client.RcvdLine1 + '''' + #13#10);
    end;


  • xss22 © (16.02.12 13:57) [18]
    продолжение кода из демки OverbyteIcsThrdSrvV3

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    { This event handler is called when listening (server) socket experienced   }
    { a background exception. Should normally never occurs.                     }
    procedure TThrdSrvForm.WSocketThrdServer1BgException(
       Sender       : TObject;
       E            : Exception;
       var CanClose : Boolean);
    begin
       Display('Server exception occured: ' + E.ClassName + ': ' + E.Message);
       CanClose := FALSE;  { Hoping that server will still work ! }
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    { This event handler is called when a client socket experience a background }
    { exception. It is likely to occurs when client aborted connection and data }
    { has not been sent yet.                                                    }
    procedure TThrdSrvForm.ClientBgException(
       Sender       : TObject;
       E            : Exception;
       var CanClose : Boolean);
    begin
       with Sender as TMyClient do begin
           Display('Client exception occured: ' + E.ClassName + ': ' + E.Message);
           CanClose := TRUE;   { Goodbye client ! }
       end;
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.ClientsPerThreadEditChange(Sender: TObject);
    var
       Num : Integer;
    begin
       try
           Num := StrToInt((Sender as TEdit).Text);
       except
           Num := 1;
       end;
       WSocketThrdServer1.ClientsPerThread := Num;
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.DisconnectAllButtonClick(Sender: TObject);
    begin
       WSocketThrdServer1.DisconnectAll;
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.ClearMemoButtonClick(Sender: TObject);
    begin
       DisplayMemo.Clear;
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    procedure TThrdSrvForm.WSocketThrdServer1ThreadException(
       Sender        : TObject;
       AThread       : TWsClientThread;
       const AErrMsg : String);
    begin
       Display(AErrMsg);
    end;

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    initialization
       InitializeCriticalSection(LockDisplay);

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    finalization
       DeleteCriticalSection(LockDisplay);

    {* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *}
    end.

  • Anatoly Podgoretsky © (16.02.12 14:13) [19]
    > xss22  (16.02.2012 13:56:16)  [16]

    А чего его искать, когда это поле в потоке     Nick: String;
    Оно сразу доступно.
 
Конференция "Сети" » Многонитевой TCP-сервер на асинхронных событиях [D7]
Есть новые Нет новых   [134435   +14][b:0][p:0.042]