Конференция "Сети" » Видеопоток по idhttp. Зависание потока [D6, WinXP]
 
  • olikke (08.10.12 12:04) [0]
    Здавствуйте!

    Задача:
    Есть несколько IP камер Axis (P1344) и бесконечное количество клиентов для них. Клиент может подключиться к любой из камер, а также к нескольким из них.
    Поскольку к камерам напрямую подключаться нельзя, нужно создать что-то типа видеосервера. В задачу видеосервера входит получение от всех камер видеопотока по http и выдача их в сеть клиентам (UDP Broadcast - все клиенты в одной локальной  сети).
    Как делаю:
    Для каждой камеры создаётся свой экземпляр TThread, внутри которого живёт idHTTP. Ну и естественно свой UDPClient (тоже использую Indy10).

    Собственно код потока для HTTP:

    unit THttp;

    interface

    uses
     Windows,Classes,SysUtils,IdHTTP,SyncObjs,UDP;

    type
     TSt = class(TThread)
     private
       idHTTP1:TidHTTP;
       UDPClient1:TUDP;
       xStream: TStream;
     protected
       function LiveStreamWrite(const aBuffer; aCount: Longint): Longint;
       procedure Execute; override;
     public
       URL:String;
       Numb:byte;
       FEvent:TEvent;
     end;

    implementation
    uses
     LiveStreamer,Global;

    procedure TSt.Execute;
    begin
     FreeOnTerminate:=true;
     idHTTP1:=TidHTTP.Create(nil);
     idHTTP1.Request:=TidHTTPRequest.Create(idHTTP1);
     idHTTP1.Request.BasicAuthentication:=true;
     idHTTP1.Request.Password:='****';
     idHTTP1.Request.Username:='****';
     idHTTP1.HTTPOptions:=[hoForceEncodeParams];
     idHTTp1.Request.Connection:='Keep-Alive';
     UDPClient1:=TUDP.Create(True);
     UDPClient1.Priority:=tpNormal;
     UDPClient1.FreeOnTerminate:=True;
     UDPClient1.Port:=Table[Numb].Port;
     UDPClient1.Numb:=Numb;
     UDPClient1.Resume;
     xStream := TLiveStream.Create(nil, LiveStreamWrite);
     xStream.Size:=$8800;
     XStream.Position:=0;
     while (not fl_STOP) and (not fl_st[Numb]) do
     begin
       try idHTTP1.Get(URL, xStream);
       finally  end;
     end;
     FreeAndNil(xStream);
     UDPCLient1.Terminate;
     Event[numb].Setevent;
     UDPClient1:=nil;
    end;

    function TSt.LiveStreamWrite(const aBuffer; aCount: Integer): Longint;
    begin
       Buffer[Numb].Lock;
     try
       Buffer[Numb].Size:=0;
       Buffer[Numb].Write(aBuffer,aCount);
       Buffer[Numb].Position:=0;
     finally
       Buffer[Numb].UnLock;
     end;
     Event[Numb].SetEvent;
    end;

    end.



    Код потока для UDP:

    unit UDP;

    interface

    uses
     Classes,IdUDPClient,IdGlobal,SyncObjs,SysUtils;

    type
     TUDP = class(TThread)
     private
       UDPClient:TidUDPClient;
     protected
       procedure Execute; override;
     public
       Port:integer;
       HOST:string;
       Numb:byte;
     end;

    implementation

    uses Global;

    procedure TUDP.Execute;
    var buff:string;
    begin
     FreeOnTerminate:=true;
     //Создание клиента для передачи данных
     UDPClient:=TIdUDPClient.Create;
     UDPClient.BufferSize:=$8800;
     UDPClient.IPVersion:=Id_IPv4;
     UDPClient.Port:=Port;
     UDPClient.ReceiveTimeout:=-1;
     UDPClient.Tag:=0;
     UDPClient.BroadcastEnabled:=true;
     UDPClient.Host:=Table[Numb].Broadcast;
     try UDPClient.Active:=True;
     except end;
     while not Terminated do
     begin
       Event[Numb].WaitFor(INFINITE);
       Event[Numb].ResetEvent;
       if terminated then
       begin
         UDPClient.Active:=false;
         UDPClient.Free;
         buff:='';
         exit;
       end;
       buff:='';
         Buffer[Numb].Lock;
       try
         Buff:=Buffer[Numb].DataString;
       finally
         Buffer[Numb].UnLock;
       end;
       try UDPClient.Send(TRIM(Table[Numb].Broadcast),UDPClient.Port,buff);
       except
       end;
       buff:='';
     end;
    end;

    end.



    Так вот, вся эта зараза прекрасно работает. Клиенты смотрят картинки с камер.
    Но где-то через час-два каринтка пропадает. Точнее от IP камеры пропадает поток (смотрела сниффером). А значит экземпояр потока TSt, привязанного к этой камере бесконечно ждёт продолжения кино (висит).

    У меня два вопроса:
    1. Почему пропадает поток от IP камеры (мой косяк или её?)
    2. Как мне грамотно прибить поток висящий? Кроме TerminateThread до него ничем не достучаться. Но в этом случае все мои ресурсы остаются незакрытыми. Да и неправильно это как-то....

    СПАСИТЕ МЕНЯ!!!
  • RWolf © (08.10.12 12:08) [1]
    > А значит экземпояр потока TSt, привязанного к этой камере бесконечно ждёт продолжения кино (висит).

    ну так пусть ждёт не бесконечно.
  • olikke (08.10.12 12:25) [2]
    Уважаемый RWolf, в том то и вопрос, что нельзя чтоб он висел. Надо либо понять почему он виснет, либо перезапустить его. Я уже думала о создании следящего потока, но поскольку зависание внутри цикла в execute мне корректно не закрыться. И вообще задача максимум обойтись без этого.
  • brother © (08.10.12 16:00) [3]
    > UDPClient.ReceiveTimeout:=-1;

    меня это смущает
  • olikke (08.10.12 21:29) [4]
    To brother
    Почему? Я не жду ответа.
  • DVM © (08.10.12 23:34) [5]

    > olikke   (08.10.12 12:04) 


    >  while (not fl_STOP) and (not fl_st[Numb]) do
    >  begin
    >    try idHTTP1.Get(URL, xStream);
    >    finally  end;
    >  end;

    URL какой?
  • DVM © (08.10.12 23:38) [6]

    > olikke   (08.10.12 12:04) 

    вообще конечно недочетов в коде огромное количество.

    вот например, что будет если произойдет ошибка в idHTTP1.Get(URL, xStream);
    Поток умрет просто. Тихо без выброса исключений.
  • olikke (09.10.12 10:04) [7]
  • DVM © (09.10.12 11:25) [8]

    > olikke   (09.10.12 10:04) [7]

    Это multipart/x-mixed-replace данные, TIDHttp вообще не поддерживает до Indy 10 включительно данный Content-Type. Поддержка обещается в будущих версиях. Для нормальной работы с данным типом данных нужно использовать TIdTCPClient  (ну или чистые сокеты) и самому реализовывать разбор потока.

    Этот поток не имеет конца, данные будут поступать и поступать до тех пор пока клиент не оборвет соединение. Я не понимаю (но догадываюсь) как это у вас может работать, если вы читаете ответ сервера таким образом:

    while (not fl_STOP) and (not fl_st[Numb]) do
    begin
      try idHTTP1.Get(URL, xStream);
      finally  end;
    end;



    Метод  idHTTP1.Get(URL, xStream) вообще никогда не вернет управление. Я догадываюсь, что xStream у вас какой то хитрый наследник TStream (кстати, потокозащищенный?) в котором как то обрабатываются поступающие данные, но так как кода его нет, что больше ничего сказать не могу.

    Короче говоря, при чтении вероятно возникает ошибка, ваш поток TSt умирает и все. При ошибке в TIdHTTP надо закрывать сокет самостоятельно, тем более что у вас  idHTTp1.Request.Connection:='Keep-Alive';
  • olikke (09.10.12 11:42) [9]
    Спасибо, DVM, за ответ. Очень похоже на то что происходит у меня. Видела тему где вы подробно разбирали реализацию данного запроса (buffer у меня ваш tsavebuffer). Буду смотреть в эту сторону.
  • DVM © (09.10.12 12:07) [10]

    > buffer у меня ваш tsavebuffer

    В принципе можно сделать и наследника TStream по аналогии с тем буфером, т.е. получить TMemoryStream у которого можно удалять данные из начала без лишних телодвижений с памятью. Вот у меня есть подобный (правда не тестировал) в него можно писать с одной стороны, а забирать данные с другой (не забывая о синхронизации если программа многопоточная)


    ////////////////////////////////////////////////////////////////////////////////
    //
    //  ****************************************************************************
    //  * Модуль       : uFlexibleMemoryStream.pas
    //  * Назначение   : Расширенный Memory Stream
    //  * Copyright    : 2012 © Дмитрий Муратов (dvmuratov@yandex.ru)
    //  ****************************************************************************
    //
    ////////////////////////////////////////////////////////////////////////////////

    {
     Данный класс представляет собой расширенный аналог TMemoryStream,
     главным отличием которого от оригинального является возможность удаления данных
     из начала. При таком удалении оставшиеся данные не перемещаются в памяти,
     перемещается лишь указатель на них (Head), и соответственно
     уменьшается Position. При добавлении данных проверяется есть ли свободное место
     в уже выделенном буфере перед или за полезными данными и если есть, оно используется
     по возможности. Если в буфере недостаточно свободного (Unused) места, то происходит
     перевыделение памяти под буфер и копирование содержимого старого буфера в новый.
     Также данный класс обладает некоторыми дополнительными методами, повышающими
     удобство работы.

     Схематично хранение данных в данном Stream можно представить:

     +--------------+---------------------------------+--------------+
     | Unused space |   Data                          | Unused space |
     +--------------+---------------------------------+--------------+
     ^              ^          ^                      ^
     Storage        Head       Position               Tail

     Storage - указатель на начало внутреннего буфера
     Head - указатель на начало данных (аналог Memory в TMemoryStream)
     Tail - указаетль на конец данных

    }


    {$I DEFINES.INC}

    unit uFlexibleMemoryStream;

    interface

    uses
    {$IFDEF DELPHIXE2_UP}
     System.SysUtils, System.Classes, System.Math;
    {$ELSE}
     SysUtils, Classes, Math;
    {$ENDIF}

    type
     TFlexibleMemoryStream = class(TStream)
     const
       DefAllocation = 1024;
     private
       FStorage: PByte;          // Указатель на начало буфера
       FHead: PByte;             // Указатель на начало полезных данных в буфере
       FAllocation: NativeInt;   // Общее количество выделенной под буфер памяти
       FSize: NativeInt;         // Размер полезных данных
       FPosition: NativeInt;     // Текущая позиция в полезных данных (смещение от FHead)
       function GetTail: PByte;
     public
       constructor Create; overload;
       constructor Create(const AAllocation: NativeInt); overload;
       constructor Create(const ABuffer; ACount: Longint); overload;
       constructor Create(const AStream: TStream); overload;
       destructor Destroy; override;
       procedure Clear;
       procedure Tidy;
       procedure Expand(const ACount: NativeInt); virtual;
       function Consume(ACount: NativeInt): NativeInt; virtual;
       function Extract(ACount: NativeInt): PByte;
       function Append(const ABuffer; ACount: Longint): Longint; overload; virtual;
       function Append(const AStream: TStream): Int64; overload; virtual;
       function Shrink(ACount: NativeInt): NativeInt; virtual;
       procedure SetSize(const ANewSize: Int64); override;
       procedure LoadFromStream(AStream: TStream); virtual;
       procedure LoadFromBuffer(const ABuffer; ACount: Longint); virtual;
       procedure LoadFromFile(const AFileName: string); virtual;
       procedure SaveToStream(AStream: TStream); virtual;
       procedure SaveToFile(const AFileName: string); virtual;
       function Seek(const AOffset: Int64; AOrigin: TSeekOrigin): Int64; override;
       function Read(var ABuffer; ACount: Longint): Longint; override;
       function Write(const ABuffer; ACount: Longint): Longint; override;
       function Pos(const ABuffer; ACount: Longint; AOffset: NativeInt = 0): NativeInt; virtual;
       function IsEmpty: boolean;
       property Head: PByte read FHead;
       property Tail: PByte read GetTail;
       property Storage: PByte read FStorage;
       property Allocation: NativeInt read FAllocation;
     end;


  • DVM © (09.10.12 12:07) [11]

    implementation

    { TFlexibleMemoryStream }

    function TFlexibleMemoryStream.Append(const ABuffer; ACount: Integer): Longint;
    begin
     FPosition := FSize;
     Result := Write(ABuffer, ACount);
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.Append(const AStream: TStream): Int64;
    begin
     Result := 0;
     if AStream.Size = 0 then
       Exit;
     Expand(AStream.Size);
     Seek(0, soEnd);
     Result := CopyFrom(AStream, 0);
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.Clear;
    begin
     FSize := 0;
     FHead := FStorage;
     FPosition := 0;
    end;

    //------------------------------------------------------------------------------

    constructor TFlexibleMemoryStream.Create;
    begin
     Create(DefAllocation);
    end;

    //------------------------------------------------------------------------------

    constructor TFlexibleMemoryStream.Create(const AAllocation: NativeInt);
    begin
     inherited Create;
     FAllocation := AAllocation;
     FSize := 0;
     GetMem(FStorage, FAllocation);
     FHead := FStorage;
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.Consume(ACount: NativeInt): NativeInt;
    begin
     Extract(ACount);
     Result := FSize;
    end;

    //------------------------------------------------------------------------------

    destructor TFlexibleMemoryStream.Destroy;
    begin
     FreeMem(FStorage, FAllocation);
     inherited Destroy;
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.Expand(const ACount: NativeInt);
    var
     Spare: NativeInt;      // Незанятый объем внутреннего буфера
     HeadSpace: NativeInt;  // Незанятый объем в  начале буфера
     TailSpace: NativeInt;  // Незанятый объем в конце буфера
     OldAllocation: NativeInt;
     NewStorage: PByte;
    begin
     Spare := FAllocation - FSize;
     HeadSpace := FHead - FStorage;
     TailSpace := Spare - HeadSpace;
     // Если в буфере доставточно свободного места для добавления ACount байт
     if Spare >= ACount then
       begin
         // Если в хвосте свободного места меньше чем надо добавить
         if TailSpace < ACount then
           begin
             // Двигаем данные в начало буфера
             Move(FHead^, FStorage^, FSize);
             // Начало буфера теперь совпадает с началом данных
             FHead := FStorage;
           end
         else
           begin
             // Ничего не делаем, в хвосте достаточно места для добавления ACount байт
           end;
       end
     else
       // В буфере недостаточно места для добавления ACount байт, создаем новый буфер,
       // копируем в него данные из старого, старый удаляем.
       begin
         OldAllocation := FAllocation;

         //FAllocation := FAllocation + ACount;
         FAllocation := Max(FAllocation, ACount) * 2;

         GetMem(NewStorage, FAllocation);
         if FStorage <> nil then
           begin
             Move(FHead^, NewStorage^, FSize);
             FreeMem(FStorage, OldAllocation);
           end;
         FStorage := NewStorage;
         FHead := FStorage;
       end;
     FSize := FSize  + ACount;
    end;

    //------------------------------------------------------------------------------

  • DVM © (09.10.12 12:08) [12]

    function TFlexibleMemoryStream.Extract(ACount: NativeInt): PByte;
    begin
     Result := FHead;
     if ACount <= 0 then
       Exit;
     if ACount > FSize then
       ACount := FSize;
     FHead := FHead + ACount;
     FSize := FSize - ACount;
     FPosition := FPosition - ACount;
     if FPosition < 0  then
       FPosition := 0;
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.GetTail: PByte;
    begin
     Result := PByte(FHead + FSize);
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.Pos(const ABuffer; ACount: Longint; AOffset: NativeInt = 0): NativeInt;

     function StartsBuffer(ASubBuffer: Pointer; ASubBufferSize: Longint;
       ABuffer: Pointer; ABufferSize: NativeInt): Boolean;
     begin
       if ASubBufferSize <= ABufferSize then
         Result := CompareMem(ASubBuffer, ABuffer, ASubBufferSize)
       else
         Result := False;
     end;

    var
     i: NativeInt;
     P: PByte;
    begin
     Result := -1;
     for I := AOffset to FSize - 1 do
       begin
         P := FHead + I;
         if StartsBuffer(@ABuffer, ACount, Pointer(P), FSize - I) then
           begin
             Result := i;
             Break;
           end;
       end;
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.IsEmpty: boolean;
    begin
     Result := FSize = 0;
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.LoadFromBuffer(const ABuffer; ACount: Integer);
    begin
     FPosition := 0;
     Write(ABuffer, ACount);
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.LoadFromFile(const AFileName: string);
    var
     Stream: TStream;
    begin
     Stream := TFileStream.Create(AFileName, fmOpenRead);
     try
       LoadFromStream(Stream);
     finally
       Stream.Free;
     end;
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.LoadFromStream(AStream: TStream);
    begin
     if AStream.Size = 0 then
       Exit;
     SetSize(AStream.Size);
     Seek(0, soBeginning);
     CopyFrom(AStream, 0);
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.Read(var ABuffer; ACount: Integer): Longint;
    begin
     if (FPosition >= 0) and (ACount >= 0) then
     begin
       Result := FSize - FPosition;
       if Result > 0 then
       begin
         if Result > ACount then Result := ACount;
         Move((PByte(FHead) + FPosition)^, ABuffer, Result);
         Inc(FPosition, Result);
         Exit;
       end;
     end;
     Result := 0;
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.SaveToFile(const AFileName: string);
    var
     Stream: TStream;
    begin
     Stream := TFileStream.Create(AFileName, fmCreate);
     try
       SaveToStream(Stream);
     finally
       Stream.Free;
     end;
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.SaveToStream(AStream: TStream);
    begin
     if FSize <> 0 then AStream.WriteBuffer(FHead^, FSize);
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.Seek(const AOffset: Int64;
     AOrigin: TSeekOrigin): Int64;
    begin
     case AOrigin of
       soBeginning: FPosition := AOffset;
       soCurrent: Inc(FPosition, AOffset);
       soEnd: FPosition := FSize + AOffset;
     end;
     Result := FPosition;
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.SetSize(const ANewSize: Int64);
    begin
     if FSize <> ANewSize then
       begin
         if FSize < ANewSize then
           Expand(ANewSize - FSize)
         else
           begin
             FSize := ANewSize;
           end;
         if FPosition > FSize then
           FPosition := FSize;
       end;
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.Shrink(ACount: NativeInt): NativeInt;
    begin
     Result := FSize;
     if ACount <= 0 then
       Exit;
     if ACount > FSize then ACount := FSize;
     SetSize(FSize - ACount);
     Result := FSize;
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.Tidy;
    begin
     if FHead <> FStorage then
       begin
         if FSize = 0 then
           FHead := FStorage
         else
           begin
             Move(FHead^, FStorage^, FSize);
             FHead := FStorage;
           end;
       end;
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.Write(const ABuffer; ACount: Integer): Longint;
    var
     Pos: Int64;
    begin
     Pos := FPosition + ACount;
     if Pos > 0 then
     begin
       if Pos > FSize then
         begin
           Expand(ACount);
           FSize := Pos;
         end;
       Move(ABuffer, (FHead + FPosition)^, ACount);
       FPosition := Pos;
       Result := ACount;
       exit;
     end;
     Result := 0;
    end;

    //------------------------------------------------------------------------------

    constructor TFlexibleMemoryStream.Create(const ABuffer; ACount: Integer);
    begin
     Create(ACount);
     LoadFromBuffer(ABuffer, ACount);
    end;

    //------------------------------------------------------------------------------

    constructor TFlexibleMemoryStream.Create(const AStream: TStream);
    begin
     Create(AStream.Size);
     LoadFromStream(AStream);
    end;

    //------------------------------------------------------------------------------

    end.

  • brother © (09.10.12 12:10) [13]
    > Почему? Я не жду ответа.

    -1 или всеж 0?
  • brother © (09.10.12 12:10) [14]
    или 1
  • olikke (09.10.12 16:23) [15]
    Новые подробности:


    > Метод  idHTTP1.Get(URL, xStream) вообще никогда не вернет
    > управление.


    Действительно так. И это видимо нехорошо. Обратиться к потоку (ну чтоб закрыть его) я могу только так например:


    function TSt.LiveStreamWrite(const aBuffer; aCount: Integer): Longint;
    var str:string;
    begin
     
     if fl_Stop then
     begin
       idHTTP1.Disconnect;
       exit;
     end;
     
       Buffer[Numb].Lock;
     try
       Buffer[Numb].Size:=0;
       Buffer[Numb].Write(aBuffer,aCount);
       Buffer[Numb].Position:=0;
     finally
       Buffer[Numb].UnLock;
     end;
     Event[Numb].SetEvent;
    end;




    Однако ошибка возникает всё-таки не в



    while (not fl_STOP) and (not fl_st[Numb]) do
    begin
      try idHTTP1.Get(URL, xStream);
      finally
       { вывод сообщения  в лог - при зависании потока не возникает, только   при idHTTP1.Disconnect}
     end;
    end;



    Ошибка появляется только когда данные из Buffer я начинаю выдавать через UDPClient. Как только делаю коммент - всё работает спокойно.

    procedure TUDP.Execute;
    var buff:string;
    begin
     FreeOnTerminate:=true;
     //Создание клиента для передачи данных
     UDPClient:=TIdUDPClient.Create;
     UDPClient.BufferSize:=$8800;
     UDPClient.IPVersion:=Id_IPv4;
     UDPClient.Port:=Port;
     UDPClient.ReceiveTimeout:=-1;
     UDPClient.Tag:=0;
     UDPClient.BroadcastEnabled:=true;
     UDPClient.Host:=Table[Numb].Broadcast;
     try UDPClient.Active:=True;
     except end;
     while not Terminated do
     begin
       Event[Numb].WaitFor(INFINITE);
       Event[Numb].ResetEvent;
       if terminated then
       begin
         UDPClient.Active:=false;
         UDPClient.Free;
         buff:='';
         exit;
       end;
       buff:='';
         Buffer[Numb].Lock;
       try
         Buff:=Buffer[Numb].DataString;
       finally
         Buffer[Numb].UnLock;
       end;
    { закомментила специально
         try UDPClient.Send(TRIM(Table[Numb].Broadcast),UDPClient.Port,buff);}

       except
      // вывод сообщения - не возникает никогда
       end;
       buff:='';
     end;
    end;

    Я прекрасно понимаю, что для корректной работы, в частности при разрывах соединения и пр. ошибках, мне придётся отказаться от халявного idHTTP. Но в данной ситуации проблема явно не в нём, а в связке приёма по HTTP и передече данных по UDP.

    Код XStream (автора не помню), на всякий случай:

    unit LiveStreamer;

    interface

    uses
     Classes;

    type
     TLSOnRead = function(var vBuffer; aCount: Longint): Longint of object;
     TLSOnWrite = function(const aBuffer; aCount: Longint): Longint of object;

     TLiveStream = class(TStream)
     protected
       fOnRead: TLSOnRead;
       fOnWrite: TLSOnWrite;
       // We dont override and just ignore SetSize. Some callers might try to preallocate
       // size. Because of this we just ignore instead of throwing an error.
       // procedure SetSize(NewSize: Longint); override;
       // procedure SetSize(const NewSize: Int64); override;
     public
       constructor Create(aOnRead: TLSOnRead; aOnWrite: TLSOnWrite);
       function Read(var vBuffer; aCount: Longint): Longint; override;
       function Write(const aBuffer; aCount: Longint): Longint; override;
       // We dont implement seek either, we catch all data...
       function Seek(const aOffset: Int64; aOrigin: TSeekOrigin): Int64; override;
     end;

    implementation

    uses
     SysUtils;

    { TLiveStream }

    constructor TLiveStream.Create(aOnRead: TLSOnRead; aOnWrite: TLSOnWrite);
    begin
     inherited Create;
     fOnRead := aOnRead;
     fOnWrite := aOnWrite;
    end;

    function TLiveStream.Read(var vBuffer; aCount: Integer): Longint;
    begin
     if Assigned(fOnRead) then begin
       Result := fOnRead(vBuffer, aCount);
     end else begin
       // If not handled, we just ignore and say we handled it all so we dont err out caller
       Result := aCount;
     end;
    end;

    function TLiveStream.Seek(const aOffset: Int64; aOrigin: TSeekOrigin): Int64;
    begin
     // Must return 0 - ie no position. If not stack overflow becuase how TStream.Seek is
     // implemented.
     Result := 0;
    end;

    function TLiveStream.Write(const aBuffer; aCount: Integer): Longint;
    begin
     if Assigned(fOnWrite) then begin
       Result := fOnWrite(aBuffer, aCount);
     end else begin
       // If not handled, we just ignore and say we handled it all so we dont err out caller
       Result := aCount;
     end;
    end;

    end.

  • DVM © (09.10.12 17:31) [16]

    > olikke   (09.10.12 16:23) [15]


    > Ошибка появляется только когда данные из Buffer я начинаю
    > выдавать через UDPClient. Как только делаю коммент - всё
    > работает спокойно.

    Я уже неоднократно намекал на потокобезопасность приемного буфера (т.е. твоего TLiveStream). Он не потокобезопасный, а обращаешься ты к нему из дух потоков. Первый всегда в него пишет (метод Get выполнение которого никогда не прекращается) и второй поток пытается обратиться к TLiveStream для чтения. Тут скорее всего и возникают грабли. Надо все методы твоего TLiveStream обернуть в крит секции, типа того:


    ////////////////////////////////////////////////////////////////////////////////
    //  TThreadSafeStreamProxy
    ////////////////////////////////////////////////////////////////////////////////

     TThreadSafeStreamProxy = class(TStreamProxy)
     private
       FCS: TCriticalSection;
     public
       procedure Lock;
       procedure Unlock;
       constructor Create(AStream: TStream; AOwnsStream: Boolean = False);
       destructor Destroy; override;
       function Read(var Buffer; Count: Integer): Longint; override;
       function Write(const Buffer; Count: Integer): Longint; override;
       function Seek(Offset: Longint; Origin: Word): Longint; override;
       function Seek(const Offset: Int64; Origin: TSeekOrigin): Int64; override;
     end;

    ...

    constructor TThreadSafeStreamProxy.Create(AStream: TStream;
     AOwnsStream: Boolean);
    begin
     FCS := TCriticalSection.Create;
     inherited Create(AStream, AOwnsStream);
    end;

    //------------------------------------------------------------------------------

    destructor TThreadSafeStreamProxy.Destroy;
    begin
     Lock;
     try
       inherited Destroy;
     finally
       Unlock;
       FCS.Free;
     end;
    end;

    //------------------------------------------------------------------------------

    procedure TThreadSafeStreamProxy.Lock;
    begin
     FCS.Enter;
    end;

    //------------------------------------------------------------------------------

    function TThreadSafeStreamProxy.Read(var Buffer; Count: Integer): Longint;
    begin
     Lock;
     try
       Result := inherited Read(Buffer, Count);
     finally
       Unlock;
     end;
    end;

    //------------------------------------------------------------------------------

    function TThreadSafeStreamProxy.Seek(Offset: Integer; Origin: Word): Longint;
    begin
     Lock;
     try
       Result := inherited Seek(Offset, Origin);
     finally
       Unlock;
     end;
    end;

    //------------------------------------------------------------------------------

    function TThreadSafeStreamProxy.Seek(const Offset: Int64;
     Origin: TSeekOrigin): Int64;
    begin
     Lock;
     try
       Result := inherited Seek(Offset, Origin);
     finally
       Unlock;
     end;
    end;

    //------------------------------------------------------------------------------

    procedure TThreadSafeStreamProxy.Unlock;
    begin
     FCS.Leave;
    end;

    //------------------------------------------------------------------------------

    function TThreadSafeStreamProxy.Write(const Buffer; Count: Integer): Longint;
    begin
     Lock;
     try
       Result := inherited Write(Buffer, Count);
     finally
       Unlock;
     end;
    end;

  • DVM © (09.10.12 17:34) [17]
    Здесь TStreamProxy еще один мой класс, но не суть важно, тебе нужен наследник TStream, который кроме нужной тебе функциональности переопределяет все перечисленные в моем примере методы оригинального TStream и оборачивает их крит секциями.
  • olikke (09.10.12 23:40) [18]
    Сделала критические секции. Но только в данном случае они мне ни к чему, т.к. в данном случае используется только метод Write

    function TSt.LiveStreamWrite(const aBuffer; aCount: Integer): Longint;



    в котором данные из приёмного буфера idHTTP перебрасываются в Buffer.

    Buffer[Numb].Lock;
     try
       Buffer[Numb].Size:=0;
       Buffer[Numb].Write(aBuffer,aCount);
       Buffer[Numb].Position:=0;
     finally
       Buffer[Numb].UnLock;



    Затем ставиться Event для UDP потока, который работает уже с Buffer.
  • DVM © (09.10.12 23:53) [19]

    > olikke   (09.10.12 23:40) [18]
    > Сделала критические секции. Но только в данном случае они
    > мне ни к чему

    Критические секции нужны там, где есть общие данные, используемые 2-мя и более потоками. Насколько я понимаю, у тебя все же есть несинхронизованные места. Вот смотри. TIdHTTP читает в своем потоке данные в TLiveStream. При записи в него генерится событие, в котором ты, как говоришь перебрасываешь данные в буфер. Момент переброски в буфер должен быть обернут в критическую секцию. Далее отсылка этого буфера или любая его обработка в другом потоке тоже должна этот буфер блокировать, иначе пока ты его будешь отсылать, TIdHTTP опять попробует модифицировать его. Нет?
  • olikke (10.10.12 00:04) [20]

    >  Момент переброски в буфер должен быть обернут в критическую
    > секцию

    Сам TLiveStream обёрнут как в [16]
    function TLiveStream.Write(const aBuffer; aCount: Integer): Longint;
    begin
     Lock;
     try
       if Assigned(fOnWrite) then begin
         Result := fOnWrite(aBuffer, aCount);
       end else begin
         Result := aCount;
       end;
     finally
       UnLock;
     end;
    end;



    Буфер также обёрнут в потоке приёма данных по http
    Buffer[Numb].Lock;
     try
       Buffer[Numb].Size:=0;
       Buffer[Numb].Write(aBuffer,aCount);
       Buffer[Numb].Position:=0;
     finally
       Buffer[Numb].UnLock;
     end;



    И в единственном месте где он читается - в потоке UDP
    Buffer[Numb].Lock;
       try
         Buff:=Buffer[Numb].DataString;
       finally
         Buffer[Numb].UnLock;
       end;

  • olikke (10.10.12 00:16) [21]
    Я не понимаю вот чего:
    Есть у меня два потока  - Tst и UDP. Единственное чем они связаны - общим защищённым буфером и TEvent для пробуждения UDP.
    В такой связке они не могут друг другу мешать жить.
    Почему тогда:
    1) Если UDP просыпается по Event, читает буфер в промежуточную переменную (buff:String), но не делает непосредственно отправки -
    UDPClient.Send(TRIM(Table[Numb].Broadcast),UDPClient.Port,buff);

    , то всё работает сутками при всех недостатках idHTTP для приёма данных этого типа.
    2) А если всё то же, но отправка по UDP есть, то всё медленно умирает.
  • DVM © (10.10.12 00:31) [22]

    > olikke   (10.10.12 00:04) [20]

    Поставь отправку сразу за строкой Buff:=Buffer[Numb].DataString; внутри крит секции. Есть тут у меня одно подозрение.
  • olikke (10.10.12 00:35) [23]
    Ok. Завтра на работе проверю.
    И большое спасибо за ваше внимание.
  • olikke (10.10.12 12:25) [24]
    Поставила отправку UDP внутри критической секции.
    Видеосервер работал 30 минут. За это время отправлено 149491 пакетов по UDP, 73 пакета потеряны. В моём логе это выглядит так:

    { чтение данных TSt.LiveStreamWrite, запись в Buffer, установка tEvent }
    10:32:07:640 LiveStreamwrite 32768    
    {отправка данных UDPClient.Send(TRIM(Table[Numb].Broadcast),UDPClient.Port,buff);
    Если бы было исключение при отправке, то оно было бы здесь}

    10:32:07:656     UDP Execute 32768    
    10:32:07:671 LiveStreamwrite 32768
    10:32:07:671     UDP Execute 32768
    {Ну а вот пример пропуска фрейма. Меня он не смущает, т.к. бывает редко, и
    клиенты-приемники UDP потока умеют с ними справляться}

    10:32:07:671 LiveStreamwrite 32768  
    10:32:07:703 LiveStreamwrite 32768
    10:32:07:703     UDP Execute 32768
    10:32:07:703 LiveStreamwrite 32768
    10:32:07:718     UDP Execute 32768



    Последнее действие в логе - отправка данных по UDP, вот его окончание

    11:03:36:531 LiveStreamwrite 32768
    11:03:36:531     UDP Execute 32768
    11:03:36:562 LiveStreamwrite 32768
    11:03:36:562     UDP Execute 32768
    11:03:36:562 LiveStreamwrite 32768
    11:03:36:562     UDP Execute 32768



    Исключений нет. После этого событие TSt.LiveStreamWrite не наступает никогда.

    какое было подозрение?
  • DVM © (10.10.12 15:08) [25]

    > olikke   (10.10.12 12:25) [24]

    а у тебя логгирование потокобезопасное?
  • olikke (10.10.12 15:34) [26]
    Лог в критических секциях.
  • olikke (10.10.12 16:33) [27]
    Поскольку как я понимаю, с помощью индейского http добиться причины ошибки не удается, сделала следующий шаг:
    1. Взяла заведомо рабочий код, т.е. DVM Thttpinputthread из ветки " передача видео и звука с помощью indy".  
    2. Немного изменила - поставила в Thttpinputthread.readdata прямо отправку всех полученных данных (без выделения кадра) через idudpclient. Ну и вобработках всех исключений вписала лог. ( это только чтоб разобраться в причинах ошибки).
    3. Получила ошибку в thttpinputthread.readdata на функции found = select(...).    Found вернулся =0!!! Это при таймауте в 20 сек.
    4. Далее понятно socketdisconnect (вот теперь я на собственном опыте убедилась чем удобен winsock) и создание его заново.

    Чем был занят мой сокет в течение 20 сек?
  • olikke (10.10.12 16:38) [28]
    Какой минимальный таймаут на функцию select будет корректно поставить? Если я планирую получать поток порядка 20 кадров в сек, я могу поставить хотя бы 0.1 сек?
  • DVM © (11.10.12 11:04) [29]

    > olikke   (10.10.12 16:33) [27]

    Ну я самого начала и говорил, что multipart/x-mixed-replace не поддерживается TIdHTTP и Реми Лебо писал как то про это. Так что либо чистые сокеты, либо TIdTCP - с ним проблем нет тоже.


    > 3. Получила ошибку в thttpinputthread.readdata на функции
    > found = select(...).    Found вернулся =0!!! Это при таймауте
    > в 20 сек.

    Очень странно. И что это регулярно происходит со всем камерами?


    > Какой минимальный таймаут на функцию select будет корректно
    > поставить? Если я планирую получать поток порядка 20 кадров
    > в сек, я могу поставить хотя бы 0.1 сек?

    20 секунд и оставь, он не связан никак с фпс.
  • olikke (11.10.12 13:04) [30]

    > DVM ©   (11.10.12 11:04) [29]



    > 20 секунд и оставь, он не связан никак с фпс.


    Используя чистые сокеты я могу обработать ошибку
    > found = select(...).

    И соответственно переподключиться к моей камере максимально быстро. Если я уверена, что в нормальном режиме работы у меня кадры приходят с частотой 20 фпс, зачем мне ждать 20 сек? Жду 0.1 сек, создаю новый сокет и получаю поток дальше. Картинка на клиентах замораживается на 0,5 сек, что конечно некрасиво, но всё же лучше, чем было до этого.


    > Очень странно. И что это регулярно происходит со всем камерами?

    Ну реально к рабочей машине у меня сейчас подключена только одна Axis P1344. Я решила сначала добиться результата от неё.
    Частота вылетов в функции select напрямую зависит от частоты фрймов в потоке. Т.Е. если я ставлю resolution=1024x768&compression=20&fps=30 (фактически максимум для этой камеры), то вылетаю часто (~раза в минуту). Ну это я конечно для проверки такие параметры выставила.
    Сейчас установила B>resolution=1024x768&compression=40&fps=25, получаю ошибку раз в пол-часа (примерно). Т.е. пару раз в час моя картинка подвисает на 0,5 сек.
  • DVM © (11.10.12 13:48) [31]

    > olikke   (11.10.12 13:04) [30]
    >


    > Жду 0.1 сек, создаю новый сокет и получаю поток дальше.
    > Картинка на клиентах замораживается на 0,5 сек, что конечно
    > некрасиво, но всё же лучше, чем было до этого.

    Не, так мало ждать нельзя. Если будет загружена сеть, загружен компьютер, загружена камера, медленная сеть или интернет и т.д., то вполне вероятно, что некоторое время данные в сокет поступать не будут и select будет ждать их. Да и надо ждать!. Если FPS камеры снизится из-за нагрузки на нее (а он снизится может), то пауза между кадрами и пакетами возрастет и твоя программа постоянно только и будет делать что отключаться и подключаться к камерам, что в результате приведет к исчерпанию лимита на соединения (так как система отдает сокеты обратно не сразу).

    Ты борешься с симптомами, а надо искать причину по которой данные перестают поступать от камеры столь длительне время. Ты говоришь, что проблема возникает когда ты включаешь ретрансляцию по UDP. Может эти широковещательные пакеты забивают всю сеть? Может есть какой то антифирус или файерволл? Их надо обязательно отключать при работе с камерами или добавлять программу в исключения.
  • olikke (11.10.12 16:39) [32]
    Антивирус и брандмауэр отключены.

    Настройки моей сети
    IP камера: ip 192.168.2.4 mask 255.255.255.0 100Мбит
    Видеосервер: ip 192.168.2.1 mask 255.255.255.128 1Гбит
    Клиент: ip 192.168.2.7 mask 255.255.255.128 1Гбит
    всё через коммутатор D-Link DGS-1060 1Гбит.

    Ретранслирую UDP на адрес 192.168.2.127 - данные от камеры перестают поступать.
    Ретранслирую UDP прямо клиенту 192.168.2.7 - всё работает.

    Мне очень стыдно. Но я не понимаю в чём дело....

    Самое главное, что я не хочу передавать udp для каждого клиента отдельно. Почему же не работает Broadcast?
  • DVM © (11.10.12 18:00) [33]

    > olikke   (11.10.12 16:39) [32]


    > Ретранслирую UDP на адрес 192.168.2.127 - данные от камеры
    > перестают поступать.

    А в тот момент когда данные с камеры перестают поступать она вообще доступна, через браузер например с нее видеопоток идет? Она случайно не перезагружается там?

    А порт UDP не пробовала менять? Может выбранный порт камера слушает и как то неадекватно реагирует на кучу пакетов. Ее бы попробовать отрезать от широковещательного UDP трафика твоего и посмотреть как она себя поведет.
  • olikke (12.10.12 11:55) [34]
    Порты я пробовала менять, не помогает.

    Тестировала сегодня свою систему и вот что интересного у меня получилось:

    Включила одновременно браузер (IE6) и свою программу с ретрансляцией UDP broadcast. Как обычнов моей программе начала появляться ошибка на функции select. При этом с браузером было вот что: видеопоток на нем один раз оборвался, причём вылезла ошибка что-то типа "не может быть произведён reconnect" (помогло обновление страницы). И ещё сниффером заметила, что порт подключения браузера к камере несколько раз изменился. Вывод: IP камера действительно обрывала поток (ошибки у меня и браузера возникали асинхронно). Браузер в этом случае предпринимал попытку переподключиться к ней самостоятельно, и только один раз для этого потребовалось участие пользователя (пока я это писала браузер уже два раза автоматически переподключился).
    Т.е. источник проблемы ясен.
    Теперь ищем причину.
    Как распространяется широковещательное сообщение по локальной сети?
    1:Широковещательный адрес распознаётся по IP-адресу, в котором все биты хоста установлены в 1.
    2:При широковещательной передаче все сетевые адаптеры, находящиеся в подсети соответствующей широковещательному запросу должны проверить нужен ли им входящий пакет. Для этого каждый широковещательный пакет должен быть принят и рассмотрен на транспортном уровне (UDP), где прописан порт-получатель. Только после этого пакет может быть принят или отторгнут.
    Здесь нужно обязательно добавить, что IP камеры 100МБитные, в отличие от остальных устройств в сети. А поток я сних беру немаленький + ретрансляция по UDP, итого получаю порядка 9-11% загрузки ГБитной сети.
    Именно поэтому я установила разные подсети для группы клиентов + видеосервер и для группы камер. Т.е. для сетевого подключения с маской 255.255.255.0 (для IP камер) пакет с адресом 192.168.2.127 не может быть широковещательным. В то время как для видеосервера и его клиентов маска выбрана 255.255.255.128. Таким образом я была твердо убеждена что мои широковещательные запросы не будут забивать сетевые подключения камер.
    Но сегодня мой мир перевернулся.
    Широковещательный запрос с адресом 192.168.2.127 проходит на сетевое подключение моей камеры (ip 192.168.2.4 mask 255.255.255.0)!!!!!
    Вывод:
    Как правильно предложил DVM, надо отрезать группу камер от широковещательного трафика. Для этого я хочу попробовать 2 варианта:
    1. Маршрутизатор.
    2.Ретрансляция не широковещательных запросов, а групповых.
  • DVM © (12.10.12 16:22) [35]

    > olikke   (12.10.12 11:55) [34]


    > широковещательный пакет должен быть принят и рассмотрен
    > на транспортном уровне (UDP), где прописан порт-получатель.
    >  Только после этого пакет может быть принят или отторгнут.
    >
    > Здесь нужно обязательно добавить, что IP камеры 100МБитные,
    >  в отличие от остальных устройств в сети. А поток я сних
    > беру немаленький + ретрансляция по UDP, итого получаю порядка
    > 9-11% загрузки ГБитной сети.

    Вот я и говорю, завалила ты камеры пакетами. Найди такую же камеру в интернет и попробуй с ней. Можно еще сделать так, на сайте аксиса есть эмулятор камер, его можно скачать, дать ему записи с камер и заменить камеры им, и запустить его на мощном ПК. Если надо дам ссылку на него.
  • olikke (12.10.12 22:32) [36]
    DVM, эмулятор камер был бы мне очень полезен. Если не затруднит дайте ссылку.
  • DVM © (15.10.12 23:06) [37]

    > olikke   (12.10.12 22:32) [36]


    > DVM, эмулятор камер был бы мне очень полезен

    как оказалось, он на их сайте находится в закрытом разделе для разработчиков
 
Конференция "Сети" » Видеопоток по idhttp. Зависание потока [D6, WinXP]
Есть новые Нет новых   [134430   +43][b:0][p:0.014]