DB client state gets corrupted
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / GraphSession.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.ByteArrayInputStream;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.lang.ref.SoftReference;
18 import java.util.ArrayList;
19 import java.util.Collection;
20 import java.util.HashSet;
21 import java.util.Iterator;
22 import java.util.Map;
23 import java.util.TreeMap;
24 import java.util.Vector;
25 import java.util.concurrent.CopyOnWriteArrayList;
26
27 import org.simantics.databoard.Bindings;
28 import org.simantics.databoard.binding.impl.TreeMapBinding;
29 import org.simantics.databoard.serialization.Serializer;
30 import org.simantics.db.ChangeSetIdentifier;
31 import org.simantics.db.Database;
32 import org.simantics.db.Database.Session.ChangeSetData;
33 import org.simantics.db.Database.Session.ChangeSetIds;
34 import org.simantics.db.Database.Session.ClusterChanges;
35 import org.simantics.db.Database.Session.ClusterIds;
36 import org.simantics.db.Database.Session.Refresh;
37 import org.simantics.db.Operation;
38 import org.simantics.db.SessionReference;
39 import org.simantics.db.common.UndoContextEx;
40 import org.simantics.db.common.utils.Logger;
41 import org.simantics.db.exception.DatabaseException;
42 import org.simantics.db.exception.InternalException;
43 import org.simantics.db.impl.ClusterI;
44 import org.simantics.db.impl.ResourceImpl;
45 import org.simantics.db.service.ClusterUID;
46 import org.simantics.db.service.ExternalOperation;
47 import org.simantics.db.service.ManagementSupport.ChangeSetListener;
48 import org.simantics.scl.runtime.function.Function1;
49 import org.simantics.scl.runtime.function.FunctionImpl1;
50 import org.simantics.utils.threads.ThreadUtils;
51
52 import gnu.trove.iterator.TLongObjectIterator;
53 import gnu.trove.map.hash.THashMap;
54 import gnu.trove.map.hash.TLongObjectHashMap;
55
56 final class ChangeSetIdentifierImpl implements ChangeSetIdentifier {
57     private final long id;
58     private final GraphSession graphSession;
59     private Map<String, byte[]> metadata;
60     ChangeSetIdentifierImpl(long id, GraphSession graphSession) {
61         this.id = id;
62         this.graphSession = graphSession;
63         this.metadata = null;
64     }
65     public final long getId() {
66         return id;
67     }
68     public final void setMetadata(Map<String, byte[]> metadata) {
69         this.metadata = metadata;
70     }
71     public final byte[] getContext()
72     throws DatabaseException {
73         if (null == graphSession)
74             return new byte[0];
75         return graphSession.getChangeSetContext(id);
76     }
77
78     static Serializer METADATA_SERIALIZER =
79         Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING,
80             Bindings.BYTE_ARRAY));
81
82     @SuppressWarnings("unchecked")
83     public Map<String, byte[]> getMetadata() {
84         try {
85             if (metadata == null) {
86                 byte[] data = getContext();
87                 if (null != data && data.length > 0) {
88                     metadata = (TreeMap<String, byte[]>) METADATA_SERIALIZER.deserialize(data);
89                 }
90             }
91         } catch (DatabaseException e) {
92             Logger.defaultLogError(e);
93             e.printStackTrace();
94         } catch (IOException e) {
95             Logger.defaultLogError(e);
96             e.printStackTrace();
97         }
98         return metadata;
99     }
100
101     static long[] changeSetIds2ints(Collection<ChangeSetIdentifier> changeSetIds) {
102         if (null == changeSetIds)
103             return new long[0];
104         long[] t = new long[changeSetIds.size()];
105         int i = -1;
106         for (ChangeSetIdentifier id : changeSetIds)
107             t[++i] = ((ChangeSetIdentifierImpl) id).getId();
108         return t;
109     }
110     static long[] operations2ints(Collection<Operation> ops) {
111         if (null == ops)
112             return new long[0];
113         long[] t = new long[ops.size()];
114         int i = -1;
115         for (Operation o : ops)
116             t[++i] = o.getCSId();
117         return t;
118     }
119     static Collection<ChangeSetIdentifier> longs2changeSetIds(GraphSession gs, long firstChangeSetIds, int count) {
120         ArrayList<ChangeSetIdentifier> t = new ArrayList<ChangeSetIdentifier>();
121         t.ensureCapacity(count);
122         for (int i=0; i<count; ++i)
123             t.add(new ChangeSetIdentifierImpl(firstChangeSetIds + i, gs));
124         return t;
125     }
126     static Collection<ChangeSetIdentifier> longs2changeSetIds(GraphSession gs, long min, long max) {
127         ArrayList<ChangeSetIdentifier> t = new ArrayList<ChangeSetIdentifier>();
128         if (max<min)
129             return t;
130         long length = max - min + 1;
131         t.ensureCapacity((int)length);
132         for (int i=0; i<length; ++i) {
133             t.add(new ChangeSetIdentifierImpl(min+i, gs));
134         }
135         return t;
136     }
137 }
138
139 public abstract class GraphSession {
140     final protected boolean DEBUG = false;
141     final protected boolean VERBOSE = false;
142     protected Listener listener;
143     protected SessionImplSocket session;
144     protected final Database.Session dbSession;
145     private SessionReference sessionReference;
146     private TLongObjectHashMap<ClusterI> clusterMap = new TLongObjectHashMap<ClusterI>();
147     protected THashMap<String, BuiltinData> builtinMap = null;
148     private long firstChangeSetId = 0;
149     protected SynchronizeContextI synchronizeContext;
150     
151     final UndoContextEx undoContext = new UndoContextEx("GraphSession");
152     final CopyOnWriteArrayList<ChangeSetListener> changeSetListeners = new CopyOnWriteArrayList<ChangeSetListener>();
153     private long lastChangeSetId = 0;
154     protected MetadataCache metadataCache = new MetadataCache();
155     
156     public GraphSession(SessionImplSocket sessionImpl, SessionReference sessionReference, Database.Session dbSession) {
157         this.dbSession = dbSession;
158         if (null == dbSession)
159             throw new RuntimeException("Failed to initialize GraphSession. Database.Session can not be null.");
160 //        if (!(sessionReference instanceof ProCoreSessionReference))
161 //            throw new RuntimeException("Failed to initialize GraphSession. SessionReference must be instance of ProCoreSessionReference.");
162         this.sessionReference = sessionReference;
163         this.session = sessionImpl;
164     }
165     void addChangeSetListener(ChangeSetListener csl) {
166         changeSetListeners.add(csl);
167     }
168     void removeChangeSetListener(ChangeSetListener csl) {
169         changeSetListeners.remove(csl);
170     }
171     SessionReference getSessionReference() {
172         return sessionReference;
173     }
174
175     public void open() throws InternalException {
176         dbSession.open();
177     }
178
179     public void close() throws InternalException {
180         dbSession.close();
181     }
182
183     public long getSessionId() {
184         return sessionReference.getSessionId();
185     }
186 //
187 //    public String getHost() {
188 //        return sessionReference.serverReference.socketAddress.getHostName();
189 //    }
190 //
191 //    public int getPort() {
192 //        return sessionReference.serverReference.socketAddress.getPort();
193 //    }
194
195     long getFirstChangeSetId() {
196         return firstChangeSetId;
197     }
198
199     String getClusterFileName(long clusterId) {
200         String fileName = "cluster" + clusterId + ".dat";
201         return fileName;
202     }
203
204     int computeClusterMemoryUse() {
205
206         int size = 0;
207         TLongObjectIterator<ClusterI> i = clusterMap.iterator();
208         while (i.hasNext()) {
209             i.advance();
210             ClusterI c = i.value();
211             try {
212                 size += c.getUsedSpace();
213             } catch (DatabaseException e) {
214                 Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c);
215             }
216         }
217
218         return size;
219
220     }
221
222     void printClusterSize() {
223
224         long size = 0;
225
226         long amount = 0;
227         TLongObjectIterator<ClusterI> i = clusterMap.iterator();
228         while (i.hasNext()) {
229             i.advance();
230             ClusterI c = i.value();
231             if (c.isLoaded()) {
232                 try {
233                     size += c.getUsedSpace();
234                 } catch (DatabaseException e) {
235                     Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c);
236                 }
237                 amount++;
238             }
239         }
240
241         if (amount > 50)
242             loadedClusters.clear();
243         System.out.println("" + size / (1024 * 1024) + "M in " + amount + " clusters.");
244
245     }
246
247     private HashSet<Long> loadedClusters = new HashSet<Long>();
248
249     ClusterI freeClusterFromCacheAndFreeClusterMemory(long clusterId) {
250         ClusterI c = clusterMap.remove(clusterId);
251         if (null != c)
252             c.releaseMemory();
253         return c;
254     }
255
256     private void debugPrint(UpdateClusterFunction f) {
257         for (int i = 0; i < f.operation.length; ++i) {
258             System.out.println("op=" + f.operation[i]);
259         }
260     }
261
262     long total = 0;
263
264     public boolean updateCluster(UpdateClusterFunction f) {
265         session.clusterStream.markDirty();
266         assert (null != f);
267         try {
268             if (DEBUG) {
269                 System.out.println("update cluster");
270                 if (VERBOSE)
271                     debugPrint(f);
272             }
273 // long start = System.nanoTime();
274             dbSession.updateCluster(f.operation);
275 // long duration = System.nanoTime() - start;
276 // total += duration;
277 // System.err.println("graphClient.call = " + 1e-9*duration + " " + 1e-9*total);
278 // System.out.println("GraphSession.updateCluster done.");
279             return true; // ok
280         } catch (Throwable e) {
281             e.printStackTrace();
282             return false;
283         }
284     }
285     public byte[] getChangeSetContext(long id)
286     throws DatabaseException {
287         byte[] data = metadataCache.get(id);
288         if (null != data)
289             return data;
290         if (id < getServerInformation().firstChangeSetId) {
291             Logger.defaultLogInfo("Asking purged change set metadata. uid=" + id);
292             return new byte[0];
293         }
294         return dbSession.getChangeSetMetadata(id);
295     }
296
297     public StatementImpl getStatement(ResourceImpl s, ResourceImpl p, ResourceImpl o) {
298         return new StatementImplOld(s, p, o);
299     }
300     public long newClusterId() throws DatabaseException {
301         long id = dbSession.reserveIds(1);
302         if (id <= ClusterUID.Builtin.second)
303             return newClusterId();
304         return id;
305     }
306
307     long load    = 0;
308
309     long inflate = 0;
310
311 //    public ClusterI getClusterImpl(ClusterUID clusterUID, int clusterKey)
312 //    throws DatabaseException {
313 //        assert(ClusterUID.isLegal(clusterUID));
314 //
315 //              if(Development.DEVELOPMENT) {
316 //            if(Development.isTrue(DevelopmentKeys.READGRAPH_COUNT)) {
317 //              ReadGraphImpl.counters.adjustOrPutValue("load " + clusterUID + " " + clusterKey, 1, 1);
318 //            }
319 //              }
320 //
321 //// ITask task = ThreadLogger.getInstance().begin("Load cluster " + clusterId);
322 //        if (DEBUG)
323 //            System.out.println("DEBUG: loading cluster=" + clusterUID);
324 //// System.out.println("getClusterImpl " + clusterId);
325 //        long start = System.nanoTime();
326 //        Cluster t = dbSession.getCluster(clusterUID.asBytes());
327 //        long duration = System.nanoTime() - start;
328 //        load += duration;
329 //        //System.out.println("loaded in " + 1e-9*duration + " total " + 1e-9*load);
330 //// if (Constants.NullClusterId == t.clusterId) {
331 //// throw new DatabaseException("Cluster " + t.clusterId + " not in server.");
332 //// }
333 //
334 //        ByteBuffer deflated = t.getDeflated();
335 //        start = System.nanoTime();
336 //        Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);
337 //        duration = System.nanoTime() - start;
338 //        inflate += duration;
339 //        //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);
340 //        long[] longs = (long[]) arrays[0];
341 //        int[] ints = (int[]) arrays[1];
342 //        byte[] bytes = (byte[]) arrays[2];
343 //
344 //// System.out.println("Got cluster " + clusterId + " from core.");
345 //        ClusterImpl cluster = ClusterImpl.make(longs, ints, bytes, session.clusterTranslator, clusterKey);
346 ////        System.out.println("cluster[ " + clusterId + "].size = " + cluster.getUsedSpace(-1));
347 //// task.finish();
348 //        return cluster;
349 //    }
350
351     ClusterI getCluster(long clusterId) {
352         return clusterMap.get(clusterId);
353     }
354
355     ClusterI getClusterOrThrow(long clusterId) throws IllegalArgumentException {
356         ClusterI c = this.getCluster(clusterId);
357         if (c == null)
358             throw new IllegalArgumentException("Cluster " + clusterId + " does not exist.");
359         return c;
360     }
361
362     public String execute(String command) throws DatabaseException {
363         return dbSession.execute(command);
364     }
365
366     public Collection<ChangeSetIdentifier> getChangeSets()
367     throws DatabaseException {
368         ChangeSetIds t = dbSession.getChangeSetIds();
369         return ChangeSetIdentifierImpl.longs2changeSetIds(this, t.getFirstChangeSetId(), t.getCount());
370     }
371     public long getLastChangeSetId() {
372         return lastChangeSetId;
373     }
374     public Collection<ChangeSetIdentifier> getChangeSets(long from, long to, long upperLimit)
375     throws DatabaseException {
376         if (from > to)
377             throw new DatabaseException("Illegal argument: from=" + from + " > to=" + to + ".");
378         long min = Math.max(from, 1);
379         long max = Math.min(to, upperLimit);
380         return ChangeSetIdentifierImpl.longs2changeSetIds(this, min, max);
381     }
382
383     public boolean getChangeSets(long minCS, long maxCS, SynchronizeContextI context)
384     throws DatabaseException {
385         try {
386             synchronizeContext = context;
387             ChangeSetData t = dbSession.getChangeSetData(minCS, maxCS, context);
388             return !t.isOk();
389         } catch (Throwable e) {
390             if (DEBUG)
391                 e.printStackTrace();
392             if (e instanceof DatabaseException)
393                 throw (DatabaseException) e;
394             else
395                 throw new DatabaseException("GetChangeSetData call to server failed.");
396         } finally {
397             synchronizeContext = null;
398         }
399     }
400
401     public static void forExternals(Collection<Operation> operations, Function1<ExternalOperation, Boolean> fn) {
402         for(Operation o : operations) {
403                 for(ExternalOperation e : o.getExternalOperations()) {
404                         if(!e.isDisposed()) fn.apply(e);
405                 }
406         }
407     }
408
409     public boolean undo(Collection<Operation> changeSetIds, SynchronizeContextI context)
410     throws DatabaseException {
411         try {
412
413                 forExternals(changeSetIds, new FunctionImpl1<ExternalOperation, Boolean>() {
414                         @Override
415                         public Boolean apply(ExternalOperation op) {
416                                 op.undo();
417                                 return true;
418                         }
419                 });
420
421             long[] cids = ChangeSetIdentifierImpl.operations2ints(changeSetIds);
422             synchronizeContext = context;
423             if (DEBUG)
424                 for (long id : cids)
425                     System.out.println("DEBUG: Undo cs=" + id);
426             return dbSession.undo(cids, context);
427         } catch (Throwable e) {
428             if (DEBUG)
429                 e.printStackTrace();
430             if (e instanceof DatabaseException)
431                 throw (DatabaseException) e;
432             else
433                 throw new DatabaseException("Undo call to server failed.", e);
434         } finally {
435             synchronizeContext = null;
436         }
437     }
438
439     public ClusterUID[] getRefresh2(long csid) throws DatabaseException {
440         Refresh t = dbSession.getRefresh(csid);
441         if (DEBUG)
442           System.err.println("getRefresh2 old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " chanaged clusters=" + t.getFirst().length);
443         ClusterUID[] clusters = new ClusterUID[t.getFirst().length];
444         for (int i=0; i<t.getFirst().length; ++i)
445             clusters[i] = new ClusterUID(t.getFirst()[i], t.getSecond()[i]);
446         return clusters;
447     }
448
449     public ClusterChanges getClusterChanges(ClusterUID cluster, long csid) throws DatabaseException {
450         ClusterChanges t = dbSession.getClusterChanges(csid, cluster.asBytes());
451         if (DEBUG)
452             System.err.println("getClusterChanges old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " cluster=" + cluster + " statements=" + t.getPredicateIndex().length + " values=" + t.getValueIndex().length);
453         return t;
454     }
455     public long getCluster(int id) {
456         return session.getCluster(id);
457     }
458
459     private static class ResourceSegment {
460         public long   valueSize;
461
462         public byte[] bytes;
463
464         ResourceSegment(long valueSize, byte[] bytes) {
465             this.valueSize = valueSize;
466             this.bytes = bytes;
467         }
468     }
469     public ResourceSegment getResourceSegment(int resourceIndex, ClusterUID clusterUID, long offset, short size)
470     throws DatabaseException {
471         if (DEBUG)
472             System.out.println("DEBUG: getResourceSegment ri=" + resourceIndex + " cid=" + clusterUID + " offset=" + offset + " size=" + size);
473         org.simantics.db.Database.Session.ResourceSegment t = dbSession.getResourceSegment(clusterUID.asBytes(), resourceIndex, offset, size);
474         return new ResourceSegment(t.getValueSize(), t.getSegment());
475     }
476
477     public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID)
478     throws DatabaseException {
479         return getResourceValue(resourceIndex, clusterUID, 0, 0);
480     }
481
482     public InputStream getResourceValueStream(final int resourceIndex, final ClusterUID clusterUID, long offset, int length) throws DatabaseException {
483
484         if (DEBUG)
485             System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length);
486         final int IMAX = 0xFFFF;
487         short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX);
488         final ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen);
489         if (s.valueSize < 0)
490             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
491             " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1).");
492         int ilen = (int)slen & 0xFFFF;
493         assert(s.bytes.length <= ilen);
494         if (0 == length) {
495             if (s.valueSize > Integer.MAX_VALUE)
496                 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
497                 " cluster=" + clusterUID + " off=" + offset + " len=" + length +
498                 ". Value size=" + s.valueSize + " (2).");
499             length = (int)s.valueSize;
500         }
501         long rSize = s.valueSize - offset;
502         if (rSize < length)
503             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
504             " cluster=" + clusterUID + " off=" + offset + " len=" + length +
505             ". Value size=" + s.valueSize + " (3).");
506         else if (length <= IMAX)
507             return new ByteArrayInputStream(s.bytes);
508
509         final int finalLength = length;
510
511         return new InputStream() {
512
513             int left = finalLength;
514             long valueOffset = 0;
515             int offset = 0;
516             ResourceSegment _s = s;
517
518             @Override
519             public int read() throws IOException {
520
521                 if(left <= 0) return -1;
522
523                 if(offset == _s.bytes.length) {
524                     short slen = (short)Math.min(left, IMAX);
525                     valueOffset += _s.bytes.length;
526                     try {
527                         _s = getResourceSegment(resourceIndex, clusterUID, valueOffset, slen);
528                     } catch (DatabaseException e) {
529                         throw new IOException(e);
530                     }
531                     offset = 0;
532                 }
533
534                 left--;
535                 int result = _s.bytes[offset++];
536                 if(result < 0) result += 256;
537                 return result;
538
539             }
540
541         };
542     }
543
544
545     public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID, long offset, int length)
546     throws DatabaseException {
547         if (DEBUG)
548             System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length);
549         final int IMAX = 0xFFFF;
550         short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX);
551         ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen);
552         final long VALUE_SIZE = s.valueSize;
553         if (s.valueSize < 0)
554             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
555             " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1).");
556         int ilen = (int)slen & 0xFFFF;
557         assert(s.bytes.length <= ilen);
558         if (0 == length) {
559             if (s.valueSize > Integer.MAX_VALUE)
560                 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
561                 " cluster=" + clusterUID + " off=" + offset + " len=" + length +
562                 ". Value size=" + s.valueSize + " (2).");
563             length = (int)s.valueSize;
564         }
565         long rSize = s.valueSize - offset;
566         if (rSize < length)
567             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
568             " cluster=" + clusterUID + " off=" + offset + " len=" + length +
569             ". Value size=" + s.valueSize + " (3).");
570         else if (length <= IMAX)
571             return s.bytes;
572         byte[] bytes = new byte[length];
573         int left = (int)length - s.bytes.length;
574         int cur = s.bytes.length;
575         offset += s.bytes.length;
576         System.arraycopy(s.bytes, 0, bytes, 0, cur);
577         while (left > 0) {
578             slen = (short)Math.min(left, IMAX);
579             s = getResourceSegment(resourceIndex, clusterUID, offset, slen);
580             ilen = (int)slen & 0xFFFF;
581             if (s.valueSize != VALUE_SIZE || s.bytes.length != ilen)
582                 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
583                 " cluster=" + clusterUID + " off=" + offset + " len=" + length +
584                 ". Value size=" + s.valueSize + " blen=" + s.bytes.length + " (4).");
585             System.arraycopy(s.bytes, 0, bytes, cur, s.bytes.length);
586             left -= s.bytes.length;
587             cur += s.bytes.length;
588             offset += s.bytes.length;
589         }
590         return bytes;
591     }
592
593     public long getResourceValueSize(int resourceIndex, ClusterUID clusterUID)
594     throws DatabaseException {
595         if (DEBUG)
596             System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID);
597         final short SMAX = (short)0xFFFF;
598         ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, 0, SMAX);
599         if (s.valueSize < 0)
600             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
601             " cluster=" + clusterUID + " size=" + s.valueSize);
602         return s.valueSize;
603     }
604     int wait4RequestsLess(int limit)
605     throws DatabaseException {
606         dbSession.execute("");
607         return 0;
608     }
609     interface Listener {
610         void onChangeSetId(int thread, long csid, boolean refresh);
611     }
612
613     void setListener(Listener listener) {
614         this.listener = listener;
615     }
616
617     protected void updateLastChangeSetId(int thread, final long csid, boolean refresh) {
618         if (null != listener)
619             listener.onChangeSetId(thread, csid, refresh);
620         if (csid > lastChangeSetId) {
621             lastChangeSetId = csid;
622             final Iterator<ChangeSetListener> it = changeSetListeners.iterator();
623             ThreadUtils.getBlockingWorkExecutor().execute(new Runnable() {
624                 @Override
625                 public void run() {
626                     while (it.hasNext()) {
627                         ChangeSetListener l = it.next();
628                         try {
629                             l.onChanged(csid);
630                         } catch (Throwable t) {
631                             Logger.defaultLogError(t);
632                         }
633                     }
634                 }
635             });
636         }
637     }
638     protected abstract ServerInformationImpl getServerInformation()
639     throws DatabaseException;
640     public abstract void acceptCommit(long transactionId, long csid, byte[] metadata)
641     throws DatabaseException;
642     public abstract void cancelCommit(long transactionId, long csid, byte[] metadata, SynchronizeContextI context)
643     throws DatabaseException;
644     public abstract void endTransaction(long transactionId, boolean write)
645     throws DatabaseException;
646     public abstract long askWriteTransaction(int thread, long transactionId)
647     throws DatabaseException;
648     public abstract long askReadTransaction(int thread)
649     throws DatabaseException;
650     public abstract void stop()
651     throws DatabaseException;
652
653     public abstract long reserveIds(int count)
654     throws DatabaseException;
655     static class MetadataCache {
656         private final boolean DEBUG = false;
657         private final int SIZE = 10;
658         private int lastInd; // index of last inserted element
659         private long lastId; // id of last inserted element
660         private int count; // number of inserted elements
661         Vector<byte[]> buffer;
662         MetadataCache() {
663             init();
664         }
665         public int clear() {
666             int ret = count;
667             init();
668             return ret;
669         }
670         private void init() {
671             if (DEBUG)
672                 System.out.println("DEBUG: MetadataCache init");
673             lastInd = -1;
674             lastId = 0;
675             count = 0;
676             buffer = new Vector<byte[]>();
677             buffer.setSize(SIZE);
678         }
679         private boolean incLastInd() {
680             ++lastInd;
681             if (lastInd >= SIZE) {
682                 lastInd = 0;
683                 return true;
684             } else
685                 return false;
686         }
687         synchronized void addNext(long id, byte[] data)
688         throws DatabaseException {
689             if (DEBUG)
690                 System.out.println("DEBUG: MetadataCache add id=" + id);
691             if (lastId != 0 && lastId != id - 1)
692                 init(); // Only continuous sequence supported.
693             incLastInd();
694             buffer.set(lastInd, data);
695             lastId = id;
696             if (count < SIZE)
697                 ++count;
698         }
699         synchronized byte[] get(long id) {
700             if (DEBUG)
701                 System.out.println("DEBUG: MetadataCache get id=" + id);
702             if (id > lastId || id <= lastId - count)
703                 return null;
704             int ind = lastInd - (int)(lastId - id);
705             if (ind < 0)
706                 ind = SIZE + ind;
707             byte[] found = buffer.get(ind);
708             if (DEBUG)
709                 if (null != found)
710                     System.out.println("DEBUG: MetadataCache found id=" + id);
711             return found;
712         }
713     }
714     public ClusterUID[] listClusters() throws InternalException {
715         ClusterIds t = dbSession.getClusterIds();
716         long[] first = t.getFirst();
717         long[] second = t.getSecond();
718         int N1 = (null == first) ? 0 : first.length;
719         N1 = Math.min(N1, t.getStatus());
720         int N2 = (null == second) ? 0 : second.length;
721         N2 = Math.min(N1, N2);
722         ClusterUID[] clusters = new ClusterUID[N2];
723         for (int i=0; i<N2; ++i)
724             clusters[i] = new ClusterUID(first[i], second[i]);
725         return clusters;
726     }
727     public boolean rolledback() {
728         return dbSession.rolledback();
729     }
730 }
731 class BuiltinData {
732     final long                  id;
733     final long                  cluster;
734     SoftReference<ResourceImpl> weakResource = null;
735     BuiltinData(long id, long cluster) {
736         this.id = id;
737         this.cluster = cluster;
738     }
739 }
740 class UpdateClusterFunction {
741     public byte[] operation = null;
742     public UpdateClusterFunction(byte[] operation) {
743         this.operation = operation;
744     }
745 }