Реализация off-line репликации в Interbase/Firebird

© 2005 Морянцев М.А.
Компания НеРуСофт

Часто в задачи автоматизации управления предприятиями входит задача синхронизации состояния информации баз данных в головном офисе и региональных офисах. Примерами могут служить управление распределенным складом или филиалами банка.

В системе без выделенного головного офиса задача может состоять в передаче изменений заданных таблиц БД всем или выделенному списку территориальных управлений(ТУ), работающих с БД той же структуры.

Для решения этой задачи в СУБД FireBird можно поступать следующим образом.

Для всех таблиц БД использовать суррогатные первичные ключи представляющие счетики, задаваемые генераторами, а ограничение уникальности сочетаний полей задавать дополнительно. Для каждого ТУ задавать такие значения генераторов, чтобы гарантировать отсутствие пересечений множеств значений генераторов разных ТУ, например все генераторы ТУ1 имеют значения в отрезке целых чисел [1, N] , для ТУ2 [N+1, 2N] и т.д.

Для отслеживания изменений в базе данных воспользуемся менеджером протоколов данных IBExpress. На Рис 1. показано каким образом можно поставить на логирование таблицы базы данных ROffice.gdb.


Рис 1.

Учет изменений ведется в четырех системных таблицах изображенных на Рис 2.


Рис 2.

Например , при внесении изменений в таблице CITY , в таблицах IBE$Log_Tables, IBE$Log_Keys, IBE$Log_Fields, IBE$Log_Blob_Fields появятся записи за счет срабатывания триггеров логирования. На Рис. 3 показаны данные таблицы IBE$Log_Tables, где видно, что 09.09.2005 выполнялись операции INSERT и UPDATE с таблицей CITY. Соответствующие этим операциям записи будут присутствовать и в таблицах IBE$Log_Keys, IBE$Log_Fields, IBE$Log_Blob_Fields.


Рис 3.

Информацию об измененных данных одной БД за определенный промежуток времени для заданного набора таблиц будем формировать в файл данных off-line репликации, который будем называть D - файл.

Имена файлов D имеют структуру:

D_CODSENDER_CODREC_TIMESTAMP_IDPredFileD_IDFileD

CODSENDER – код территориального управления отправителя
CODREC – код территориального управления получателя
IDPredFileD номер ID последнего сквитованного без ошибок файла
IDFileD номер ID отправляемого файла

При приеме файла D происходит проверка синтаксиса файла, номера сеанса и даты в имени файла, номера сеансов должны удовлетворять своей последовательности от каждого ТУ (см. формирование имени файла D). По результату контороля формируется файл квитанции S.

Имя файла квитанции формируется из имени файла добавлением слева буквы S.

Для обмена D и S файлами создаюся каталоги.

.. ROOT IN
.. ROOT OUT, где ROOT – корневая директория

Для управления сеансами репликации нужно создать БД , содержащую по крайней мере три таблицы, подобных изображенным на Рис. 4.


Рис. 4.

и таблицы Project и Tables, содержащие информацию о базах и таблицах , подлежащих репликации (Рис 5).


Рис 5.

Здесь AliasName имя файла базы данных. Replication = 1 означает, что объект подлежит репликации а Replication = 0 нет.

Очередной файл репликации D может быть построен только при наличии сквитованного без ошибок предыдущего файла D, или если он является первым.

При успешном формировании файла он помещается в Connections.Files, каталог "..ROOT " и копируется в ".. ROOT OUT" для почтовой программы. В Connections.Errors пишется протокол. При этом выполняются преобразования в таблице сеансов Connections.

При обнаружении ошибки при приеме D файла формируется квитанция с кодом ошибки , а Connections.State := stateReceived , Connections.Errid := <> 0.

При обнаружении ошибки при приеме S, квитанция бракуется Connections.State := stateReceived , Connections.Errid := <> 0.

При нормальном приеме S Connections.State := stateTICKED (Сквитован) для файла D и stateRECEIVE (принята) для квитанции и Connections.Errid := 0 для обоих.

Если для принимаемого файла существует запись Connections.NameFile = NameFile, Connections.State = stateReceive, Connections.Errid = 0 , файл удаляется как повторный.

Файл D содержит следующие данные:

Формат передачи данных ClientDataset используется для удобства передачи значений BLOB полей чего не позволяет наиболее популярный для этих целей XML формат.

// функция построения файла D для заданного отрезка времени
// ProjectID - номер проекта
// NKeyMin – минимальное значение ключей БД отправителя
// NKeyMax - максимальное значение ключей БД отправителя
// TablesList – список таблиц, разделенных запятой, подлежащих репликации
function TfmSkladTun.CreateContentDViacds( DateB, DateE: TDateTime; NameDataBase: string; ProjectID, NKeyMin, NKeyMax: integer;TablesList: string ): string;

Const // для последовательности операций

  SIBT =    ' select IBT.table_name, IBT.ID, IBT.OPERATION from    ibe$log_tables IBT inner join ibe$log_keys IBK on IBT.ID = IBK.log_tables_id '+
   ' where DATE_TIME >= :D1 and DATE_TIME < :D2'+
   ' and Cast(IBK.KEY_VALUE as numeric) >= %d '+
   ' and Cast(IBK.KEY_VALUE as numeric) < %d';

   SIBKUI = // для удаленных записей
   ' select IBT.table_name, IBK.KEY_FIELD, IBK.KEY_VALUE, IBT.ID from ibe$log_keys IBK, ibe$log_tables IBT where log_tables_id in '+
   ' (select id from ibe$log_tables where DATE_TIME >= :D1 and DATE_TIME < :D2 '+
   ' and OPERATION <> 'D' ) '+
   ' and IBK.log_tables_id = IBT.id '+
   ' and Cast(IBK.KEY_VALUE as numeric) >= %d '+
   ' and Cast(IBK.KEY_VALUE as numeric) < %d';

   SIBKD = // для удаленных записей
   ' select IBT.table_name, IBK.KEY_FIELD, IBK.KEY_VALUE, IBT.ID from ibe$log_keys IBK, ibe$log_tables IBT where log_tables_id in '+
   ' (select id from ibe$log_tables where DATE_TIME >= :D1 and DATE_TIME < :D2 '+
   ' and OPERATION = 'D' ) '+
   'and IBK.log_tables_id = IBT.id '+
' and Cast(IBK.KEY_VALUE as numeric) >= %d '+
   ' and Cast(IBK.KEY_VALUE as numeric) < %d';

   // для измененных значений полей operation <>'D'
   // для вставленных значений полей
SIBFU =
   ' select IBT.table_name, IBF.FIELD_NAME, IBF.OLD_VALUE, IBF.NEW_VALUE, IBK.KEY_FIELD, IBK.KEY_VALUE, IBT.ID from ibe$log_tables IBT, ibe$log_fields IBF, ibe$log_keys IBK '+
   ' where IBF.log_tables_id in '+
   ' (select id from ibe$log_tables where DATE_TIME >= :D1 and DATE_TIME < :D2 '+
   ' and OPERATION <> 'D' ) '+
   ' and IBT.ID = IBK.log_tables_id and IBT.ID = IBF.log_tables_id '+
   ' and Cast(IBK.KEY_VALUE as numeric) >= %d '+
   ' and Cast(IBK.KEY_VALUE as numeric) < %d';

   // для измененных значений Blob полей operation <>'D'
   // для вставленных значений Blob полей
   SIBBU =
   ' select IBT.table_name, IBB.FIELD_NAME, IBB.OLD_CHAR_VALUE, IBB.NEW_CHAR_VALUE,'+
   'IBB.OLD_BLOB_VALUE, IBB.NEW_BLOB_VALUE,'+
   'IBK.KEY_FIELD, IBK.KEY_VALUE, IBT.ID from ibe$log_tables IBT, ibe$log_blob_fields IBB, ibe$log_keys IBK '+
   'where IBB.log_tables_id in (select id from ibe$log_tables where DATE_TIME >= :D1 and DATE_TIME < :D2 and OPERATION <> 'D')'+
   ' and IBT.ID = IBK.log_tables_id and IBT.ID = IBB.log_tables_id '+
   ' and Cast(IBK.KEY_VALUE as numeric) >= %d '+
   ' and Cast(IBK.KEY_VALUE as numeric) < %d';

var
   i,j: integer;
   StrTmpAll: string;
   ListSql: TStringList;
   TableList: TStringList;
   DBNameSaved: string;
   FDelimiter, FDelimiter1: string;
   guid: TGUID;
   StrStream: TStringStream;
   sTemp: string;
begin
   Result := ';
   try
     try
       StrTmpAll := Padl(Trim(IntToStr(ProjectID)), 10)+#13#10+ // Номер реплицируемого проекта
       FormatDateTime('DD.MM.YYYY HH:NN:SS',DateB)+#13#10+
       FormatDateTime('DD.MM.YYYY HH:NN:SS',DateE)+#13#10+
      IntToStr(DeltaGen) +#13#10;
      // – DeltaGen отрицательное или положительное целое число, представляющее разность значений генераторов в БД отправителя и БД получателя.

       if CreateGuid(guid) = S_OK then
         FDelimiter := GuidToString(guid)
       else begin
         MessageDlg('Не могу получить строку разделителя',          mtError, [mbOk], []);
         Exit;
       end; //
       StrTmpAll := StrTmpAll + FDelimiter+#13#10;
       TableList := TStringList.Create();
       TableList.CommaText := TablesList;
       TableList.CaseSensitive := False;
       ListSql := TStringList.Create();
       ListSql.Add(Format(SIBT, [ NKeyMin, NKeyMax]));
       ListSql.Add(Format(SIBKUI, [ NKeyMin, NKeyMax]));
       ListSql.Add(Format(SIBKD, [ NKeyMin, NKeyMax]) );
       ListSql.Add(Format(SIBFU, [ NKeyMin, NKeyMax]));
       ListSql.Add(Format(SIBBU, [ NKeyMin, NKeyMax]));
       if dmReplConn.fbConstr.Connected then
         dmReplConn.fbConstr.Connected := False;
       DBNameSaved := dmReplConn.fbConstr.DBName;
       dmReplConn.fbConstr.DBName := NameDataBase;
       dmReplConn.fbConstr.Connected := True;
       for j := 0 to ListSql.Count-1 do
       begin
         sTemp := ';
         StrStream := TStringStream.Create(sTemp);
         Createcds(j, cdsReplik);
         if dmReplConn.fdCommon.Active then
           dmReplConn.fdCommon.Close;
         dmReplConn.fdCommon.SelectSQL.Clear;
         dmReplConn.fdCommon.SelectSQL.Add(ListSql[j]);
         dmReplConn.fdCommon.OpenWP([DateB, DateE]);
         with (dmReplConn.fdCommon) Do
         begin
           First;
           while not Eof Do
           begin
             cdsReplik.Append;
             if TableList.IndexOf(Fields[0].AsString) <> -1 then              // включаем только
             // таблицы подлежащие репликации
             For i:=0 to Fields.Count-1 Do
               cdsReplik.Fields[i].AsString := Fields[i].AsString;
             Next;
           end;
         end;
         cdsReplik.SaveToStream(StrStream, dfBinary);
         cdsReplik.Close;
         StrTmpAll := StrTmpAll+StrStream.DataString+FDelimiter+#13#10;
         StrStream.Free;
         // конец DEL части
       end;
       Result := StrTmpAll;
     except
       on e: exception do
         ErrorDlg(e);
       end;
   finally
     ListSql.Free;
     TableList.Free;
     if dmReplConn.fdCommon.Active then
       dmReplConn.fdCommon.Close;
     if dmReplConn.fbConstr.Connected then
       dmReplConn.fbConstr.Connected := False;
     dmReplConn.fbConstr.DBName := DBNameSaved;
   end;
end;

// построение ClientDataSet для форматирования данных
procedure TfmSkladTun.Createcds(TipLog: smallint; cds: TClientDataSet);
var
   i: integer;
begin
   with cds do
   begin
     if Active then
       Close;
     FieldDefs.Clear;
     case TipLog of
     0: // для ID
     begin
       FieldDefs.Add( 'TABLE_NAME', ftString, 70 );
       FieldDefs.Add( 'OPERATION', ftString, 1 );
       FieldDefs.Add( 'ID', ftLargeint);
     end;
     1, 2: // для insert, update и delete ключей
     begin
       FieldDefs.Add( 'TABLE_NAME', ftString, 70 );
       FieldDefs.Add( 'KEY_FIELD', ftString, 70 );
       FieldDefs.Add( 'KEY_VALUE', ftString, 255);
       FieldDefs.Add( 'ID', ftLargeint);
     end;
     3: // для insert и Update полей
     begin
       FieldDefs.Add( 'TABLE_NAME', ftString, 70 );
       FieldDefs.Add( 'FIELD_NAME', ftString, 70 );
       FieldDefs.Add( 'OLD_VALUE', ftMemo);
       FieldDefs.Add( 'NEW_VALUE', ftMemo);
       FieldDefs.Add( 'KEY_FIELD', ftString, 70 );
       FieldDefs.Add( 'KEY_VALUE', ftString, 255);
       FieldDefs.Add( 'ID', ftLargeint);
     end;
     4: // для insert и Update Blob полей
     begin
       FieldDefs.Add( 'TABLE_NAME', ftString, 70 );
       FieldDefs.Add( 'FIELD_NAME', ftString, 70 );
       FieldDefs.Add( 'OLD_CHAR_VALUE', ftMemo);
       FieldDefs.Add( 'NEW_CHAR_VALUE', ftMemo);
       FieldDefs.Add( 'OLD_BLOB_VALUE', ftBlob);
       FieldDefs.Add( 'NEW_BLOB_VALUE', ftBlob);
       FieldDefs.Add( 'KEY_FIELD', ftString, 70 );
       FieldDefs.Add( 'KEY_VALUE', ftString, 255);
       FieldDefs.Add( 'ID', ftLargeint);
     end;
   end;
   CreateDataSet;
   IndexFieldNames := 'ID';
end;

Файл квитанции S содержит следующие данные:

Очередной файл D, кроме первого, может быть принят только при наличии сквитованного без ошибок предыдущего принятого файла D.

Функция реализации изменений при приеме файла D AllUpdateDatabaseGetContentD() генерирует и выполняет необходимые SQL команды в БД получателя, в одной транзакции, и в точно такой же последовательности, как они выполнялись в базе ТУ отправителя .

Процедура контроля файла D , которая выполняет проверку имени и содержимого файла, последовательности сеансов и т.п. ( в статье не приведена ) получает переменные приведенные ниже из строки содержимого файла D FileString .

ProjectID := StrToInt(Trim(Copy(FileString, 1, 10)));
StringID - строка , соответствующая файлу ClientDataSet для SIBUI
FStringForInsUpd - строка , соответствующая файлу ClientDataSet для SIBKUI
FStringForDel - строка , соответствующая файлу ClientDataSet для SIBKD
FStringForField - строка , соответствующая файлу ClientDataSet для SIBFU.
FStringForBlob - строка , соответствующая файлу ClientDataSet для SIBBU
DeltaGen – отрицательное или положительное целое число, представляющее разность значений генераторов в БД отправителя и БД получателя.
function TFileHandling.AllUpdateDatabaseGetContentD(): boolean;
var
   Sds: TSqlDataSet;
   TD: TTransactionDesc;
   OldDatabase,NameDatabase: string;
   cdsID: TClientDataSet;
   sTemp: string;
   StrStream: TStringStream;
   ProjectID: integer;
begin
   Result := true;
   NameDataBase := fmSkladTun.GetNameDatabase(ProjectID); //    получаем имя файла базы данных получателя по номеру проекта
   try
     try
       with dmReplConn do
       begin
         Sds := TSqlDataSet.Create(dmReplConn);
         if cnRep.Connected then
           cnRep.Close;
         OldDatabase := cnRep.Params.Values['DataBase'];
         cnRep.Params.Values['DataBase'] := NameDatabase;
         if not cnRep.Connected then
           cnRep.Open;
         While cnRep.InTransaction do
           Continue;
         TD.TransactionID := 1;
         TD.IsolationLevel := xilREADCOMMITTED;
         cnRep.StartTransaction(TD);
         Sds.SqlConnection := cnRep;
         Sds.CommandType := ctQuery;
         cdsID := TClientdataset.Create(fmSkladTun);
         fmSkladTun.Createcds(0, cdsID);
         sTemp := StringID;
         StrStream := TStringStream.Create(sTemp);
         cdsID.LoadFromStream(StrStream);
         FreeAndNil(StrStream);
         cdsID.First;
         While not cdsID.Eof do
         begin
           UpdateDatabaseGetContentD(cdsID.Fields[0].AsInteger, cdsID.Fields[2].AsString[1], Sds);
           cdsID.Next;
         end;
       end;
     except
       on E: Exception do
         begin
           if dmReplConn.cnRep.InTransaction then
             dmReplConn.cnRep.Rollback(TD);
           result := false;
           ErrorDlg(E);
           exit;
         end;
     end;
     dmReplConn.cnRep.Commit(TD);
   finally
     if dmReplConn.cnRep.Connected then
       dmReplConn.cnRep.Close;
     dmReplConn.cnRep.Params.Values['DataBase'] := OldDatabase;
     Screen.Cursor := crDefault;
     if Sds.Active then
       Sds.Close;
     if Assigned(Sds) then
       FreeAndNil(Sds);
     if Assigned(cdsID) then
       FreeAndNil(cdsID);
   end;
end;

procedure TFileHandling.UpdateDatabaseGetContent(IDCUR: integer; Operation: char; Sds: TSqlDataSet);
var
   i: integer;
   NBLOBVALUE: string;
   LOG_TABLES_ID: integer;
   TABLE_NAME,KEY_FIELD,KEY_VALUE: string;
   FIELD_NAME, OLD_VALUE, NEW_VALUE: string;
   OLD_CHAR_VALUE, NEW_CHAR_VALUE, OLD_BLOB_VALUE,    NEW_BLOB_VALUE: string;
   pos1, pos2: integer;
   sTemp, sTemp1: string;
   FieldType: TFieldType;
   StrStream, StrStream1: TStringStream;
   StrValues: string;
   TN: string;
   cdsKeys, cdsFieldInsert, cdsBlobInsert: TClientDataset;
   TempParams: TParams;
   sTempNames, sTempValues, sTempNamesB, sTempValuesB: string;
   sTempNamesValues, sTempNamesValuesB: string;

   function GetPartInsertStatement(ForBlob: boolean; Id: integer; Tip: smallint ): string;
   var
     sPart: string;
     i: integer;
     TABLE_NAME,FIELD_NAME,NEW_VALUE, KEY_FIELD,KEY_VALUE: string;
   begin
     Result:= ';
     sPart := ';
     if ForBlob then
     with cdsBlobInsert do
     begin
       TempParams := TParams.Create;
       Sds.Params.Clear;
       First;
       while not Eof do
       begin
         if Id = Fields[8].AsInteger then
           if Tip = 0 then
           begin // получить строку для значений вставляемых BLOB полей
             Sds.Params.Add;
             if sPart = ' then
               sPart := sPart+' :p'+Trim(IntToStr(Sds.Params.Count))
             else
               sPart := sPart+', :p'+Trim(IntToStr(Sds.Params.Count));
             if Fields[3].AsString <> ' then
               NBLOBVALUE := Fields[3].AsString
             else
               NBLOBVALUE := Fields[5].AsString;
             Sds.Params[Sds.Params.Count-1].AsBlob := NBLOBVALUE;
           end
           else // получить строку для имен вставляемых BLOB полей
           if TIP = 1 then
             if sPart = '' then
               sPart := sPart+ Fields[1].AsString
             else
               sPart := sPart+','+ Fields[1].AsString
           else // для update
           begin
             Sds.Params.Add;
             if Trim(sPart) = ' then
               sPart := sPart+Fields[1].AsString+'=:p'+Trim(IntToStr(Sds.Params.Count-1))
             else
               sPart := sPart+','+Fields[1].AsString+'= :p'+Trim(IntToStr(Sds.Params.Count));
             if Fields[3].AsString <> ' then
               NBLOBVALUE := Fields[3].AsString
             else
               NBLOBVALUE := Fields[5].AsString;
             Sds.Params[Sds.Params.Count-1].AsBlob := NBLOBVALUE;
           end;
         Next;
       end;
     end
     else //для обычных полей
     with cdsFieldInsert do
     begin
       First;
       while not Eof do
       begin
         if (Id = Fields[6].AsInteger) then
           if Tip = 0 then
           // получить строку для значений вставляемых полей
             begin
               TABLE_NAME := Fields[0].AsString;
               FIELD_NAME := Fields[1].AsString;
               NEW_VALUE := Fields[3].AsString;
               If FIELD_NAME = Fields[4].AsString then // если это ключевое поле
                 NEW_VALUE := IntToStr(Fields[3].AsInteger + DeltaGen);
               if not GetFieldType(TABLE_NAME, FIELD_NAME, FieldType) then
                 raise Exception.Create(Format('Невозможно получить тип поля %s для таблицы %s ',
                 [FIELD_NAME, TABLE_NAME]));
               if FieldType in [ftString, ftWideString,ftDate, ftDateTime, ftTimeStamp] then
                 NEW_VALUE := QuotedStr(NEW_VALUE);
               if sPart = '' then
                 sPart := sPart + NEW_VALUE
               else
                 sPart := sPart +','+ NEW_VALUE;
             end
             else // получить строку для имен вставляемых полей
             if TIP = 1 then
               if sPart = ' then
                 sPart := sPart+ Fields[1].AsString
               else
                 sPart := sPart+','+ Fields[1].AsString
             else // Tip = 2 для update
             begin
               TABLE_NAME := Fields[0].AsString;
               FIELD_NAME := Fields[1].AsString;
               NEW_VALUE := Fields[3].AsString;
               if not GetFieldType(TABLE_NAME, FIELD_NAME, FieldType) then
                 raise Exception.Create(Format('Невозможно получить тип поля %s для таблицы %s ',
                 [FIELD_NAME, TABLE_NAME]));
               if FieldType in [ftString, ftWideString,ftDate, ftDateTime, ftTimeStamp] then
                 NEW_VALUE := QuotedStr(NEW_VALUE);
               if sPart = ' then
                 sPart := sPart + Fields[1].AsString+' = '+ NEW_VALUE
               else
                 sPart := sPart+','+ Fields[1].AsString+' = '+ NEW_VALUE;
             end;
           Next;
         end;
       end;
       for i := 0 to Sds.Params.Count-1 do
         TempParams.AddParam (Sds.Params[i]);
       Result := sPart;
     end;
begin
   try
   // получение и выполнение запросов на вставку и изменение записей для обычных и BLOB полей
     cdsKeys := TClientdataset.Create(fmSkladTun);
     cdsFieldInsert:=      TClientdataset.Create(fmSkladTun);
     cdsBlobInsert := TClientdataset.Create(fmSkladTun);
     sTemp := StringForInsUpd;
     StrStream := TStringStream.Create(sTemp);
     fmSkladTun.Createcds(1, cdsKeys);
     cdsKeys.LoadFromStream(StrStream);
     FreeAndNil(StrStream);
     sTemp := StringForField;
     sTemp1 := StringForBlob;
     StrStream := TStringStream.Create(sTemp);
     StrStream1 := TStringStream.Create(sTemp1);
     fmSkladTun.Createcds(3, cdsFieldInsert);
     cdsFieldInsert.LoadFromStream(StrStream);
     fmSkladTun.Createcds(4, cdsBlobInsert ); // для BLOBOV
     cdsBlobInsert.LoadFromStream(StrStream1);
     cdsKeys.First;
     with cdsKeys do
       while not Eof do
         begin
           if Fields[3].AsInteger = IDCUR then
           begin
             TABLE_NAME := Fields[0].AsString;
             KEY_FIELD := Fields[1].AsString;
             KEY_VALUE := Fields[2].AsString;
             LOG_TABLES_ID := Fields[3].AsInteger;
             case Operation of
             'I':
               begin
                 sTempNames := GetPartInsertStatement(false, LOG_TABLES_ID, 1 );
                 sTempValues := GetPartInsertStatement(false, LOG_TABLES_ID, 0 );
                 sTempNamesB := GetPartInsertStatement(true, LOG_TABLES_ID, 1 );
                 sTempValuesB := GetPartInsertStatement(true, LOG_TABLES_ID, 0 );
                 Sds.CommandText := 'Insert INTO ' + TABLE_NAME + '( ' +sTempNames+
                 IIf_Str(sTempNames=',',IIf_Str(sTempNamesB=',',','))+{строка наименований вставляемых Не BLOB полей}
                 sTempNamesB+{строка наименований вставляемых BLOB полей}
                 ') VALUES ('+ sTempValues+
                 IIf_Str(sTempValues=',',IIf_Str(sTempValuesB=',',','))+{строка значений вставляемых Не BLOB полей}
                 sTempValuesB+{строка значений вставляемых BLOB полей}
                 ')';
               end;
             'U':
               begin
                 sTempNamesValues := GetPartInsertStatement(false, LOG_TABLES_ID, 2 );
                 sTempNamesValuesB := GetPartInsertStatement(true, LOG_TABLES_ID, 2 );
                 Sds.CommandText := 'UPDATE ' + TABLE_NAME + ' SET ' + sTempNamesValues+
                 IIf_Str(sTempNamesValues=',',IIf_Str(sTempNamesValuesB=',',','))+
                {строка наименований = значение изменяемых Не BLOB полей}
                 sTempNamesValuesB+ {строка наименований = значение изменяемых BLOB полей}
                 ' WHERE '+ Fields[1].AsString+' = '+IntToStr(Fields[2].AsInteger+ DeltaGen);
               end;
           end;
           for i := 0 to Sds.Params.Count-1 do
             Sds.Params[i]:= TempParams[i];
           Sds.ExecSql;
           if Assigned(TempParams) then
             TempParams.Free;
         end;
       Next;
     end;
   FreeAndNil(StrStream);
   FreeAndNil(StrStream1);
   // получение запросов на удаление записей
   FreeAndNil(cdsKeys);
   cdsKeys := TClientdataset.Create(fmSkladTun);
   sTemp := StringForDel;
   StrStream := TStringStream.Create(sTemp);
   fmSkladTun.Createcds(0, cdsKeys);
   cdsKeys.LoadFromStream(StrStream);
   cdsKeys.First;
   with cdsKeys do
     // if FindKey([IDCUR]) then
       while not Eof do
       begin
         if Fields[3].AsInteger = IDCUR then
           begin
             TABLE_NAME := Fields[0].AsString;
             KEY_FIELD := Fields[1].AsString;
             KEY_VALUE := Fields[2].AsString;
             LOG_TABLES_ID := Fields[3].AsInteger;
             Sds.CommandText := 'delete from '+TABLE_NAME+ ' where '+KEY_FIELD+' = ' + KEY_VALUE;
             Sds.ExecSql;
           end;
         Next;
       end;
   FreeAndNil(StrStream);
   finally
     if Assigned(cdsKeys) then
       FreeAndNil(cdsKeys);
     if Assigned(cdsFieldInsert) then
       FreeAndNil(cdsFieldInsert);
     if Assigned(cdsBlobInsert) then
       FreeAndNil(cdsBlobInsert);
   end;
end;

Сервер off–line репликации имеет возможность выполнять настройки режимов репликации, выбирать списки ТУ, на которые выполняется рассылка TU.Replication = True, списки проектов в каждом ТУ, и списки таблиц баз данных проектов, подлежащих репликации. При выполнении пункта меню 'Создание файла репликации' выполняется построение и отправка файла D за период от введенного времени до текущего. При запуске автоматического режима выполняются циклы приема и передачи данных и квитанций с периодичностью TU. PeriodConnect. В любой момент можно остановить сервер off – line репликации.


Рис. 6.

На рис. 6. изображено состояние после отправки на Склад 2 очередного файла D. Файл сформировался после изменения данных в одной из таблиц проекта 3, поставленных на репликацию. На Рис. 7. показано, что следующий файл не формируется до прихода квитанции S от склада 2. На Рис. 8. показано, что все таблицы проекта 3 поставлены на репликацию по передаче для склада 1 . После приема файла D на стороне склада 2 и формирования квитанции Рис. 9. сервер склада 1 готов снова формировать сеанс D (Рис. 10.) , при возникновении изменений в данных одной из таблиц, поставленных на репликацию . В соответствующей БД и таблицах на стороне склада 2 выполнится та же транзакция, но со значениями ключей , отличающимися на DeltaGen.


Рис. 7.


Рис. 8.


Рис. 9.


Рис. 10.

Пример программы, реализующей данный механизм Вы можете найти на сайте: http://nerusoft.com.

Copyright© 2005 НеРуСофт, Все права защищены.


Пожалуйста, оцените статью
Отлично
Хорошо
Средне
Плохо
Очень плохо

Rambler's Top100