-/*******************************************************************************\r
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- * VTT Technical Research Centre of Finland - initial API and implementation\r
- *******************************************************************************/\r
-package fi.vtt.simantics.procore.internal;\r
-\r
-import java.io.ByteArrayInputStream;\r
-import java.io.IOException;\r
-import java.io.InputStream;\r
-import java.lang.ref.SoftReference;\r
-import java.util.ArrayList;\r
-import java.util.Collection;\r
-import java.util.HashSet;\r
-import java.util.Iterator;\r
-import java.util.Map;\r
-import java.util.TreeMap;\r
-import java.util.Vector;\r
-import java.util.concurrent.CopyOnWriteArrayList;\r
-\r
-import org.simantics.databoard.Bindings;\r
-import org.simantics.databoard.binding.impl.TreeMapBinding;\r
-import org.simantics.databoard.serialization.Serializer;\r
-import org.simantics.db.ChangeSetIdentifier;\r
-import org.simantics.db.Database;\r
-import org.simantics.db.Database.Session.ChangeSetData;\r
-import org.simantics.db.Database.Session.ChangeSetIds;\r
-import org.simantics.db.Database.Session.ClusterChanges;\r
-import org.simantics.db.Database.Session.ClusterIds;\r
-import org.simantics.db.Database.Session.Refresh;\r
-import org.simantics.db.Operation;\r
-import org.simantics.db.SessionReference;\r
-import org.simantics.db.common.UndoContextEx;\r
-import org.simantics.db.common.utils.Logger;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.exception.InternalException;\r
-import org.simantics.db.impl.ClusterI;\r
-import org.simantics.db.impl.ResourceImpl;\r
-import org.simantics.db.service.ClusterUID;\r
-import org.simantics.db.service.ExternalOperation;\r
-import org.simantics.db.service.ManagementSupport.ChangeSetListener;\r
-import org.simantics.scl.runtime.function.Function1;\r
-import org.simantics.scl.runtime.function.FunctionImpl1;\r
-import org.simantics.utils.threads.ThreadUtils;\r
-\r
-import gnu.trove.iterator.TLongObjectIterator;\r
-import gnu.trove.map.hash.THashMap;\r
-import gnu.trove.map.hash.TLongObjectHashMap;\r
-\r
-final class ChangeSetIdentifierImpl implements ChangeSetIdentifier {\r
- private final long id;\r
- private final GraphSession graphSession;\r
- private Map<String, byte[]> metadata;\r
- ChangeSetIdentifierImpl(long id, GraphSession graphSession) {\r
- this.id = id;\r
- this.graphSession = graphSession;\r
- this.metadata = null;\r
- }\r
- public final long getId() {\r
- return id;\r
- }\r
- public final void setMetadata(Map<String, byte[]> metadata) {\r
- this.metadata = metadata;\r
- }\r
- public final byte[] getContext()\r
- throws DatabaseException {\r
- if (null == graphSession)\r
- return new byte[0];\r
- return graphSession.getChangeSetContext(id);\r
- }\r
-\r
- static Serializer METADATA_SERIALIZER =\r
- Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING,\r
- Bindings.BYTE_ARRAY));\r
-\r
- @SuppressWarnings("unchecked")\r
- public Map<String, byte[]> getMetadata() {\r
- try {\r
- if (metadata == null) {\r
- byte[] data = getContext();\r
- if (null != data && data.length > 0) {\r
- metadata = (TreeMap<String, byte[]>) METADATA_SERIALIZER.deserialize(data);\r
- }\r
- }\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogError(e);\r
- e.printStackTrace();\r
- } catch (IOException e) {\r
- Logger.defaultLogError(e);\r
- e.printStackTrace();\r
- }\r
- return metadata;\r
- }\r
-\r
- static long[] changeSetIds2ints(Collection<ChangeSetIdentifier> changeSetIds) {\r
- if (null == changeSetIds)\r
- return new long[0];\r
- long[] t = new long[changeSetIds.size()];\r
- int i = -1;\r
- for (ChangeSetIdentifier id : changeSetIds)\r
- t[++i] = ((ChangeSetIdentifierImpl) id).getId();\r
- return t;\r
- }\r
- static long[] operations2ints(Collection<Operation> ops) {\r
- if (null == ops)\r
- return new long[0];\r
- long[] t = new long[ops.size()];\r
- int i = -1;\r
- for (Operation o : ops)\r
- t[++i] = o.getCSId();\r
- return t;\r
- }\r
- static Collection<ChangeSetIdentifier> longs2changeSetIds(GraphSession gs, long firstChangeSetIds, int count) {\r
- ArrayList<ChangeSetIdentifier> t = new ArrayList<ChangeSetIdentifier>();\r
- t.ensureCapacity(count);\r
- for (int i=0; i<count; ++i)\r
- t.add(new ChangeSetIdentifierImpl(firstChangeSetIds + i, gs));\r
- return t;\r
- }\r
- static Collection<ChangeSetIdentifier> longs2changeSetIds(GraphSession gs, long min, long max) {\r
- ArrayList<ChangeSetIdentifier> t = new ArrayList<ChangeSetIdentifier>();\r
- if (max<min)\r
- return t;\r
- long length = max - min + 1;\r
- t.ensureCapacity((int)length);\r
- for (int i=0; i<length; ++i) {\r
- t.add(new ChangeSetIdentifierImpl(min+i, gs));\r
- }\r
- return t;\r
- }\r
-}\r
-\r
-public abstract class GraphSession {\r
- final protected boolean DEBUG = false;\r
- final protected boolean VERBOSE = false;\r
- protected Listener listener;\r
- protected SessionImplSocket session;\r
- protected final Database.Session dbSession;\r
- private SessionReference sessionReference;\r
- private TLongObjectHashMap<ClusterI> clusterMap = new TLongObjectHashMap<ClusterI>();\r
- protected THashMap<String, BuiltinData> builtinMap = null;\r
- private long firstChangeSetId = 0;\r
- protected SynchronizeContextI synchronizeContext;\r
- \r
- final UndoContextEx undoContext = new UndoContextEx("GraphSession");\r
- final CopyOnWriteArrayList<ChangeSetListener> changeSetListeners = new CopyOnWriteArrayList<ChangeSetListener>();\r
- private long lastChangeSetId = 0;\r
- protected MetadataCache metadataCache = new MetadataCache();\r
- \r
- public GraphSession(SessionImplSocket sessionImpl, SessionReference sessionReference, Database.Session dbSession) {\r
- this.dbSession = dbSession;\r
- if (null == dbSession)\r
- throw new RuntimeException("Failed to initialize GraphSession. Database.Session can not be null.");\r
-// if (!(sessionReference instanceof ProCoreSessionReference))\r
-// throw new RuntimeException("Failed to initialize GraphSession. SessionReference must be instance of ProCoreSessionReference.");\r
- this.sessionReference = sessionReference;\r
- this.session = sessionImpl;\r
- }\r
- void addChangeSetListener(ChangeSetListener csl) {\r
- changeSetListeners.add(csl);\r
- }\r
- void removeChangeSetListener(ChangeSetListener csl) {\r
- changeSetListeners.remove(csl);\r
- }\r
- SessionReference getSessionReference() {\r
- return sessionReference;\r
- }\r
-\r
- public void open() throws InternalException {\r
- dbSession.open();\r
- }\r
-\r
- public void close() throws InternalException {\r
- dbSession.close();\r
- }\r
-\r
- public long getSessionId() {\r
- return sessionReference.getSessionId();\r
- }\r
-//\r
-// public String getHost() {\r
-// return sessionReference.serverReference.socketAddress.getHostName();\r
-// }\r
-//\r
-// public int getPort() {\r
-// return sessionReference.serverReference.socketAddress.getPort();\r
-// }\r
-\r
- long getFirstChangeSetId() {\r
- return firstChangeSetId;\r
- }\r
-\r
- String getClusterFileName(long clusterId) {\r
- String fileName = "cluster" + clusterId + ".dat";\r
- return fileName;\r
- }\r
-\r
- int computeClusterMemoryUse() {\r
-\r
- int size = 0;\r
- TLongObjectIterator<ClusterI> i = clusterMap.iterator();\r
- while (i.hasNext()) {\r
- i.advance();\r
- ClusterI c = i.value();\r
- try {\r
- size += c.getUsedSpace();\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c);\r
- }\r
- }\r
-\r
- return size;\r
-\r
- }\r
-\r
- void printClusterSize() {\r
-\r
- long size = 0;\r
-\r
- long amount = 0;\r
- TLongObjectIterator<ClusterI> i = clusterMap.iterator();\r
- while (i.hasNext()) {\r
- i.advance();\r
- ClusterI c = i.value();\r
- if (c.isLoaded()) {\r
- try {\r
- size += c.getUsedSpace();\r
- } catch (DatabaseException e) {\r
- Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c);\r
- }\r
- amount++;\r
- }\r
- }\r
-\r
- if (amount > 50)\r
- loadedClusters.clear();\r
- System.out.println("" + size / (1024 * 1024) + "M in " + amount + " clusters.");\r
-\r
- }\r
-\r
- private HashSet<Long> loadedClusters = new HashSet<Long>();\r
-\r
- ClusterI freeClusterFromCacheAndFreeClusterMemory(long clusterId) {\r
- ClusterI c = clusterMap.remove(clusterId);\r
- if (null != c)\r
- c.releaseMemory();\r
- return c;\r
- }\r
-\r
- private void debugPrint(UpdateClusterFunction f) {\r
- for (int i = 0; i < f.operation.length; ++i) {\r
- System.out.println("op=" + f.operation[i]);\r
- }\r
- }\r
-\r
- long total = 0;\r
-\r
- public boolean updateCluster(UpdateClusterFunction f) {\r
- session.clusterStream.markDirty();\r
- assert (null != f);\r
- try {\r
- if (DEBUG) {\r
- System.out.println("update cluster");\r
- if (VERBOSE)\r
- debugPrint(f);\r
- }\r
-// long start = System.nanoTime();\r
- dbSession.updateCluster(f.operation);\r
-// long duration = System.nanoTime() - start;\r
-// total += duration;\r
-// System.err.println("graphClient.call = " + 1e-9*duration + " " + 1e-9*total);\r
-// System.out.println("GraphSession.updateCluster done.");\r
- return true; // ok\r
- } catch (Throwable e) {\r
- e.printStackTrace();\r
- return false;\r
- }\r
- }\r
- public byte[] getChangeSetContext(long id)\r
- throws DatabaseException {\r
- byte[] data = metadataCache.get(id);\r
- if (null != data)\r
- return data;\r
- if (id < getServerInformation().firstChangeSetId) {\r
- Logger.defaultLogInfo("Asking purged change set metadata. uid=" + id);\r
- return new byte[0];\r
- }\r
- return dbSession.getChangeSetMetadata(id);\r
- }\r
-\r
- public StatementImpl getStatement(ResourceImpl s, ResourceImpl p, ResourceImpl o) {\r
- return new StatementImplOld(s, p, o);\r
- }\r
- public long newClusterId() throws DatabaseException {\r
- long id = dbSession.reserveIds(1);\r
- if (id <= ClusterUID.Builtin.second)\r
- return newClusterId();\r
- return id;\r
- }\r
-\r
- long load = 0;\r
-\r
- long inflate = 0;\r
-\r
-// public ClusterI getClusterImpl(ClusterUID clusterUID, int clusterKey)\r
-// throws DatabaseException {\r
-// assert(ClusterUID.isLegal(clusterUID));\r
-//\r
-// if(Development.DEVELOPMENT) {\r
-// if(Development.isTrue(DevelopmentKeys.READGRAPH_COUNT)) {\r
-// ReadGraphImpl.counters.adjustOrPutValue("load " + clusterUID + " " + clusterKey, 1, 1);\r
-// }\r
-// }\r
-//\r
-//// ITask task = ThreadLogger.getInstance().begin("Load cluster " + clusterId);\r
-// if (DEBUG)\r
-// System.out.println("DEBUG: loading cluster=" + clusterUID);\r
-//// System.out.println("getClusterImpl " + clusterId);\r
-// long start = System.nanoTime();\r
-// Cluster t = dbSession.getCluster(clusterUID.asBytes());\r
-// long duration = System.nanoTime() - start;\r
-// load += duration;\r
-// //System.out.println("loaded in " + 1e-9*duration + " total " + 1e-9*load);\r
-//// if (Constants.NullClusterId == t.clusterId) {\r
-//// throw new DatabaseException("Cluster " + t.clusterId + " not in server.");\r
-//// }\r
-//\r
-// ByteBuffer deflated = t.getDeflated();\r
-// start = System.nanoTime();\r
-// Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);\r
-// duration = System.nanoTime() - start;\r
-// inflate += duration;\r
-// //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);\r
-// long[] longs = (long[]) arrays[0];\r
-// int[] ints = (int[]) arrays[1];\r
-// byte[] bytes = (byte[]) arrays[2];\r
-//\r
-//// System.out.println("Got cluster " + clusterId + " from core.");\r
-// ClusterImpl cluster = ClusterImpl.make(longs, ints, bytes, session.clusterTranslator, clusterKey);\r
-//// System.out.println("cluster[ " + clusterId + "].size = " + cluster.getUsedSpace(-1));\r
-//// task.finish();\r
-// return cluster;\r
-// }\r
-\r
- ClusterI getCluster(long clusterId) {\r
- return clusterMap.get(clusterId);\r
- }\r
-\r
- ClusterI getClusterOrThrow(long clusterId) throws IllegalArgumentException {\r
- ClusterI c = this.getCluster(clusterId);\r
- if (c == null)\r
- throw new IllegalArgumentException("Cluster " + clusterId + " does not exist.");\r
- return c;\r
- }\r
-\r
- public String execute(String command) throws DatabaseException {\r
- return dbSession.execute(command);\r
- }\r
-\r
- public Collection<ChangeSetIdentifier> getChangeSets()\r
- throws DatabaseException {\r
- ChangeSetIds t = dbSession.getChangeSetIds();\r
- return ChangeSetIdentifierImpl.longs2changeSetIds(this, t.getFirstChangeSetId(), t.getCount());\r
- }\r
- public long getLastChangeSetId() {\r
- return lastChangeSetId;\r
- }\r
- public Collection<ChangeSetIdentifier> getChangeSets(long from, long to, long upperLimit)\r
- throws DatabaseException {\r
- if (from > to)\r
- throw new DatabaseException("Illegal argument: from=" + from + " > to=" + to + ".");\r
- long min = Math.max(from, 1);\r
- long max = Math.min(to, upperLimit);\r
- return ChangeSetIdentifierImpl.longs2changeSetIds(this, min, max);\r
- }\r
-\r
- public boolean getChangeSets(long minCS, long maxCS, SynchronizeContextI context)\r
- throws DatabaseException {\r
- try {\r
- synchronizeContext = context;\r
- ChangeSetData t = dbSession.getChangeSetData(minCS, maxCS, context);\r
- return !t.isOk();\r
- } catch (Throwable e) {\r
- if (DEBUG)\r
- e.printStackTrace();\r
- if (e instanceof DatabaseException)\r
- throw (DatabaseException) e;\r
- else\r
- throw new DatabaseException("GetChangeSetData call to server failed.");\r
- } finally {\r
- synchronizeContext = null;\r
- }\r
- }\r
-\r
- public static void forExternals(Collection<Operation> operations, Function1<ExternalOperation, Boolean> fn) {\r
- for(Operation o : operations) {\r
- for(ExternalOperation e : o.getExternalOperations()) {\r
- if(!e.isDisposed()) fn.apply(e);\r
- }\r
- }\r
- }\r
-\r
- public boolean undo(Collection<Operation> changeSetIds, SynchronizeContextI context)\r
- throws DatabaseException {\r
- try {\r
-\r
- forExternals(changeSetIds, new FunctionImpl1<ExternalOperation, Boolean>() {\r
- @Override\r
- public Boolean apply(ExternalOperation op) {\r
- op.undo();\r
- return true;\r
- }\r
- });\r
-\r
- long[] cids = ChangeSetIdentifierImpl.operations2ints(changeSetIds);\r
- synchronizeContext = context;\r
- if (DEBUG)\r
- for (long id : cids)\r
- System.out.println("DEBUG: Undo cs=" + id);\r
- return dbSession.undo(cids, context);\r
- } catch (Throwable e) {\r
- if (DEBUG)\r
- e.printStackTrace();\r
- if (e instanceof DatabaseException)\r
- throw (DatabaseException) e;\r
- else\r
- throw new DatabaseException("Undo call to server failed.", e);\r
- } finally {\r
- synchronizeContext = null;\r
- }\r
- }\r
-\r
- public ClusterUID[] getRefresh2(long csid) throws DatabaseException {\r
- Refresh t = dbSession.getRefresh(csid);\r
- if (DEBUG)\r
- System.err.println("getRefresh2 old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " chanaged clusters=" + t.getFirst().length);\r
- ClusterUID[] clusters = new ClusterUID[t.getFirst().length];\r
- for (int i=0; i<t.getFirst().length; ++i)\r
- clusters[i] = new ClusterUID(t.getFirst()[i], t.getSecond()[i]);\r
- return clusters;\r
- }\r
-\r
- public ClusterChanges getClusterChanges(ClusterUID cluster, long csid) throws DatabaseException {\r
- ClusterChanges t = dbSession.getClusterChanges(csid, cluster.asBytes());\r
- if (DEBUG)\r
- System.err.println("getClusterChanges old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " cluster=" + cluster + " statements=" + t.getPredicateIndex().length + " values=" + t.getValueIndex().length);\r
- return t;\r
- }\r
- public long getCluster(int id) {\r
- return session.getCluster(id);\r
- }\r
-\r
- private static class ResourceSegment {\r
- public long valueSize;\r
-\r
- public byte[] bytes;\r
-\r
- ResourceSegment(long valueSize, byte[] bytes) {\r
- this.valueSize = valueSize;\r
- this.bytes = bytes;\r
- }\r
- }\r
- public ResourceSegment getResourceSegment(int resourceIndex, ClusterUID clusterUID, long offset, short size)\r
- throws DatabaseException {\r
- if (DEBUG)\r
- System.out.println("DEBUG: getResourceSegment ri=" + resourceIndex + " cid=" + clusterUID + " offset=" + offset + " size=" + size);\r
- org.simantics.db.Database.Session.ResourceSegment t = dbSession.getResourceSegment(clusterUID.asBytes(), resourceIndex, offset, size);\r
- return new ResourceSegment(t.getValueSize(), t.getSegment());\r
- }\r
-\r
- public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID)\r
- throws DatabaseException {\r
- return getResourceValue(resourceIndex, clusterUID, 0, 0);\r
- }\r
-\r
- public InputStream getResourceValueStream(final int resourceIndex, final ClusterUID clusterUID, long offset, int length) throws DatabaseException {\r
-\r
- if (DEBUG)\r
- System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length);\r
- final int IMAX = 0xFFFF;\r
- short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX);\r
- final ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen);\r
- if (s.valueSize < 0)\r
- throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
- " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1).");\r
- int ilen = (int)slen & 0xFFFF;\r
- assert(s.bytes.length <= ilen);\r
- if (0 == length) {\r
- if (s.valueSize > Integer.MAX_VALUE)\r
- throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
- " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
- ". Value size=" + s.valueSize + " (2).");\r
- length = (int)s.valueSize;\r
- }\r
- long rSize = s.valueSize - offset;\r
- if (rSize < length)\r
- throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
- " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
- ". Value size=" + s.valueSize + " (3).");\r
- else if (length <= IMAX)\r
- return new ByteArrayInputStream(s.bytes);\r
-\r
- final int finalLength = length;\r
-\r
- return new InputStream() {\r
-\r
- int left = finalLength;\r
- long valueOffset = 0;\r
- int offset = 0;\r
- ResourceSegment _s = s;\r
-\r
- @Override\r
- public int read() throws IOException {\r
-\r
- if(left <= 0) throw new IllegalStateException();\r
-\r
- if(offset == _s.bytes.length) {\r
- short slen = (short)Math.min(left, IMAX);\r
- valueOffset += _s.bytes.length;\r
- try {\r
- _s = getResourceSegment(resourceIndex, clusterUID, valueOffset, slen);\r
- } catch (DatabaseException e) {\r
- throw new IOException(e);\r
- }\r
- offset = 0;\r
- }\r
-\r
- left--;\r
- int result = _s.bytes[offset++];\r
- if(result < 0) result += 256;\r
- return result;\r
-\r
- }\r
-\r
- };\r
- }\r
-\r
-\r
- public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID, long offset, int length)\r
- throws DatabaseException {\r
- if (DEBUG)\r
- System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length);\r
- final int IMAX = 0xFFFF;\r
- short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX);\r
- ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen);\r
- final long VALUE_SIZE = s.valueSize;\r
- if (s.valueSize < 0)\r
- throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
- " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1).");\r
- int ilen = (int)slen & 0xFFFF;\r
- assert(s.bytes.length <= ilen);\r
- if (0 == length) {\r
- if (s.valueSize > Integer.MAX_VALUE)\r
- throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
- " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
- ". Value size=" + s.valueSize + " (2).");\r
- length = (int)s.valueSize;\r
- }\r
- long rSize = s.valueSize - offset;\r
- if (rSize < length)\r
- throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
- " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
- ". Value size=" + s.valueSize + " (3).");\r
- else if (length <= IMAX)\r
- return s.bytes;\r
- byte[] bytes = new byte[length];\r
- int left = (int)length - s.bytes.length;\r
- int cur = s.bytes.length;\r
- offset += s.bytes.length;\r
- System.arraycopy(s.bytes, 0, bytes, 0, cur);\r
- while (left > 0) {\r
- slen = (short)Math.min(left, IMAX);\r
- s = getResourceSegment(resourceIndex, clusterUID, offset, slen);\r
- ilen = (int)slen & 0xFFFF;\r
- if (s.valueSize != VALUE_SIZE || s.bytes.length != ilen)\r
- throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
- " cluster=" + clusterUID + " off=" + offset + " len=" + length +\r
- ". Value size=" + s.valueSize + " blen=" + s.bytes.length + " (4).");\r
- System.arraycopy(s.bytes, 0, bytes, cur, s.bytes.length);\r
- left -= s.bytes.length;\r
- cur += s.bytes.length;\r
- offset += s.bytes.length;\r
- }\r
- return bytes;\r
- }\r
-\r
- public long getResourceValueSize(int resourceIndex, ClusterUID clusterUID)\r
- throws DatabaseException {\r
- if (DEBUG)\r
- System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID);\r
- final short SMAX = (short)0xFFFF;\r
- ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, 0, SMAX);\r
- if (s.valueSize < 0)\r
- throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +\r
- " cluster=" + clusterUID + " size=" + s.valueSize);\r
- return s.valueSize;\r
- }\r
- int wait4RequestsLess(int limit)\r
- throws DatabaseException {\r
- dbSession.execute("");\r
- return 0;\r
- }\r
- interface Listener {\r
- void onChangeSetId(int thread, long csid, boolean refresh);\r
- }\r
-\r
- void setListener(Listener listener) {\r
- this.listener = listener;\r
- }\r
-\r
- protected void updateLastChangeSetId(int thread, final long csid, boolean refresh) {\r
- if (null != listener)\r
- listener.onChangeSetId(thread, csid, refresh);\r
- if (csid > lastChangeSetId) {\r
- lastChangeSetId = csid;\r
- final Iterator<ChangeSetListener> it = changeSetListeners.iterator();\r
- ThreadUtils.getBlockingWorkExecutor().execute(new Runnable() {\r
- @Override\r
- public void run() {\r
- while (it.hasNext()) {\r
- ChangeSetListener l = it.next();\r
- try {\r
- l.onChanged(csid);\r
- } catch (Throwable t) {\r
- Logger.defaultLogError(t);\r
- }\r
- }\r
- }\r
- });\r
- }\r
- }\r
- protected abstract ServerInformationImpl getServerInformation()\r
- throws DatabaseException;\r
- public abstract void acceptCommit(long transactionId, long csid, byte[] metadata)\r
- throws DatabaseException;\r
- public abstract void cancelCommit(long transactionId, long csid, byte[] metadata, SynchronizeContextI context)\r
- throws DatabaseException;\r
- public abstract void endTransaction(long transactionId, boolean write)\r
- throws DatabaseException;\r
- public abstract long askWriteTransaction(int thread, long transactionId)\r
- throws DatabaseException;\r
- public abstract long askReadTransaction(int thread)\r
- throws DatabaseException;\r
- public abstract void stop()\r
- throws DatabaseException;\r
-\r
- public abstract long reserveIds(int count)\r
- throws DatabaseException;\r
- static class MetadataCache {\r
- private final boolean DEBUG = false;\r
- private final int SIZE = 10;\r
- private int lastInd; // index of last inserted element\r
- private long lastId; // id of last inserted element\r
- private int count; // number of inserted elements\r
- Vector<byte[]> buffer;\r
- MetadataCache() {\r
- init();\r
- }\r
- public int clear() {\r
- int ret = count;\r
- init();\r
- return ret;\r
- }\r
- private void init() {\r
- if (DEBUG)\r
- System.out.println("DEBUG: MetadataCache init");\r
- lastInd = -1;\r
- lastId = 0;\r
- count = 0;\r
- buffer = new Vector<byte[]>();\r
- buffer.setSize(SIZE);\r
- }\r
- private boolean incLastInd() {\r
- ++lastInd;\r
- if (lastInd >= SIZE) {\r
- lastInd = 0;\r
- return true;\r
- } else\r
- return false;\r
- }\r
- synchronized void addNext(long id, byte[] data)\r
- throws DatabaseException {\r
- if (DEBUG)\r
- System.out.println("DEBUG: MetadataCache add id=" + id);\r
- if (lastId != 0 && lastId != id - 1)\r
- init(); // Only continuous sequence supported.\r
- incLastInd();\r
- buffer.set(lastInd, data);\r
- lastId = id;\r
- if (count < SIZE)\r
- ++count;\r
- }\r
- synchronized byte[] get(long id) {\r
- if (DEBUG)\r
- System.out.println("DEBUG: MetadataCache get id=" + id);\r
- if (id > lastId || id <= lastId - count)\r
- return null;\r
- int ind = lastInd - (int)(lastId - id);\r
- if (ind < 0)\r
- ind = SIZE + ind;\r
- byte[] found = buffer.get(ind);\r
- if (DEBUG)\r
- if (null != found)\r
- System.out.println("DEBUG: MetadataCache found id=" + id);\r
- return found;\r
- }\r
- }\r
- public ClusterUID[] listClusters() throws InternalException {\r
- ClusterIds t = dbSession.getClusterIds();\r
- long[] first = t.getFirst();\r
- long[] second = t.getSecond();\r
- int N1 = (null == first) ? 0 : first.length;\r
- N1 = Math.min(N1, t.getStatus());\r
- int N2 = (null == second) ? 0 : second.length;\r
- N2 = Math.min(N1, N2);\r
- ClusterUID[] clusters = new ClusterUID[N2];\r
- for (int i=0; i<N2; ++i)\r
- clusters[i] = new ClusterUID(first[i], second[i]);\r
- return clusters;\r
- }\r
- public boolean rolledback() {\r
- return dbSession.rolledback();\r
- }\r
-}\r
-class BuiltinData {\r
- final long id;\r
- final long cluster;\r
- SoftReference<ResourceImpl> weakResource = null;\r
- BuiltinData(long id, long cluster) {\r
- this.id = id;\r
- this.cluster = cluster;\r
- }\r
-}\r
-class UpdateClusterFunction {\r
- public byte[] operation = null;\r
- public UpdateClusterFunction(byte[] operation) {\r
- this.operation = operation;\r
- }\r
-}\r
+/*******************************************************************************
+ * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * VTT Technical Research Centre of Finland - initial API and implementation
+ *******************************************************************************/
+package fi.vtt.simantics.procore.internal;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.ref.SoftReference;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.Vector;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.simantics.databoard.Bindings;
+import org.simantics.databoard.binding.impl.TreeMapBinding;
+import org.simantics.databoard.serialization.Serializer;
+import org.simantics.db.ChangeSetIdentifier;
+import org.simantics.db.Database;
+import org.simantics.db.Database.Session.ChangeSetData;
+import org.simantics.db.Database.Session.ChangeSetIds;
+import org.simantics.db.Database.Session.ClusterChanges;
+import org.simantics.db.Database.Session.ClusterIds;
+import org.simantics.db.Database.Session.Refresh;
+import org.simantics.db.Operation;
+import org.simantics.db.SessionReference;
+import org.simantics.db.common.UndoContextEx;
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.exception.InternalException;
+import org.simantics.db.impl.ClusterI;
+import org.simantics.db.impl.ResourceImpl;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.db.service.ExternalOperation;
+import org.simantics.db.service.ManagementSupport.ChangeSetListener;
+import org.simantics.scl.runtime.function.Function1;
+import org.simantics.scl.runtime.function.FunctionImpl1;
+import org.simantics.utils.threads.ThreadUtils;
+
+import gnu.trove.iterator.TLongObjectIterator;
+import gnu.trove.map.hash.THashMap;
+import gnu.trove.map.hash.TLongObjectHashMap;
+
+final class ChangeSetIdentifierImpl implements ChangeSetIdentifier {
+ private final long id;
+ private final GraphSession graphSession;
+ private Map<String, byte[]> metadata;
+ ChangeSetIdentifierImpl(long id, GraphSession graphSession) {
+ this.id = id;
+ this.graphSession = graphSession;
+ this.metadata = null;
+ }
+ public final long getId() {
+ return id;
+ }
+ public final void setMetadata(Map<String, byte[]> metadata) {
+ this.metadata = metadata;
+ }
+ public final byte[] getContext()
+ throws DatabaseException {
+ if (null == graphSession)
+ return new byte[0];
+ return graphSession.getChangeSetContext(id);
+ }
+
+ static Serializer METADATA_SERIALIZER =
+ Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING,
+ Bindings.BYTE_ARRAY));
+
+ @SuppressWarnings("unchecked")
+ public Map<String, byte[]> getMetadata() {
+ try {
+ if (metadata == null) {
+ byte[] data = getContext();
+ if (null != data && data.length > 0) {
+ metadata = (TreeMap<String, byte[]>) METADATA_SERIALIZER.deserialize(data);
+ }
+ }
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ e.printStackTrace();
+ } catch (IOException e) {
+ Logger.defaultLogError(e);
+ e.printStackTrace();
+ }
+ return metadata;
+ }
+
+ static long[] changeSetIds2ints(Collection<ChangeSetIdentifier> changeSetIds) {
+ if (null == changeSetIds)
+ return new long[0];
+ long[] t = new long[changeSetIds.size()];
+ int i = -1;
+ for (ChangeSetIdentifier id : changeSetIds)
+ t[++i] = ((ChangeSetIdentifierImpl) id).getId();
+ return t;
+ }
+ static long[] operations2ints(Collection<Operation> ops) {
+ if (null == ops)
+ return new long[0];
+ long[] t = new long[ops.size()];
+ int i = -1;
+ for (Operation o : ops)
+ t[++i] = o.getCSId();
+ return t;
+ }
+ static Collection<ChangeSetIdentifier> longs2changeSetIds(GraphSession gs, long firstChangeSetIds, int count) {
+ ArrayList<ChangeSetIdentifier> t = new ArrayList<ChangeSetIdentifier>();
+ t.ensureCapacity(count);
+ for (int i=0; i<count; ++i)
+ t.add(new ChangeSetIdentifierImpl(firstChangeSetIds + i, gs));
+ return t;
+ }
+ static Collection<ChangeSetIdentifier> longs2changeSetIds(GraphSession gs, long min, long max) {
+ ArrayList<ChangeSetIdentifier> t = new ArrayList<ChangeSetIdentifier>();
+ if (max<min)
+ return t;
+ long length = max - min + 1;
+ t.ensureCapacity((int)length);
+ for (int i=0; i<length; ++i) {
+ t.add(new ChangeSetIdentifierImpl(min+i, gs));
+ }
+ return t;
+ }
+}
+
+public abstract class GraphSession {
+ final protected boolean DEBUG = false;
+ final protected boolean VERBOSE = false;
+ protected Listener listener;
+ protected SessionImplSocket session;
+ protected final Database.Session dbSession;
+ private SessionReference sessionReference;
+ private TLongObjectHashMap<ClusterI> clusterMap = new TLongObjectHashMap<ClusterI>();
+ protected THashMap<String, BuiltinData> builtinMap = null;
+ private long firstChangeSetId = 0;
+ protected SynchronizeContextI synchronizeContext;
+
+ final UndoContextEx undoContext = new UndoContextEx("GraphSession");
+ final CopyOnWriteArrayList<ChangeSetListener> changeSetListeners = new CopyOnWriteArrayList<ChangeSetListener>();
+ private long lastChangeSetId = 0;
+ protected MetadataCache metadataCache = new MetadataCache();
+
+ public GraphSession(SessionImplSocket sessionImpl, SessionReference sessionReference, Database.Session dbSession) {
+ this.dbSession = dbSession;
+ if (null == dbSession)
+ throw new RuntimeException("Failed to initialize GraphSession. Database.Session can not be null.");
+// if (!(sessionReference instanceof ProCoreSessionReference))
+// throw new RuntimeException("Failed to initialize GraphSession. SessionReference must be instance of ProCoreSessionReference.");
+ this.sessionReference = sessionReference;
+ this.session = sessionImpl;
+ }
+ void addChangeSetListener(ChangeSetListener csl) {
+ changeSetListeners.add(csl);
+ }
+ void removeChangeSetListener(ChangeSetListener csl) {
+ changeSetListeners.remove(csl);
+ }
+ SessionReference getSessionReference() {
+ return sessionReference;
+ }
+
+ public void open() throws InternalException {
+ dbSession.open();
+ }
+
+ public void close() throws InternalException {
+ dbSession.close();
+ }
+
+ public long getSessionId() {
+ return sessionReference.getSessionId();
+ }
+//
+// public String getHost() {
+// return sessionReference.serverReference.socketAddress.getHostName();
+// }
+//
+// public int getPort() {
+// return sessionReference.serverReference.socketAddress.getPort();
+// }
+
+ long getFirstChangeSetId() {
+ return firstChangeSetId;
+ }
+
+ String getClusterFileName(long clusterId) {
+ String fileName = "cluster" + clusterId + ".dat";
+ return fileName;
+ }
+
+ int computeClusterMemoryUse() {
+
+ int size = 0;
+ TLongObjectIterator<ClusterI> i = clusterMap.iterator();
+ while (i.hasNext()) {
+ i.advance();
+ ClusterI c = i.value();
+ try {
+ size += c.getUsedSpace();
+ } catch (DatabaseException e) {
+ Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c);
+ }
+ }
+
+ return size;
+
+ }
+
+ void printClusterSize() {
+
+ long size = 0;
+
+ long amount = 0;
+ TLongObjectIterator<ClusterI> i = clusterMap.iterator();
+ while (i.hasNext()) {
+ i.advance();
+ ClusterI c = i.value();
+ if (c.isLoaded()) {
+ try {
+ size += c.getUsedSpace();
+ } catch (DatabaseException e) {
+ Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c);
+ }
+ amount++;
+ }
+ }
+
+ if (amount > 50)
+ loadedClusters.clear();
+ System.out.println("" + size / (1024 * 1024) + "M in " + amount + " clusters.");
+
+ }
+
+ private HashSet<Long> loadedClusters = new HashSet<Long>();
+
+ ClusterI freeClusterFromCacheAndFreeClusterMemory(long clusterId) {
+ ClusterI c = clusterMap.remove(clusterId);
+ if (null != c)
+ c.releaseMemory();
+ return c;
+ }
+
+ private void debugPrint(UpdateClusterFunction f) {
+ for (int i = 0; i < f.operation.length; ++i) {
+ System.out.println("op=" + f.operation[i]);
+ }
+ }
+
+ long total = 0;
+
+ public boolean updateCluster(UpdateClusterFunction f) {
+ session.clusterStream.markDirty();
+ assert (null != f);
+ try {
+ if (DEBUG) {
+ System.out.println("update cluster");
+ if (VERBOSE)
+ debugPrint(f);
+ }
+// long start = System.nanoTime();
+ dbSession.updateCluster(f.operation);
+// long duration = System.nanoTime() - start;
+// total += duration;
+// System.err.println("graphClient.call = " + 1e-9*duration + " " + 1e-9*total);
+// System.out.println("GraphSession.updateCluster done.");
+ return true; // ok
+ } catch (Throwable e) {
+ e.printStackTrace();
+ return false;
+ }
+ }
+ public byte[] getChangeSetContext(long id)
+ throws DatabaseException {
+ byte[] data = metadataCache.get(id);
+ if (null != data)
+ return data;
+ if (id < getServerInformation().firstChangeSetId) {
+ Logger.defaultLogInfo("Asking purged change set metadata. uid=" + id);
+ return new byte[0];
+ }
+ return dbSession.getChangeSetMetadata(id);
+ }
+
+ public StatementImpl getStatement(ResourceImpl s, ResourceImpl p, ResourceImpl o) {
+ return new StatementImplOld(s, p, o);
+ }
+ public long newClusterId() throws DatabaseException {
+ long id = dbSession.reserveIds(1);
+ if (id <= ClusterUID.Builtin.second)
+ return newClusterId();
+ return id;
+ }
+
+ long load = 0;
+
+ long inflate = 0;
+
+// public ClusterI getClusterImpl(ClusterUID clusterUID, int clusterKey)
+// throws DatabaseException {
+// assert(ClusterUID.isLegal(clusterUID));
+//
+// if(Development.DEVELOPMENT) {
+// if(Development.isTrue(DevelopmentKeys.READGRAPH_COUNT)) {
+// ReadGraphImpl.counters.adjustOrPutValue("load " + clusterUID + " " + clusterKey, 1, 1);
+// }
+// }
+//
+//// ITask task = ThreadLogger.getInstance().begin("Load cluster " + clusterId);
+// if (DEBUG)
+// System.out.println("DEBUG: loading cluster=" + clusterUID);
+//// System.out.println("getClusterImpl " + clusterId);
+// long start = System.nanoTime();
+// Cluster t = dbSession.getCluster(clusterUID.asBytes());
+// long duration = System.nanoTime() - start;
+// load += duration;
+// //System.out.println("loaded in " + 1e-9*duration + " total " + 1e-9*load);
+//// if (Constants.NullClusterId == t.clusterId) {
+//// throw new DatabaseException("Cluster " + t.clusterId + " not in server.");
+//// }
+//
+// ByteBuffer deflated = t.getDeflated();
+// start = System.nanoTime();
+// Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated);
+// duration = System.nanoTime() - start;
+// inflate += duration;
+// //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate);
+// long[] longs = (long[]) arrays[0];
+// int[] ints = (int[]) arrays[1];
+// byte[] bytes = (byte[]) arrays[2];
+//
+//// System.out.println("Got cluster " + clusterId + " from core.");
+// ClusterImpl cluster = ClusterImpl.make(longs, ints, bytes, session.clusterTranslator, clusterKey);
+//// System.out.println("cluster[ " + clusterId + "].size = " + cluster.getUsedSpace(-1));
+//// task.finish();
+// return cluster;
+// }
+
+ ClusterI getCluster(long clusterId) {
+ return clusterMap.get(clusterId);
+ }
+
+ ClusterI getClusterOrThrow(long clusterId) throws IllegalArgumentException {
+ ClusterI c = this.getCluster(clusterId);
+ if (c == null)
+ throw new IllegalArgumentException("Cluster " + clusterId + " does not exist.");
+ return c;
+ }
+
+ public String execute(String command) throws DatabaseException {
+ return dbSession.execute(command);
+ }
+
+ public Collection<ChangeSetIdentifier> getChangeSets()
+ throws DatabaseException {
+ ChangeSetIds t = dbSession.getChangeSetIds();
+ return ChangeSetIdentifierImpl.longs2changeSetIds(this, t.getFirstChangeSetId(), t.getCount());
+ }
+ public long getLastChangeSetId() {
+ return lastChangeSetId;
+ }
+ public Collection<ChangeSetIdentifier> getChangeSets(long from, long to, long upperLimit)
+ throws DatabaseException {
+ if (from > to)
+ throw new DatabaseException("Illegal argument: from=" + from + " > to=" + to + ".");
+ long min = Math.max(from, 1);
+ long max = Math.min(to, upperLimit);
+ return ChangeSetIdentifierImpl.longs2changeSetIds(this, min, max);
+ }
+
+ public boolean getChangeSets(long minCS, long maxCS, SynchronizeContextI context)
+ throws DatabaseException {
+ try {
+ synchronizeContext = context;
+ ChangeSetData t = dbSession.getChangeSetData(minCS, maxCS, context);
+ return !t.isOk();
+ } catch (Throwable e) {
+ if (DEBUG)
+ e.printStackTrace();
+ if (e instanceof DatabaseException)
+ throw (DatabaseException) e;
+ else
+ throw new DatabaseException("GetChangeSetData call to server failed.");
+ } finally {
+ synchronizeContext = null;
+ }
+ }
+
+ public static void forExternals(Collection<Operation> operations, Function1<ExternalOperation, Boolean> fn) {
+ for(Operation o : operations) {
+ for(ExternalOperation e : o.getExternalOperations()) {
+ if(!e.isDisposed()) fn.apply(e);
+ }
+ }
+ }
+
+ public boolean undo(Collection<Operation> changeSetIds, SynchronizeContextI context)
+ throws DatabaseException {
+ try {
+
+ forExternals(changeSetIds, new FunctionImpl1<ExternalOperation, Boolean>() {
+ @Override
+ public Boolean apply(ExternalOperation op) {
+ op.undo();
+ return true;
+ }
+ });
+
+ long[] cids = ChangeSetIdentifierImpl.operations2ints(changeSetIds);
+ synchronizeContext = context;
+ if (DEBUG)
+ for (long id : cids)
+ System.out.println("DEBUG: Undo cs=" + id);
+ return dbSession.undo(cids, context);
+ } catch (Throwable e) {
+ if (DEBUG)
+ e.printStackTrace();
+ if (e instanceof DatabaseException)
+ throw (DatabaseException) e;
+ else
+ throw new DatabaseException("Undo call to server failed.", e);
+ } finally {
+ synchronizeContext = null;
+ }
+ }
+
+ public ClusterUID[] getRefresh2(long csid) throws DatabaseException {
+ Refresh t = dbSession.getRefresh(csid);
+ if (DEBUG)
+ System.err.println("getRefresh2 old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " chanaged clusters=" + t.getFirst().length);
+ ClusterUID[] clusters = new ClusterUID[t.getFirst().length];
+ for (int i=0; i<t.getFirst().length; ++i)
+ clusters[i] = new ClusterUID(t.getFirst()[i], t.getSecond()[i]);
+ return clusters;
+ }
+
+ public ClusterChanges getClusterChanges(ClusterUID cluster, long csid) throws DatabaseException {
+ ClusterChanges t = dbSession.getClusterChanges(csid, cluster.asBytes());
+ if (DEBUG)
+ System.err.println("getClusterChanges old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " cluster=" + cluster + " statements=" + t.getPredicateIndex().length + " values=" + t.getValueIndex().length);
+ return t;
+ }
+ public long getCluster(int id) {
+ return session.getCluster(id);
+ }
+
+ private static class ResourceSegment {
+ public long valueSize;
+
+ public byte[] bytes;
+
+ ResourceSegment(long valueSize, byte[] bytes) {
+ this.valueSize = valueSize;
+ this.bytes = bytes;
+ }
+ }
+ public ResourceSegment getResourceSegment(int resourceIndex, ClusterUID clusterUID, long offset, short size)
+ throws DatabaseException {
+ if (DEBUG)
+ System.out.println("DEBUG: getResourceSegment ri=" + resourceIndex + " cid=" + clusterUID + " offset=" + offset + " size=" + size);
+ org.simantics.db.Database.Session.ResourceSegment t = dbSession.getResourceSegment(clusterUID.asBytes(), resourceIndex, offset, size);
+ return new ResourceSegment(t.getValueSize(), t.getSegment());
+ }
+
+ public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID)
+ throws DatabaseException {
+ return getResourceValue(resourceIndex, clusterUID, 0, 0);
+ }
+
+ public InputStream getResourceValueStream(final int resourceIndex, final ClusterUID clusterUID, long offset, int length) throws DatabaseException {
+
+ if (DEBUG)
+ System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length);
+ final int IMAX = 0xFFFF;
+ short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX);
+ final ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen);
+ if (s.valueSize < 0)
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
+ " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1).");
+ int ilen = (int)slen & 0xFFFF;
+ assert(s.bytes.length <= ilen);
+ if (0 == length) {
+ if (s.valueSize > Integer.MAX_VALUE)
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
+ " cluster=" + clusterUID + " off=" + offset + " len=" + length +
+ ". Value size=" + s.valueSize + " (2).");
+ length = (int)s.valueSize;
+ }
+ long rSize = s.valueSize - offset;
+ if (rSize < length)
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
+ " cluster=" + clusterUID + " off=" + offset + " len=" + length +
+ ". Value size=" + s.valueSize + " (3).");
+ else if (length <= IMAX)
+ return new ByteArrayInputStream(s.bytes);
+
+ final int finalLength = length;
+
+ return new InputStream() {
+
+ int left = finalLength;
+ long valueOffset = 0;
+ int offset = 0;
+ ResourceSegment _s = s;
+
+ @Override
+ public int read() throws IOException {
+
+ if(left <= 0) return -1;
+
+ if(offset == _s.bytes.length) {
+ short slen = (short)Math.min(left, IMAX);
+ valueOffset += _s.bytes.length;
+ try {
+ _s = getResourceSegment(resourceIndex, clusterUID, valueOffset, slen);
+ } catch (DatabaseException e) {
+ throw new IOException(e);
+ }
+ offset = 0;
+ }
+
+ left--;
+ int result = _s.bytes[offset++];
+ if(result < 0) result += 256;
+ return result;
+
+ }
+
+ };
+ }
+
+
+ public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID, long offset, int length)
+ throws DatabaseException {
+ if (DEBUG)
+ System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length);
+ final int IMAX = 0xFFFF;
+ short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX);
+ ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen);
+ final long VALUE_SIZE = s.valueSize;
+ if (s.valueSize < 0)
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
+ " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1).");
+ int ilen = (int)slen & 0xFFFF;
+ assert(s.bytes.length <= ilen);
+ if (0 == length) {
+ if (s.valueSize > Integer.MAX_VALUE)
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
+ " cluster=" + clusterUID + " off=" + offset + " len=" + length +
+ ". Value size=" + s.valueSize + " (2).");
+ length = (int)s.valueSize;
+ }
+ long rSize = s.valueSize - offset;
+ if (rSize < length)
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
+ " cluster=" + clusterUID + " off=" + offset + " len=" + length +
+ ". Value size=" + s.valueSize + " (3).");
+ else if (length <= IMAX)
+ return s.bytes;
+ byte[] bytes = new byte[length];
+ int left = (int)length - s.bytes.length;
+ int cur = s.bytes.length;
+ offset += s.bytes.length;
+ System.arraycopy(s.bytes, 0, bytes, 0, cur);
+ while (left > 0) {
+ slen = (short)Math.min(left, IMAX);
+ s = getResourceSegment(resourceIndex, clusterUID, offset, slen);
+ ilen = (int)slen & 0xFFFF;
+ if (s.valueSize != VALUE_SIZE || s.bytes.length != ilen)
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
+ " cluster=" + clusterUID + " off=" + offset + " len=" + length +
+ ". Value size=" + s.valueSize + " blen=" + s.bytes.length + " (4).");
+ System.arraycopy(s.bytes, 0, bytes, cur, s.bytes.length);
+ left -= s.bytes.length;
+ cur += s.bytes.length;
+ offset += s.bytes.length;
+ }
+ return bytes;
+ }
+
+ public long getResourceValueSize(int resourceIndex, ClusterUID clusterUID)
+ throws DatabaseException {
+ if (DEBUG)
+ System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID);
+ final short SMAX = (short)0xFFFF;
+ ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, 0, SMAX);
+ if (s.valueSize < 0)
+ throw new DatabaseException("Failed to get value for resource index=" + resourceIndex +
+ " cluster=" + clusterUID + " size=" + s.valueSize);
+ return s.valueSize;
+ }
+ int wait4RequestsLess(int limit)
+ throws DatabaseException {
+ dbSession.execute("");
+ return 0;
+ }
+ interface Listener {
+ void onChangeSetId(int thread, long csid, boolean refresh);
+ }
+
+ void setListener(Listener listener) {
+ this.listener = listener;
+ }
+
+ protected void updateLastChangeSetId(int thread, final long csid, boolean refresh) {
+ if (null != listener)
+ listener.onChangeSetId(thread, csid, refresh);
+ if (csid > lastChangeSetId) {
+ lastChangeSetId = csid;
+ final Iterator<ChangeSetListener> it = changeSetListeners.iterator();
+ ThreadUtils.getBlockingWorkExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ while (it.hasNext()) {
+ ChangeSetListener l = it.next();
+ try {
+ l.onChanged(csid);
+ } catch (Throwable t) {
+ Logger.defaultLogError(t);
+ }
+ }
+ }
+ });
+ }
+ }
+ protected abstract ServerInformationImpl getServerInformation()
+ throws DatabaseException;
+ public abstract void acceptCommit(long transactionId, long csid, byte[] metadata)
+ throws DatabaseException;
+ public abstract void cancelCommit(long transactionId, long csid, byte[] metadata, SynchronizeContextI context)
+ throws DatabaseException;
+ public abstract void endTransaction(long transactionId, boolean write)
+ throws DatabaseException;
+ public abstract long askWriteTransaction(int thread, long transactionId)
+ throws DatabaseException;
+ public abstract long askReadTransaction(int thread)
+ throws DatabaseException;
+ public abstract void stop()
+ throws DatabaseException;
+
+ public abstract long reserveIds(int count)
+ throws DatabaseException;
+ static class MetadataCache {
+ private final boolean DEBUG = false;
+ private final int SIZE = 10;
+ private int lastInd; // index of last inserted element
+ private long lastId; // id of last inserted element
+ private int count; // number of inserted elements
+ Vector<byte[]> buffer;
+ MetadataCache() {
+ init();
+ }
+ public int clear() {
+ int ret = count;
+ init();
+ return ret;
+ }
+ private void init() {
+ if (DEBUG)
+ System.out.println("DEBUG: MetadataCache init");
+ lastInd = -1;
+ lastId = 0;
+ count = 0;
+ buffer = new Vector<byte[]>();
+ buffer.setSize(SIZE);
+ }
+ private boolean incLastInd() {
+ ++lastInd;
+ if (lastInd >= SIZE) {
+ lastInd = 0;
+ return true;
+ } else
+ return false;
+ }
+ synchronized void addNext(long id, byte[] data)
+ throws DatabaseException {
+ if (DEBUG)
+ System.out.println("DEBUG: MetadataCache add id=" + id);
+ if (lastId != 0 && lastId != id - 1)
+ init(); // Only continuous sequence supported.
+ incLastInd();
+ buffer.set(lastInd, data);
+ lastId = id;
+ if (count < SIZE)
+ ++count;
+ }
+ synchronized byte[] get(long id) {
+ if (DEBUG)
+ System.out.println("DEBUG: MetadataCache get id=" + id);
+ if (id > lastId || id <= lastId - count)
+ return null;
+ int ind = lastInd - (int)(lastId - id);
+ if (ind < 0)
+ ind = SIZE + ind;
+ byte[] found = buffer.get(ind);
+ if (DEBUG)
+ if (null != found)
+ System.out.println("DEBUG: MetadataCache found id=" + id);
+ return found;
+ }
+ }
+ public ClusterUID[] listClusters() throws InternalException {
+ ClusterIds t = dbSession.getClusterIds();
+ long[] first = t.getFirst();
+ long[] second = t.getSecond();
+ int N1 = (null == first) ? 0 : first.length;
+ N1 = Math.min(N1, t.getStatus());
+ int N2 = (null == second) ? 0 : second.length;
+ N2 = Math.min(N1, N2);
+ ClusterUID[] clusters = new ClusterUID[N2];
+ for (int i=0; i<N2; ++i)
+ clusters[i] = new ClusterUID(first[i], second[i]);
+ return clusters;
+ }
+ public boolean rolledback() {
+ return dbSession.rolledback();
+ }
+}
+class BuiltinData {
+ final long id;
+ final long cluster;
+ SoftReference<ResourceImpl> weakResource = null;
+ BuiltinData(long id, long cluster) {
+ this.id = id;
+ this.cluster = cluster;
+ }
+}
+class UpdateClusterFunction {
+ public byte[] operation = null;
+ public UpdateClusterFunction(byte[] operation) {
+ this.operation = operation;
+ }
+}