X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FGraphSession.java;h=8888657da2452088400d7d7ea3a6e453a4af344d;hp=054ee1d604e5ef8ddb6e0509cf043795a7612d90;hb=e19c37f84fd1ce2d946578f7c05f3e45444ba67a;hpb=969bd23cab98a79ca9101af33334000879fb60c5 diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/GraphSession.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/GraphSession.java index 054ee1d60..8888657da 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/GraphSession.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/GraphSession.java @@ -1,743 +1,745 @@ -/******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package fi.vtt.simantics.procore.internal; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.lang.ref.SoftReference; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.TreeMap; -import java.util.Vector; -import java.util.concurrent.CopyOnWriteArrayList; - -import org.simantics.databoard.Bindings; -import org.simantics.databoard.binding.impl.TreeMapBinding; -import org.simantics.databoard.serialization.Serializer; -import org.simantics.db.ChangeSetIdentifier; -import org.simantics.db.Database; -import org.simantics.db.Operation; -import org.simantics.db.SessionReference; -import org.simantics.db.Database.Session.ChangeSetData; -import org.simantics.db.Database.Session.ChangeSetIds; -import org.simantics.db.Database.Session.ClusterChanges; -import org.simantics.db.Database.Session.ClusterIds; -import org.simantics.db.Database.Session.Refresh; -import org.simantics.db.common.UndoContextEx; -import org.simantics.db.common.utils.Logger; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.exception.InternalException; -import org.simantics.db.impl.ClusterI; -import org.simantics.db.impl.ResourceImpl; -import org.simantics.db.service.ClusterUID; -import org.simantics.db.service.ExternalOperation; -import org.simantics.db.service.ManagementSupport.ChangeSetListener; -import org.simantics.scl.runtime.function.Function1; -import org.simantics.scl.runtime.function.FunctionImpl1; -import org.simantics.utils.threads.ThreadUtils; - -import fi.vtt.simantics.procore.ProCoreSessionReference; -import gnu.trove.iterator.TLongObjectIterator; -import gnu.trove.map.hash.THashMap; -import gnu.trove.map.hash.TLongObjectHashMap; - -final class ChangeSetIdentifierImpl implements ChangeSetIdentifier { - private final long id; - private final GraphSession graphSession; - private Map metadata; - ChangeSetIdentifierImpl(long id, GraphSession graphSession) { - this.id = id; - this.graphSession = graphSession; - this.metadata = null; - } - public final long getId() { - return id; - } - public final void setMetadata(Map metadata) { - this.metadata = metadata; - } - public final byte[] getContext() - throws DatabaseException { - if (null == graphSession) - return new byte[0]; - return graphSession.getChangeSetContext(id); - } - - static Serializer METADATA_SERIALIZER = - Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING, - Bindings.BYTE_ARRAY)); - - @SuppressWarnings("unchecked") - public Map getMetadata() { - try { - if (metadata == null) { - byte[] data = getContext(); - if (null != data && data.length > 0) { - metadata = (TreeMap) METADATA_SERIALIZER.deserialize(data); - } - } - } catch (DatabaseException e) { - Logger.defaultLogError(e); - e.printStackTrace(); - } catch (IOException e) { - Logger.defaultLogError(e); - e.printStackTrace(); - } - return metadata; - } - - static long[] changeSetIds2ints(Collection changeSetIds) { - if (null == changeSetIds) - return new long[0]; - long[] t = new long[changeSetIds.size()]; - int i = -1; - for (ChangeSetIdentifier id : changeSetIds) - t[++i] = ((ChangeSetIdentifierImpl) id).getId(); - return t; - } - static long[] operations2ints(Collection ops) { - if (null == ops) - return new long[0]; - long[] t = new long[ops.size()]; - int i = -1; - for (Operation o : ops) - t[++i] = o.getCSId(); - return t; - } - static Collection longs2changeSetIds(GraphSession gs, long firstChangeSetIds, int count) { - ArrayList t = new ArrayList(); - t.ensureCapacity(count); - for (int i=0; i longs2changeSetIds(GraphSession gs, long min, long max) { - ArrayList t = new ArrayList(); - if (max clusterMap = new TLongObjectHashMap(); - protected THashMap builtinMap = null; - private long firstChangeSetId = 0; - protected SynchronizeContextI synchronizeContext; - - final UndoContextEx undoContext = new UndoContextEx("GraphSession"); - final CopyOnWriteArrayList changeSetListeners = new CopyOnWriteArrayList(); - private long lastChangeSetId = 0; - protected MetadataCache metadataCache = new MetadataCache(); - - public GraphSession(SessionImplSocket sessionImpl, SessionReference sessionReference, Database.Session dbSession) { - this.dbSession = dbSession; - if (null == dbSession) - throw new RuntimeException("Failed to initialize GraphSession. Database.Session can not be null."); -// if (!(sessionReference instanceof ProCoreSessionReference)) -// throw new RuntimeException("Failed to initialize GraphSession. SessionReference must be instance of ProCoreSessionReference."); - this.sessionReference = sessionReference; - this.session = sessionImpl; - } - void addChangeSetListener(ChangeSetListener csl) { - changeSetListeners.add(csl); - } - void removeChangeSetListener(ChangeSetListener csl) { - changeSetListeners.remove(csl); - } - SessionReference getSessionReference() { - return sessionReference; - } - - public void open() throws InternalException { - dbSession.open(); - } - - public void close() throws InternalException { - dbSession.close(); - } - - public long getSessionId() { - return sessionReference.getSessionId(); - } -// -// public String getHost() { -// return sessionReference.serverReference.socketAddress.getHostName(); -// } -// -// public int getPort() { -// return sessionReference.serverReference.socketAddress.getPort(); -// } - - long getFirstChangeSetId() { - return firstChangeSetId; - } - - String getClusterFileName(long clusterId) { - String fileName = "cluster" + clusterId + ".dat"; - return fileName; - } - - int computeClusterMemoryUse() { - - int size = 0; - TLongObjectIterator i = clusterMap.iterator(); - while (i.hasNext()) { - i.advance(); - ClusterI c = i.value(); - try { - size += c.getUsedSpace(); - } catch (DatabaseException e) { - Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c); - } - } - - return size; - - } - - void printClusterSize() { - - long size = 0; - - long amount = 0; - TLongObjectIterator i = clusterMap.iterator(); - while (i.hasNext()) { - i.advance(); - ClusterI c = i.value(); - if (c.isLoaded()) { - try { - size += c.getUsedSpace(); - } catch (DatabaseException e) { - Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c); - } - amount++; - } - } - - if (amount > 50) - loadedClusters.clear(); - System.out.println("" + size / (1024 * 1024) + "M in " + amount + " clusters."); - - } - - private HashSet loadedClusters = new HashSet(); - - ClusterI freeClusterFromCacheAndFreeClusterMemory(long clusterId) { - ClusterI c = clusterMap.remove(clusterId); - if (null != c) - c.releaseMemory(); - return c; - } - - private void debugPrint(UpdateClusterFunction f) { - for (int i = 0; i < f.operation.length; ++i) { - System.out.println("op=" + f.operation[i]); - } - } - - long total = 0; - - public boolean updateCluster(UpdateClusterFunction f) { - session.clusterStream.markDirty(); - assert (null != f); - try { - if (DEBUG) { - System.out.println("update cluster"); - if (VERBOSE) - debugPrint(f); - } -// long start = System.nanoTime(); - dbSession.updateCluster(f.operation); -// long duration = System.nanoTime() - start; -// total += duration; -// System.err.println("graphClient.call = " + 1e-9*duration + " " + 1e-9*total); -// System.out.println("GraphSession.updateCluster done."); - return true; // ok - } catch (Throwable e) { - e.printStackTrace(); - return false; - } - } - public byte[] getChangeSetContext(long id) - throws DatabaseException { - byte[] data = metadataCache.get(id); - if (null != data) - return data; - if (id < getServerInformation().firstChangeSetId) { - Logger.defaultLogInfo("Asking purged change set metadata. uid=" + id); - return new byte[0]; - } - return dbSession.getChangeSetMetadata(id); - } - - public StatementImpl getStatement(ResourceImpl s, ResourceImpl p, ResourceImpl o) { - return new StatementImplOld(s, p, o); - } - public long newClusterId() throws DatabaseException { - long id = dbSession.reserveIds(1); - if (id <= ClusterUID.Builtin.second) - return newClusterId(); - return id; - } - - long load = 0; - - long inflate = 0; - -// public ClusterI getClusterImpl(ClusterUID clusterUID, int clusterKey) -// throws DatabaseException { -// assert(ClusterUID.isLegal(clusterUID)); -// -// if(Development.DEVELOPMENT) { -// if(Development.isTrue(DevelopmentKeys.READGRAPH_COUNT)) { -// ReadGraphImpl.counters.adjustOrPutValue("load " + clusterUID + " " + clusterKey, 1, 1); -// } -// } -// -//// ITask task = ThreadLogger.getInstance().begin("Load cluster " + clusterId); -// if (DEBUG) -// System.out.println("DEBUG: loading cluster=" + clusterUID); -//// System.out.println("getClusterImpl " + clusterId); -// long start = System.nanoTime(); -// Cluster t = dbSession.getCluster(clusterUID.asBytes()); -// long duration = System.nanoTime() - start; -// load += duration; -// //System.out.println("loaded in " + 1e-9*duration + " total " + 1e-9*load); -//// if (Constants.NullClusterId == t.clusterId) { -//// throw new DatabaseException("Cluster " + t.clusterId + " not in server."); -//// } -// -// ByteBuffer deflated = t.getDeflated(); -// start = System.nanoTime(); -// Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated); -// duration = System.nanoTime() - start; -// inflate += duration; -// //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate); -// long[] longs = (long[]) arrays[0]; -// int[] ints = (int[]) arrays[1]; -// byte[] bytes = (byte[]) arrays[2]; -// -//// System.out.println("Got cluster " + clusterId + " from core."); -// ClusterImpl cluster = ClusterImpl.make(longs, ints, bytes, session.clusterTranslator, clusterKey); -//// System.out.println("cluster[ " + clusterId + "].size = " + cluster.getUsedSpace(-1)); -//// task.finish(); -// return cluster; -// } - - ClusterI getCluster(long clusterId) { - return clusterMap.get(clusterId); - } - - ClusterI getClusterOrThrow(long clusterId) throws IllegalArgumentException { - ClusterI c = this.getCluster(clusterId); - if (c == null) - throw new IllegalArgumentException("Cluster " + clusterId + " does not exist."); - return c; - } - - public String execute(String command) throws DatabaseException { - return dbSession.execute(command); - } - - public Collection getChangeSets() - throws DatabaseException { - ChangeSetIds t = dbSession.getChangeSetIds(); - return ChangeSetIdentifierImpl.longs2changeSetIds(this, t.getFirstChangeSetId(), t.getCount()); - } - public long getLastChangeSetId() { - return lastChangeSetId; - } - public Collection getChangeSets(long from, long to, long upperLimit) - throws DatabaseException { - if (from > to) - throw new DatabaseException("Illegal argument: from=" + from + " > to=" + to + "."); - long min = Math.max(from, 1); - long max = Math.min(to, upperLimit); - return ChangeSetIdentifierImpl.longs2changeSetIds(this, min, max); - } - - public boolean getChangeSets(long minCS, long maxCS, SynchronizeContextI context) - throws DatabaseException { - try { - synchronizeContext = context; - ChangeSetData t = dbSession.getChangeSetData(minCS, maxCS, context); - return !t.isOk(); - } catch (Throwable e) { - if (DEBUG) - e.printStackTrace(); - if (e instanceof DatabaseException) - throw (DatabaseException) e; - else - throw new DatabaseException("GetChangeSetData call to server failed."); - } finally { - synchronizeContext = null; - } - } - - public static void forExternals(Collection operations, Function1 fn) { - for(Operation o : operations) { - for(ExternalOperation e : o.getExternalOperations()) { - if(!e.isDisposed()) fn.apply(e); - } - } - } - - public boolean undo(Collection changeSetIds, SynchronizeContextI context) - throws DatabaseException { - try { - - forExternals(changeSetIds, new FunctionImpl1() { - @Override - public Boolean apply(ExternalOperation op) { - op.undo(); - return true; - } - }); - - long[] cids = ChangeSetIdentifierImpl.operations2ints(changeSetIds); - synchronizeContext = context; - if (DEBUG) - for (long id : cids) - System.out.println("DEBUG: Undo cs=" + id); - return dbSession.undo(cids, context); - } catch (Throwable e) { - if (DEBUG) - e.printStackTrace(); - if (e instanceof DatabaseException) - throw (DatabaseException) e; - else - throw new DatabaseException("Undo call to server failed.", e); - } finally { - synchronizeContext = null; - } - } - - public ClusterUID[] getRefresh2(long csid) throws DatabaseException { - Refresh t = dbSession.getRefresh(csid); - if (DEBUG) - System.err.println("getRefresh2 old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " chanaged clusters=" + t.getFirst().length); - ClusterUID[] clusters = new ClusterUID[t.getFirst().length]; - for (int i=0; i Integer.MAX_VALUE) - throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + - " cluster=" + clusterUID + " off=" + offset + " len=" + length + - ". Value size=" + s.valueSize + " (2)."); - length = (int)s.valueSize; - } - long rSize = s.valueSize - offset; - if (rSize < length) - throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + - " cluster=" + clusterUID + " off=" + offset + " len=" + length + - ". Value size=" + s.valueSize + " (3)."); - else if (length <= IMAX) - return new ByteArrayInputStream(s.bytes); - - final int finalLength = length; - - return new InputStream() { - - int left = finalLength; - long valueOffset = 0; - int offset = 0; - ResourceSegment _s = s; - - @Override - public int read() throws IOException { - - if(left <= 0) throw new IllegalStateException(); - - if(offset == _s.bytes.length) { - short slen = (short)Math.min(left, IMAX); - valueOffset += _s.bytes.length; - try { - _s = getResourceSegment(resourceIndex, clusterUID, valueOffset, slen); - } catch (DatabaseException e) { - throw new IOException(e); - } - offset = 0; - } - - left--; - int result = _s.bytes[offset++]; - if(result < 0) result += 256; - return result; - - } - - }; - } - - - public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID, long offset, int length) - throws DatabaseException { - if (DEBUG) - System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length); - final int IMAX = 0xFFFF; - short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX); - ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen); - final long VALUE_SIZE = s.valueSize; - if (s.valueSize < 0) - throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + - " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1)."); - int ilen = (int)slen & 0xFFFF; - assert(s.bytes.length <= ilen); - if (0 == length) { - if (s.valueSize > Integer.MAX_VALUE) - throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + - " cluster=" + clusterUID + " off=" + offset + " len=" + length + - ". Value size=" + s.valueSize + " (2)."); - length = (int)s.valueSize; - } - long rSize = s.valueSize - offset; - if (rSize < length) - throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + - " cluster=" + clusterUID + " off=" + offset + " len=" + length + - ". Value size=" + s.valueSize + " (3)."); - else if (length <= IMAX) - return s.bytes; - byte[] bytes = new byte[length]; - int left = (int)length - s.bytes.length; - int cur = s.bytes.length; - offset += s.bytes.length; - System.arraycopy(s.bytes, 0, bytes, 0, cur); - while (left > 0) { - slen = (short)Math.min(left, IMAX); - s = getResourceSegment(resourceIndex, clusterUID, offset, slen); - ilen = (int)slen & 0xFFFF; - if (s.valueSize != VALUE_SIZE || s.bytes.length != ilen) - throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + - " cluster=" + clusterUID + " off=" + offset + " len=" + length + - ". Value size=" + s.valueSize + " blen=" + s.bytes.length + " (4)."); - System.arraycopy(s.bytes, 0, bytes, cur, s.bytes.length); - left -= s.bytes.length; - cur += s.bytes.length; - offset += s.bytes.length; - } - return bytes; - } - - public long getResourceValueSize(int resourceIndex, ClusterUID clusterUID) - throws DatabaseException { - if (DEBUG) - System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID); - final short SMAX = (short)0xFFFF; - ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, 0, SMAX); - if (s.valueSize < 0) - throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + - " cluster=" + clusterUID + " size=" + s.valueSize); - return s.valueSize; - } - int wait4RequestsLess(int limit) - throws DatabaseException { - dbSession.execute(""); - return 0; - } - interface Listener { - void onChangeSetId(int thread, long csid, boolean refresh); - } - - void setListener(Listener listener) { - this.listener = listener; - } - - protected void updateLastChangeSetId(int thread, final long csid, boolean refresh) { - if (null != listener) - listener.onChangeSetId(thread, csid, refresh); - if (csid > lastChangeSetId) { - lastChangeSetId = csid; - final Iterator it = changeSetListeners.iterator(); - ThreadUtils.getBlockingWorkExecutor().execute(new Runnable() { - @Override - public void run() { - while (it.hasNext()) { - ChangeSetListener l = it.next(); - try { - l.onChanged(csid); - } catch (Throwable t) { - Logger.defaultLogError(t); - } - } - } - }); - } - } - protected abstract ServerInformationImpl getServerInformation() - throws DatabaseException; - public abstract void acceptCommit(long transactionId, long csid, byte[] metadata) - throws DatabaseException; - public abstract void cancelCommit(long transactionId, long csid, byte[] metadata, SynchronizeContextI context) - throws DatabaseException; - public abstract void endTransaction(long transactionId, boolean write) - throws DatabaseException; - public abstract long askWriteTransaction(int thread, long transactionId) - throws DatabaseException; - public abstract long askReadTransaction(int thread) - throws DatabaseException; - public abstract void stop() - throws DatabaseException; - - public abstract long reserveIds(int count) - throws DatabaseException; - static class MetadataCache { - private final boolean DEBUG = false; - private final int SIZE = 10; - private int lastInd; // index of last inserted element - private long lastId; // id of last inserted element - private int count; // number of inserted elements - Vector buffer; - MetadataCache() { - init(); - } - public int clear() { - int ret = count; - init(); - return ret; - } - private void init() { - if (DEBUG) - System.out.println("DEBUG: MetadataCache init"); - lastInd = -1; - lastId = 0; - count = 0; - buffer = new Vector(); - buffer.setSize(SIZE); - } - private boolean incLastInd() { - ++lastInd; - if (lastInd >= SIZE) { - lastInd = 0; - return true; - } else - return false; - } - synchronized void addNext(long id, byte[] data) - throws DatabaseException { - if (DEBUG) - System.out.println("DEBUG: MetadataCache add id=" + id); - if (lastId != 0 && lastId != id - 1) - init(); // Only continuous sequence supported. - incLastInd(); - buffer.set(lastInd, data); - lastId = id; - if (count < SIZE) - ++count; - } - synchronized byte[] get(long id) { - if (DEBUG) - System.out.println("DEBUG: MetadataCache get id=" + id); - if (id > lastId || id <= lastId - count) - return null; - int ind = lastInd - (int)(lastId - id); - if (ind < 0) - ind = SIZE + ind; - byte[] found = buffer.get(ind); - if (DEBUG) - if (null != found) - System.out.println("DEBUG: MetadataCache found id=" + id); - return found; - } - } - public ClusterUID[] listClusters() throws InternalException { - ClusterIds t = dbSession.getClusterIds(); - long[] first = t.getFirst(); - long[] second = t.getSecond(); - int N1 = (null == first) ? 0 : first.length; - N1 = Math.min(N1, t.getStatus()); - int N2 = (null == second) ? 0 : second.length; - N2 = Math.min(N1, N2); - ClusterUID[] clusters = new ClusterUID[N2]; - for (int i=0; i weakResource = null; - BuiltinData(long id, long cluster) { - this.id = id; - this.cluster = cluster; - } -} -class UpdateClusterFunction { - public byte[] operation = null; - public UpdateClusterFunction(byte[] operation) { - this.operation = operation; - } -} +/******************************************************************************* + * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package fi.vtt.simantics.procore.internal; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.ref.SoftReference; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; +import java.util.Vector; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.simantics.databoard.Bindings; +import org.simantics.databoard.binding.impl.TreeMapBinding; +import org.simantics.databoard.serialization.Serializer; +import org.simantics.db.ChangeSetIdentifier; +import org.simantics.db.Database; +import org.simantics.db.Database.Session.ChangeSetData; +import org.simantics.db.Database.Session.ChangeSetIds; +import org.simantics.db.Database.Session.ClusterChanges; +import org.simantics.db.Database.Session.ClusterIds; +import org.simantics.db.Database.Session.Refresh; +import org.simantics.db.Operation; +import org.simantics.db.SessionReference; +import org.simantics.db.common.UndoContextEx; +import org.simantics.db.common.utils.Logger; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.exception.InternalException; +import org.simantics.db.impl.ClusterI; +import org.simantics.db.impl.ResourceImpl; +import org.simantics.db.service.ClusterUID; +import org.simantics.db.service.ExternalOperation; +import org.simantics.db.service.ManagementSupport.ChangeSetListener; +import org.simantics.scl.runtime.function.Function1; +import org.simantics.scl.runtime.function.FunctionImpl1; +import org.simantics.utils.threads.ThreadUtils; + +import gnu.trove.iterator.TLongObjectIterator; +import gnu.trove.map.hash.THashMap; +import gnu.trove.map.hash.TLongObjectHashMap; + +final class ChangeSetIdentifierImpl implements ChangeSetIdentifier { + private final long id; + private final GraphSession graphSession; + private Map metadata; + ChangeSetIdentifierImpl(long id, GraphSession graphSession) { + this.id = id; + this.graphSession = graphSession; + this.metadata = null; + } + public final long getId() { + return id; + } + public final void setMetadata(Map metadata) { + this.metadata = metadata; + } + public final byte[] getContext() + throws DatabaseException { + if (null == graphSession) + return new byte[0]; + return graphSession.getChangeSetContext(id); + } + + static Serializer METADATA_SERIALIZER = + Bindings.getSerializerUnchecked( new TreeMapBinding(Bindings.STRING, + Bindings.BYTE_ARRAY)); + + @SuppressWarnings("unchecked") + public Map getMetadata() { + try { + if (metadata == null) { + byte[] data = getContext(); + if (null != data && data.length > 0) { + metadata = (TreeMap) METADATA_SERIALIZER.deserialize(data); + } + } + } catch (DatabaseException e) { + Logger.defaultLogError(e); + e.printStackTrace(); + } catch (IOException e) { + Logger.defaultLogError(e); + e.printStackTrace(); + } + return metadata; + } + + static long[] changeSetIds2ints(Collection changeSetIds) { + if (null == changeSetIds) + return new long[0]; + long[] t = new long[changeSetIds.size()]; + int i = -1; + for (ChangeSetIdentifier id : changeSetIds) + t[++i] = ((ChangeSetIdentifierImpl) id).getId(); + return t; + } + static long[] operations2ints(Collection ops) { + if (null == ops) + return new long[0]; + long[] t = new long[ops.size()]; + int i = -1; + for (Operation o : ops) + t[++i] = o.getCSId(); + return t; + } + static Collection longs2changeSetIds(GraphSession gs, long firstChangeSetIds, int count) { + ArrayList t = new ArrayList(); + t.ensureCapacity(count); + for (int i=0; i longs2changeSetIds(GraphSession gs, long min, long max) { + ArrayList t = new ArrayList(); + if (max clusterMap = new TLongObjectHashMap(); + protected THashMap builtinMap = null; + private long firstChangeSetId = 0; + protected SynchronizeContextI synchronizeContext; + + final UndoContextEx undoContext = new UndoContextEx("GraphSession"); + final CopyOnWriteArrayList changeSetListeners = new CopyOnWriteArrayList(); + private long lastChangeSetId = 0; + protected MetadataCache metadataCache = new MetadataCache(); + + public GraphSession(SessionImplSocket sessionImpl, SessionReference sessionReference, Database.Session dbSession) { + this.dbSession = dbSession; + if (null == dbSession) + throw new RuntimeException("Failed to initialize GraphSession. Database.Session can not be null."); +// if (!(sessionReference instanceof ProCoreSessionReference)) +// throw new RuntimeException("Failed to initialize GraphSession. SessionReference must be instance of ProCoreSessionReference."); + this.sessionReference = sessionReference; + this.session = sessionImpl; + } + void addChangeSetListener(ChangeSetListener csl) { + changeSetListeners.add(csl); + } + void removeChangeSetListener(ChangeSetListener csl) { + changeSetListeners.remove(csl); + } + SessionReference getSessionReference() { + return sessionReference; + } + + public void open() throws InternalException { + dbSession.open(); + } + + public void close() throws InternalException { + dbSession.close(); + } + + public long getSessionId() { + return sessionReference.getSessionId(); + } +// +// public String getHost() { +// return sessionReference.serverReference.socketAddress.getHostName(); +// } +// +// public int getPort() { +// return sessionReference.serverReference.socketAddress.getPort(); +// } + + long getFirstChangeSetId() { + return firstChangeSetId; + } + + String getClusterFileName(long clusterId) { + String fileName = "cluster" + clusterId + ".dat"; + return fileName; + } + + int computeClusterMemoryUse() { + + int size = 0; + TLongObjectIterator i = clusterMap.iterator(); + while (i.hasNext()) { + i.advance(); + ClusterI c = i.value(); + try { + size += c.getUsedSpace(); + } catch (DatabaseException e) { + Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c); + } + } + + return size; + + } + + void printClusterSize() { + + long size = 0; + + long amount = 0; + TLongObjectIterator i = clusterMap.iterator(); + while (i.hasNext()) { + i.advance(); + ClusterI c = i.value(); + if (c.isLoaded()) { + try { + size += c.getUsedSpace(); + } catch (DatabaseException e) { + Logger.defaultLogInfo("GetUsedSpace faile for cluster " + c); + } + amount++; + } + } + + if (amount > 50) + loadedClusters.clear(); + System.out.println("" + size / (1024 * 1024) + "M in " + amount + " clusters."); + + } + + private HashSet loadedClusters = new HashSet(); + + ClusterI freeClusterFromCacheAndFreeClusterMemory(long clusterId) { + ClusterI c = clusterMap.remove(clusterId); + if (null != c) + c.releaseMemory(); + return c; + } + + private void debugPrint(UpdateClusterFunction f) { + for (int i = 0; i < f.operation.length; ++i) { + System.out.println("op=" + f.operation[i]); + } + } + + long total = 0; + + public boolean updateCluster(UpdateClusterFunction f) { + session.clusterStream.markDirty(); + assert (null != f); + try { + if (DEBUG) { + System.out.println("update cluster"); + if (VERBOSE) + debugPrint(f); + } +// long start = System.nanoTime(); + dbSession.updateCluster(f.operation); +// long duration = System.nanoTime() - start; +// total += duration; +// System.err.println("graphClient.call = " + 1e-9*duration + " " + 1e-9*total); +// System.out.println("GraphSession.updateCluster done."); + return true; // ok + } catch (Throwable e) { + e.printStackTrace(); + return false; + } + } + public byte[] getChangeSetContext(long id) + throws DatabaseException { + byte[] data = metadataCache.get(id); + if (null != data) + return data; + if (id < getServerInformation().firstChangeSetId) { + Logger.defaultLogInfo("Asking purged change set metadata. uid=" + id); + return new byte[0]; + } + return dbSession.getChangeSetMetadata(id); + } + + public StatementImpl getStatement(ResourceImpl s, ResourceImpl p, ResourceImpl o) { + return new StatementImplOld(s, p, o); + } + public long newClusterId() throws DatabaseException { + long id = dbSession.reserveIds(1); + if (id <= ClusterUID.Builtin.second) + return newClusterId(); + return id; + } + + long load = 0; + + long inflate = 0; + +// public ClusterI getClusterImpl(ClusterUID clusterUID, int clusterKey) +// throws DatabaseException { +// assert(ClusterUID.isLegal(clusterUID)); +// +// if(Development.DEVELOPMENT) { +// if(Development.isTrue(DevelopmentKeys.READGRAPH_COUNT)) { +// ReadGraphImpl.counters.adjustOrPutValue("load " + clusterUID + " " + clusterKey, 1, 1); +// } +// } +// +//// ITask task = ThreadLogger.getInstance().begin("Load cluster " + clusterId); +// if (DEBUG) +// System.out.println("DEBUG: loading cluster=" + clusterUID); +//// System.out.println("getClusterImpl " + clusterId); +// long start = System.nanoTime(); +// Cluster t = dbSession.getCluster(clusterUID.asBytes()); +// long duration = System.nanoTime() - start; +// load += duration; +// //System.out.println("loaded in " + 1e-9*duration + " total " + 1e-9*load); +//// if (Constants.NullClusterId == t.clusterId) { +//// throw new DatabaseException("Cluster " + t.clusterId + " not in server."); +//// } +// +// ByteBuffer deflated = t.getDeflated(); +// start = System.nanoTime(); +// Object[] arrays = clusterDecompressor.inflateCluster(t.getInflateSize(), deflated); +// duration = System.nanoTime() - start; +// inflate += duration; +// //System.out.println("inflated in " + 1e-9*duration + " total " + 1e-9*inflate); +// long[] longs = (long[]) arrays[0]; +// int[] ints = (int[]) arrays[1]; +// byte[] bytes = (byte[]) arrays[2]; +// +//// System.out.println("Got cluster " + clusterId + " from core."); +// ClusterImpl cluster = ClusterImpl.make(longs, ints, bytes, session.clusterTranslator, clusterKey); +//// System.out.println("cluster[ " + clusterId + "].size = " + cluster.getUsedSpace(-1)); +//// task.finish(); +// return cluster; +// } + + ClusterI getCluster(long clusterId) { + return clusterMap.get(clusterId); + } + + ClusterI getClusterOrThrow(long clusterId) throws IllegalArgumentException { + ClusterI c = this.getCluster(clusterId); + if (c == null) + throw new IllegalArgumentException("Cluster " + clusterId + " does not exist."); + return c; + } + + public String execute(String command) throws DatabaseException { + return dbSession.execute(command); + } + + public Collection getChangeSets() + throws DatabaseException { + ChangeSetIds t = dbSession.getChangeSetIds(); + return ChangeSetIdentifierImpl.longs2changeSetIds(this, t.getFirstChangeSetId(), t.getCount()); + } + public long getLastChangeSetId() { + return lastChangeSetId; + } + public Collection getChangeSets(long from, long to, long upperLimit) + throws DatabaseException { + if (from > to) + throw new DatabaseException("Illegal argument: from=" + from + " > to=" + to + "."); + long min = Math.max(from, 1); + long max = Math.min(to, upperLimit); + return ChangeSetIdentifierImpl.longs2changeSetIds(this, min, max); + } + + public boolean getChangeSets(long minCS, long maxCS, SynchronizeContextI context) + throws DatabaseException { + try { + synchronizeContext = context; + ChangeSetData t = dbSession.getChangeSetData(minCS, maxCS, context); + return !t.isOk(); + } catch (Throwable e) { + if (DEBUG) + e.printStackTrace(); + if (e instanceof DatabaseException) + throw (DatabaseException) e; + else + throw new DatabaseException("GetChangeSetData call to server failed."); + } finally { + synchronizeContext = null; + } + } + + public static void forExternals(Collection operations, Function1 fn) { + for(Operation o : operations) { + for(ExternalOperation e : o.getExternalOperations()) { + if(!e.isDisposed()) fn.apply(e); + } + } + } + + public boolean undo(Collection changeSetIds, SynchronizeContextI context) + throws DatabaseException { + try { + + forExternals(changeSetIds, new FunctionImpl1() { + @Override + public Boolean apply(ExternalOperation op) { + op.undo(); + return true; + } + }); + + long[] cids = ChangeSetIdentifierImpl.operations2ints(changeSetIds); + synchronizeContext = context; + if (DEBUG) + for (long id : cids) + System.out.println("DEBUG: Undo cs=" + id); + return dbSession.undo(cids, context); + } catch (Throwable e) { + if (DEBUG) + e.printStackTrace(); + if (e instanceof DatabaseException) + throw (DatabaseException) e; + else + throw new DatabaseException("Undo call to server failed.", e); + } finally { + synchronizeContext = null; + } + } + + public ClusterUID[] getRefresh2(long csid) throws DatabaseException { + Refresh t = dbSession.getRefresh(csid); + if (DEBUG) + System.err.println("getRefresh2 old cs=" + csid + " head cs=" + t.getHeadChangeSetId() + " chanaged clusters=" + t.getFirst().length); + ClusterUID[] clusters = new ClusterUID[t.getFirst().length]; + for (int i=0; i Integer.MAX_VALUE) + throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + + " cluster=" + clusterUID + " off=" + offset + " len=" + length + + ". Value size=" + s.valueSize + " (2)."); + length = (int)s.valueSize; + } + long rSize = s.valueSize - offset; + if (rSize < length) + throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + + " cluster=" + clusterUID + " off=" + offset + " len=" + length + + ". Value size=" + s.valueSize + " (3)."); + else if (length <= IMAX) + return new ByteArrayInputStream(s.bytes); + + final int finalLength = length; + + return new InputStream() { + + int left = finalLength; + long valueOffset = 0; + int offset = 0; + ResourceSegment _s = s; + + @Override + public int read() throws IOException { + + if(left <= 0) return -1; + + if(offset == _s.bytes.length) { + short slen = (short)Math.min(left, IMAX); + valueOffset += _s.bytes.length; + try { + _s = getResourceSegment(resourceIndex, clusterUID, valueOffset, slen); + } catch (DatabaseException e) { + throw new IOException(e); + } + offset = 0; + } + + left--; + int result = _s.bytes[offset++]; + if(result < 0) result += 256; + return result; + + } + + }; + } + + + public byte[] getResourceValue(int resourceIndex, ClusterUID clusterUID, long offset, int length) + throws DatabaseException { + if (DEBUG) + System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID + " off=" + offset + " len=" + length); + final int IMAX = 0xFFFF; + short slen = (short)Math.min(length != 0 ? length : IMAX, IMAX); + ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, offset, slen); + final long VALUE_SIZE = s.valueSize; + if (s.valueSize < 0) + throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + + " cluster=" + clusterUID + " off=" + offset + " len=" + length + " (1)."); + int ilen = (int)slen & 0xFFFF; + assert(s.bytes.length <= ilen); + if (0 == length) { + if (s.valueSize > Integer.MAX_VALUE) + throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + + " cluster=" + clusterUID + " off=" + offset + " len=" + length + + ". Value size=" + s.valueSize + " (2)."); + length = (int)s.valueSize; + } + long rSize = s.valueSize - offset; + if (rSize < length) + throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + + " cluster=" + clusterUID + " off=" + offset + " len=" + length + + ". Value size=" + s.valueSize + " (3)."); + else if (length <= IMAX) + return s.bytes; + byte[] bytes = new byte[length]; + int left = (int)length - s.bytes.length; + int cur = s.bytes.length; + offset += s.bytes.length; + System.arraycopy(s.bytes, 0, bytes, 0, cur); + while (left > 0) { + slen = (short)Math.min(left, IMAX); + s = getResourceSegment(resourceIndex, clusterUID, offset, slen); + ilen = (int)slen & 0xFFFF; + if (s.valueSize != VALUE_SIZE || s.bytes.length != ilen) + throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + + " cluster=" + clusterUID + " off=" + offset + " len=" + length + + ". Value size=" + s.valueSize + " blen=" + s.bytes.length + " (4)."); + System.arraycopy(s.bytes, 0, bytes, cur, s.bytes.length); + left -= s.bytes.length; + cur += s.bytes.length; + offset += s.bytes.length; + } + return bytes; + } + + public long getResourceValueSize(int resourceIndex, ClusterUID clusterUID) + throws DatabaseException { + if (DEBUG) + System.out.println("DEBUG: getResourceFile ri=" + resourceIndex + " cid=" + clusterUID); + final short SMAX = (short)0xFFFF; + ResourceSegment s = getResourceSegment(resourceIndex, clusterUID, 0, SMAX); + if (s.valueSize < 0) + throw new DatabaseException("Failed to get value for resource index=" + resourceIndex + + " cluster=" + clusterUID + " size=" + s.valueSize); + return s.valueSize; + } + int wait4RequestsLess(int limit) + throws DatabaseException { + dbSession.execute(""); + return 0; + } + interface Listener { + void onChangeSetId(int thread, long csid, boolean refresh); + } + + void setListener(Listener listener) { + this.listener = listener; + } + + protected void updateLastChangeSetId(int thread, final long csid, boolean refresh) { + if (null != listener) + listener.onChangeSetId(thread, csid, refresh); + if (csid > lastChangeSetId) { + lastChangeSetId = csid; + final Iterator it = changeSetListeners.iterator(); + ThreadUtils.getBlockingWorkExecutor().execute(new Runnable() { + @Override + public void run() { + while (it.hasNext()) { + ChangeSetListener l = it.next(); + try { + l.onChanged(csid); + } catch (Throwable t) { + Logger.defaultLogError(t); + } + } + } + }); + } + } + protected abstract ServerInformationImpl getServerInformation() + throws DatabaseException; + public abstract void acceptCommit(long transactionId, long csid, byte[] metadata) + throws DatabaseException; + public abstract void cancelCommit(long transactionId, long csid, byte[] metadata, SynchronizeContextI context) + throws DatabaseException; + public abstract void endTransaction(long transactionId, boolean write) + throws DatabaseException; + public abstract long askWriteTransaction(int thread, long transactionId) + throws DatabaseException; + public abstract long askReadTransaction(int thread) + throws DatabaseException; + public abstract void stop() + throws DatabaseException; + + public abstract long reserveIds(int count) + throws DatabaseException; + static class MetadataCache { + private final boolean DEBUG = false; + private final int SIZE = 10; + private int lastInd; // index of last inserted element + private long lastId; // id of last inserted element + private int count; // number of inserted elements + Vector buffer; + MetadataCache() { + init(); + } + public int clear() { + int ret = count; + init(); + return ret; + } + private void init() { + if (DEBUG) + System.out.println("DEBUG: MetadataCache init"); + lastInd = -1; + lastId = 0; + count = 0; + buffer = new Vector(); + buffer.setSize(SIZE); + } + private boolean incLastInd() { + ++lastInd; + if (lastInd >= SIZE) { + lastInd = 0; + return true; + } else + return false; + } + synchronized void addNext(long id, byte[] data) + throws DatabaseException { + if (DEBUG) + System.out.println("DEBUG: MetadataCache add id=" + id); + if (lastId != 0 && lastId != id - 1) + init(); // Only continuous sequence supported. + incLastInd(); + buffer.set(lastInd, data); + lastId = id; + if (count < SIZE) + ++count; + } + synchronized byte[] get(long id) { + if (DEBUG) + System.out.println("DEBUG: MetadataCache get id=" + id); + if (id > lastId || id <= lastId - count) + return null; + int ind = lastInd - (int)(lastId - id); + if (ind < 0) + ind = SIZE + ind; + byte[] found = buffer.get(ind); + if (DEBUG) + if (null != found) + System.out.println("DEBUG: MetadataCache found id=" + id); + return found; + } + } + public ClusterUID[] listClusters() throws InternalException { + ClusterIds t = dbSession.getClusterIds(); + long[] first = t.getFirst(); + long[] second = t.getSecond(); + int N1 = (null == first) ? 0 : first.length; + N1 = Math.min(N1, t.getStatus()); + int N2 = (null == second) ? 0 : second.length; + N2 = Math.min(N1, N2); + ClusterUID[] clusters = new ClusterUID[N2]; + for (int i=0; i weakResource = null; + BuiltinData(long id, long cluster) { + this.id = id; + this.cluster = cluster; + } +} +class UpdateClusterFunction { + public byte[] operation = null; + public UpdateClusterFunction(byte[] operation) { + this.operation = operation; + } +}