]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/GraphSession.java
Merge commit 'd7afa23'
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / GraphSession.java
1 /*******************************************************************************\r
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     VTT Technical Research Centre of Finland - initial API and implementation\r
11  *******************************************************************************/\r
12 package fi.vtt.simantics.procore.internal;\r
13 \r
14 import java.io.ByteArrayInputStream;\r
15 import java.io.IOException;\r
16 import java.io.InputStream;\r
17 import java.lang.ref.SoftReference;\r
18 import java.util.ArrayList;\r
19 import java.util.Collection;\r
20 import java.util.HashSet;\r
21 import java.util.Iterator;\r
22 import java.util.Map;\r
23 import java.util.TreeMap;\r
24 import java.util.Vector;\r
25 import java.util.concurrent.CopyOnWriteArrayList;\r
26 \r
27 import org.simantics.databoard.Bindings;\r
28 import org.simantics.databoard.binding.impl.TreeMapBinding;\r
29 import org.simantics.databoard.serialization.Serializer;\r
30 import org.simantics.db.ChangeSetIdentifier;\r
31 import org.simantics.db.Database;\r
32 import org.simantics.db.Database.Session.ChangeSetData;\r
33 import org.simantics.db.Database.Session.ChangeSetIds;\r
34 import org.simantics.db.Database.Session.ClusterChanges;\r
35 import org.simantics.db.Database.Session.ClusterIds;\r
36 import org.simantics.db.Database.Session.Refresh;\r
37 import org.simantics.db.Operation;\r
38 import org.simantics.db.SessionReference;\r
39 import org.simantics.db.common.UndoContextEx;\r
40 import org.simantics.db.common.utils.Logger;\r
41 import org.simantics.db.exception.DatabaseException;\r
42 import org.simantics.db.exception.InternalException;\r
43 import org.simantics.db.impl.ClusterI;\r
44 import org.simantics.db.impl.ResourceImpl;\r
45 import org.simantics.db.service.ClusterUID;\r
46 import org.simantics.db.service.ExternalOperation;\r
47 import org.simantics.db.service.ManagementSupport.ChangeSetListener;\r
48 import org.simantics.scl.runtime.function.Function1;\r
49 import org.simantics.scl.runtime.function.FunctionImpl1;\r
50 import org.simantics.utils.threads.ThreadUtils;\r
51 \r
52 import gnu.trove.iterator.TLongObjectIterator;\r
53 import gnu.trove.map.hash.THashMap;\r
54 import gnu.trove.map.hash.TLongObjectHashMap;\r
55 \r
56 final class ChangeSetIdentifierImpl implements ChangeSetIdentifier {\r
57     private final long id;\r
58     private final GraphSession graphSession;\r
59     private Map<String, byte[]> metadata;\r
60     ChangeSetIdentifierImpl(long id, GraphSession graphSession) {\r
61         this.id = id;\r
62         this.graphSession = graphSession;\r
63         this.metadata = null;\r
64     }\r
65     public final long getId() {\r
66         return id;\r
67     }\r
68     public final void setMetadata(Map<String, byte[]> metadata) {\r
69         this.metadata = metadata;\r
70     }\r
71     public final byte[] getContext()\r
72     throws DatabaseException {\r
73         if (null == graphSession)\r
74             return new byte[0];\r
75         return graphSession.getChangeSetContext(id);\r
76     }\r
77 \r
78     static Serializer METADATA_SERIALIZER =\r
79         Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING,\r
80             Bindings.BYTE_ARRAY));\r
81 \r
82     @SuppressWarnings("unchecked")\r
83     public Map<String, byte[]> getMetadata() {\r
84         try {\r
85             if (metadata == null) {\r
86                 byte[] data = getContext();\r
87                 if (null != data && data.length > 0) {\r
88                     metadata = (TreeMap<String, byte[]>) METADATA_SERIALIZER.deserialize(data);\r
89                 }\r
90             }\r
91         } catch (DatabaseException e) {\r
92             Logger.defaultLogError(e);\r
93             e.printStackTrace();\r
94         } catch (IOException e) {\r
95             Logger.defaultLogError(e);\r
96             e.printStackTrace();\r
97         }\r
98         return metadata;\r
99     }\r
100 \r
101     static long[] changeSetIds2ints(Collection<ChangeSetIdentifier> changeSetIds) {\r
102         if (null == changeSetIds)\r
103             return new long[0];\r
104         long[] t = new long[changeSetIds.size()];\r
105         int i = -1;\r
106         for (ChangeSetIdentifier id : changeSetIds)\r
107             t[++i] = ((ChangeSetIdentifierImpl) id).getId();\r
108         return t;\r
109     }\r
110     static long[] operations2ints(Collection<Operation> ops) {\r
111         if (null == ops)\r
112             return new long[0];\r
113         long[] t = new long[ops.size()];\r
114         int i = -1;\r
115         for (Operation o : ops)\r
116             t[++i] = o.getCSId();\r
117         return t;\r
118     }\r
119     static Collection<ChangeSetIdentifier> longs2changeSetIds(GraphSession gs, long firstChangeSetIds, int count) {\r
120         ArrayList<ChangeSetIdentifier> t = new ArrayList<ChangeSetIdentifier>();\r
121         t.ensureCapacity(count);\r
122         for (int i=0; i<count; ++i)\r
123             t.add(new ChangeSetIdentifierImpl(firstChangeSetIds + i, gs));\r
124         return t;\r
125     }\r
126     static Collection<ChangeSetIdentifier> longs2changeSetIds(GraphSession gs, long min, long max) {\r
127         ArrayList<ChangeSetIdentifier> t = new ArrayList<ChangeSetIdentifier>();\r
128         if (max<min)\r
129             return t;\r
130         long length = max - min + 1;\r
131         t.ensureCapacity((int)length);\r
132         for (int i=0; i<length; ++i) {\r
133             t.add(new ChangeSetIdentifierImpl(min+i, gs));\r
134         }\r
135         return t;\r
136     }\r
137 }\r
138 \r
139 public abstract class GraphSession {\r
140     final protected boolean DEBUG = false;\r
141     final protected boolean VERBOSE = false;\r
142     protected Listener listener;\r
143     protected SessionImplSocket session;\r
144     protected final Database.Session dbSession;\r
145     private SessionReference sessionReference;\r
146     private TLongObjectHashMap<ClusterI> clusterMap = new TLongObjectHashMap<ClusterI>();\r
147     protected THashMap<String, BuiltinData> builtinMap = null;\r
148     private long firstChangeSetId = 0;\r
149     protected SynchronizeContextI synchronizeContext;\r
150     \r
151     final UndoContextEx undoContext = new UndoContextEx("GraphSession");\r
152     final CopyOnWriteArrayList<ChangeSetListener> changeSetListeners = new CopyOnWriteArrayList<ChangeSetListener>();\r
153     private long lastChangeSetId = 0;\r
154     protected MetadataCache metadataCache = new MetadataCache();\r
155     \r
156     public GraphSession(SessionImplSocket sessionImpl, SessionReference sessionReference, Database.Session dbSession) {\r
157         this.dbSession = dbSession;\r
158         if (null == dbSession)\r
159             throw new RuntimeException("Failed to initialize GraphSession. Database.Session can not be null.");\r
160 //        if (!(sessionReference instanceof ProCoreSessionReference))\r
161 //            throw new RuntimeException("Failed to initialize GraphSession. SessionReference must be instance of ProCoreSessionReference.");\r
162         this.sessionReference = sessionReference;\r
163         this.session = sessionImpl;\r
164     }\r
165     void addChangeSetListener(ChangeSetListener csl) {\r
166         changeSetListeners.add(csl);\r
167     }\r
168     void removeChangeSetListener(ChangeSetListener csl) {\r
169         changeSetListeners.remove(csl);\r
170     }\r
171     SessionReference getSessionReference() {\r
172         return sessionReference;\r
173     }\r
174 \r
175     public void open() throws InternalException {\r
176         dbSession.open();\r
177     }\r
178 \r
179     public void close() throws InternalException {\r
180         dbSession.close();\r
181     }\r
182 \r
183     public long getSessionId() {\r
184         return sessionReference.getSessionId();\r
185     }\r
186 //\r
187 //    public String getHost() {\r
188 //        return sessionReference.serverReference.socketAddress.getHostName();\r
189 //    }\r
190 //\r
191 //    public int getPort() {\r
192 //        return sessionReference.serverReference.socketAddress.getPort();\r
193 //    }\r
194 \r
195     long getFirstChangeSetId() {\r
196         return firstChangeSetId;\r
197     }\r
198 \r
199     String getClusterFileName(long clusterId) {\r
200         String fileName = "cluster" + clusterId + ".dat";\r
201         return fileName;\r
202     }\r
203 \r
204     int computeClusterMemoryUse() {\r
205 \r
206         int size = 0;\r
207         TLongObjectIterator<ClusterI> i = clusterMap.iterator();\r
208         while (i.hasNext()) {\r
209             i.advance();\r
210             ClusterI c = i.value();\r
211             try {\r
212                 size += c.getUsedSpace();\r
213             } catch (DatabaseException e) {\r
214                 Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c);\r
215             }\r
216         }\r
217 \r
218         return size;\r
219 \r
220     }\r
221 \r
222     void printClusterSize() {\r
223 \r
224         long size = 0;\r
225 \r
226         long amount = 0;\r
227         TLongObjectIterator<ClusterI> i = clusterMap.iterator();\r
228         while (i.hasNext()) {\r
229             i.advance();\r
230             ClusterI c = i.value();\r
231             if (c.isLoaded()) {\r
232                 try {\r
233                     size += c.getUsedSpace();\r
234                 } catch (DatabaseException e) {\r
235                     Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c);\r
236                 }\r
237                 amount++;\r
238             }\r
239         }\r
240 \r
241         if (amount > 50)\r
242             loadedClusters.clear();\r
243         System.out.println("" + size / (1024 * 1024) + "M in " + amount + " clusters.");\r
244 \r
245     }\r
246 \r
247     private HashSet<Long> loadedClusters = new HashSet<Long>();\r
248 \r
249     ClusterI freeClusterFromCacheAndFreeClusterMemory(long clusterId) {\r
250         ClusterI c = clusterMap.remove(clusterId);\r
251         if (null != c)\r
252             c.releaseMemory();\r
253         return c;\r
254     }\r
255 \r
256     private void debugPrint(UpdateClusterFunction f) {\r
257         for (int i = 0; i < f.operation.length; ++i) {\r
258             System.out.println("op=" + f.operation[i]);\r
259         }\r
260     }\r
261 \r
262     long total = 0;\r
263 \r
264     public boolean updateCluster(UpdateClusterFunction f) {\r
265         session.clusterStream.markDirty();\r
266         assert (null != f);\r
267         try {\r
268             if (DEBUG) {\r
269                 System.out.println("update cluster");\r
270                 if (VERBOSE)\r
271                     debugPrint(f);\r
272             }\r
273 // long start = System.nanoTime();\r
274             dbSession.updateCluster(f.operation);\r
275 // long duration = System.nanoTime() - start;\r
276 // total += duration;\r
277 // System.err.println("graphClient.call = " + 1e-9*duration + " " + 1e-9*total);\r
278 // System.out.println("GraphSession.updateCluster done.");\r
279             return true; // ok\r
280         } catch (Throwable e) {\r
281             e.printStackTrace();\r
282             return false;\r
283         }\r
284     }\r
285     public byte[] getChangeSetContext(long id)\r
286     throws DatabaseException {\r
287         byte[] data = metadataCache.get(id);\r
288         if (null != data)\r
289             return data;\r
290         if (id < getServerInformation().firstChangeSetId) {\r
291             Logger.defaultLogInfo("Asking purged change set metadata. uid=" + id);\r
292             return new byte[0];\r
293         }\r
294         return dbSession.getChangeSetMetadata(id);\r
295     }\r
296 \r
297     public StatementImpl getStatement(ResourceImpl s, ResourceImpl p, ResourceImpl o) {\r
298         return new StatementImplOld(s, p, o);\r
299     }\r
300     public long newClusterId() throws DatabaseException {\r
301         long id = dbSession.reserveIds(1);\r
302         if (id <= ClusterUID.Builtin.second)\r
303             return newClusterId();\r
304         return id;\r
305     }\r
306 \r
307     long load    = 0;\r
308 \r
309     long inflate = 0;\r
310 \r
311 //    public ClusterI getClusterImpl(ClusterUID clusterUID, int clusterKey)\r
312 //    throws DatabaseException {\r
313 //        assert(ClusterUID.isLegal(clusterUID));\r
314 //\r
315 //              if(Development.DEVELOPMENT) {\r
316 //            if(Development.isTrue(DevelopmentKeys.READGRAPH_COUNT)) {\r
317 //              ReadGraphImpl.counters.adjustOrPutValue("load " + clusterUID + " " + clusterKey, 1, 1);\r
318 //            }\r
319 //              }\r
320 //\r
321 //// ITask task = ThreadLogger.getInstance().begin("Load cluster " + clusterId);\r
322 //        if (DEBUG)\r
323 //            System.out.println("DEBUG: loading cluster=" + clusterUID);\r
324 //// System.out.println("getClusterImpl " + clusterId);\r
325 //        long start = System.nanoTime();\r
326 //        Cluster t = dbSession.getCluster(clusterUID.asBytes());\r
327 //        long duration = System.nanoTime() - start;\r
328 //        load += duration;\r
329 //        //System.out.println("loaded in " + 1e-9*duration + " total " + 1e-9*load);\r
330 //// if (Constants.NullClusterId == t.clusterId) {\r
331 //// throw new DatabaseException("Cluster " + t.clusterId + " not in server.");\r
332 //// }\r
333 //\r
334 //        ByteBuffer deflated = t.getDeflated();\r
335 //        start = System.nanoTime();\r
336 //        Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);\r
337 //        duration = System.nanoTime() - start;\r
338 //        inflate += duration;\r
339 //        //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);\r
340 //        long[] longs = (long[]) arrays[0];\r
341 //        int[] ints = (int[]) arrays[1];\r
342 //        byte[] bytes = (byte[]) arrays[2];\r
343 //\r
344 //// System.out.println("Got cluster " + clusterId + " from core.");\r
345 //        ClusterImpl cluster = ClusterImpl.make(longs, ints, bytes, session.clusterTranslator, clusterKey);\r
346 ////        System.out.println("cluster[ " + clusterId + "].size = " + cluster.getUsedSpace(-1));\r
347 //// task.finish();\r
348 //        return cluster;\r
349 //    }\r
350 \r
351     ClusterI getCluster(long clusterId) {\r
352         return clusterMap.get(clusterId);\r
353     }\r
354 \r
355     ClusterI getClusterOrThrow(long clusterId) throws IllegalArgumentException {\r
356         ClusterI c = this.getCluster(clusterId);\r
357         if (c == null)\r
358             throw new IllegalArgumentException("Cluster " + clusterId + " does not exist.");\r
359         return c;\r
360     }\r
361 \r
362     public String execute(String command) throws DatabaseException {\r
363         return dbSession.execute(command);\r
364     }\r
365 \r
366     public Collection<ChangeSetIdentifier> getChangeSets()\r
367     throws DatabaseException {\r
368         ChangeSetIds t = dbSession.getChangeSetIds();\r
369         return ChangeSetIdentifierImpl.longs2changeSetIds(this, t.getFirstChangeSetId(), t.getCount());\r
370     }\r
371     public long getLastChangeSetId() {\r
372         return lastChangeSetId;\r
373     }\r
374     public Collection<ChangeSetIdentifier> getChangeSets(long from, long to, long upperLimit)\r
375     throws DatabaseException {\r
376         if (from > to)\r
377             throw new DatabaseException("Illegal argument: from=" + from + " > to=" + to + ".");\r
378         long min = Math.max(from, 1);\r
379         long max = Math.min(to, upperLimit);\r
380         return ChangeSetIdentifierImpl.longs2changeSetIds(this, min, max);\r
381     }\r
382 \r
383     public boolean getChangeSets(long minCS, long maxCS, SynchronizeContextI context)\r
384     throws DatabaseException {\r
385         try {\r
386             synchronizeContext = context;\r
387             ChangeSetData t = dbSession.getChangeSetData(minCS, maxCS, context);\r
388             return !t.isOk();\r
389         } catch (Throwable e) {\r
390             if (DEBUG)\r
391                 e.printStackTrace();\r
392             if (e instanceof DatabaseException)\r
393                 throw (DatabaseException) e;\r
394             else\r
395                 throw new DatabaseException("GetChangeSetData call to server failed.");\r
396         } finally {\r
397             synchronizeContext = null;\r
398         }\r
399     }\r
400 \r
401     public static void forExternals(Collection<Operation> operations, Function1<ExternalOperation, Boolean> fn) {\r
402         for(Operation o : operations) {\r
403                 for(ExternalOperation e : o.getExternalOperations()) {\r
404                         if(!e.isDisposed()) fn.apply(e);\r
405                 }\r
406         }\r
407     }\r
408 \r
409     public boolean undo(Collection<Operation> changeSetIds, SynchronizeContextI context)\r
410     throws DatabaseException {\r
411         try {\r
412 \r
413                 forExternals(changeSetIds, new FunctionImpl1<ExternalOperation, Boolean>() {\r
414                         @Override\r
415                         public Boolean apply(ExternalOperation op) {\r
416                                 op.undo();\r
417                                 return true;\r
418                         }\r
419                 });\r
420 \r
421             long[] cids = ChangeSetIdentifierImpl.operations2ints(changeSetIds);\r
422             synchronizeContext = context;\r
423             if (DEBUG)\r
424                 for (long id : cids)\r
425                     System.out.println("DEBUG: Undo cs=" + id);\r
426             return dbSession.undo(cids, context);\r
427         } catch (Throwable e) {\r
428             if (DEBUG)\r
429                 e.printStackTrace();\r
430             if (e instanceof DatabaseException)\r
431                 throw (DatabaseException) e;\r
432             else\r
433                 throw new DatabaseException("Undo call to server failed.", e);\r
434         } finally {\r
435             synchronizeContext = null;\r
436         }\r
437     }\r
438 \r
439     public ClusterUID[] getRefresh2(long csid) throws DatabaseException {\r
440         Refresh t = dbSession.getRefresh(csid);\r
441         if (DEBUG)\r
442           System.err.println("getRefresh2 old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " chanaged clusters=" + t.getFirst().length);\r
443         ClusterUID[] clusters = new ClusterUID[t.getFirst().length];\r
444         for (int i=0; i<t.getFirst().length; ++i)\r
445             clusters[i] = new ClusterUID(t.getFirst()[i], t.getSecond()[i]);\r
446         return clusters;\r
447     }\r
448 \r
449     public ClusterChanges getClusterChanges(ClusterUID cluster, long csid) throws DatabaseException {\r
450         ClusterChanges t = dbSession.getClusterChanges(csid, cluster.asBytes());\r
451         if (DEBUG)\r
452             System.err.println("getClusterChanges old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " cluster=" + cluster + " statements=" + t.getPredicateIndex().length + " values=" + t.getValueIndex().length);\r
453         return t;\r
454     }\r
455     public long getCluster(int id) {\r
456         return session.getCluster(id);\r
457     }\r
458 \r
459     private static class ResourceSegment {\r
460         public long   valueSize;\r
461 \r
462         public byte[] bytes;\r
463 \r
464         ResourceSegment(long valueSize, byte[] bytes) {\r
465             this.valueSize = valueSize;\r
466             this.bytes = bytes;\r
467         }\r
468     }\r
469     public ResourceSegment getResourceSegment(int resourceIndex, ClusterUID clusterUID, long offset, short size)\r
470     throws DatabaseException {\r
471         if (DEBUG)\r
472             System.out.println("DEBUG: getResourceSegment ri=" + resourceIndex + " cid=" + clusterUID + " offset=" + offset + " size=" + size);\r
473         org.simantics.db.Database.Session.ResourceSegment t = dbSession.getResourceSegment(clusterUID.asBytes(), resourceIndex, offset, size);\r
474         return new ResourceSegment(t.getValueSize(), t.getSegment());\r
475     }\r
476 \r
477     public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID)\r
478     throws DatabaseException {\r
479         return getResourceValue(resourceIndex, clusterUID, 0, 0);\r
480     }\r
481 \r
482     public InputStream getResourceValueStream(final int resourceIndex, final ClusterUID clusterUID, long offset, int length) throws DatabaseException {\r
483 \r
484         if (DEBUG)\r
485             System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length);\r
486         final int IMAX = 0xFFFF;\r
487         short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX);\r
488         final ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen);\r
489         if (s.valueSize < 0)\r
490             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
491             " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1).");\r
492         int ilen = (int)slen & 0xFFFF;\r
493         assert(s.bytes.length <= ilen);\r
494         if (0 == length) {\r
495             if (s.valueSize > Integer.MAX_VALUE)\r
496                 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
497                 " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
498                 ". Value size=" + s.valueSize + " (2).");\r
499             length = (int)s.valueSize;\r
500         }\r
501         long rSize = s.valueSize - offset;\r
502         if (rSize < length)\r
503             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
504             " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
505             ". Value size=" + s.valueSize + " (3).");\r
506         else if (length <= IMAX)\r
507             return new ByteArrayInputStream(s.bytes);\r
508 \r
509         final int finalLength = length;\r
510 \r
511         return new InputStream() {\r
512 \r
513             int left = finalLength;\r
514             long valueOffset = 0;\r
515             int offset = 0;\r
516             ResourceSegment _s = s;\r
517 \r
518             @Override\r
519             public int read() throws IOException {\r
520 \r
521                 if(left <= 0) return -1;\r
522 \r
523                 if(offset == _s.bytes.length) {\r
524                     short slen = (short)Math.min(left, IMAX);\r
525                     valueOffset += _s.bytes.length;\r
526                     try {\r
527                         _s = getResourceSegment(resourceIndex, clusterUID, valueOffset, slen);\r
528                     } catch (DatabaseException e) {\r
529                         throw new IOException(e);\r
530                     }\r
531                     offset = 0;\r
532                 }\r
533 \r
534                 left--;\r
535                 int result = _s.bytes[offset++];\r
536                 if(result < 0) result += 256;\r
537                 return result;\r
538 \r
539             }\r
540 \r
541         };\r
542     }\r
543 \r
544 \r
545     public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID, long offset, int length)\r
546     throws DatabaseException {\r
547         if (DEBUG)\r
548             System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length);\r
549         final int IMAX = 0xFFFF;\r
550         short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX);\r
551         ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen);\r
552         final long VALUE_SIZE = s.valueSize;\r
553         if (s.valueSize < 0)\r
554             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
555             " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1).");\r
556         int ilen = (int)slen & 0xFFFF;\r
557         assert(s.bytes.length <= ilen);\r
558         if (0 == length) {\r
559             if (s.valueSize > Integer.MAX_VALUE)\r
560                 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
561                 " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
562                 ". Value size=" + s.valueSize + " (2).");\r
563             length = (int)s.valueSize;\r
564         }\r
565         long rSize = s.valueSize - offset;\r
566         if (rSize < length)\r
567             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
568             " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
569             ". Value size=" + s.valueSize + " (3).");\r
570         else if (length <= IMAX)\r
571             return s.bytes;\r
572         byte[] bytes = new byte[length];\r
573         int left = (int)length - s.bytes.length;\r
574         int cur = s.bytes.length;\r
575         offset += s.bytes.length;\r
576         System.arraycopy(s.bytes, 0, bytes, 0, cur);\r
577         while (left > 0) {\r
578             slen = (short)Math.min(left, IMAX);\r
579             s = getResourceSegment(resourceIndex, clusterUID, offset, slen);\r
580             ilen = (int)slen & 0xFFFF;\r
581             if (s.valueSize != VALUE_SIZE || s.bytes.length != ilen)\r
582                 throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
583                 " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
584                 ". Value size=" + s.valueSize + " blen=" + s.bytes.length + " (4).");\r
585             System.arraycopy(s.bytes, 0, bytes, cur, s.bytes.length);\r
586             left -= s.bytes.length;\r
587             cur += s.bytes.length;\r
588             offset += s.bytes.length;\r
589         }\r
590         return bytes;\r
591     }\r
592 \r
593     public long getResourceValueSize(int resourceIndex, ClusterUID clusterUID)\r
594     throws DatabaseException {\r
595         if (DEBUG)\r
596             System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID);\r
597         final short SMAX = (short)0xFFFF;\r
598         ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, 0, SMAX);\r
599         if (s.valueSize < 0)\r
600             throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
601             " cluster=" + clusterUID + " size=" + s.valueSize);\r
602         return s.valueSize;\r
603     }\r
604     int wait4RequestsLess(int limit)\r
605     throws DatabaseException {\r
606         dbSession.execute("");\r
607         return 0;\r
608     }\r
609     interface Listener {\r
610         void onChangeSetId(int thread, long csid, boolean refresh);\r
611     }\r
612 \r
613     void setListener(Listener listener) {\r
614         this.listener = listener;\r
615     }\r
616 \r
617     protected void updateLastChangeSetId(int thread, final long csid, boolean refresh) {\r
618         if (null != listener)\r
619             listener.onChangeSetId(thread, csid, refresh);\r
620         if (csid > lastChangeSetId) {\r
621             lastChangeSetId = csid;\r
622             final Iterator<ChangeSetListener> it = changeSetListeners.iterator();\r
623             ThreadUtils.getBlockingWorkExecutor().execute(new Runnable() {\r
624                 @Override\r
625                 public void run() {\r
626                     while (it.hasNext()) {\r
627                         ChangeSetListener l = it.next();\r
628                         try {\r
629                             l.onChanged(csid);\r
630                         } catch (Throwable t) {\r
631                             Logger.defaultLogError(t);\r
632                         }\r
633                     }\r
634                 }\r
635             });\r
636         }\r
637     }\r
638     protected abstract ServerInformationImpl getServerInformation()\r
639     throws DatabaseException;\r
640     public abstract void acceptCommit(long transactionId, long csid, byte[] metadata)\r
641     throws DatabaseException;\r
642     public abstract void cancelCommit(long transactionId, long csid, byte[] metadata, SynchronizeContextI context)\r
643     throws DatabaseException;\r
644     public abstract void endTransaction(long transactionId, boolean write)\r
645     throws DatabaseException;\r
646     public abstract long askWriteTransaction(int thread, long transactionId)\r
647     throws DatabaseException;\r
648     public abstract long askReadTransaction(int thread)\r
649     throws DatabaseException;\r
650     public abstract void stop()\r
651     throws DatabaseException;\r
652 \r
653     public abstract long reserveIds(int count)\r
654     throws DatabaseException;\r
655     static class MetadataCache {\r
656         private final boolean DEBUG = false;\r
657         private final int SIZE = 10;\r
658         private int lastInd; // index of last inserted element\r
659         private long lastId; // id of last inserted element\r
660         private int count; // number of inserted elements\r
661         Vector<byte[]> buffer;\r
662         MetadataCache() {\r
663             init();\r
664         }\r
665         public int clear() {\r
666             int ret = count;\r
667             init();\r
668             return ret;\r
669         }\r
670         private void init() {\r
671             if (DEBUG)\r
672                 System.out.println("DEBUG: MetadataCache init");\r
673             lastInd = -1;\r
674             lastId = 0;\r
675             count = 0;\r
676             buffer = new Vector<byte[]>();\r
677             buffer.setSize(SIZE);\r
678         }\r
679         private boolean incLastInd() {\r
680             ++lastInd;\r
681             if (lastInd >= SIZE) {\r
682                 lastInd = 0;\r
683                 return true;\r
684             } else\r
685                 return false;\r
686         }\r
687         synchronized void addNext(long id, byte[] data)\r
688         throws DatabaseException {\r
689             if (DEBUG)\r
690                 System.out.println("DEBUG: MetadataCache add id=" + id);\r
691             if (lastId != 0 && lastId != id - 1)\r
692                 init(); // Only continuous sequence supported.\r
693             incLastInd();\r
694             buffer.set(lastInd, data);\r
695             lastId = id;\r
696             if (count < SIZE)\r
697                 ++count;\r
698         }\r
699         synchronized byte[] get(long id) {\r
700             if (DEBUG)\r
701                 System.out.println("DEBUG: MetadataCache get id=" + id);\r
702             if (id > lastId || id <= lastId - count)\r
703                 return null;\r
704             int ind = lastInd - (int)(lastId - id);\r
705             if (ind < 0)\r
706                 ind = SIZE + ind;\r
707             byte[] found = buffer.get(ind);\r
708             if (DEBUG)\r
709                 if (null != found)\r
710                     System.out.println("DEBUG: MetadataCache found id=" + id);\r
711             return found;\r
712         }\r
713     }\r
714     public ClusterUID[] listClusters() throws InternalException {\r
715         ClusterIds t = dbSession.getClusterIds();\r
716         long[] first = t.getFirst();\r
717         long[] second = t.getSecond();\r
718         int N1 = (null == first) ? 0 : first.length;\r
719         N1 = Math.min(N1, t.getStatus());\r
720         int N2 = (null == second) ? 0 : second.length;\r
721         N2 = Math.min(N1, N2);\r
722         ClusterUID[] clusters = new ClusterUID[N2];\r
723         for (int i=0; i<N2; ++i)\r
724             clusters[i] = new ClusterUID(first[i], second[i]);\r
725         return clusters;\r
726     }\r
727     public boolean rolledback() {\r
728         return dbSession.rolledback();\r
729     }\r
730 }\r
731 class BuiltinData {\r
732     final long                  id;\r
733     final long                  cluster;\r
734     SoftReference<ResourceImpl> weakResource = null;\r
735     BuiltinData(long id, long cluster) {\r
736         this.id = id;\r
737         this.cluster = cluster;\r
738     }\r
739 }\r
740 class UpdateClusterFunction {\r
741     public byte[] operation = null;\r
742     public UpdateClusterFunction(byte[] operation) {\r
743         this.operation = operation;\r
744     }\r
745 }\r