Многопоточный доступ к базам данных

© 2002 Галимарзанов Фанис

По этой теме очень мало информации, особенно в части, касающейся доступа к SQL-серверам (например IB). Мне пришлось несколько дней активно заниматься всем этим – не нашел достойной замены для VirtualTree и решил заполнять дерево с помощью потока. Отмечу, что мои ранние попытки использовать потоки для обращения к Interbase не увенчались успехом, да и дискуссии по теме на форуме epsylon.public.interbase не особо вдохновляли. Жизнь заставила пересмотреть подходы к проблеме и вот что получилось.

Как известно, запрос в потоке должен выполняться в отдельном контексте, т.е. поток должен иметь как минимум IBDataset, IBTransaction и IBSQL. Можно использовать IBDataBase.InternalTransaction, но лучше таки создать в потоке отдельный IBTransaction.

Рассмотрим процесс создания потока, способного обеспечить требуемые условия – создадим базовый класс и затем породим от него поток, способный, например, подсчитать количество записей в запросе.

unit basedbtr;
interface
uses Windows,Classes, Messages,IBDatabase,IBSQL;
const
{Идентификаторы сообщений, используемых для передачи информации форме, создавшей поток}
WM_THREAD_TEXT=WM_USER+1000;
WM_THREAD_INTEGER=WM_THREAD_TEXT+1;
type
TBaseDB_Thread = class(TThread)
private
{Локальные поля, хранящие информацию об окне владельца потока}
fCurForm:HWND;
{SQL текст}
fNewSqlText:string;
{Флаг нового запроса}
fSetNewQuery:boolean;
{Далее три компоненты, обеспечивающие наличие собственного контекста потока}
Base:TIBdatabase;
trWrite:TIbTransaction;
ThreadSql:TIBSQL;
{Процедуры для передачи информации владельцу потока}
procedure Post_Text(MsgData:string);
procedure Post_Integer(MsgData:integer);
protected
{Метод Execute пока ничего, кроме как перехода в состояние Suspend, не умеет}
procedure Execute; override;
public
property CurForm:HWND read fCurForm write fCurForm;
property NewSqlText:string read fNewSqlText write fNewSqlText;
property SetNewQuery:boolean read fSetNewQuery write fSetNewQuery;
constructor Create(OwnerHWND:HWND; //Дескриптор окна-родителя потока aBaseName:String; //DatabaseName
aBaseParams:TStrings; //Информация для IBDatabase.Params
TransParams:TStrings); //Информация для IBTransactin.Params
destructor Destroy; override;
end;

{Породим от базового потока новый поток с требуемой функциональностью, которая реализуется в методе Execute}
TGetRecordCount_Thread = class(TBaseDB_Thread)
protected
procedure Execute; override;
end;

{Глобальная переменная, хранящая текст для передачи окну-владельцу}
var MsgText:string;
implementation

constructor TBaseDB_Thread.Create;
begin
fCurForm:=OwnerHWND;
NewSqlText:='';
FreeOnTerminate :=False;
base:=TIBdatabase.Create(nil);
Base.DatabaseName:=aBaseName;
Base.LoginPrompt:=false;
Base.Params.Assign(aBaseParams);
trWrite:=TIBTransaction.Create(nil);
trWrite.DefaultDatabase:=Base;
trWrite.Params.Assign(TransParams);
ThreadSql:=TIBSQL.Create(nil);
ThreadSql.SQL.Text:='';
ThreadSql.Transaction:=trWrite;
Base.Connected:=true;
inherited Create(true); {true - переведем поток в состояние Suspended}
end;

destructor TBaseDB_Thread.Destroy;
begin
{Остановим поток}
if not Suspended then Suspend;
fNewSqlText:='';
fSetNewQuery:=false;
Terminate;
Resume;
WaitFor;
ThreadSql.Free;
trWrite.Free;
Base.Free;
inherited Destroy;
end;

{Ниже процедуры для передачи информации владельцу}
procedure TBaseDB_Thread.Post_Text(MsgData:string);
begin
MsgText:=msgData;
PostMessage(CurForm,WM_THREAD_TEXT,0,Integer(PChar(MsgText)));
end;

procedure TBaseDB_Thread.Post_Integer(MsgData:integer);
begin
PostMessage(CurForm,WM_THREAD_INTEGER,0,MsgData);
end;

{"Пустой" метод базового класса}
procedure TBaseDB_Thread.Execute;
begin
while not Terminated do
suspend;
end;

{Здесь Execute наследника}
procedure TGetRecordCount_Thread.Execute;
begin
while not Terminated do begin
{Если установлен флаг нового запроса}
if fSetNewQuery then
begin
{Сбросим флаг}
fSetNewQuery:=false;
{Сообщим владельцу о начале работы потока }
Post_text('Resume');
try
if not trWrite.InTransaction then trWrite.StartTransaction;
ThreadSql.Sql.Text:=NewSqlText;
ThreadSql.ExecQuery;
{Сообщим владельцу результат}
Post_Integer(ThreadSql.Fields[0].AsInteger);
trWrite.Commit;
ThreadSql.Close;
except
raise;
Post_Text('Error');
end;
end;
{Если в процессе выполнения не установлен флаг нового запроса, то остановимся в этой точке}
if not Terminated and not SetNewQuery then begin
{Вначале сообщим владельцу}
Post_Text('Suspend');
suspend;
end;
end;
end;
end.

В прилагаемом к статье проекте на Delphi (10.4K) реализованы дополнительные функции для обеспечения нормальной работы потока. Рассмотрим их.

procedure TForm1.IBQuery1AfterOpen(DataSet: TDataSet);
begin
if Trs.Suspended
then SetTrsData
else Timer1.Enabled:=true;
end;

Событие должно вызвать метод потока Resume для выполнения запроса о количестве записей. Но мы не знаем, стоит ли это делать, может прежний запрос касается миллионов записей и выражение WHERE столь сложен, чтоб его выполнить за секунды. Проверим состояние потока – если он Suspended, то можно сходу инициализировать поток новыми параметрами и запустить его, иначе – воспользуемся таймером Timer1.

procedure TForm1.Timer1Timer(Sender: TObject);
begin
if Trs.Suspended then begin
Timer1.Enabled:=false;
SetTrsData;
end;
end;

Здесь процедура рестарта потока

procedure TForm1.SetTrsData;
begin
Trs.NewSqlText:=Memo2.Lines.Text;
Trs.SetNewQuery:=true;
Trs.Resume;
end;

Перед тем, как переоткрыть запрос для Grid, остановим таймер – вдруг он сработает и нарушит порядок

procedure TForm1.IBQuery1BeforeOpen(DataSet: TDataSet);
begin
Timer1.Enabled:=false;
end;

Вот и все. Данный пример показывает, как создавать поток для использования его в качестве дополнительного инструмента как для мелких «поручений» типа рассмотренного выше, так и для исполнения тяжелых процедур, связанных с изменениями (INSERT or UPDATE), способных ввести основной поток приложения в ступор.

Предположим, что необходимо заполнять TREE данными из запроса.

TMySql_Thread = class(TBaseDB_Thread)
Private
{Это флаг актуальности данных}
FActual:boolean;
{Набор параметров для IBSQL}
FParam1:integer;
FParam2:integer;
protected
procedure Execute; override;
public
property Actual:boolean read fActual write fActual;
property Param1:integer read fParam1 write fParam1;
property Param2:integer read fParam2 write fParam2;
end;

procedure TMySql_Thread.Execute;
begin
while not Terminated do begin
{Если установлен флаг нового запроса}
if fSetNewQuery then
begin
{Сбросим флаг}
fSetNewQuery:=false;
fActual:=true;
{Сообщим владельцу о начале работы потока }
Post_text('Resume');
try
if not trWrite.InTransaction then trWrite.StartTransaction;
ThreadSql.Sql.Text:=NewSqlText;
ThreadSql.Params[0].asInteger:=fParam1;
ThreadSql.Params[1].asInteger:=fParam2;
ThreadSql.ExecQuery;
{Здесь цикл вставки данных в Tree, причем анализируются Terminated и флаг актуальности потока}
While not Terminated and fActual and not ThreadSql.Eof do
Begin
{Вызываем процедуру вставки}
Synchronize(AddToList);
ThreadSql.Next;
End;
trWrite.Commit;
ThreadSql.Close;
except
raise;
Post_Text('Error');
end;
end;
{Если в процессе выполнения не установлен флаг нового запроса, то остановимся в этой точке}
if not Terminated and not SetNewQuery then begin
{Вначале сообщим владельцу}
Post_Text('Suspend');
suspend;
end;
end;
end;

В прилагаемом к статье проекте на Delphi (10.4K) реализованы все изложенные идеи. Надеюсь, что все это поможет использовать потоки в полной мере.

С уважением Галимарзанов Фанис (Fanis).

Будут вопросы - меня всегда найдете на epsylon.public.interbase

P.S. Автор – профессиональный судоводитель, бывший капитан, посему прошу особо не пинать за упрощенное изложение.

Copyright© 2002 Галимарзанов Фанис Специально для Delphi Plus



Новости за месяц

  • Сентябрь
    2019
  • Пн
  • Вт
  • Ср
  • Чт
  • Пт
  • Сб
  • Вс