]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/GraphSession.java
DB-client fixes
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / GraphSession.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package fi.vtt.simantics.procore.internal;\r
13 \r
14 import java.io.ByteArrayInputStream;\r
15 import java.io.IOException;\r
16 import java.io.InputStream;\r
17 import java.lang.ref.SoftReference;\r
18 import java.util.ArrayList;\r
19 import java.util.Collection;\r
20 import java.util.HashSet;\r
21 import java.util.Iterator;\r
22 import java.util.Map;\r
23 import java.util.TreeMap;\r
24 import java.util.Vector;\r
25 import java.util.concurrent.CopyOnWriteArrayList;\r
26 \r
27 import org.simantics.databoard.Bindings;\r
28 import org.simantics.databoard.binding.impl.TreeMapBinding;\r
29 import org.simantics.databoard.serialization.Serializer;\r
30 import org.simantics.db.ChangeSetIdentifier;\r
31 import org.simantics.db.Database;\r
32 import org.simantics.db.Operation;\r
33 import org.simantics.db.SessionReference;\r
34 import org.simantics.db.Database.Session.ChangeSetData;\r
35 import org.simantics.db.Database.Session.ChangeSetIds;\r
36 import org.simantics.db.Database.Session.ClusterChanges;\r
37 import org.simantics.db.Database.Session.ClusterIds;\r
38 import org.simantics.db.Database.Session.Refresh;\r
39 import org.simantics.db.common.UndoContextEx;\r
40 import org.simantics.db.common.utils.Logger;\r
41 import org.simantics.db.exception.DatabaseException;\r
42 import org.simantics.db.exception.InternalException;\r
43 import org.simantics.db.impl.ClusterI;\r
44 import org.simantics.db.impl.ResourceImpl;\r
45 import org.simantics.db.service.ClusterUID;\r
46 import org.simantics.db.service.ExternalOperation;\r
47 import org.simantics.db.service.ManagementSupport.ChangeSetListener;\r
48 import org.simantics.scl.runtime.function.Function1;\r
49 import org.simantics.scl.runtime.function.FunctionImpl1;\r
50 import org.simantics.utils.threads.ThreadUtils;\r
51 \r
52 import fi.vtt.simantics.procore.ProCoreSessionReference;\r
53 import gnu.trove.iterator.TLongObjectIterator;\r
54 import gnu.trove.map.hash.THashMap;\r
55 import gnu.trove.map.hash.TLongObjectHashMap;\r
56 \r
57 final class ChangeSetIdentifierImpl implements ChangeSetIdentifier {\r
58     private final long id;\r
59     private final GraphSession graphSession;\r
60     private Map<String, byte[]> metadata;\r
61     ChangeSetIdentifierImpl(long id, GraphSession graphSession) {\r
62         this.id = id;\r
63         this.graphSession = graphSession;\r
64         this.metadata = null;\r
65     }\r
66     public final long getId() {\r
67         return id;\r
68     }\r
69     public final void setMetadata(Map<String, byte[]> metadata) {\r
70         this.metadata = metadata;\r
71     }\r
72     public final byte[] getContext()\r
73     throws DatabaseException {\r
74         if (null == graphSession)\r
75             return new byte[0];\r
76         return graphSession.getChangeSetContext(id);\r
77     }\r
78 \r
79     static Serializer METADATA_SERIALIZER =\r
80         Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING,\r
81             Bindings.BYTE_ARRAY));\r
82 \r
83     @SuppressWarnings("unchecked")\r
84     public Map<String, byte[]> getMetadata() {\r
85         try {\r
86             if (metadata == null) {\r
87                 byte[] data = getContext();\r
88                 if (null != data && data.length > 0) {\r
89                     metadata = (TreeMap<String, byte[]>) METADATA_SERIALIZER.deserialize(data);\r
90                 }\r
91             }\r
92         } catch (DatabaseException e) {\r
93             Logger.defaultLogError(e);\r
94             e.printStackTrace();\r
95         } catch (IOException e) {\r
96             Logger.defaultLogError(e);\r
97             e.printStackTrace();\r
98         }\r
99         return metadata;\r
100     }\r
101 \r
102     static long[] changeSetIds2ints(Collection<ChangeSetIdentifier> changeSetIds) {\r
103         if (null == changeSetIds)\r
104             return new long[0];\r
105         long[] t = new long[changeSetIds.size()];\r
106         int i = -1;\r
107         for (ChangeSetIdentifier id : changeSetIds)\r
108             t[++i] = ((ChangeSetIdentifierImpl) id).getId();\r
109         return t;\r
110     }\r
111     static long[] operations2ints(Collection<Operation> ops) {\r
112         if (null == ops)\r
113             return new long[0];\r
114         long[] t = new long[ops.size()];\r
115         int i = -1;\r
116         for (Operation o : ops)\r
117             t[++i] = o.getCSId();\r
118         return t;\r
119     }\r
120     static Collection<ChangeSetIdentifier> longs2changeSetIds(GraphSession gs, long firstChangeSetIds, int count) {\r
121         ArrayList<ChangeSetIdentifier> t = new ArrayList<ChangeSetIdentifier>();\r
122         t.ensureCapacity(count);\r
123         for (int i=0; i<count; ++i)\r
124             t.add(new ChangeSetIdentifierImpl(firstChangeSetIds + i, gs));\r
125         return t;\r
126     }\r
127     static Collection<ChangeSetIdentifier> longs2changeSetIds(GraphSession gs, long min, long max) {\r
128         ArrayList<ChangeSetIdentifier> t = new ArrayList<ChangeSetIdentifier>();\r
129         if (max<min)\r
130             return t;\r
131         long length = max - min + 1;\r
132         t.ensureCapacity((int)length);\r
133         for (int i=0; i<length; ++i) {\r
134             t.add(new ChangeSetIdentifierImpl(min+i, gs));\r
135         }\r
136         return t;\r
137     }\r
138 }\r
139 \r
140 public abstract class GraphSession {\r
141     final protected boolean DEBUG = false;\r
142     final protected boolean VERBOSE = false;\r
143     protected Listener listener;\r
144     protected SessionImplSocket session;\r
145     protected final Database.Session dbSession;\r
146     private SessionReference sessionReference;\r
147     private TLongObjectHashMap<ClusterI> clusterMap = new TLongObjectHashMap<ClusterI>();\r
148     protected THashMap<String, BuiltinData> builtinMap = null;\r
149     private long firstChangeSetId = 0;\r
150     protected SynchronizeContextI synchronizeContext;\r
151     \r
152     final UndoContextEx undoContext = new UndoContextEx("GraphSession");\r
153     final CopyOnWriteArrayList<ChangeSetListener> changeSetListeners = new CopyOnWriteArrayList<ChangeSetListener>();\r
154     private long lastChangeSetId = 0;\r
155     protected MetadataCache metadataCache = new MetadataCache();\r
156     \r
157     public GraphSession(SessionImplSocket sessionImpl, SessionReference sessionReference, Database.Session dbSession) {\r
158         this.dbSession = dbSession;\r
159         if (null == dbSession)\r
160             throw new RuntimeException("Failed to initialize GraphSession. Database.Session can not be null.");\r
161 //        if (!(sessionReference instanceof ProCoreSessionReference))\r
162 //            throw new RuntimeException("Failed to initialize GraphSession. SessionReference must be instance of ProCoreSessionReference.");\r
163         this.sessionReference = sessionReference;\r
164         this.session = sessionImpl;\r
165     }\r
166     void addChangeSetListener(ChangeSetListener csl) {\r
167         changeSetListeners.add(csl);\r
168     }\r
169     void removeChangeSetListener(ChangeSetListener csl) {\r
170         changeSetListeners.remove(csl);\r
171     }\r
172     SessionReference getSessionReference() {\r
173         return sessionReference;\r
174     }\r
175 \r
176     public void open() throws InternalException {\r
177         dbSession.open();\r
178     }\r
179 \r
180     public void close() throws InternalException {\r
181         dbSession.close();\r
182     }\r
183 \r
184     public long getSessionId() {\r
185         return sessionReference.getSessionId();\r
186     }\r
187 //\r
188 //    public String getHost() {\r
189 //        return sessionReference.serverReference.socketAddress.getHostName();\r
190 //    }\r
191 //\r
192 //    public int getPort() {\r
193 //        return sessionReference.serverReference.socketAddress.getPort();\r
194 //    }\r
195 \r
196     long getFirstChangeSetId() {\r
197         return firstChangeSetId;\r
198     }\r
199 \r
200     String getClusterFileName(long clusterId) {\r
201         String fileName = "cluster" + clusterId + ".dat";\r
202         return fileName;\r
203     }\r
204 \r
205     int computeClusterMemoryUse() {\r
206 \r
207         int size = 0;\r
208         TLongObjectIterator<ClusterI> i = clusterMap.iterator();\r
209         while (i.hasNext()) {\r
210             i.advance();\r
211             ClusterI c = i.value();\r
212             try {\r
213                 size += c.getUsedSpace();\r
214             } catch (DatabaseException e) {\r
215                 Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c);\r
216             }\r
217         }\r
218 \r
219         return size;\r
220 \r
221     }\r
222 \r
223     void printClusterSize() {\r
224 \r
225         long size = 0;\r
226 \r
227         long amount = 0;\r
228         TLongObjectIterator<ClusterI> i = clusterMap.iterator();\r
229         while (i.hasNext()) {\r
230             i.advance();\r
231             ClusterI c = i.value();\r
232             if (c.isLoaded()) {\r
233                 try {\r
234                     size += c.getUsedSpace();\r
235                 } catch (DatabaseException e) {\r
236                     Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c);\r
237                 }\r
238                 amount++;\r
239             }\r
240         }\r
241 \r
242         if (amount > 50)\r
243             loadedClusters.clear();\r
244         System.out.println("" + size / (1024 * 1024) + "M in " + amount + " clusters.");\r
245 \r
246     }\r
247 \r
248     private HashSet<Long> loadedClusters = new HashSet<Long>();\r
249 \r
250     ClusterI freeClusterFromCacheAndFreeClusterMemory(long clusterId) {\r
251         ClusterI c = clusterMap.remove(clusterId);\r
252         if (null != c)\r
253             c.releaseMemory();\r
254         return c;\r
255     }\r
256 \r
257     private void debugPrint(UpdateClusterFunction f) {\r
258         for (int i = 0; i < f.operation.length; ++i) {\r
259             System.out.println("op=" + f.operation[i]);\r
260         }\r
261     }\r
262 \r
263     long total = 0;\r
264 \r
265     public boolean updateCluster(UpdateClusterFunction f) {\r
266         session.clusterStream.markDirty();\r
267         assert (null != f);\r
268         try {\r
269             if (DEBUG) {\r
270                 System.out.println("update cluster");\r
271                 if (VERBOSE)\r
272                     debugPrint(f);\r
273             }\r
274 // long start = System.nanoTime();\r
275             dbSession.updateCluster(f.operation);\r
276 // long duration = System.nanoTime() - start;\r
277 // total += duration;\r
278 // System.err.println("graphClient.call = " + 1e-9*duration + " " + 1e-9*total);\r
279 // System.out.println("GraphSession.updateCluster done.");\r
280             return true; // ok\r
281         } catch (Throwable e) {\r
282             e.printStackTrace();\r
283             return false;\r
284         }\r
285     }\r
286     public byte[] getChangeSetContext(long id)\r
287     throws DatabaseException {\r
288         byte[] data = metadataCache.get(id);\r
289         if (null != data)\r
290             return data;\r
291         if (id < getServerInformation().firstChangeSetId) {\r
292             Logger.defaultLogInfo("Asking purged change set metadata. uid=" + id);\r
293             return new byte[0];\r
294         }\r
295         return dbSession.getChangeSetMetadata(id);\r
296     }\r
297 \r
298     public StatementImpl getStatement(ResourceImpl s, ResourceImpl p, ResourceImpl o) {\r
299         return new StatementImplOld(s, p, o);\r
300     }\r
301     public long newClusterId() throws DatabaseException {\r
302         long id = dbSession.reserveIds(1);\r
303         if (id <= ClusterUID.Builtin.second)\r
304             return newClusterId();\r
305         return id;\r
306     }\r
307 \r
308     long load    = 0;\r
309 \r
310     long inflate = 0;\r
311 \r
312 //    public ClusterI getClusterImpl(ClusterUID clusterUID, int clusterKey)\r
313 //    throws DatabaseException {\r
314 //        assert(ClusterUID.isLegal(clusterUID));\r
315 //\r
316 //              if(Development.DEVELOPMENT) {\r
317 //            if(Development.isTrue(DevelopmentKeys.READGRAPH_COUNT)) {\r
318 //              ReadGraphImpl.counters.adjustOrPutValue("load " + clusterUID + " " + clusterKey, 1, 1);\r
319 //            }\r
320 //              }\r
321 //\r
322 //// ITask task = ThreadLogger.getInstance().begin("Load cluster " + clusterId);\r
323 //        if (DEBUG)\r
324 //            System.out.println("DEBUG: loading cluster=" + clusterUID);\r
325 //// System.out.println("getClusterImpl " + clusterId);\r
326 //        long start = System.nanoTime();\r
327 //        Cluster t = dbSession.getCluster(clusterUID.asBytes());\r
328 //        long duration = System.nanoTime() - start;\r
329 //        load += duration;\r
330 //        //System.out.println("loaded in " + 1e-9*duration + " total " + 1e-9*load);\r
331 //// if (Constants.NullClusterId == t.clusterId) {\r
332 //// throw new DatabaseException("Cluster " + t.clusterId + " not in server.");\r
333 //// }\r
334 //\r
335 //        ByteBuffer deflated = t.getDeflated();\r
336 //        start = System.nanoTime();\r
337 //        Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);\r
338 //        duration = System.nanoTime() - start;\r
339 //        inflate += duration;\r
340 //        //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);\r
341 //        long[] longs = (long[]) arrays[0];\r
342 //        int[] ints = (int[]) arrays[1];\r
343 //        byte[] bytes = (byte[]) arrays[2];\r
344 //\r
345 //// System.out.println("Got cluster " + clusterId + " from core.");\r
346 //        ClusterImpl cluster = ClusterImpl.make(longs, ints, bytes, session.clusterTranslator, clusterKey);\r
347 ////        System.out.println("cluster[ " + clusterId + "].size = " + cluster.getUsedSpace(-1));\r
348 //// task.finish();\r
349 //        return cluster;\r
350 //    }\r
351 \r
352     ClusterI getCluster(long clusterId) {\r
353         return clusterMap.get(clusterId);\r
354     }\r
355 \r
356     ClusterI getClusterOrThrow(long clusterId) throws IllegalArgumentException {\r
357         ClusterI c = this.getCluster(clusterId);\r
358         if (c == null)\r
359             throw new IllegalArgumentException("Cluster " + clusterId + " does not exist.");\r
360         return c;\r
361     }\r
362 \r
363     public String execute(String command) throws DatabaseException {\r
364         return dbSession.execute(command);\r
365     }\r
366 \r
367     public Collection<ChangeSetIdentifier> getChangeSets()\r
368     throws DatabaseException {\r
369         ChangeSetIds t = dbSession.getChangeSetIds();\r
370         return ChangeSetIdentifierImpl.longs2changeSetIds(this, t.getFirstChangeSetId(), t.getCount());\r
371     }\r
372     public long getLastChangeSetId() {\r
373         return lastChangeSetId;\r
374     }\r
375     public Collection<ChangeSetIdentifier> getChangeSets(long from, long to, long upperLimit)\r
376     throws DatabaseException {\r
377         if (from > to)\r
378             throw new DatabaseException("Illegal argument: from=" + from + " > to=" + to + ".");\r
379         long min = Math.max(from, 1);\r
380         long max = Math.min(to, upperLimit);\r
381         return ChangeSetIdentifierImpl.longs2changeSetIds(this, min, max);\r
382     }\r
383 \r
384     public boolean getChangeSets(long minCS, long maxCS, SynchronizeContextI context)\r
385     throws DatabaseException {\r
386         try {\r
387             synchronizeContext = context;\r
388             ChangeSetData t = dbSession.getChangeSetData(minCS, maxCS, context);\r
389             return !t.isOk();\r
390         } catch (Throwable e) {\r
391             if (DEBUG)\r
392                 e.printStackTrace();\r
393             if (e instanceof DatabaseException)\r
394                 throw (DatabaseException) e;\r
395             else\r
396                 throw new DatabaseException("GetChangeSetData call to server failed.");\r
397         } finally {\r
398             synchronizeContext = null;\r
399         }\r
400     }\r
401 \r
402     public static void forExternals(Collection<Operation> operations, Function1<ExternalOperation, Boolean> fn) {\r
403         for(Operation o : operations) {\r
404                 for(ExternalOperation e : o.getExternalOperations()) {\r
405                         if(!e.isDisposed()) fn.apply(e);\r
406                 }\r
407         }\r
408     }\r
409 \r
410     public boolean undo(Collection<Operation> changeSetIds, SynchronizeContextI context)\r
411     throws DatabaseException {\r
412         try {\r
413 \r
414                 forExternals(changeSetIds, new FunctionImpl1<ExternalOperation, Boolean>() {\r
415                         @Override\r
416                         public Boolean apply(ExternalOperation op) {\r
417                                 op.undo();\r
418                                 return true;\r
419                         }\r
420                 });\r
421 \r
422             long[] cids = ChangeSetIdentifierImpl.operations2ints(changeSetIds);\r
423             synchronizeContext = context;\r
424             if (DEBUG)\r
425                 for (long id : cids)\r
426                     System.out.println("DEBUG: Undo cs=" + id);\r
427             return dbSession.undo(cids, context);\r
428         } catch (Throwable e) {\r
429             if (DEBUG)\r
430                 e.printStackTrace();\r
431             if (e instanceof DatabaseException)\r
432                 throw (DatabaseException) e;\r
433             else\r
434                 throw new DatabaseException("Undo call to server failed.", e);\r
435         } finally {\r
436             synchronizeContext = null;\r
437         }\r
438     }\r
439 \r
440     public ClusterUID[] getRefresh2(long csid) throws DatabaseException {\r
441         Refresh t = dbSession.getRefresh(csid);\r
442         if (DEBUG)\r
443           System.err.println("getRefresh2 old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " chanaged clusters=" + t.getFirst().length);\r
444         ClusterUID[] clusters = new ClusterUID[t.getFirst().length];\r
445         for (int i=0; i<t.getFirst().length; ++i)\r
446             clusters[i] = new ClusterUID(t.getFirst()[i], t.getSecond()[i]);\r
447         return clusters;\r
448     }\r
449 \r
450     public ClusterChanges getClusterChanges(ClusterUID cluster, long csid) throws DatabaseException {\r
451         ClusterChanges t = dbSession.getClusterChanges(csid, cluster.asBytes());\r
452         if (DEBUG)\r
453             System.err.println("getClusterChanges old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " cluster=" + cluster + " statements=" + t.getPredicateIndex().length + " values=" + t.getValueIndex().length);\r
454         return t;\r
455     }\r
456     public long getCluster(int id) {\r
457         return session.getCluster(id);\r
458     }\r
459 \r
460     private static class ResourceSegment {\r
461         public long   valueSize;\r
462 \r
463         public byte[] bytes;\r
464 \r
465         ResourceSegment(long valueSize, byte[] bytes) {\r
466             this.valueSize = valueSize;\r
467             this.bytes = bytes;\r
468         }\r
469     }\r
470     public ResourceSegment getResourceSegment(int resourceIndex, ClusterUID clusterUID, long offset, short size)\r
471     throws DatabaseException {\r
472         if (DEBUG)\r
473             System.out.println("DEBUG: getResourceSegment ri=" + resourceIndex + " cid=" + clusterUID + " offset=" + offset + " size=" + size);\r
474         org.simantics.db.Database.Session.ResourceSegment t = dbSession.getResourceSegment(clusterUID.asBytes(), resourceIndex, offset, size);\r
475         return new ResourceSegment(t.getValueSize(), t.getSegment());\r
476     }\r
477 \r
478     public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID)\r
479     throws DatabaseException {\r
480         return getResourceValue(resourceIndex, clusterUID, 0, 0);\r
481     }\r
482 \r
483     public InputStream getResourceValueStream(final int resourceIndex, final ClusterUID clusterUID, long offset, int length) throws DatabaseException {\r
484 \r
485         if (DEBUG)\r
486             System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length);\r
487         final int IMAX = 0xFFFF;\r
488         short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX);\r
489         final ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen);\r
490         if (s.valueSize < 0)\r
491             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
492             " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1).");\r
493         int ilen = (int)slen & 0xFFFF;\r
494         assert(s.bytes.length <= ilen);\r
495         if (0 == length) {\r
496             if (s.valueSize > Integer.MAX_VALUE)\r
497                 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
498                 " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
499                 ". Value size=" + s.valueSize + " (2).");\r
500             length = (int)s.valueSize;\r
501         }\r
502         long rSize = s.valueSize - offset;\r
503         if (rSize < length)\r
504             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
505             " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
506             ". Value size=" + s.valueSize + " (3).");\r
507         else if (length <= IMAX)\r
508             return new ByteArrayInputStream(s.bytes);\r
509 \r
510         final int finalLength = length;\r
511 \r
512         return new InputStream() {\r
513 \r
514             int left = finalLength;\r
515             long valueOffset = 0;\r
516             int offset = 0;\r
517             ResourceSegment _s = s;\r
518 \r
519             @Override\r
520             public int read() throws IOException {\r
521 \r
522                 if(left <= 0) throw new IllegalStateException();\r
523 \r
524                 if(offset == _s.bytes.length) {\r
525                     short slen = (short)Math.min(left, IMAX);\r
526                     valueOffset += _s.bytes.length;\r
527                     try {\r
528                         _s = getResourceSegment(resourceIndex, clusterUID, valueOffset, slen);\r
529                     } catch (DatabaseException e) {\r
530                         throw new IOException(e);\r
531                     }\r
532                     offset = 0;\r
533                 }\r
534 \r
535                 left--;\r
536                 int result = _s.bytes[offset++];\r
537                 if(result < 0) result += 256;\r
538                 return result;\r
539 \r
540             }\r
541 \r
542         };\r
543     }\r
544 \r
545 \r
546     public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID, long offset, int length)\r
547     throws DatabaseException {\r
548         if (DEBUG)\r
549             System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length);\r
550         final int IMAX = 0xFFFF;\r
551         short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX);\r
552         ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen);\r
553         final long VALUE_SIZE = s.valueSize;\r
554         if (s.valueSize < 0)\r
555             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
556             " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1).");\r
557         int ilen = (int)slen & 0xFFFF;\r
558         assert(s.bytes.length <= ilen);\r
559         if (0 == length) {\r
560             if (s.valueSize > Integer.MAX_VALUE)\r
561                 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
562                 " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
563                 ". Value size=" + s.valueSize + " (2).");\r
564             length = (int)s.valueSize;\r
565         }\r
566         long rSize = s.valueSize - offset;\r
567         if (rSize < length)\r
568             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
569             " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
570             ". Value size=" + s.valueSize + " (3).");\r
571         else if (length <= IMAX)\r
572             return s.bytes;\r
573         byte[] bytes = new byte[length];\r
574         int left = (int)length - s.bytes.length;\r
575         int cur = s.bytes.length;\r
576         offset += s.bytes.length;\r
577         System.arraycopy(s.bytes, 0, bytes, 0, cur);\r
578         while (left > 0) {\r
579             slen = (short)Math.min(left, IMAX);\r
580             s = getResourceSegment(resourceIndex, clusterUID, offset, slen);\r
581             ilen = (int)slen & 0xFFFF;\r
582             if (s.valueSize != VALUE_SIZE || s.bytes.length != ilen)\r
583                 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
584                 " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
585                 ". Value size=" + s.valueSize + " blen=" + s.bytes.length + " (4).");\r
586             System.arraycopy(s.bytes, 0, bytes, cur, s.bytes.length);\r
587             left -= s.bytes.length;\r
588             cur += s.bytes.length;\r
589             offset += s.bytes.length;\r
590         }\r
591         return bytes;\r
592     }\r
593 \r
594     public long getResourceValueSize(int resourceIndex, ClusterUID clusterUID)\r
595     throws DatabaseException {\r
596         if (DEBUG)\r
597             System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID);\r
598         final short SMAX = (short)0xFFFF;\r
599         ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, 0, SMAX);\r
600         if (s.valueSize < 0)\r
601             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
602             " cluster=" + clusterUID + " size=" + s.valueSize);\r
603         return s.valueSize;\r
604     }\r
605     int wait4RequestsLess(int limit)\r
606     throws DatabaseException {\r
607         dbSession.execute("");\r
608         return 0;\r
609     }\r
610     interface Listener {\r
611         void onChangeSetId(int thread, long csid, boolean refresh);\r
612     }\r
613 \r
614     void setListener(Listener listener) {\r
615         this.listener = listener;\r
616     }\r
617 \r
618     protected void updateLastChangeSetId(int thread, final long csid, boolean refresh) {\r
619         if (null != listener)\r
620             listener.onChangeSetId(thread, csid, refresh);\r
621         if (csid > lastChangeSetId) {\r
622             lastChangeSetId = csid;\r
623             final Iterator<ChangeSetListener> it = changeSetListeners.iterator();\r
624             ThreadUtils.getBlockingWorkExecutor().execute(new Runnable() {\r
625                 @Override\r
626                 public void run() {\r
627                     while (it.hasNext()) {\r
628                         ChangeSetListener l = it.next();\r
629                         try {\r
630                             l.onChanged(csid);\r
631                         } catch (Throwable t) {\r
632                             Logger.defaultLogError(t);\r
633                         }\r
634                     }\r
635                 }\r
636             });\r
637         }\r
638     }\r
639     protected abstract ServerInformationImpl getServerInformation()\r
640     throws DatabaseException;\r
641     public abstract void acceptCommit(long transactionId, long csid, byte[] metadata)\r
642     throws DatabaseException;\r
643     public abstract void cancelCommit(long transactionId, long csid, byte[] metadata, SynchronizeContextI context)\r
644     throws DatabaseException;\r
645     public abstract void endTransaction(long transactionId, boolean write)\r
646     throws DatabaseException;\r
647     public abstract long askWriteTransaction(int thread, long transactionId)\r
648     throws DatabaseException;\r
649     public abstract long askReadTransaction(int thread)\r
650     throws DatabaseException;\r
651     public abstract void stop()\r
652     throws DatabaseException;\r
653 \r
654     public abstract long reserveIds(int count)\r
655     throws DatabaseException;\r
656     static class MetadataCache {\r
657         private final boolean DEBUG = false;\r
658         private final int SIZE = 10;\r
659         private int lastInd; // index of last inserted element\r
660         private long lastId; // id of last inserted element\r
661         private int count; // number of inserted elements\r
662         Vector<byte[]> buffer;\r
663         MetadataCache() {\r
664             init();\r
665         }\r
666         public int clear() {\r
667             int ret = count;\r
668             init();\r
669             return ret;\r
670         }\r
671         private void init() {\r
672             if (DEBUG)\r
673                 System.out.println("DEBUG: MetadataCache init");\r
674             lastInd = -1;\r
675             lastId = 0;\r
676             count = 0;\r
677             buffer = new Vector<byte[]>();\r
678             buffer.setSize(SIZE);\r
679         }\r
680         private boolean incLastInd() {\r
681             ++lastInd;\r
682             if (lastInd >= SIZE) {\r
683                 lastInd = 0;\r
684                 return true;\r
685             } else\r
686                 return false;\r
687         }\r
688         synchronized void addNext(long id, byte[] data)\r
689         throws DatabaseException {\r
690             if (DEBUG)\r
691                 System.out.println("DEBUG: MetadataCache add id=" + id);\r
692             if (lastId != 0 && lastId != id - 1)\r
693                 init(); // Only continuous sequence supported.\r
694             incLastInd();\r
695             buffer.set(lastInd, data);\r
696             lastId = id;\r
697             if (count < SIZE)\r
698                 ++count;\r
699         }\r
700         synchronized byte[] get(long id) {\r
701             if (DEBUG)\r
702                 System.out.println("DEBUG: MetadataCache get id=" + id);\r
703             if (id > lastId || id <= lastId - count)\r
704                 return null;\r
705             int ind = lastInd - (int)(lastId - id);\r
706             if (ind < 0)\r
707                 ind = SIZE + ind;\r
708             byte[] found = buffer.get(ind);\r
709             if (DEBUG)\r
710                 if (null != found)\r
711                     System.out.println("DEBUG: MetadataCache found id=" + id);\r
712             return found;\r
713         }\r
714     }\r
715     public ClusterUID[] listClusters() throws InternalException {\r
716         ClusterIds t = dbSession.getClusterIds();\r
717         long[] first = t.getFirst();\r
718         long[] second = t.getSecond();\r
719         int N1 = (null == first) ? 0 : first.length;\r
720         N1 = Math.min(N1, t.getStatus());\r
721         int N2 = (null == second) ? 0 : second.length;\r
722         N2 = Math.min(N1, N2);\r
723         ClusterUID[] clusters = new ClusterUID[N2];\r
724         for (int i=0; i<N2; ++i)\r
725             clusters[i] = new ClusterUID(first[i], second[i]);\r
726         return clusters;\r
727     }\r
728 }\r
729 class BuiltinData {\r
730     final long                  id;\r
731     final long                  cluster;\r
732     SoftReference<ResourceImpl> weakResource = null;\r
733     BuiltinData(long id, long cluster) {\r
734         this.id = id;\r
735         this.cluster = cluster;\r
736     }\r
737 }\r
738 class UpdateClusterFunction {\r
739     public byte[] operation = null;\r
740     public UpdateClusterFunction(byte[] operation) {\r
741         this.operation = operation;\r
742     }\r
743 }\r