--- /dev/null
+/*******************************************************************************\r
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * VTT Technical Research Centre of Finland - initial API and implementation\r
+ *******************************************************************************/\r
+package fi.vtt.simantics.procore.internal;\r
+\r
+import java.io.ByteArrayInputStream;\r
+import java.io.IOException;\r
+import java.io.InputStream;\r
+import java.lang.ref.SoftReference;\r
+import java.util.ArrayList;\r
+import java.util.Collection;\r
+import java.util.HashSet;\r
+import java.util.Iterator;\r
+import java.util.Map;\r
+import java.util.TreeMap;\r
+import java.util.Vector;\r
+import java.util.concurrent.CopyOnWriteArrayList;\r
+\r
+import org.simantics.databoard.Bindings;\r
+import org.simantics.databoard.binding.impl.TreeMapBinding;\r
+import org.simantics.databoard.serialization.Serializer;\r
+import org.simantics.db.ChangeSetIdentifier;\r
+import org.simantics.db.Database;\r
+import org.simantics.db.Operation;\r
+import org.simantics.db.SessionReference;\r
+import org.simantics.db.Database.Session.ChangeSetData;\r
+import org.simantics.db.Database.Session.ChangeSetIds;\r
+import org.simantics.db.Database.Session.ClusterChanges;\r
+import org.simantics.db.Database.Session.ClusterIds;\r
+import org.simantics.db.Database.Session.Refresh;\r
+import org.simantics.db.common.UndoContextEx;\r
+import org.simantics.db.common.utils.Logger;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.exception.InternalException;\r
+import org.simantics.db.impl.ClusterI;\r
+import org.simantics.db.impl.ResourceImpl;\r
+import org.simantics.db.service.ClusterUID;\r
+import org.simantics.db.service.ExternalOperation;\r
+import org.simantics.db.service.ManagementSupport.ChangeSetListener;\r
+import org.simantics.scl.runtime.function.Function1;\r
+import org.simantics.scl.runtime.function.FunctionImpl1;\r
+import org.simantics.utils.threads.ThreadUtils;\r
+\r
+import fi.vtt.simantics.procore.ProCoreSessionReference;\r
+import gnu.trove.iterator.TLongObjectIterator;\r
+import gnu.trove.map.hash.THashMap;\r
+import gnu.trove.map.hash.TLongObjectHashMap;\r
+\r
+final class ChangeSetIdentifierImpl implements ChangeSetIdentifier {\r
+ private final long id;\r
+ private final GraphSession graphSession;\r
+ private Map<String, byte[]> metadata;\r
+ ChangeSetIdentifierImpl(long id, GraphSession graphSession) {\r
+ this.id = id;\r
+ this.graphSession = graphSession;\r
+ this.metadata = null;\r
+ }\r
+ public final long getId() {\r
+ return id;\r
+ }\r
+ public final void setMetadata(Map<String, byte[]> metadata) {\r
+ this.metadata = metadata;\r
+ }\r
+ public final byte[] getContext()\r
+ throws DatabaseException {\r
+ if (null == graphSession)\r
+ return new byte[0];\r
+ return graphSession.getChangeSetContext(id);\r
+ }\r
+\r
+ static Serializer METADATA_SERIALIZER =\r
+ Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING,\r
+ Bindings.BYTE_ARRAY));\r
+\r
+ @SuppressWarnings("unchecked")\r
+ public Map<String, byte[]> getMetadata() {\r
+ try {\r
+ if (metadata == null) {\r
+ byte[] data = getContext();\r
+ if (null != data && data.length > 0) {\r
+ metadata = (TreeMap<String, byte[]>) METADATA_SERIALIZER.deserialize(data);\r
+ }\r
+ }\r
+ } catch (DatabaseException e) {\r
+ Logger.defaultLogError(e);\r
+ e.printStackTrace();\r
+ } catch (IOException e) {\r
+ Logger.defaultLogError(e);\r
+ e.printStackTrace();\r
+ }\r
+ return metadata;\r
+ }\r
+\r
+ static long[] changeSetIds2ints(Collection<ChangeSetIdentifier> changeSetIds) {\r
+ if (null == changeSetIds)\r
+ return new long[0];\r
+ long[] t = new long[changeSetIds.size()];\r
+ int i = -1;\r
+ for (ChangeSetIdentifier id : changeSetIds)\r
+ t[++i] = ((ChangeSetIdentifierImpl) id).getId();\r
+ return t;\r
+ }\r
+ static long[] operations2ints(Collection<Operation> ops) {\r
+ if (null == ops)\r
+ return new long[0];\r
+ long[] t = new long[ops.size()];\r
+ int i = -1;\r
+ for (Operation o : ops)\r
+ t[++i] = o.getCSId();\r
+ return t;\r
+ }\r
+ static Collection<ChangeSetIdentifier> longs2changeSetIds(GraphSession gs, long firstChangeSetIds, int count) {\r
+ ArrayList<ChangeSetIdentifier> t = new ArrayList<ChangeSetIdentifier>();\r
+ t.ensureCapacity(count);\r
+ for (int i=0; i<count; ++i)\r
+ t.add(new ChangeSetIdentifierImpl(firstChangeSetIds + i, gs));\r
+ return t;\r
+ }\r
+ static Collection<ChangeSetIdentifier> longs2changeSetIds(GraphSession gs, long min, long max) {\r
+ ArrayList<ChangeSetIdentifier> t = new ArrayList<ChangeSetIdentifier>();\r
+ if (max<min)\r
+ return t;\r
+ long length = max - min + 1;\r
+ t.ensureCapacity((int)length);\r
+ for (int i=0; i<length; ++i) {\r
+ t.add(new ChangeSetIdentifierImpl(min+i, gs));\r
+ }\r
+ return t;\r
+ }\r
+}\r
+\r
+public abstract class GraphSession {\r
+ final protected boolean DEBUG = false;\r
+ final protected boolean VERBOSE = false;\r
+ protected Listener listener;\r
+ protected SessionImplSocket session;\r
+ protected final Database.Session dbSession;\r
+ private SessionReference sessionReference;\r
+ private TLongObjectHashMap<ClusterI> clusterMap = new TLongObjectHashMap<ClusterI>();\r
+ protected THashMap<String, BuiltinData> builtinMap = null;\r
+ private long firstChangeSetId = 0;\r
+ protected SynchronizeContextI synchronizeContext;\r
+ \r
+ final UndoContextEx undoContext = new UndoContextEx("GraphSession");\r
+ final CopyOnWriteArrayList<ChangeSetListener> changeSetListeners = new CopyOnWriteArrayList<ChangeSetListener>();\r
+ private long lastChangeSetId = 0;\r
+ protected MetadataCache metadataCache = new MetadataCache();\r
+ \r
+ public GraphSession(SessionImplSocket sessionImpl, SessionReference sessionReference, Database.Session dbSession) {\r
+ this.dbSession = dbSession;\r
+ if (null == dbSession)\r
+ throw new RuntimeException("Failed to initialize GraphSession. Database.Session can not be null.");\r
+// if (!(sessionReference instanceof ProCoreSessionReference))\r
+// throw new RuntimeException("Failed to initialize GraphSession. SessionReference must be instance of ProCoreSessionReference.");\r
+ this.sessionReference = sessionReference;\r
+ this.session = sessionImpl;\r
+ }\r
+ void addChangeSetListener(ChangeSetListener csl) {\r
+ changeSetListeners.add(csl);\r
+ }\r
+ void removeChangeSetListener(ChangeSetListener csl) {\r
+ changeSetListeners.remove(csl);\r
+ }\r
+ SessionReference getSessionReference() {\r
+ return sessionReference;\r
+ }\r
+\r
+ public void open() throws InternalException {\r
+ dbSession.open();\r
+ }\r
+\r
+ public void close() throws InternalException {\r
+ dbSession.close();\r
+ }\r
+\r
+ public long getSessionId() {\r
+ return sessionReference.getSessionId();\r
+ }\r
+//\r
+// public String getHost() {\r
+// return sessionReference.serverReference.socketAddress.getHostName();\r
+// }\r
+//\r
+// public int getPort() {\r
+// return sessionReference.serverReference.socketAddress.getPort();\r
+// }\r
+\r
+ long getFirstChangeSetId() {\r
+ return firstChangeSetId;\r
+ }\r
+\r
+ String getClusterFileName(long clusterId) {\r
+ String fileName = "cluster" + clusterId + ".dat";\r
+ return fileName;\r
+ }\r
+\r
+ int computeClusterMemoryUse() {\r
+\r
+ int size = 0;\r
+ TLongObjectIterator<ClusterI> i = clusterMap.iterator();\r
+ while (i.hasNext()) {\r
+ i.advance();\r
+ ClusterI c = i.value();\r
+ try {\r
+ size += c.getUsedSpace();\r
+ } catch (DatabaseException e) {\r
+ Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c);\r
+ }\r
+ }\r
+\r
+ return size;\r
+\r
+ }\r
+\r
+ void printClusterSize() {\r
+\r
+ long size = 0;\r
+\r
+ long amount = 0;\r
+ TLongObjectIterator<ClusterI> i = clusterMap.iterator();\r
+ while (i.hasNext()) {\r
+ i.advance();\r
+ ClusterI c = i.value();\r
+ if (c.isLoaded()) {\r
+ try {\r
+ size += c.getUsedSpace();\r
+ } catch (DatabaseException e) {\r
+ Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c);\r
+ }\r
+ amount++;\r
+ }\r
+ }\r
+\r
+ if (amount > 50)\r
+ loadedClusters.clear();\r
+ System.out.println("" + size / (1024 * 1024) + "M in " + amount + " clusters.");\r
+\r
+ }\r
+\r
+ private HashSet<Long> loadedClusters = new HashSet<Long>();\r
+\r
+ ClusterI freeClusterFromCacheAndFreeClusterMemory(long clusterId) {\r
+ ClusterI c = clusterMap.remove(clusterId);\r
+ if (null != c)\r
+ c.releaseMemory();\r
+ return c;\r
+ }\r
+\r
+ private void debugPrint(UpdateClusterFunction f) {\r
+ for (int i = 0; i < f.operation.length; ++i) {\r
+ System.out.println("op=" + f.operation[i]);\r
+ }\r
+ }\r
+\r
+ long total = 0;\r
+\r
+ public boolean updateCluster(UpdateClusterFunction f) {\r
+ session.clusterStream.markDirty();\r
+ assert (null != f);\r
+ try {\r
+ if (DEBUG) {\r
+ System.out.println("update cluster");\r
+ if (VERBOSE)\r
+ debugPrint(f);\r
+ }\r
+// long start = System.nanoTime();\r
+ dbSession.updateCluster(f.operation);\r
+// long duration = System.nanoTime() - start;\r
+// total += duration;\r
+// System.err.println("graphClient.call = " + 1e-9*duration + " " + 1e-9*total);\r
+// System.out.println("GraphSession.updateCluster done.");\r
+ return true; // ok\r
+ } catch (Throwable e) {\r
+ e.printStackTrace();\r
+ return false;\r
+ }\r
+ }\r
+ public byte[] getChangeSetContext(long id)\r
+ throws DatabaseException {\r
+ byte[] data = metadataCache.get(id);\r
+ if (null != data)\r
+ return data;\r
+ if (id < getServerInformation().firstChangeSetId) {\r
+ Logger.defaultLogInfo("Asking purged change set metadata. uid=" + id);\r
+ return new byte[0];\r
+ }\r
+ return dbSession.getChangeSetMetadata(id);\r
+ }\r
+\r
+ public StatementImpl getStatement(ResourceImpl s, ResourceImpl p, ResourceImpl o) {\r
+ return new StatementImplOld(s, p, o);\r
+ }\r
+ public long newClusterId() throws DatabaseException {\r
+ long id = dbSession.reserveIds(1);\r
+ if (id <= ClusterUID.Builtin.second)\r
+ return newClusterId();\r
+ return id;\r
+ }\r
+\r
+ long load = 0;\r
+\r
+ long inflate = 0;\r
+\r
+// public ClusterI getClusterImpl(ClusterUID clusterUID, int clusterKey)\r
+// throws DatabaseException {\r
+// assert(ClusterUID.isLegal(clusterUID));\r
+//\r
+// if(Development.DEVELOPMENT) {\r
+// if(Development.isTrue(DevelopmentKeys.READGRAPH_COUNT)) {\r
+// ReadGraphImpl.counters.adjustOrPutValue("load " + clusterUID + " " + clusterKey, 1, 1);\r
+// }\r
+// }\r
+//\r
+//// ITask task = ThreadLogger.getInstance().begin("Load cluster " + clusterId);\r
+// if (DEBUG)\r
+// System.out.println("DEBUG: loading cluster=" + clusterUID);\r
+//// System.out.println("getClusterImpl " + clusterId);\r
+// long start = System.nanoTime();\r
+// Cluster t = dbSession.getCluster(clusterUID.asBytes());\r
+// long duration = System.nanoTime() - start;\r
+// load += duration;\r
+// //System.out.println("loaded in " + 1e-9*duration + " total " + 1e-9*load);\r
+//// if (Constants.NullClusterId == t.clusterId) {\r
+//// throw new DatabaseException("Cluster " + t.clusterId + " not in server.");\r
+//// }\r
+//\r
+// ByteBuffer deflated = t.getDeflated();\r
+// start = System.nanoTime();\r
+// Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);\r
+// duration = System.nanoTime() - start;\r
+// inflate += duration;\r
+// //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);\r
+// long[] longs = (long[]) arrays[0];\r
+// int[] ints = (int[]) arrays[1];\r
+// byte[] bytes = (byte[]) arrays[2];\r
+//\r
+//// System.out.println("Got cluster " + clusterId + " from core.");\r
+// ClusterImpl cluster = ClusterImpl.make(longs, ints, bytes, session.clusterTranslator, clusterKey);\r
+//// System.out.println("cluster[ " + clusterId + "].size = " + cluster.getUsedSpace(-1));\r
+//// task.finish();\r
+// return cluster;\r
+// }\r
+\r
+ ClusterI getCluster(long clusterId) {\r
+ return clusterMap.get(clusterId);\r
+ }\r
+\r
+ ClusterI getClusterOrThrow(long clusterId) throws IllegalArgumentException {\r
+ ClusterI c = this.getCluster(clusterId);\r
+ if (c == null)\r
+ throw new IllegalArgumentException("Cluster " + clusterId + " does not exist.");\r
+ return c;\r
+ }\r
+\r
+ public String execute(String command) throws DatabaseException {\r
+ return dbSession.execute(command);\r
+ }\r
+\r
+ public Collection<ChangeSetIdentifier> getChangeSets()\r
+ throws DatabaseException {\r
+ ChangeSetIds t = dbSession.getChangeSetIds();\r
+ return ChangeSetIdentifierImpl.longs2changeSetIds(this, t.getFirstChangeSetId(), t.getCount());\r
+ }\r
+ public long getLastChangeSetId() {\r
+ return lastChangeSetId;\r
+ }\r
+ public Collection<ChangeSetIdentifier> getChangeSets(long from, long to, long upperLimit)\r
+ throws DatabaseException {\r
+ if (from > to)\r
+ throw new DatabaseException("Illegal argument: from=" + from + " > to=" + to + ".");\r
+ long min = Math.max(from, 1);\r
+ long max = Math.min(to, upperLimit);\r
+ return ChangeSetIdentifierImpl.longs2changeSetIds(this, min, max);\r
+ }\r
+\r
+ public boolean getChangeSets(long minCS, long maxCS, SynchronizeContextI context)\r
+ throws DatabaseException {\r
+ try {\r
+ synchronizeContext = context;\r
+ ChangeSetData t = dbSession.getChangeSetData(minCS, maxCS, context);\r
+ return !t.isOk();\r
+ } catch (Throwable e) {\r
+ if (DEBUG)\r
+ e.printStackTrace();\r
+ if (e instanceof DatabaseException)\r
+ throw (DatabaseException) e;\r
+ else\r
+ throw new DatabaseException("GetChangeSetData call to server failed.");\r
+ } finally {\r
+ synchronizeContext = null;\r
+ }\r
+ }\r
+\r
+ public static void forExternals(Collection<Operation> operations, Function1<ExternalOperation, Boolean> fn) {\r
+ for(Operation o : operations) {\r
+ for(ExternalOperation e : o.getExternalOperations()) {\r
+ if(!e.isDisposed()) fn.apply(e);\r
+ }\r
+ }\r
+ }\r
+\r
+ public boolean undo(Collection<Operation> changeSetIds, SynchronizeContextI context)\r
+ throws DatabaseException {\r
+ try {\r
+\r
+ forExternals(changeSetIds, new FunctionImpl1<ExternalOperation, Boolean>() {\r
+ @Override\r
+ public Boolean apply(ExternalOperation op) {\r
+ op.undo();\r
+ return true;\r
+ }\r
+ });\r
+\r
+ long[] cids = ChangeSetIdentifierImpl.operations2ints(changeSetIds);\r
+ synchronizeContext = context;\r
+ if (DEBUG)\r
+ for (long id : cids)\r
+ System.out.println("DEBUG: Undo cs=" + id);\r
+ return dbSession.undo(cids, context);\r
+ } catch (Throwable e) {\r
+ if (DEBUG)\r
+ e.printStackTrace();\r
+ if (e instanceof DatabaseException)\r
+ throw (DatabaseException) e;\r
+ else\r
+ throw new DatabaseException("Undo call to server failed.", e);\r
+ } finally {\r
+ synchronizeContext = null;\r
+ }\r
+ }\r
+\r
+ public ClusterUID[] getRefresh2(long csid) throws DatabaseException {\r
+ Refresh t = dbSession.getRefresh(csid);\r
+ if (DEBUG)\r
+ System.err.println("getRefresh2 old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " chanaged clusters=" + t.getFirst().length);\r
+ ClusterUID[] clusters = new ClusterUID[t.getFirst().length];\r
+ for (int i=0; i<t.getFirst().length; ++i)\r
+ clusters[i] = new ClusterUID(t.getFirst()[i], t.getSecond()[i]);\r
+ return clusters;\r
+ }\r
+\r
+ public ClusterChanges getClusterChanges(ClusterUID cluster, long csid) throws DatabaseException {\r
+ ClusterChanges t = dbSession.getClusterChanges(csid, cluster.asBytes());\r
+ if (DEBUG)\r
+ System.err.println("getClusterChanges old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " cluster=" + cluster + " statements=" + t.getPredicateIndex().length + " values=" + t.getValueIndex().length);\r
+ return t;\r
+ }\r
+ public long getCluster(int id) {\r
+ return session.getCluster(id);\r
+ }\r
+\r
+ private static class ResourceSegment {\r
+ public long valueSize;\r
+\r
+ public byte[] bytes;\r
+\r
+ ResourceSegment(long valueSize, byte[] bytes) {\r
+ this.valueSize = valueSize;\r
+ this.bytes = bytes;\r
+ }\r
+ }\r
+ public ResourceSegment getResourceSegment(int resourceIndex, ClusterUID clusterUID, long offset, short size)\r
+ throws DatabaseException {\r
+ if (DEBUG)\r
+ System.out.println("DEBUG: getResourceSegment ri=" + resourceIndex + " cid=" + clusterUID + " offset=" + offset + " size=" + size);\r
+ org.simantics.db.Database.Session.ResourceSegment t = dbSession.getResourceSegment(clusterUID.asBytes(), resourceIndex, offset, size);\r
+ return new ResourceSegment(t.getValueSize(), t.getSegment());\r
+ }\r
+\r
+ public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID)\r
+ throws DatabaseException {\r
+ return getResourceValue(resourceIndex, clusterUID, 0, 0);\r
+ }\r
+\r
+ public InputStream getResourceValueStream(final int resourceIndex, final ClusterUID clusterUID, long offset, int length) throws DatabaseException {\r
+\r
+ if (DEBUG)\r
+ System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length);\r
+ final int IMAX = 0xFFFF;\r
+ short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX);\r
+ final ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen);\r
+ if (s.valueSize < 0)\r
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
+ " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1).");\r
+ int ilen = (int)slen & 0xFFFF;\r
+ assert(s.bytes.length <= ilen);\r
+ if (0 == length) {\r
+ if (s.valueSize > Integer.MAX_VALUE)\r
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
+ " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
+ ". Value size=" + s.valueSize + " (2).");\r
+ length = (int)s.valueSize;\r
+ }\r
+ long rSize = s.valueSize - offset;\r
+ if (rSize < length)\r
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
+ " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
+ ". Value size=" + s.valueSize + " (3).");\r
+ else if (length <= IMAX)\r
+ return new ByteArrayInputStream(s.bytes);\r
+\r
+ final int finalLength = length;\r
+\r
+ return new InputStream() {\r
+\r
+ int left = finalLength;\r
+ long valueOffset = 0;\r
+ int offset = 0;\r
+ ResourceSegment _s = s;\r
+\r
+ @Override\r
+ public int read() throws IOException {\r
+\r
+ if(left <= 0) throw new IllegalStateException();\r
+\r
+ if(offset == _s.bytes.length) {\r
+ short slen = (short)Math.min(left, IMAX);\r
+ valueOffset += _s.bytes.length;\r
+ try {\r
+ _s = getResourceSegment(resourceIndex, clusterUID, valueOffset, slen);\r
+ } catch (DatabaseException e) {\r
+ throw new IOException(e);\r
+ }\r
+ offset = 0;\r
+ }\r
+\r
+ left--;\r
+ int result = _s.bytes[offset++];\r
+ if(result < 0) result += 256;\r
+ return result;\r
+\r
+ }\r
+\r
+ };\r
+ }\r
+\r
+\r
+ public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID, long offset, int length)\r
+ throws DatabaseException {\r
+ if (DEBUG)\r
+ System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length);\r
+ final int IMAX = 0xFFFF;\r
+ short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX);\r
+ ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen);\r
+ final long VALUE_SIZE = s.valueSize;\r
+ if (s.valueSize < 0)\r
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
+ " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1).");\r
+ int ilen = (int)slen & 0xFFFF;\r
+ assert(s.bytes.length <= ilen);\r
+ if (0 == length) {\r
+ if (s.valueSize > Integer.MAX_VALUE)\r
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
+ " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
+ ". Value size=" + s.valueSize + " (2).");\r
+ length = (int)s.valueSize;\r
+ }\r
+ long rSize = s.valueSize - offset;\r
+ if (rSize < length)\r
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
+ " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
+ ". Value size=" + s.valueSize + " (3).");\r
+ else if (length <= IMAX)\r
+ return s.bytes;\r
+ byte[] bytes = new byte[length];\r
+ int left = (int)length - s.bytes.length;\r
+ int cur = s.bytes.length;\r
+ offset += s.bytes.length;\r
+ System.arraycopy(s.bytes, 0, bytes, 0, cur);\r
+ while (left > 0) {\r
+ slen = (short)Math.min(left, IMAX);\r
+ s = getResourceSegment(resourceIndex, clusterUID, offset, slen);\r
+ ilen = (int)slen & 0xFFFF;\r
+ if (s.valueSize != VALUE_SIZE || s.bytes.length != ilen)\r
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
+ " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
+ ". Value size=" + s.valueSize + " blen=" + s.bytes.length + " (4).");\r
+ System.arraycopy(s.bytes, 0, bytes, cur, s.bytes.length);\r
+ left -= s.bytes.length;\r
+ cur += s.bytes.length;\r
+ offset += s.bytes.length;\r
+ }\r
+ return bytes;\r
+ }\r
+\r
+ public long getResourceValueSize(int resourceIndex, ClusterUID clusterUID)\r
+ throws DatabaseException {\r
+ if (DEBUG)\r
+ System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID);\r
+ final short SMAX = (short)0xFFFF;\r
+ ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, 0, SMAX);\r
+ if (s.valueSize < 0)\r
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
+ " cluster=" + clusterUID + " size=" + s.valueSize);\r
+ return s.valueSize;\r
+ }\r
+ int wait4RequestsLess(int limit)\r
+ throws DatabaseException {\r
+ dbSession.execute("");\r
+ return 0;\r
+ }\r
+ interface Listener {\r
+ void onChangeSetId(int thread, long csid, boolean refresh);\r
+ }\r
+\r
+ void setListener(Listener listener) {\r
+ this.listener = listener;\r
+ }\r
+\r
+ protected void updateLastChangeSetId(int thread, final long csid, boolean refresh) {\r
+ if (null != listener)\r
+ listener.onChangeSetId(thread, csid, refresh);\r
+ if (csid > lastChangeSetId) {\r
+ lastChangeSetId = csid;\r
+ final Iterator<ChangeSetListener> it = changeSetListeners.iterator();\r
+ ThreadUtils.getBlockingWorkExecutor().execute(new Runnable() {\r
+ @Override\r
+ public void run() {\r
+ while (it.hasNext()) {\r
+ ChangeSetListener l = it.next();\r
+ try {\r
+ l.onChanged(csid);\r
+ } catch (Throwable t) {\r
+ Logger.defaultLogError(t);\r
+ }\r
+ }\r
+ }\r
+ });\r
+ }\r
+ }\r
+ protected abstract ServerInformationImpl getServerInformation()\r
+ throws DatabaseException;\r
+ public abstract void acceptCommit(long transactionId, long csid, byte[] metadata)\r
+ throws DatabaseException;\r
+ public abstract void cancelCommit(long transactionId, long csid, byte[] metadata, SynchronizeContextI context)\r
+ throws DatabaseException;\r
+ public abstract void endTransaction(long transactionId, boolean write)\r
+ throws DatabaseException;\r
+ public abstract long askWriteTransaction(int thread, long transactionId)\r
+ throws DatabaseException;\r
+ public abstract long askReadTransaction(int thread)\r
+ throws DatabaseException;\r
+ public abstract void stop()\r
+ throws DatabaseException;\r
+\r
+ public abstract long reserveIds(int count)\r
+ throws DatabaseException;\r
+ static class MetadataCache {\r
+ private final boolean DEBUG = false;\r
+ private final int SIZE = 10;\r
+ private int lastInd; // index of last inserted element\r
+ private long lastId; // id of last inserted element\r
+ private int count; // number of inserted elements\r
+ Vector<byte[]> buffer;\r
+ MetadataCache() {\r
+ init();\r
+ }\r
+ public int clear() {\r
+ int ret = count;\r
+ init();\r
+ return ret;\r
+ }\r
+ private void init() {\r
+ if (DEBUG)\r
+ System.out.println("DEBUG: MetadataCache init");\r
+ lastInd = -1;\r
+ lastId = 0;\r
+ count = 0;\r
+ buffer = new Vector<byte[]>();\r
+ buffer.setSize(SIZE);\r
+ }\r
+ private boolean incLastInd() {\r
+ ++lastInd;\r
+ if (lastInd >= SIZE) {\r
+ lastInd = 0;\r
+ return true;\r
+ } else\r
+ return false;\r
+ }\r
+ synchronized void addNext(long id, byte[] data)\r
+ throws DatabaseException {\r
+ if (DEBUG)\r
+ System.out.println("DEBUG: MetadataCache add id=" + id);\r
+ if (lastId != 0 && lastId != id - 1)\r
+ init(); // Only continuous sequence supported.\r
+ incLastInd();\r
+ buffer.set(lastInd, data);\r
+ lastId = id;\r
+ if (count < SIZE)\r
+ ++count;\r
+ }\r
+ synchronized byte[] get(long id) {\r
+ if (DEBUG)\r
+ System.out.println("DEBUG: MetadataCache get id=" + id);\r
+ if (id > lastId || id <= lastId - count)\r
+ return null;\r
+ int ind = lastInd - (int)(lastId - id);\r
+ if (ind < 0)\r
+ ind = SIZE + ind;\r
+ byte[] found = buffer.get(ind);\r
+ if (DEBUG)\r
+ if (null != found)\r
+ System.out.println("DEBUG: MetadataCache found id=" + id);\r
+ return found;\r
+ }\r
+ }\r
+ public ClusterUID[] listClusters() throws InternalException {\r
+ ClusterIds t = dbSession.getClusterIds();\r
+ long[] first = t.getFirst();\r
+ long[] second = t.getSecond();\r
+ int N1 = (null == first) ? 0 : first.length;\r
+ N1 = Math.min(N1, t.getStatus());\r
+ int N2 = (null == second) ? 0 : second.length;\r
+ N2 = Math.min(N1, N2);\r
+ ClusterUID[] clusters = new ClusterUID[N2];\r
+ for (int i=0; i<N2; ++i)\r
+ clusters[i] = new ClusterUID(first[i], second[i]);\r
+ return clusters;\r
+ }\r
+}\r
+class BuiltinData {\r
+ final long id;\r
+ final long cluster;\r
+ SoftReference<ResourceImpl> weakResource = null;\r
+ BuiltinData(long id, long cluster) {\r
+ this.id = id;\r
+ this.cluster = cluster;\r
+ }\r
+}\r
+class UpdateClusterFunction {\r
+ public byte[] operation = null;\r
+ public UpdateClusterFunction(byte[] operation) {\r
+ this.operation = operation;\r
+ }\r
+}\r