Конференция "Сети" » Видеопоток по idhttp. Зависание потока [D6, WinXP]
 
  • olikke (08.10.12 12:04) [0]
    Здавствуйте!

    Задача:
    Есть несколько IP камер Axis (P1344) и бесконечное количество клиентов для них. Клиент может подключиться к любой из камер, а также к нескольким из них.
    Поскольку к камерам напрямую подключаться нельзя, нужно создать что-то типа видеосервера. В задачу видеосервера входит получение от всех камер видеопотока по http и выдача их в сеть клиентам (UDP Broadcast - все клиенты в одной локальной  сети).
    Как делаю:
    Для каждой камеры создаётся свой экземпляр TThread, внутри которого живёт idHTTP. Ну и естественно свой UDPClient (тоже использую Indy10).

    Собственно код потока для HTTP:

    unit THttp;

    interface

    uses
     Windows,Classes,SysUtils,IdHTTP,SyncObjs,UDP;

    type
     TSt = class(TThread)
     private
       idHTTP1:TidHTTP;
       UDPClient1:TUDP;
       xStream: TStream;
     protected
       function LiveStreamWrite(const aBuffer; aCount: Longint): Longint;
       procedure Execute; override;
     public
       URL:String;
       Numb:byte;
       FEvent:TEvent;
     end;

    implementation
    uses
     LiveStreamer,Global;

    procedure TSt.Execute;
    begin
     FreeOnTerminate:=true;
     idHTTP1:=TidHTTP.Create(nil);
     idHTTP1.Request:=TidHTTPRequest.Create(idHTTP1);
     idHTTP1.Request.BasicAuthentication:=true;
     idHTTP1.Request.Password:='****';
     idHTTP1.Request.Username:='****';
     idHTTP1.HTTPOptions:=[hoForceEncodeParams];
     idHTTp1.Request.Connection:='Keep-Alive';
     UDPClient1:=TUDP.Create(True);
     UDPClient1.Priority:=tpNormal;
     UDPClient1.FreeOnTerminate:=True;
     UDPClient1.Port:=Table[Numb].Port;
     UDPClient1.Numb:=Numb;
     UDPClient1.Resume;
     xStream := TLiveStream.Create(nil, LiveStreamWrite);
     xStream.Size:=$8800;
     XStream.Position:=0;
     while (not fl_STOP) and (not fl_st[Numb]) do
     begin
       try idHTTP1.Get(URL, xStream);
       finally  end;
     end;
     FreeAndNil(xStream);
     UDPCLient1.Terminate;
     Event[numb].Setevent;
     UDPClient1:=nil;
    end;

    function TSt.LiveStreamWrite(const aBuffer; aCount: Integer): Longint;
    begin
       Buffer[Numb].Lock;
     try
       Buffer[Numb].Size:=0;
       Buffer[Numb].Write(aBuffer,aCount);
       Buffer[Numb].Position:=0;
     finally
       Buffer[Numb].UnLock;
     end;
     Event[Numb].SetEvent;
    end;

    end.



    Код потока для UDP:

    unit UDP;

    interface

    uses
     Classes,IdUDPClient,IdGlobal,SyncObjs,SysUtils;

    type
     TUDP = class(TThread)
     private
       UDPClient:TidUDPClient;
     protected
       procedure Execute; override;
     public
       Port:integer;
       HOST:string;
       Numb:byte;
     end;

    implementation

    uses Global;

    procedure TUDP.Execute;
    var buff:string;
    begin
     FreeOnTerminate:=true;
     //Создание клиента для передачи данных
     UDPClient:=TIdUDPClient.Create;
     UDPClient.BufferSize:=$8800;
     UDPClient.IPVersion:=Id_IPv4;
     UDPClient.Port:=Port;
     UDPClient.ReceiveTimeout:=-1;
     UDPClient.Tag:=0;
     UDPClient.BroadcastEnabled:=true;
     UDPClient.Host:=Table[Numb].Broadcast;
     try UDPClient.Active:=True;
     except end;
     while not Terminated do
     begin
       Event[Numb].WaitFor(INFINITE);
       Event[Numb].ResetEvent;
       if terminated then
       begin
         UDPClient.Active:=false;
         UDPClient.Free;
         buff:='';
         exit;
       end;
       buff:='';
         Buffer[Numb].Lock;
       try
         Buff:=Buffer[Numb].DataString;
       finally
         Buffer[Numb].UnLock;
       end;
       try UDPClient.Send(TRIM(Table[Numb].Broadcast),UDPClient.Port,buff);
       except
       end;
       buff:='';
     end;
    end;

    end.



    Так вот, вся эта зараза прекрасно работает. Клиенты смотрят картинки с камер.
    Но где-то через час-два каринтка пропадает. Точнее от IP камеры пропадает поток (смотрела сниффером). А значит экземпояр потока TSt, привязанного к этой камере бесконечно ждёт продолжения кино (висит).

    У меня два вопроса:
    1. Почему пропадает поток от IP камеры (мой косяк или её?)
    2. Как мне грамотно прибить поток висящий? Кроме TerminateThread до него ничем не достучаться. Но в этом случае все мои ресурсы остаются незакрытыми. Да и неправильно это как-то....

    СПАСИТЕ МЕНЯ!!!
  • RWolf © (08.10.12 12:08) [1]
    > А значит экземпояр потока TSt, привязанного к этой камере бесконечно ждёт продолжения кино (висит).

    ну так пусть ждёт не бесконечно.
  • olikke (08.10.12 12:25) [2]
    Уважаемый RWolf, в том то и вопрос, что нельзя чтоб он висел. Надо либо понять почему он виснет, либо перезапустить его. Я уже думала о создании следящего потока, но поскольку зависание внутри цикла в execute мне корректно не закрыться. И вообще задача максимум обойтись без этого.
  • brother © (08.10.12 16:00) [3]
    > UDPClient.ReceiveTimeout:=-1;

    меня это смущает
  • olikke (08.10.12 21:29) [4]
    To brother
    Почему? Я не жду ответа.
  • DVM © (08.10.12 23:34) [5]

    > olikke   (08.10.12 12:04) 


    >  while (not fl_STOP) and (not fl_st[Numb]) do
    >  begin
    >    try idHTTP1.Get(URL, xStream);
    >    finally  end;
    >  end;

    URL какой?
  • DVM © (08.10.12 23:38) [6]

    > olikke   (08.10.12 12:04) 

    вообще конечно недочетов в коде огромное количество.

    вот например, что будет если произойдет ошибка в idHTTP1.Get(URL, xStream);
    Поток умрет просто. Тихо без выброса исключений.
  • olikke (09.10.12 10:04) [7]
  • DVM © (09.10.12 11:25) [8]

    > olikke   (09.10.12 10:04) [7]

    Это multipart/x-mixed-replace данные, TIDHttp вообще не поддерживает до Indy 10 включительно данный Content-Type. Поддержка обещается в будущих версиях. Для нормальной работы с данным типом данных нужно использовать TIdTCPClient  (ну или чистые сокеты) и самому реализовывать разбор потока.

    Этот поток не имеет конца, данные будут поступать и поступать до тех пор пока клиент не оборвет соединение. Я не понимаю (но догадываюсь) как это у вас может работать, если вы читаете ответ сервера таким образом:

    while (not fl_STOP) and (not fl_st[Numb]) do
    begin
      try idHTTP1.Get(URL, xStream);
      finally  end;
    end;



    Метод  idHTTP1.Get(URL, xStream) вообще никогда не вернет управление. Я догадываюсь, что xStream у вас какой то хитрый наследник TStream (кстати, потокозащищенный?) в котором как то обрабатываются поступающие данные, но так как кода его нет, что больше ничего сказать не могу.

    Короче говоря, при чтении вероятно возникает ошибка, ваш поток TSt умирает и все. При ошибке в TIdHTTP надо закрывать сокет самостоятельно, тем более что у вас  idHTTp1.Request.Connection:='Keep-Alive';
  • olikke (09.10.12 11:42) [9]
    Спасибо, DVM, за ответ. Очень похоже на то что происходит у меня. Видела тему где вы подробно разбирали реализацию данного запроса (buffer у меня ваш tsavebuffer). Буду смотреть в эту сторону.
  • DVM © (09.10.12 12:07) [10]

    > buffer у меня ваш tsavebuffer

    В принципе можно сделать и наследника TStream по аналогии с тем буфером, т.е. получить TMemoryStream у которого можно удалять данные из начала без лишних телодвижений с памятью. Вот у меня есть подобный (правда не тестировал) в него можно писать с одной стороны, а забирать данные с другой (не забывая о синхронизации если программа многопоточная)


    ////////////////////////////////////////////////////////////////////////////////
    //
    //  ****************************************************************************
    //  * Модуль       : uFlexibleMemoryStream.pas
    //  * Назначение   : Расширенный Memory Stream
    //  * Copyright    : 2012 © Дмитрий Муратов (dvmuratov@yandex.ru)
    //  ****************************************************************************
    //
    ////////////////////////////////////////////////////////////////////////////////

    {
     Данный класс представляет собой расширенный аналог TMemoryStream,
     главным отличием которого от оригинального является возможность удаления данных
     из начала. При таком удалении оставшиеся данные не перемещаются в памяти,
     перемещается лишь указатель на них (Head), и соответственно
     уменьшается Position. При добавлении данных проверяется есть ли свободное место
     в уже выделенном буфере перед или за полезными данными и если есть, оно используется
     по возможности. Если в буфере недостаточно свободного (Unused) места, то происходит
     перевыделение памяти под буфер и копирование содержимого старого буфера в новый.
     Также данный класс обладает некоторыми дополнительными методами, повышающими
     удобство работы.

     Схематично хранение данных в данном Stream можно представить:

     +--------------+---------------------------------+--------------+
     | Unused space |   Data                          | Unused space |
     +--------------+---------------------------------+--------------+
     ^              ^          ^                      ^
     Storage        Head       Position               Tail

     Storage - указатель на начало внутреннего буфера
     Head - указатель на начало данных (аналог Memory в TMemoryStream)
     Tail - указаетль на конец данных

    }


    {$I DEFINES.INC}

    unit uFlexibleMemoryStream;

    interface

    uses
    {$IFDEF DELPHIXE2_UP}
     System.SysUtils, System.Classes, System.Math;
    {$ELSE}
     SysUtils, Classes, Math;
    {$ENDIF}

    type
     TFlexibleMemoryStream = class(TStream)
     const
       DefAllocation = 1024;
     private
       FStorage: PByte;          // Указатель на начало буфера
       FHead: PByte;             // Указатель на начало полезных данных в буфере
       FAllocation: NativeInt;   // Общее количество выделенной под буфер памяти
       FSize: NativeInt;         // Размер полезных данных
       FPosition: NativeInt;     // Текущая позиция в полезных данных (смещение от FHead)
       function GetTail: PByte;
     public
       constructor Create; overload;
       constructor Create(const AAllocation: NativeInt); overload;
       constructor Create(const ABuffer; ACount: Longint); overload;
       constructor Create(const AStream: TStream); overload;
       destructor Destroy; override;
       procedure Clear;
       procedure Tidy;
       procedure Expand(const ACount: NativeInt); virtual;
       function Consume(ACount: NativeInt): NativeInt; virtual;
       function Extract(ACount: NativeInt): PByte;
       function Append(const ABuffer; ACount: Longint): Longint; overload; virtual;
       function Append(const AStream: TStream): Int64; overload; virtual;
       function Shrink(ACount: NativeInt): NativeInt; virtual;
       procedure SetSize(const ANewSize: Int64); override;
       procedure LoadFromStream(AStream: TStream); virtual;
       procedure LoadFromBuffer(const ABuffer; ACount: Longint); virtual;
       procedure LoadFromFile(const AFileName: string); virtual;
       procedure SaveToStream(AStream: TStream); virtual;
       procedure SaveToFile(const AFileName: string); virtual;
       function Seek(const AOffset: Int64; AOrigin: TSeekOrigin): Int64; override;
       function Read(var ABuffer; ACount: Longint): Longint; override;
       function Write(const ABuffer; ACount: Longint): Longint; override;
       function Pos(const ABuffer; ACount: Longint; AOffset: NativeInt = 0): NativeInt; virtual;
       function IsEmpty: boolean;
       property Head: PByte read FHead;
       property Tail: PByte read GetTail;
       property Storage: PByte read FStorage;
       property Allocation: NativeInt read FAllocation;
     end;


  • DVM © (09.10.12 12:07) [11]

    implementation

    { TFlexibleMemoryStream }

    function TFlexibleMemoryStream.Append(const ABuffer; ACount: Integer): Longint;
    begin
     FPosition := FSize;
     Result := Write(ABuffer, ACount);
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.Append(const AStream: TStream): Int64;
    begin
     Result := 0;
     if AStream.Size = 0 then
       Exit;
     Expand(AStream.Size);
     Seek(0, soEnd);
     Result := CopyFrom(AStream, 0);
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.Clear;
    begin
     FSize := 0;
     FHead := FStorage;
     FPosition := 0;
    end;

    //------------------------------------------------------------------------------

    constructor TFlexibleMemoryStream.Create;
    begin
     Create(DefAllocation);
    end;

    //------------------------------------------------------------------------------

    constructor TFlexibleMemoryStream.Create(const AAllocation: NativeInt);
    begin
     inherited Create;
     FAllocation := AAllocation;
     FSize := 0;
     GetMem(FStorage, FAllocation);
     FHead := FStorage;
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.Consume(ACount: NativeInt): NativeInt;
    begin
     Extract(ACount);
     Result := FSize;
    end;

    //------------------------------------------------------------------------------

    destructor TFlexibleMemoryStream.Destroy;
    begin
     FreeMem(FStorage, FAllocation);
     inherited Destroy;
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.Expand(const ACount: NativeInt);
    var
     Spare: NativeInt;      // Незанятый объем внутреннего буфера
     HeadSpace: NativeInt;  // Незанятый объем в  начале буфера
     TailSpace: NativeInt;  // Незанятый объем в конце буфера
     OldAllocation: NativeInt;
     NewStorage: PByte;
    begin
     Spare := FAllocation - FSize;
     HeadSpace := FHead - FStorage;
     TailSpace := Spare - HeadSpace;
     // Если в буфере доставточно свободного места для добавления ACount байт
     if Spare >= ACount then
       begin
         // Если в хвосте свободного места меньше чем надо добавить
         if TailSpace < ACount then
           begin
             // Двигаем данные в начало буфера
             Move(FHead^, FStorage^, FSize);
             // Начало буфера теперь совпадает с началом данных
             FHead := FStorage;
           end
         else
           begin
             // Ничего не делаем, в хвосте достаточно места для добавления ACount байт
           end;
       end
     else
       // В буфере недостаточно места для добавления ACount байт, создаем новый буфер,
       // копируем в него данные из старого, старый удаляем.
       begin
         OldAllocation := FAllocation;

         //FAllocation := FAllocation + ACount;
         FAllocation := Max(FAllocation, ACount) * 2;

         GetMem(NewStorage, FAllocation);
         if FStorage <> nil then
           begin
             Move(FHead^, NewStorage^, FSize);
             FreeMem(FStorage, OldAllocation);
           end;
         FStorage := NewStorage;
         FHead := FStorage;
       end;
     FSize := FSize  + ACount;
    end;

    //------------------------------------------------------------------------------

  • DVM © (09.10.12 12:08) [12]

    function TFlexibleMemoryStream.Extract(ACount: NativeInt): PByte;
    begin
     Result := FHead;
     if ACount <= 0 then
       Exit;
     if ACount > FSize then
       ACount := FSize;
     FHead := FHead + ACount;
     FSize := FSize - ACount;
     FPosition := FPosition - ACount;
     if FPosition < 0  then
       FPosition := 0;
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.GetTail: PByte;
    begin
     Result := PByte(FHead + FSize);
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.Pos(const ABuffer; ACount: Longint; AOffset: NativeInt = 0): NativeInt;

     function StartsBuffer(ASubBuffer: Pointer; ASubBufferSize: Longint;
       ABuffer: Pointer; ABufferSize: NativeInt): Boolean;
     begin
       if ASubBufferSize <= ABufferSize then
         Result := CompareMem(ASubBuffer, ABuffer, ASubBufferSize)
       else
         Result := False;
     end;

    var
     i: NativeInt;
     P: PByte;
    begin
     Result := -1;
     for I := AOffset to FSize - 1 do
       begin
         P := FHead + I;
         if StartsBuffer(@ABuffer, ACount, Pointer(P), FSize - I) then
           begin
             Result := i;
             Break;
           end;
       end;
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.IsEmpty: boolean;
    begin
     Result := FSize = 0;
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.LoadFromBuffer(const ABuffer; ACount: Integer);
    begin
     FPosition := 0;
     Write(ABuffer, ACount);
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.LoadFromFile(const AFileName: string);
    var
     Stream: TStream;
    begin
     Stream := TFileStream.Create(AFileName, fmOpenRead);
     try
       LoadFromStream(Stream);
     finally
       Stream.Free;
     end;
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.LoadFromStream(AStream: TStream);
    begin
     if AStream.Size = 0 then
       Exit;
     SetSize(AStream.Size);
     Seek(0, soBeginning);
     CopyFrom(AStream, 0);
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.Read(var ABuffer; ACount: Integer): Longint;
    begin
     if (FPosition >= 0) and (ACount >= 0) then
     begin
       Result := FSize - FPosition;
       if Result > 0 then
       begin
         if Result > ACount then Result := ACount;
         Move((PByte(FHead) + FPosition)^, ABuffer, Result);
         Inc(FPosition, Result);
         Exit;
       end;
     end;
     Result := 0;
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.SaveToFile(const AFileName: string);
    var
     Stream: TStream;
    begin
     Stream := TFileStream.Create(AFileName, fmCreate);
     try
       SaveToStream(Stream);
     finally
       Stream.Free;
     end;
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.SaveToStream(AStream: TStream);
    begin
     if FSize <> 0 then AStream.WriteBuffer(FHead^, FSize);
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.Seek(const AOffset: Int64;
     AOrigin: TSeekOrigin): Int64;
    begin
     case AOrigin of
       soBeginning: FPosition := AOffset;
       soCurrent: Inc(FPosition, AOffset);
       soEnd: FPosition := FSize + AOffset;
     end;
     Result := FPosition;
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.SetSize(const ANewSize: Int64);
    begin
     if FSize <> ANewSize then
       begin
         if FSize < ANewSize then
           Expand(ANewSize - FSize)
         else
           begin
             FSize := ANewSize;
           end;
         if FPosition > FSize then
           FPosition := FSize;
       end;
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.Shrink(ACount: NativeInt): NativeInt;
    begin
     Result := FSize;
     if ACount <= 0 then
       Exit;
     if ACount > FSize then ACount := FSize;
     SetSize(FSize - ACount);
     Result := FSize;
    end;

    //------------------------------------------------------------------------------

    procedure TFlexibleMemoryStream.Tidy;
    begin
     if FHead <> FStorage then
       begin
         if FSize = 0 then
           FHead := FStorage
         else
           begin
             Move(FHead^, FStorage^, FSize);
             FHead := FStorage;
           end;
       end;
    end;

    //------------------------------------------------------------------------------

    function TFlexibleMemoryStream.Write(const ABuffer; ACount: Integer): Longint;
    var
     Pos: Int64;
    begin
     Pos := FPosition + ACount;
     if Pos > 0 then
     begin
       if Pos > FSize then
         begin
           Expand(ACount);
           FSize := Pos;
         end;
       Move(ABuffer, (FHead + FPosition)^, ACount);
       FPosition := Pos;
       Result := ACount;
       exit;
     end;
     Result := 0;
    end;

    //------------------------------------------------------------------------------

    constructor TFlexibleMemoryStream.Create(const ABuffer; ACount: Integer);
    begin
     Create(ACount);
     LoadFromBuffer(ABuffer, ACount);
    end;

    //------------------------------------------------------------------------------

    constructor TFlexibleMemoryStream.Create(const AStream: TStream);
    begin
     Create(AStream.Size);
     LoadFromStream(AStream);
    end;

    //------------------------------------------------------------------------------

    end.

  • brother © (09.10.12 12:10) [13]
    > Почему? Я не жду ответа.

    -1 или всеж 0?
  • brother © (09.10.12 12:10) [14]
    или 1
  • olikke (09.10.12 16:23) [15]
    Новые подробности:


    > Метод  idHTTP1.Get(URL, xStream) вообще никогда не вернет
    > управление.


    Действительно так. И это видимо нехорошо. Обратиться к потоку (ну чтоб закрыть его) я могу только так например:


    function TSt.LiveStreamWrite(const aBuffer; aCount: Integer): Longint;
    var str:string;
    begin
     
     if fl_Stop then
     begin
       idHTTP1.Disconnect;
       exit;
     end;
     
       Buffer[Numb].Lock;
     try
       Buffer[Numb].Size:=0;
       Buffer[Numb].Write(aBuffer,aCount);
       Buffer[Numb].Position:=0;
     finally
       Buffer[Numb].UnLock;
     end;
     Event[Numb].SetEvent;
    end;




    Однако ошибка возникает всё-таки не в



    while (not fl_STOP) and (not fl_st[Numb]) do
    begin
      try idHTTP1.Get(URL, xStream);
      finally
       { вывод сообщения  в лог - при зависании потока не возникает, только   при idHTTP1.Disconnect}
     end;
    end;



    Ошибка появляется только когда данные из Buffer я начинаю выдавать через UDPClient. Как только делаю коммент - всё работает спокойно.

    procedure TUDP.Execute;
    var buff:string;
    begin
     FreeOnTerminate:=true;
     //Создание клиента для передачи данных
     UDPClient:=TIdUDPClient.Create;
     UDPClient.BufferSize:=$8800;
     UDPClient.IPVersion:=Id_IPv4;
     UDPClient.Port:=Port;
     UDPClient.ReceiveTimeout:=-1;
     UDPClient.Tag:=0;
     UDPClient.BroadcastEnabled:=true;
     UDPClient.Host:=Table[Numb].Broadcast;
     try UDPClient.Active:=True;
     except end;
     while not Terminated do
     begin
       Event[Numb].WaitFor(INFINITE);
       Event[Numb].ResetEvent;
       if terminated then
       begin
         UDPClient.Active:=false;
         UDPClient.Free;
         buff:='';
         exit;
       end;
       buff:='';
         Buffer[Numb].Lock;
       try
         Buff:=Buffer[Numb].DataString;
       finally
         Buffer[Numb].UnLock;
       end;
    { закомментила специально
         try UDPClient.Send(TRIM(Table[Numb].Broadcast),UDPClient.Port,buff);}

       except
      // вывод сообщения - не возникает никогда
       end;
       buff:='';
     end;
    end;

    Я прекрасно понимаю, что для корректной работы, в частности при разрывах соединения и пр. ошибках, мне придётся отказаться от халявного idHTTP. Но в данной ситуации проблема явно не в нём, а в связке приёма по HTTP и передече данных по UDP.

    Код XStream (автора не помню), на всякий случай:

    unit LiveStreamer;

    interface

    uses
     Classes;

    type
     TLSOnRead = function(var vBuffer; aCount: Longint): Longint of object;
     TLSOnWrite = function(const aBuffer; aCount: Longint): Longint of object;

     TLiveStream = class(TStream)
     protected
       fOnRead: TLSOnRead;
       fOnWrite: TLSOnWrite;
       // We dont override and just ignore SetSize. Some callers might try to preallocate
       // size. Because of this we just ignore instead of throwing an error.
       // procedure SetSize(NewSize: Longint); override;
       // procedure SetSize(const NewSize: Int64); override;
     public
       constructor Create(aOnRead: TLSOnRead; aOnWrite: TLSOnWrite);
       function Read(var vBuffer; aCount: Longint): Longint; override;
       function Write(const aBuffer; aCount: Longint): Longint; override;
       // We dont implement seek either, we catch all data...
       function Seek(const aOffset: Int64; aOrigin: TSeekOrigin): Int64; override;
     end;

    implementation

    uses
     SysUtils;

    { TLiveStream }

    constructor TLiveStream.Create(aOnRead: TLSOnRead; aOnWrite: TLSOnWrite);
    begin
     inherited Create;
     fOnRead := aOnRead;
     fOnWrite := aOnWrite;
    end;

    function TLiveStream.Read(var vBuffer; aCount: Integer): Longint;
    begin
     if Assigned(fOnRead) then begin
       Result := fOnRead(vBuffer, aCount);
     end else begin
       // If not handled, we just ignore and say we handled it all so we dont err out caller
       Result := aCount;
     end;
    end;

    function TLiveStream.Seek(const aOffset: Int64; aOrigin: TSeekOrigin): Int64;
    begin
     // Must return 0 - ie no position. If not stack overflow becuase how TStream.Seek is
     // implemented.
     Result := 0;
    end;

    function TLiveStream.Write(const aBuffer; aCount: Integer): Longint;
    begin
     if Assigned(fOnWrite) then begin
       Result := fOnWrite(aBuffer, aCount);
     end else begin
       // If not handled, we just ignore and say we handled it all so we dont err out caller
       Result := aCount;
     end;
    end;

    end.

  • DVM © (09.10.12 17:31) [16]

    > olikke   (09.10.12 16:23) [15]


    > Ошибка появляется только когда данные из Buffer я начинаю
    > выдавать через UDPClient. Как только делаю коммент - всё
    > работает спокойно.

    Я уже неоднократно намекал на потокобезопасность приемного буфера (т.е. твоего TLiveStream). Он не потокобезопасный, а обращаешься ты к нему из дух потоков. Первый всегда в него пишет (метод Get выполнение которого никогда не прекращается) и второй поток пытается обратиться к TLiveStream для чтения. Тут скорее всего и возникают грабли. Надо все методы твоего TLiveStream обернуть в крит секции, типа того:


    ////////////////////////////////////////////////////////////////////////////////
    //  TThreadSafeStreamProxy
    ////////////////////////////////////////////////////////////////////////////////

     TThreadSafeStreamProxy = class(TStreamProxy)
     private
       FCS: TCriticalSection;
     public
       procedure Lock;
       procedure Unlock;
       constructor Create(AStream: TStream; AOwnsStream: Boolean = False);
       destructor Destroy; override;
       function Read(var Buffer; Count: Integer): Longint; override;
       function Write(const Buffer; Count: Integer): Longint; override;
       function Seek(Offset: Longint; Origin: Word): Longint; override;
       function Seek(const Offset: Int64; Origin: TSeekOrigin): Int64; override;
     end;

    ...

    constructor TThreadSafeStreamProxy.Create(AStream: TStream;
     AOwnsStream: Boolean);
    begin
     FCS := TCriticalSection.Create;
     inherited Create(AStream, AOwnsStream);
    end;

    //------------------------------------------------------------------------------

    destructor TThreadSafeStreamProxy.Destroy;
    begin
     Lock;
     try
       inherited Destroy;
     finally
       Unlock;
       FCS.Free;
     end;
    end;

    //------------------------------------------------------------------------------

    procedure TThreadSafeStreamProxy.Lock;
    begin
     FCS.Enter;
    end;

    //------------------------------------------------------------------------------

    function TThreadSafeStreamProxy.Read(var Buffer; Count: Integer): Longint;
    begin
     Lock;
     try
       Result := inherited Read(Buffer, Count);
     finally
       Unlock;
     end;
    end;

    //------------------------------------------------------------------------------

    function TThreadSafeStreamProxy.Seek(Offset: Integer; Origin: Word): Longint;
    begin
     Lock;
     try
       Result := inherited Seek(Offset, Origin);
     finally
       Unlock;
     end;
    end;

    //------------------------------------------------------------------------------

    function TThreadSafeStreamProxy.Seek(const Offset: Int64;
     Origin: TSeekOrigin): Int64;
    begin
     Lock;
     try
       Result := inherited Seek(Offset, Origin);
     finally
       Unlock;
     end;
    end;

    //------------------------------------------------------------------------------

    procedure TThreadSafeStreamProxy.Unlock;
    begin
     FCS.Leave;
    end;

    //------------------------------------------------------------------------------

    function TThreadSafeStreamProxy.Write(const Buffer; Count: Integer): Longint;
    begin
     Lock;
     try
       Result := inherited Write(Buffer, Count);
     finally
       Unlock;
     end;
    end;

  • DVM © (09.10.12 17:34) [17]
    Здесь TStreamProxy еще один мой класс, но не суть важно, тебе нужен наследник TStream, который кроме нужной тебе функциональности переопределяет все перечисленные в моем примере методы оригинального TStream и оборачивает их крит секциями.
  • olikke (09.10.12 23:40) [18]
    Сделала критические секции. Но только в данном случае они мне ни к чему, т.к. в данном случае используется только метод Write

    function TSt.LiveStreamWrite(const aBuffer; aCount: Integer): Longint;



    в котором данные из приёмного буфера idHTTP перебрасываются в Buffer.

    Buffer[Numb].Lock;
     try
       Buffer[Numb].Size:=0;
       Buffer[Numb].Write(aBuffer,aCount);
       Buffer[Numb].Position:=0;
     finally
       Buffer[Numb].UnLock;



    Затем ставиться Event для UDP потока, который работает уже с Buffer.
  • DVM © (09.10.12 23:53) [19]

    > olikke   (09.10.12 23:40) [18]
    > Сделала критические секции. Но только в данном случае они
    > мне ни к чему

    Критические секции нужны там, где есть общие данные, используемые 2-мя и более потоками. Насколько я понимаю, у тебя все же есть несинхронизованные места. Вот смотри. TIdHTTP читает в своем потоке данные в TLiveStream. При записи в него генерится событие, в котором ты, как говоришь перебрасываешь данные в буфер. Момент переброски в буфер должен быть обернут в критическую секцию. Далее отсылка этого буфера или любая его обработка в другом потоке тоже должна этот буфер блокировать, иначе пока ты его будешь отсылать, TIdHTTP опять попробует модифицировать его. Нет?
 
Конференция "Сети" » Видеопоток по idhttp. Зависание потока [D6, WinXP]
Есть новые Нет новых   [134435   +19][b:0][p:0.015]