-
Здавствуйте! Задача: Есть несколько IP камер Axis (P1344) и бесконечное количество клиентов для них. Клиент может подключиться к любой из камер, а также к нескольким из них. Поскольку к камерам напрямую подключаться нельзя, нужно создать что-то типа видеосервера. В задачу видеосервера входит получение от всех камер видеопотока по http и выдача их в сеть клиентам (UDP Broadcast - все клиенты в одной локальной сети). Как делаю: Для каждой камеры создаётся свой экземпляр TThread, внутри которого живёт idHTTP. Ну и естественно свой UDPClient (тоже использую Indy10). Собственно код потока для HTTP:
unit THttp;
interface
uses
Windows,Classes,SysUtils,IdHTTP,SyncObjs,UDP;
type
TSt = class(TThread)
private
idHTTP1:TidHTTP;
UDPClient1:TUDP;
xStream: TStream;
protected
function LiveStreamWrite(const aBuffer; aCount: Longint): Longint;
procedure Execute; override;
public
URL:String;
Numb:byte;
FEvent:TEvent;
end;
implementation
uses
LiveStreamer,Global;
procedure TSt.Execute;
begin
FreeOnTerminate:=true;
idHTTP1:=TidHTTP.Create(nil);
idHTTP1.Request:=TidHTTPRequest.Create(idHTTP1);
idHTTP1.Request.BasicAuthentication:=true;
idHTTP1.Request.Password:='****';
idHTTP1.Request.Username:='****';
idHTTP1.HTTPOptions:=[hoForceEncodeParams];
idHTTp1.Request.Connection:='Keep-Alive';
UDPClient1:=TUDP.Create(True);
UDPClient1.Priority:=tpNormal;
UDPClient1.FreeOnTerminate:=True;
UDPClient1.Port:=Table[Numb].Port;
UDPClient1.Numb:=Numb;
UDPClient1.Resume;
xStream := TLiveStream.Create(nil, LiveStreamWrite);
xStream.Size:=$8800;
XStream.Position:=0;
while (not fl_STOP) and (not fl_st[Numb]) do
begin
try idHTTP1.Get(URL, xStream);
finally end;
end;
FreeAndNil(xStream);
UDPCLient1.Terminate;
Event[numb].Setevent;
UDPClient1:=nil;
end;
function TSt.LiveStreamWrite(const aBuffer; aCount: Integer): Longint;
begin
Buffer[Numb].Lock;
try
Buffer[Numb].Size:=0;
Buffer[Numb].Write(aBuffer,aCount);
Buffer[Numb].Position:=0;
finally
Buffer[Numb].UnLock;
end;
Event[Numb].SetEvent;
end;
end.
Код потока для UDP:
unit UDP;
interface
uses
Classes,IdUDPClient,IdGlobal,SyncObjs,SysUtils;
type
TUDP = class(TThread)
private
UDPClient:TidUDPClient;
protected
procedure Execute; override;
public
Port:integer;
HOST:string;
Numb:byte;
end;
implementation
uses Global;
procedure TUDP.Execute;
var buff:string;
begin
FreeOnTerminate:=true;
UDPClient:=TIdUDPClient.Create;
UDPClient.BufferSize:=$8800;
UDPClient.IPVersion:=Id_IPv4;
UDPClient.Port:=Port;
UDPClient.ReceiveTimeout:=-1;
UDPClient.Tag:=0;
UDPClient.BroadcastEnabled:=true;
UDPClient.Host:=Table[Numb].Broadcast;
try UDPClient.Active:=True;
except end;
while not Terminated do
begin
Event[Numb].WaitFor(INFINITE);
Event[Numb].ResetEvent;
if terminated then
begin
UDPClient.Active:=false;
UDPClient.Free;
buff:='';
exit;
end;
buff:='';
Buffer[Numb].Lock;
try
Buff:=Buffer[Numb].DataString;
finally
Buffer[Numb].UnLock;
end;
try UDPClient.Send(TRIM(Table[Numb].Broadcast),UDPClient.Port,buff);
except
end;
buff:='';
end;
end;
end.
Так вот, вся эта зараза прекрасно работает. Клиенты смотрят картинки с камер. Но где-то через час-два каринтка пропадает. Точнее от IP камеры пропадает поток (смотрела сниффером). А значит экземпояр потока TSt, привязанного к этой камере бесконечно ждёт продолжения кино (висит). У меня два вопроса: 1. Почему пропадает поток от IP камеры (мой косяк или её?) 2. Как мне грамотно прибить поток висящий? Кроме TerminateThread до него ничем не достучаться. Но в этом случае все мои ресурсы остаются незакрытыми. Да и неправильно это как-то.... СПАСИТЕ МЕНЯ!!!
-
> А значит экземпояр потока TSt, привязанного к этой камере бесконечно ждёт продолжения кино (висит).
ну так пусть ждёт не бесконечно.
-
Уважаемый RWolf, в том то и вопрос, что нельзя чтоб он висел. Надо либо понять почему он виснет, либо перезапустить его. Я уже думала о создании следящего потока, но поскольку зависание внутри цикла в execute мне корректно не закрыться. И вообще задача максимум обойтись без этого.
-
> UDPClient.ReceiveTimeout:=-1;
меня это смущает
-
To brother Почему? Я не жду ответа.
-
> olikke (08.10.12 12:04)
> while (not fl_STOP) and (not fl_st[Numb]) do > begin > try idHTTP1.Get(URL, xStream); > finally end; > end;
URL какой?
-
> olikke (08.10.12 12:04)
вообще конечно недочетов в коде огромное количество.
вот например, что будет если произойдет ошибка в idHTTP1.Get(URL, xStream); Поток умрет просто. Тихо без выброса исключений.
-
-
> olikke (09.10.12 10:04) [7]
Это multipart/x-mixed-replace данные, TIDHttp вообще не поддерживает до Indy 10 включительно данный Content-Type. Поддержка обещается в будущих версиях. Для нормальной работы с данным типом данных нужно использовать TIdTCPClient (ну или чистые сокеты) и самому реализовывать разбор потока. Этот поток не имеет конца, данные будут поступать и поступать до тех пор пока клиент не оборвет соединение. Я не понимаю (но догадываюсь) как это у вас может работать, если вы читаете ответ сервера таким образом: while (not fl_STOP) and (not fl_st[Numb]) do
begin
try idHTTP1.Get(URL, xStream);
finally end;
end; Метод idHTTP1.Get(URL, xStream) вообще никогда не вернет управление. Я догадываюсь, что xStream у вас какой то хитрый наследник TStream (кстати, потокозащищенный?) в котором как то обрабатываются поступающие данные, но так как кода его нет, что больше ничего сказать не могу. Короче говоря, при чтении вероятно возникает ошибка, ваш поток TSt умирает и все. При ошибке в TIdHTTP надо закрывать сокет самостоятельно, тем более что у вас idHTTp1.Request.Connection:='Keep-Alive';
-
Спасибо, DVM, за ответ. Очень похоже на то что происходит у меня. Видела тему где вы подробно разбирали реализацию данного запроса (buffer у меня ваш tsavebuffer). Буду смотреть в эту сторону.
-
> buffer у меня ваш tsavebuffer
В принципе можно сделать и наследника TStream по аналогии с тем буфером, т.е. получить TMemoryStream у которого можно удалять данные из начала без лишних телодвижений с памятью. Вот у меня есть подобный (правда не тестировал) в него можно писать с одной стороны, а забирать данные с другой (не забывая о синхронизации если программа многопоточная)
unit uFlexibleMemoryStream;
interface
uses
System.SysUtils, System.Classes, System.Math;
SysUtils, Classes, Math;
type
TFlexibleMemoryStream = class(TStream)
const
DefAllocation = 1024;
private
FStorage: PByte; FHead: PByte; FAllocation: NativeInt; FSize: NativeInt; FPosition: NativeInt; function GetTail: PByte;
public
constructor Create; overload;
constructor Create(const AAllocation: NativeInt); overload;
constructor Create(const ABuffer; ACount: Longint); overload;
constructor Create(const AStream: TStream); overload;
destructor Destroy; override;
procedure Clear;
procedure Tidy;
procedure Expand(const ACount: NativeInt); virtual;
function Consume(ACount: NativeInt): NativeInt; virtual;
function Extract(ACount: NativeInt): PByte;
function Append(const ABuffer; ACount: Longint): Longint; overload; virtual;
function Append(const AStream: TStream): Int64; overload; virtual;
function Shrink(ACount: NativeInt): NativeInt; virtual;
procedure SetSize(const ANewSize: Int64); override;
procedure LoadFromStream(AStream: TStream); virtual;
procedure LoadFromBuffer(const ABuffer; ACount: Longint); virtual;
procedure LoadFromFile(const AFileName: string); virtual;
procedure SaveToStream(AStream: TStream); virtual;
procedure SaveToFile(const AFileName: string); virtual;
function Seek(const AOffset: Int64; AOrigin: TSeekOrigin): Int64; override;
function Read(var ABuffer; ACount: Longint): Longint; override;
function Write(const ABuffer; ACount: Longint): Longint; override;
function Pos(const ABuffer; ACount: Longint; AOffset: NativeInt = 0): NativeInt; virtual;
function IsEmpty: boolean;
property Head: PByte read FHead;
property Tail: PByte read GetTail;
property Storage: PByte read FStorage;
property Allocation: NativeInt read FAllocation;
end;
-
implementation
function TFlexibleMemoryStream.Append(const ABuffer; ACount: Integer): Longint;
begin
FPosition := FSize;
Result := Write(ABuffer, ACount);
end;
function TFlexibleMemoryStream.Append(const AStream: TStream): Int64;
begin
Result := 0;
if AStream.Size = 0 then
Exit;
Expand(AStream.Size);
Seek(0, soEnd);
Result := CopyFrom(AStream, 0);
end;
procedure TFlexibleMemoryStream.Clear;
begin
FSize := 0;
FHead := FStorage;
FPosition := 0;
end;
constructor TFlexibleMemoryStream.Create;
begin
Create(DefAllocation);
end;
constructor TFlexibleMemoryStream.Create(const AAllocation: NativeInt);
begin
inherited Create;
FAllocation := AAllocation;
FSize := 0;
GetMem(FStorage, FAllocation);
FHead := FStorage;
end;
function TFlexibleMemoryStream.Consume(ACount: NativeInt): NativeInt;
begin
Extract(ACount);
Result := FSize;
end;
destructor TFlexibleMemoryStream.Destroy;
begin
FreeMem(FStorage, FAllocation);
inherited Destroy;
end;
procedure TFlexibleMemoryStream.Expand(const ACount: NativeInt);
var
Spare: NativeInt; HeadSpace: NativeInt; TailSpace: NativeInt; OldAllocation: NativeInt;
NewStorage: PByte;
begin
Spare := FAllocation - FSize;
HeadSpace := FHead - FStorage;
TailSpace := Spare - HeadSpace;
if Spare >= ACount then
begin
if TailSpace < ACount then
begin
Move(FHead^, FStorage^, FSize);
FHead := FStorage;
end
else
begin
end;
end
else
begin
OldAllocation := FAllocation;
FAllocation := Max(FAllocation, ACount) * 2;
GetMem(NewStorage, FAllocation);
if FStorage <> nil then
begin
Move(FHead^, NewStorage^, FSize);
FreeMem(FStorage, OldAllocation);
end;
FStorage := NewStorage;
FHead := FStorage;
end;
FSize := FSize + ACount;
end;
-
function TFlexibleMemoryStream.Extract(ACount: NativeInt): PByte;
begin
Result := FHead;
if ACount <= 0 then
Exit;
if ACount > FSize then
ACount := FSize;
FHead := FHead + ACount;
FSize := FSize - ACount;
FPosition := FPosition - ACount;
if FPosition < 0 then
FPosition := 0;
end;
function TFlexibleMemoryStream.GetTail: PByte;
begin
Result := PByte(FHead + FSize);
end;
function TFlexibleMemoryStream.Pos(const ABuffer; ACount: Longint; AOffset: NativeInt = 0): NativeInt;
function StartsBuffer(ASubBuffer: Pointer; ASubBufferSize: Longint;
ABuffer: Pointer; ABufferSize: NativeInt): Boolean;
begin
if ASubBufferSize <= ABufferSize then
Result := CompareMem(ASubBuffer, ABuffer, ASubBufferSize)
else
Result := False;
end;
var
i: NativeInt;
P: PByte;
begin
Result := -1;
for I := AOffset to FSize - 1 do
begin
P := FHead + I;
if StartsBuffer(@ABuffer, ACount, Pointer(P), FSize - I) then
begin
Result := i;
Break;
end;
end;
end;
function TFlexibleMemoryStream.IsEmpty: boolean;
begin
Result := FSize = 0;
end;
procedure TFlexibleMemoryStream.LoadFromBuffer(const ABuffer; ACount: Integer);
begin
FPosition := 0;
Write(ABuffer, ACount);
end;
procedure TFlexibleMemoryStream.LoadFromFile(const AFileName: string);
var
Stream: TStream;
begin
Stream := TFileStream.Create(AFileName, fmOpenRead);
try
LoadFromStream(Stream);
finally
Stream.Free;
end;
end;
procedure TFlexibleMemoryStream.LoadFromStream(AStream: TStream);
begin
if AStream.Size = 0 then
Exit;
SetSize(AStream.Size);
Seek(0, soBeginning);
CopyFrom(AStream, 0);
end;
function TFlexibleMemoryStream.Read(var ABuffer; ACount: Integer): Longint;
begin
if (FPosition >= 0) and (ACount >= 0) then
begin
Result := FSize - FPosition;
if Result > 0 then
begin
if Result > ACount then Result := ACount;
Move((PByte(FHead) + FPosition)^, ABuffer, Result);
Inc(FPosition, Result);
Exit;
end;
end;
Result := 0;
end;
procedure TFlexibleMemoryStream.SaveToFile(const AFileName: string);
var
Stream: TStream;
begin
Stream := TFileStream.Create(AFileName, fmCreate);
try
SaveToStream(Stream);
finally
Stream.Free;
end;
end;
procedure TFlexibleMemoryStream.SaveToStream(AStream: TStream);
begin
if FSize <> 0 then AStream.WriteBuffer(FHead^, FSize);
end;
function TFlexibleMemoryStream.Seek(const AOffset: Int64;
AOrigin: TSeekOrigin): Int64;
begin
case AOrigin of
soBeginning: FPosition := AOffset;
soCurrent: Inc(FPosition, AOffset);
soEnd: FPosition := FSize + AOffset;
end;
Result := FPosition;
end;
procedure TFlexibleMemoryStream.SetSize(const ANewSize: Int64);
begin
if FSize <> ANewSize then
begin
if FSize < ANewSize then
Expand(ANewSize - FSize)
else
begin
FSize := ANewSize;
end;
if FPosition > FSize then
FPosition := FSize;
end;
end;
function TFlexibleMemoryStream.Shrink(ACount: NativeInt): NativeInt;
begin
Result := FSize;
if ACount <= 0 then
Exit;
if ACount > FSize then ACount := FSize;
SetSize(FSize - ACount);
Result := FSize;
end;
procedure TFlexibleMemoryStream.Tidy;
begin
if FHead <> FStorage then
begin
if FSize = 0 then
FHead := FStorage
else
begin
Move(FHead^, FStorage^, FSize);
FHead := FStorage;
end;
end;
end;
function TFlexibleMemoryStream.Write(const ABuffer; ACount: Integer): Longint;
var
Pos: Int64;
begin
Pos := FPosition + ACount;
if Pos > 0 then
begin
if Pos > FSize then
begin
Expand(ACount);
FSize := Pos;
end;
Move(ABuffer, (FHead + FPosition)^, ACount);
FPosition := Pos;
Result := ACount;
exit;
end;
Result := 0;
end;
constructor TFlexibleMemoryStream.Create(const ABuffer; ACount: Integer);
begin
Create(ACount);
LoadFromBuffer(ABuffer, ACount);
end;
constructor TFlexibleMemoryStream.Create(const AStream: TStream);
begin
Create(AStream.Size);
LoadFromStream(AStream);
end;
end.
-
> Почему? Я не жду ответа.
-1 или всеж 0?
-
или 1
-
Новые подробности: > Метод idHTTP1.Get(URL, xStream) вообще никогда не вернет > управление.
Действительно так. И это видимо нехорошо. Обратиться к потоку (ну чтоб закрыть его) я могу только так например:
function TSt.LiveStreamWrite(const aBuffer; aCount: Integer): Longint;
var str:string;
begin
if fl_Stop then
begin
idHTTP1.Disconnect;
exit;
end;
Buffer[Numb].Lock;
try
Buffer[Numb].Size:=0;
Buffer[Numb].Write(aBuffer,aCount);
Buffer[Numb].Position:=0;
finally
Buffer[Numb].UnLock;
end;
Event[Numb].SetEvent;
end;
Однако ошибка возникает всё-таки не в
while (not fl_STOP) and (not fl_st[Numb]) do
begin
try idHTTP1.Get(URL, xStream);
finally
end;
end;
Ошибка появляется только когда данные из Buffer я начинаю выдавать через UDPClient. Как только делаю коммент - всё работает спокойно. procedure TUDP.Execute; var buff:string; begin FreeOnTerminate:=true; //Создание клиента для передачи данных UDPClient:=TIdUDPClient.Create; UDPClient.BufferSize:=$8800; UDPClient.IPVersion:=Id_IPv4; UDPClient.Port:=Port; UDPClient.ReceiveTimeout:=-1; UDPClient.Tag:=0; UDPClient.BroadcastEnabled:=true; UDPClient.Host:=Table[Numb].Broadcast; try UDPClient.Active:=True; except end; while not Terminated do begin Event[Numb].WaitFor(INFINITE); Event[Numb].ResetEvent; if terminated then begin UDPClient.Active:=false; UDPClient.Free; buff:=''; exit; end; buff:=''; Buffer[Numb].Lock; try Buff:=Buffer[Numb].DataString; finally Buffer[Numb].UnLock; end; { закомментила специально try UDPClient.Send(TRIM(Table[Numb].Broadcast),UDPClient.Port,buff);} except // вывод сообщения - не возникает никогда end; buff:=''; end; end; Я прекрасно понимаю, что для корректной работы, в частности при разрывах соединения и пр. ошибках, мне придётся отказаться от халявного idHTTP. Но в данной ситуации проблема явно не в нём, а в связке приёма по HTTP и передече данных по UDP. Код XStream (автора не помню), на всякий случай: unit LiveStreamer;
interface
uses
Classes;
type
TLSOnRead = function(var vBuffer; aCount: Longint): Longint of object;
TLSOnWrite = function(const aBuffer; aCount: Longint): Longint of object;
TLiveStream = class(TStream)
protected
fOnRead: TLSOnRead;
fOnWrite: TLSOnWrite;
public
constructor Create(aOnRead: TLSOnRead; aOnWrite: TLSOnWrite);
function Read(var vBuffer; aCount: Longint): Longint; override;
function Write(const aBuffer; aCount: Longint): Longint; override;
function Seek(const aOffset: Int64; aOrigin: TSeekOrigin): Int64; override;
end;
implementation
uses
SysUtils;
constructor TLiveStream.Create(aOnRead: TLSOnRead; aOnWrite: TLSOnWrite);
begin
inherited Create;
fOnRead := aOnRead;
fOnWrite := aOnWrite;
end;
function TLiveStream.Read(var vBuffer; aCount: Integer): Longint;
begin
if Assigned(fOnRead) then begin
Result := fOnRead(vBuffer, aCount);
end else begin
Result := aCount;
end;
end;
function TLiveStream.Seek(const aOffset: Int64; aOrigin: TSeekOrigin): Int64;
begin
Result := 0;
end;
function TLiveStream.Write(const aBuffer; aCount: Integer): Longint;
begin
if Assigned(fOnWrite) then begin
Result := fOnWrite(aBuffer, aCount);
end else begin
Result := aCount;
end;
end;
end.
-
> olikke (09.10.12 16:23) [15]
> Ошибка появляется только когда данные из Buffer я начинаю > выдавать через UDPClient. Как только делаю коммент - всё > работает спокойно.
Я уже неоднократно намекал на потокобезопасность приемного буфера (т.е. твоего TLiveStream). Он не потокобезопасный, а обращаешься ты к нему из дух потоков. Первый всегда в него пишет (метод Get выполнение которого никогда не прекращается) и второй поток пытается обратиться к TLiveStream для чтения. Тут скорее всего и возникают грабли. Надо все методы твоего TLiveStream обернуть в крит секции, типа того:
TThreadSafeStreamProxy = class(TStreamProxy)
private
FCS: TCriticalSection;
public
procedure Lock;
procedure Unlock;
constructor Create(AStream: TStream; AOwnsStream: Boolean = False);
destructor Destroy; override;
function Read(var Buffer; Count: Integer): Longint; override;
function Write(const Buffer; Count: Integer): Longint; override;
function Seek(Offset: Longint; Origin: Word): Longint; override;
function Seek(const Offset: Int64; Origin: TSeekOrigin): Int64; override;
end;
...
constructor TThreadSafeStreamProxy.Create(AStream: TStream;
AOwnsStream: Boolean);
begin
FCS := TCriticalSection.Create;
inherited Create(AStream, AOwnsStream);
end;
destructor TThreadSafeStreamProxy.Destroy;
begin
Lock;
try
inherited Destroy;
finally
Unlock;
FCS.Free;
end;
end;
procedure TThreadSafeStreamProxy.Lock;
begin
FCS.Enter;
end;
function TThreadSafeStreamProxy.Read(var Buffer; Count: Integer): Longint;
begin
Lock;
try
Result := inherited Read(Buffer, Count);
finally
Unlock;
end;
end;
function TThreadSafeStreamProxy.Seek(Offset: Integer; Origin: Word): Longint;
begin
Lock;
try
Result := inherited Seek(Offset, Origin);
finally
Unlock;
end;
end;
function TThreadSafeStreamProxy.Seek(const Offset: Int64;
Origin: TSeekOrigin): Int64;
begin
Lock;
try
Result := inherited Seek(Offset, Origin);
finally
Unlock;
end;
end;
procedure TThreadSafeStreamProxy.Unlock;
begin
FCS.Leave;
end;
function TThreadSafeStreamProxy.Write(const Buffer; Count: Integer): Longint;
begin
Lock;
try
Result := inherited Write(Buffer, Count);
finally
Unlock;
end;
end;
-
Здесь TStreamProxy еще один мой класс, но не суть важно, тебе нужен наследник TStream, который кроме нужной тебе функциональности переопределяет все перечисленные в моем примере методы оригинального TStream и оборачивает их крит секциями.
-
Сделала критические секции. Но только в данном случае они мне ни к чему, т.к. в данном случае используется только метод Write function TSt.LiveStreamWrite(const aBuffer; aCount: Integer): Longint; в котором данные из приёмного буфера idHTTP перебрасываются в Buffer. Buffer[Numb].Lock;
try
Buffer[Numb].Size:=0;
Buffer[Numb].Write(aBuffer,aCount);
Buffer[Numb].Position:=0;
finally
Buffer[Numb].UnLock; Затем ставиться Event для UDP потока, который работает уже с Buffer.
-
> olikke (09.10.12 23:40) [18] > Сделала критические секции. Но только в данном случае они > мне ни к чему
Критические секции нужны там, где есть общие данные, используемые 2-мя и более потоками. Насколько я понимаю, у тебя все же есть несинхронизованные места. Вот смотри. TIdHTTP читает в своем потоке данные в TLiveStream. При записи в него генерится событие, в котором ты, как говоришь перебрасываешь данные в буфер. Момент переброски в буфер должен быть обернут в критическую секцию. Далее отсылка этого буфера или любая его обработка в другом потоке тоже должна этот буфер блокировать, иначе пока ты его будешь отсылать, TIdHTTP опять попробует модифицировать его. Нет?
|