X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=bundles%2Forg.simantics.databoard%2Fsrc%2Forg%2Fsimantics%2Fdataboard%2Faccessor%2Fbinary%2FBinaryStreamArray.java;h=8685c0d03b833b681be1baff5e8bf1863cb75dd0;hb=0ae2b770234dfc3cbb18bd38f324125cf0faca07;hp=a14cbfb87ee57854250016932ff7aa23020bee24;hpb=24e2b34260f219f0d1644ca7a138894980e25b14;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.databoard/src/org/simantics/databoard/accessor/binary/BinaryStreamArray.java b/bundles/org.simantics.databoard/src/org/simantics/databoard/accessor/binary/BinaryStreamArray.java index a14cbfb87..8685c0d03 100644 --- a/bundles/org.simantics.databoard/src/org/simantics/databoard/accessor/binary/BinaryStreamArray.java +++ b/bundles/org.simantics.databoard/src/org/simantics/databoard/accessor/binary/BinaryStreamArray.java @@ -1,946 +1,946 @@ -/******************************************************************************* - * Copyright (c) 2007, 2011 Association for Decentralized Information Management in - * Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * VTT Technical Research Centre of Finland - initial API and implementation - *******************************************************************************/ -package org.simantics.databoard.accessor.binary; - -import java.io.IOException; -import java.lang.ref.WeakReference; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.SortedMap; -import java.util.TreeMap; - -import org.simantics.databoard.accessor.Accessor; -import org.simantics.databoard.accessor.ArrayAccessor; -import org.simantics.databoard.accessor.CloseableAccessor; -import org.simantics.databoard.accessor.StreamAccessor; -import org.simantics.databoard.accessor.error.AccessorConstructionException; -import org.simantics.databoard.accessor.error.AccessorException; -import org.simantics.databoard.accessor.error.ReferenceException; -import org.simantics.databoard.accessor.event.ArrayElementAdded; -import org.simantics.databoard.accessor.event.ArrayElementRemoved; -import org.simantics.databoard.accessor.event.Event; -import org.simantics.databoard.accessor.event.ValueAssigned; -import org.simantics.databoard.accessor.file.FileArrayAccessor; -import org.simantics.databoard.accessor.impl.AccessorParams; -import org.simantics.databoard.accessor.impl.ListenerEntry; -import org.simantics.databoard.accessor.interestset.ArrayInterestSet; -import org.simantics.databoard.accessor.interestset.InterestSet; -import org.simantics.databoard.accessor.reference.ChildReference; -import org.simantics.databoard.accessor.reference.IndexReference; -import org.simantics.databoard.accessor.reference.LabelReference; -import org.simantics.databoard.adapter.AdaptException; -import org.simantics.databoard.binding.ArrayBinding; -import org.simantics.databoard.binding.Binding; -import org.simantics.databoard.binding.error.BindingException; -import org.simantics.databoard.binding.mutable.MutableVariant; -import org.simantics.databoard.serialization.Serializer; -import org.simantics.databoard.serialization.SerializerConstructionException; -import org.simantics.databoard.type.ArrayType; -import org.simantics.databoard.type.Datatype; -import org.simantics.databoard.util.binary.Blob; -import org.simantics.databoard.util.binary.RandomAccessBinary.ByteSide; - -/** - * Stream Array is accessor to an array where values can be added to - * the end, and where element size is constant. - * - *

- * The Binary format is different from the normal array format, there is no size - * integer at the beginning of the binary data. Instead the size is derieved - * from the size of the binary data by dividing byte size with element size. - * Therefore, the element size must be constant. - * - * @author Toni Kalajainen - */ -public class BinaryStreamArray extends BinaryObject implements ArrayAccessor, FileArrayAccessor, CloseableAccessor, ArrayAccessor.CloseableArrayAccessor, StreamAccessor { - - /** Accessors to children */ - TreeMap> children = new TreeMap>(); - - /** Element Binding */ - Binding cb; - /** Element Serializer */ - Serializer cs; - /** Element size */ - int elementSize; - - public BinaryStreamArray(BinaryObject parent, Blob blob, Datatype type, AccessorParams params) - throws AccessorException - { - super(parent, blob, type, params); - ArrayType at = (ArrayType) type; - cb = params.bindingScheme.getBindingUnchecked(at.componentType); - cs = params.serializerScheme.getSerializerUnchecked( cb ); - Integer elementConstantSize = cs.getConstantSize(); - if (elementConstantSize == null) { - throw new AccessorException("The size in an element of an AppendableArray must be constant."); - } - elementSize = elementConstantSize; - } - - public ArrayType type() { - return (ArrayType) type; - } - - - /** - * Get existing sub accessor - * @param index - * @return sub-accessor or null - */ - BinaryObject getExistingAccessor(int index) - { - java.lang.ref.Reference ref = children.get(index); - if (ref==null) return null; - BinaryObject res = (BinaryObject) ref.get(); -// if (res==null) children.remove(index); - return res; - } - - /** - * Get start position of a field - * - * @param fieldIndex - * @return - * @throws AccessorException - */ - long getStartPosition(int fieldIndex) throws AccessorException { - return fieldIndex * (long) (elementSize); - } - - long getLength(int index, long pos) throws AccessorException { - return elementSize; - } - - @Override - public void setNoflush(int index, Binding rcb, Object rcv) - throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - - // Write - Serializer rcs = params.serializerScheme.getSerializer( rcb ); - long pos = getStartPosition(index); - b.position(pos); - rcs.serialize(b, null, rcv); - - // Notify - ListenerEntry le = listeners; - while (le!=null) { - ArrayInterestSet is = le.getInterestSet(); - if (is.inNotificationsOf(index)) { - MutableVariant newValue = null; - if (is.inValues()) newValue = new MutableVariant(rcb, rcb.isImmutable() ? rcv : rcb.clone(rcv)); - - Event e = new ValueAssigned(new IndexReference(index), newValue); - emitEvent(le, e); - } - le = le.next; - } - - } catch (IOException e) { - throw new AccessorException(e); - } catch (AdaptException e) { - throw new AccessorException(e); - } catch (SerializerConstructionException e) { - throw new AccessorException(e); - } finally { - writeUnlock(); - } - - } - - /** - * Set all values - * - * @param arrayBinding - * @param newArray - */ - @Override - public void setValueNoflush(Binding arrayBinding, Object newArray) - throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - // Write - ArrayBinding rb = ((ArrayBinding)arrayBinding); - Binding rcb = rb.getComponentBinding(); - Serializer rcs = params.serializerScheme.getSerializer( rcb ); - int oldCount = _size(); - int newCount = rb.size(newArray); - b.setLength( newCount * elementSize ); - - // Serialize - b.position(0L); - for (int index=0; index=newCount; index--) { - BinaryObject sa = getExistingAccessor(index); - if (sa!=null) { - sa.invalidatedNotification(); - children.remove(index); - sa = null; - } - - // Notify changes - ListenerEntry le = listeners; - while (le!=null) { - ArrayInterestSet is = le.getInterestSet(); - if (is.inNotificationsOf(index)) { - Event e = new ArrayElementRemoved(index); - emitEvent(le, e); - } - le = le.next; - } - } - - // Notify new assignment - if (listeners!=null) { - for (int index=0; indexoldCount) throw new AccessorException("Index out of range"); - - long pos = getStartPosition(index); - b.position(pos); - b.insertBytes(elementSize, ByteSide.Left); - rcs.serialize(b, null, rcv); - - // Update child map keys - if (!lastEntry && !children.isEmpty()) { - Integer key = children.lastKey(); - while (key != null && key >= index) { - java.lang.ref.Reference value = children.remove(key); - if (value.get()!=null) children.put(key+1, value); - key = children.lowerKey(key); - } - } - - // Notify Listeners - ListenerEntry le = listeners; - while (le!=null) { - ArrayInterestSet is = le.getInterestSet(); - if (is.inNotifications()) { - MutableVariant newValue = null; - if (is.inValues()) newValue = new MutableVariant(rcb, rcb.isImmutable() ? rcv : rcb.clone(rcv)); - ArrayElementAdded e = new ArrayElementAdded(index, newValue); - emitEvent(le, e); - } - - // Update indices of interest sets - if (is.componentInterests!=null) { - Map oldCis = is.componentInterests; - boolean needUpdates = false; - for (Integer i : oldCis.keySet()) { - needUpdates |= i>=index; - if (needUpdates) break; - } - - if (needUpdates) { - Map newCis = new HashMap(oldCis.size()); - for (Integer i : oldCis.keySet()) - { - Integer oldKey = i; - Integer newKey = i>=index ? i+1 : i; - InterestSet oldValue = oldCis.get(oldKey); - newCis.put(newKey, oldValue); - } - is.componentInterests = newCis; - } - } - - le = le.next; - } - - } catch (IOException e) { - throw new AccessorException(e); - } catch (AdaptException e) { - throw new AccessorException(e); - } catch (SerializerConstructionException e) { - throw new AccessorException(e); - } finally { - writeUnlock(); - } - } - - @Override - public void addNoflush(Binding binding, Object value) - throws AccessorException { - addNoflush(_size(), binding, value); - } - - @Override - public void addAllNoflush(Binding binding, Object[] values) - throws AccessorException { - addAllNoflush(size(), binding, values); - } - - @Override - public void addAllNoflush(int index, Binding rcb, Object[] rcvs) - throws AccessorException { - if (index<0||index>size()) throw new AccessorException("Index out of bounds"); - assert b.isOpen(); - writeLock(); - try { - Serializer rcs = params.serializerScheme.getSerializer( rcb ); - // Write - b.position(0L); - int oldCount = b.readInt(); - int newCount = oldCount + rcvs.length; - if (index>oldCount) throw new AccessorException("Index out of range"); - b.position(0L); - b.writeInt(newCount); - boolean lastEntry = index == oldCount; - - int size = 0; - for (int i=0; i= index) { - java.lang.ref.Reference value = children.remove(key); - if (value.get()!=null) children.put(key+rcvs.length, value); - key = children.lowerKey(key); - } - } - - // Notify Listeners - ListenerEntry le = listeners; - while (le!=null) { - ArrayInterestSet is = le.getInterestSet(); - if (is.inNotifications()) { - for (int i=0; i oldCis = is.componentInterests; - boolean needUpdates = false; - for (Integer i : oldCis.keySet()) { - needUpdates |= i>=index; - if (needUpdates) break; - } - - if (needUpdates) { - Map newCis = new HashMap(oldCis.size()); - for (Integer i : oldCis.keySet()) - { - Integer oldKey = i; - Integer newKey = i>=index ? i+rcvs.length : i; - InterestSet oldValue = oldCis.get(oldKey); - newCis.put(newKey, oldValue); - } - is.componentInterests = newCis; - } - - // Add component interest listener - /* - for (int i = index; isize()) throw new AccessorException("Index out of bounds"); - assert b.isOpen(); - writeLock(); - try { - Serializer rcs = params.serializerScheme.getSerializer( rcb ); - // Write - int oldCount = _size(); - int newCount = oldCount + repeatCount; - if (index>oldCount) throw new AccessorException("Index out of range"); - b.position(0L); - b.writeInt(newCount); - boolean lastEntry = index == oldCount; - - int size = rcs.getSize(obj) * repeatCount; - long pos = getStartPosition(index); - b.position(pos); - b.insertBytes(size, ByteSide.Right); - - b.position(pos); - for (int i=0; i= index) { - java.lang.ref.Reference value = children.remove(key); - if (value.get()!=null) children.put(key+repeatCount, value); - key = children.lowerKey(key); - } - } - - // Notify Listeners - ListenerEntry le = listeners; - while (le!=null) { - ArrayInterestSet is = le.getInterestSet(); - if (is.inNotifications()) { - for (int i=0; i oldCis = is.componentInterests; - boolean needUpdates = false; - for (Integer i : oldCis.keySet()) { - needUpdates |= i>=index; - if (needUpdates) break; - } - - if (needUpdates) { - Map newCis = new HashMap(oldCis.size()); - for (Integer i : oldCis.keySet()) - { - Integer oldKey = i; - Integer newKey = i>=index ? i+repeatCount : i; - InterestSet oldValue = oldCis.get(oldKey); - newCis.put(newKey, oldValue); - } - is.componentInterests = newCis; - } - - } - - le = le.next; - } - - } catch (IOException e) { - throw new AccessorException(e); - } catch (SerializerConstructionException e) { - throw new AccessorException(e); - } finally { - writeUnlock(); - } - } - - @Override - public void removeNoflush(int index, int count) throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - // Write - boolean lastEntry = index == count; - int oldCount = _size(); - if (index<0||index+count>oldCount) throw new AccessorException("Index out of bounds"); - long pos = getStartPosition(index); - long lastPos = getStartPosition(index+count-1); - long lastLen = getLength(index, lastPos); - long end = lastPos + lastLen; - long len = end - pos; - b.position(pos); - b.removeBytes(len, ByteSide.Right); - - // Remove children - SortedMap> sm = children.subMap(index, true, index+count, false); - for (Entry> e : sm.entrySet()) { - BinaryObject bo = e.getValue().get(); - if (bo==null) continue; - bo.invalidatedNotification(); - } - sm.clear(); - - // Update the keys of consecutive children - if (!lastEntry && !children.isEmpty()) { - Integer lastKey = children.lastKey(); - Integer key = children.higherKey(index); - while (key != null && key <= lastKey) { - java.lang.ref.Reference value = children.remove(key); - if (value.get()!=null) children.put(key-count, value); - key = children.higherKey(key); - } - } - - // Notify Listeners - ListenerEntry le = listeners; - while (le!=null) { - ArrayInterestSet is = le.getInterestSet(); - if (is.inNotifications()) { - ArrayElementRemoved e = new ArrayElementRemoved(index); - emitEvent(le, e); - } - - // Update indices of interest sets - if (is.componentInterests!=null) { - Map oldCis = is.componentInterests; - boolean needUpdates = false; - for (Integer i : oldCis.keySet()) { - needUpdates |= i>=index; - if (needUpdates) break; - } - - if (needUpdates) { - Map newCis = new HashMap(oldCis.size()); - for (Integer i : oldCis.keySet()) - { - Integer oldKey = i; - Integer newKey = i>=index ? i-1 : i; - InterestSet oldValue = oldCis.get(oldKey); - newCis.put(newKey, oldValue); - } - is.componentInterests = newCis; - } - } - le = le.next; - } - - - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public Object get(int index, Binding valueBinding) throws AccessorException { - assert b.isOpen(); - readLock(); - try { - long pos = getStartPosition(index); - b.position(pos); - Serializer s = params.serializerScheme.getSerializer(valueBinding); - return s.deserialize(b); - } catch (IOException e) { - throw new AccessorException(e); - } catch (SerializerConstructionException e) { - throw new AccessorException(e); - } finally { - readUnlock(); - } - } - - @Override - public void get(int index, Binding valueBinding, Object dst) throws AccessorException { - assert b.isOpen(); - readLock(); - try { - long pos = getStartPosition(index); - b.position(pos); - Serializer s = params.serializerScheme.getSerializer(valueBinding); - s.deserializeTo(b, dst); - } catch (IOException e) { - throw new AccessorException(e); - } catch (SerializerConstructionException e) { - throw new AccessorException(e); - } finally { - readUnlock(); - } - } - - - @SuppressWarnings("unchecked") - @Override - public T getAccessor(int index) - throws AccessorConstructionException { - assert b.isOpen(); - readLock(); - try { - int count = _size(); - if (index<0 || index>=count) throw new ReferenceException("Element index ("+index+") out of bounds ("+count+")"); - - // Get existing or create new - BinaryObject sa = getExistingAccessor(index); - if (sa==null) { - long pos = getStartPosition(index); - long len = getLength(index, pos); - - // Instantiate correct sub accessor. - sa = createSubAccessor(cb.type(), pos, len, params); - children.put(index, new WeakReference(sa) ); - - // Add component interest sets - ListenerEntry le = listeners; - while (le!=null) { - ArrayInterestSet is = le.getInterestSet(); - - // Generic element interest - InterestSet gis = is.getComponentInterest(); - if (gis != null) { - try { - ChildReference childPath = ChildReference.concatenate(le.path, new IndexReference(index) ); - sa.addListener(le.listener, gis, childPath, le.executor); - } catch (AccessorException e) { - throw new AccessorConstructionException(e); - } - } - - // Specific element interest - InterestSet cis = is.getComponentInterest(index); - if (cis != null) { - try { - ChildReference childPath = ChildReference.concatenate(le.path, new IndexReference(index) ); - sa.addListener(le.listener, cis, childPath,le.executor); - } catch (AccessorException e) { - throw new AccessorConstructionException(e); - } - } - - // Next listener - le = le.next; - } - - } - - return (T) sa; - } catch (AccessorException e) { - throw new AccessorConstructionException(e); - } finally { - readUnlock(); - } - } - - @SuppressWarnings("unchecked") - @Override - public T getComponent(ChildReference reference) - throws AccessorConstructionException { - if (reference==null) return (T) this; - if (reference instanceof LabelReference) { - LabelReference lr = (LabelReference) reference; - try { - Integer index = new Integer( lr.label ); - Accessor result = getAccessor(index); - if (reference.getChildReference() != null) - result = result.getComponent(reference.getChildReference()); - return (T) result; - } catch ( NumberFormatException nfe ) { - throw new ReferenceException(nfe); - } - } else if (reference instanceof IndexReference) { - IndexReference ref = (IndexReference) reference; - int index = ref.getIndex(); - Accessor result = getAccessor(index); - if (reference.getChildReference() != null) - result = result.getComponent(reference.getChildReference()); - return (T) result; - } throw new ReferenceException(reference.getClass().getName()+" is not a reference of an array"); - } - - @Override - public void getAll(Binding valueBinding, Object[] array) - throws AccessorException { - assert b.isOpen(); - readLock(); - try { - int size = _size(); - if (size > array.length) throw new AccessorException("Argument array too short"); - Serializer s = params.serializerScheme.getSerializer(valueBinding); - b.position(0L); - for (int i=0; i values) - throws AccessorException { - assert b.isOpen(); - readLock(); - try { - b.position(0L); - int size = _size(); - Serializer s = params.serializerScheme.getSerializer(valueBinding); - for (int i=0; ioldSize) { - Object dummy = cb.createDefault(); - int count = newSize-oldSize; - addRepeatNoflush(oldSize, cb, dummy, count); - } - - } catch (BindingException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public void setSize(int newSize) throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - setSizeNoflush(newSize); - b.flush(); - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public int size() throws AccessorException { - assert b.isOpen(); - readLock(); - try { - return (int) ( b.length() / elementSize ); - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - readUnlock(); - } - } - - private int _size() throws AccessorException { - try { - return (int) ( b.length() / elementSize ); - } catch (IOException e) { - throw new AccessorException( e ); - } - } - - @Override - Event applyLocal(Event e, boolean makeRollback) throws AccessorException { - Event rollback = null; - if (e instanceof ValueAssigned) { - ValueAssigned va = (ValueAssigned) e; - if (makeRollback) rollback = new ValueAssigned(cb, getValue(cb)); - setValueNoflush(va.newValue.getBinding(), va.newValue.getValue()); - } else - if (e instanceof ArrayElementAdded) { - ArrayElementAdded aa = (ArrayElementAdded) e; - addNoflush(aa.index, aa.value.getBinding(), aa.value.getValue()); - if (makeRollback) rollback = new ArrayElementRemoved(aa.index); - } else if (e instanceof ArrayElementRemoved) { - ArrayElementRemoved ar = (ArrayElementRemoved) e; - if (ar.index<0 || ar.index >=size()) throw new AccessorException("Array index out of bounds"); - if (makeRollback) { - Object cv = get(ar.index, cb); - rollback = new ArrayElementAdded(ar.index, new MutableVariant(cb, cv)); - } - removeNoflush(ar.index, 1); - } else { - throw new AccessorException("Cannot apply "+e.getClass().getName()+" to Array"); - } - - return rollback; - } - - - - @Override - public void add(Binding binding, Object value) throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - addNoflush(binding, value); - b.flush(); - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public void add(int index, Binding binding, Object value) - throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - addNoflush(index, binding, value); - b.flush(); - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public void addAll(Binding binding, Object[] values) - throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - addAllNoflush(binding, values); - b.flush(); - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public void addAll(int index, Binding binding, Object[] values) - throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - addAllNoflush(index, binding, values); - b.flush(); - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public void remove(int index, int count) throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - removeNoflush(index, count); - b.flush(); - } catch (IOException e) { - throw new AccessorException( e ); - } finally { - writeUnlock(); - } - } - - @Override - public void set(int index, Binding binding, Object value) - throws AccessorException { - assert b.isOpen(); - writeLock(); - try { - setNoflush(index, binding, value); - b.flush(); - } catch (IOException e) { - throw new AccessorException(e); - } finally { - writeUnlock(); - } - } - -} - +/******************************************************************************* + * Copyright (c) 2007, 2011 Association for Decentralized Information Management in + * Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * VTT Technical Research Centre of Finland - initial API and implementation + *******************************************************************************/ +package org.simantics.databoard.accessor.binary; + +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.simantics.databoard.accessor.Accessor; +import org.simantics.databoard.accessor.ArrayAccessor; +import org.simantics.databoard.accessor.CloseableAccessor; +import org.simantics.databoard.accessor.StreamAccessor; +import org.simantics.databoard.accessor.error.AccessorConstructionException; +import org.simantics.databoard.accessor.error.AccessorException; +import org.simantics.databoard.accessor.error.ReferenceException; +import org.simantics.databoard.accessor.event.ArrayElementAdded; +import org.simantics.databoard.accessor.event.ArrayElementRemoved; +import org.simantics.databoard.accessor.event.Event; +import org.simantics.databoard.accessor.event.ValueAssigned; +import org.simantics.databoard.accessor.file.FileArrayAccessor; +import org.simantics.databoard.accessor.impl.AccessorParams; +import org.simantics.databoard.accessor.impl.ListenerEntry; +import org.simantics.databoard.accessor.interestset.ArrayInterestSet; +import org.simantics.databoard.accessor.interestset.InterestSet; +import org.simantics.databoard.accessor.reference.ChildReference; +import org.simantics.databoard.accessor.reference.IndexReference; +import org.simantics.databoard.accessor.reference.LabelReference; +import org.simantics.databoard.adapter.AdaptException; +import org.simantics.databoard.binding.ArrayBinding; +import org.simantics.databoard.binding.Binding; +import org.simantics.databoard.binding.error.BindingException; +import org.simantics.databoard.binding.mutable.MutableVariant; +import org.simantics.databoard.serialization.Serializer; +import org.simantics.databoard.serialization.SerializerConstructionException; +import org.simantics.databoard.type.ArrayType; +import org.simantics.databoard.type.Datatype; +import org.simantics.databoard.util.binary.Blob; +import org.simantics.databoard.util.binary.RandomAccessBinary.ByteSide; + +/** + * Stream Array is accessor to an array where values can be added to + * the end, and where element size is constant. + * + *

+ * The Binary format is different from the normal array format, there is no size + * integer at the beginning of the binary data. Instead the size is derieved + * from the size of the binary data by dividing byte size with element size. + * Therefore, the element size must be constant. + * + * @author Toni Kalajainen + */ +public class BinaryStreamArray extends BinaryObject implements ArrayAccessor, FileArrayAccessor, CloseableAccessor, ArrayAccessor.CloseableArrayAccessor, StreamAccessor { + + /** Accessors to children */ + TreeMap> children = new TreeMap>(); + + /** Element Binding */ + Binding cb; + /** Element Serializer */ + Serializer cs; + /** Element size */ + int elementSize; + + public BinaryStreamArray(BinaryObject parent, Blob blob, Datatype type, AccessorParams params) + throws AccessorException + { + super(parent, blob, type, params); + ArrayType at = (ArrayType) type; + cb = params.bindingScheme.getBindingUnchecked(at.componentType); + cs = params.serializerScheme.getSerializerUnchecked( cb ); + Integer elementConstantSize = cs.getConstantSize(); + if (elementConstantSize == null) { + throw new AccessorException("The size in an element of an AppendableArray must be constant."); + } + elementSize = elementConstantSize; + } + + public ArrayType type() { + return (ArrayType) type; + } + + + /** + * Get existing sub accessor + * @param index + * @return sub-accessor or null + */ + BinaryObject getExistingAccessor(int index) + { + java.lang.ref.Reference ref = children.get(index); + if (ref==null) return null; + BinaryObject res = (BinaryObject) ref.get(); +// if (res==null) children.remove(index); + return res; + } + + /** + * Get start position of a field + * + * @param fieldIndex + * @return + * @throws AccessorException + */ + long getStartPosition(int fieldIndex) throws AccessorException { + return fieldIndex * (long) (elementSize); + } + + long getLength(int index, long pos) throws AccessorException { + return elementSize; + } + + @Override + public void setNoflush(int index, Binding rcb, Object rcv) + throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + + // Write + Serializer rcs = params.serializerScheme.getSerializer( rcb ); + long pos = getStartPosition(index); + b.position(pos); + rcs.serialize(b, null, rcv); + + // Notify + ListenerEntry le = listeners; + while (le!=null) { + ArrayInterestSet is = le.getInterestSet(); + if (is.inNotificationsOf(index)) { + MutableVariant newValue = null; + if (is.inValues()) newValue = new MutableVariant(rcb, rcb.isImmutable() ? rcv : rcb.clone(rcv)); + + Event e = new ValueAssigned(new IndexReference(index), newValue); + emitEvent(le, e); + } + le = le.next; + } + + } catch (IOException e) { + throw new AccessorException(e); + } catch (AdaptException e) { + throw new AccessorException(e); + } catch (SerializerConstructionException e) { + throw new AccessorException(e); + } finally { + writeUnlock(); + } + + } + + /** + * Set all values + * + * @param arrayBinding + * @param newArray + */ + @Override + public void setValueNoflush(Binding arrayBinding, Object newArray) + throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + // Write + ArrayBinding rb = ((ArrayBinding)arrayBinding); + Binding rcb = rb.getComponentBinding(); + Serializer rcs = params.serializerScheme.getSerializer( rcb ); + int oldCount = _size(); + int newCount = rb.size(newArray); + b.setLength( newCount * elementSize ); + + // Serialize + b.position(0L); + for (int index=0; index=newCount; index--) { + BinaryObject sa = getExistingAccessor(index); + if (sa!=null) { + sa.invalidatedNotification(); + children.remove(index); + sa = null; + } + + // Notify changes + ListenerEntry le = listeners; + while (le!=null) { + ArrayInterestSet is = le.getInterestSet(); + if (is.inNotificationsOf(index)) { + Event e = new ArrayElementRemoved(index); + emitEvent(le, e); + } + le = le.next; + } + } + + // Notify new assignment + if (listeners!=null) { + for (int index=0; indexoldCount) throw new AccessorException("Index out of range"); + + long pos = getStartPosition(index); + b.position(pos); + b.insertBytes(elementSize, ByteSide.Left); + rcs.serialize(b, null, rcv); + + // Update child map keys + if (!lastEntry && !children.isEmpty()) { + Integer key = children.lastKey(); + while (key != null && key >= index) { + java.lang.ref.Reference value = children.remove(key); + if (value.get()!=null) children.put(key+1, value); + key = children.lowerKey(key); + } + } + + // Notify Listeners + ListenerEntry le = listeners; + while (le!=null) { + ArrayInterestSet is = le.getInterestSet(); + if (is.inNotifications()) { + MutableVariant newValue = null; + if (is.inValues()) newValue = new MutableVariant(rcb, rcb.isImmutable() ? rcv : rcb.clone(rcv)); + ArrayElementAdded e = new ArrayElementAdded(index, newValue); + emitEvent(le, e); + } + + // Update indices of interest sets + if (is.componentInterests!=null) { + Map oldCis = is.componentInterests; + boolean needUpdates = false; + for (Integer i : oldCis.keySet()) { + needUpdates |= i>=index; + if (needUpdates) break; + } + + if (needUpdates) { + Map newCis = new HashMap(oldCis.size()); + for (Integer i : oldCis.keySet()) + { + Integer oldKey = i; + Integer newKey = i>=index ? i+1 : i; + InterestSet oldValue = oldCis.get(oldKey); + newCis.put(newKey, oldValue); + } + is.componentInterests = newCis; + } + } + + le = le.next; + } + + } catch (IOException e) { + throw new AccessorException(e); + } catch (AdaptException e) { + throw new AccessorException(e); + } catch (SerializerConstructionException e) { + throw new AccessorException(e); + } finally { + writeUnlock(); + } + } + + @Override + public void addNoflush(Binding binding, Object value) + throws AccessorException { + addNoflush(_size(), binding, value); + } + + @Override + public void addAllNoflush(Binding binding, Object[] values) + throws AccessorException { + addAllNoflush(size(), binding, values); + } + + @Override + public void addAllNoflush(int index, Binding rcb, Object[] rcvs) + throws AccessorException { + if (index<0||index>size()) throw new AccessorException("Index out of bounds"); + assert b.isOpen(); + writeLock(); + try { + Serializer rcs = params.serializerScheme.getSerializer( rcb ); + // Write + b.position(0L); + int oldCount = b.readInt(); + int newCount = oldCount + rcvs.length; + if (index>oldCount) throw new AccessorException("Index out of range"); + b.position(0L); + b.writeInt(newCount); + boolean lastEntry = index == oldCount; + + int size = 0; + for (int i=0; i= index) { + java.lang.ref.Reference value = children.remove(key); + if (value.get()!=null) children.put(key+rcvs.length, value); + key = children.lowerKey(key); + } + } + + // Notify Listeners + ListenerEntry le = listeners; + while (le!=null) { + ArrayInterestSet is = le.getInterestSet(); + if (is.inNotifications()) { + for (int i=0; i oldCis = is.componentInterests; + boolean needUpdates = false; + for (Integer i : oldCis.keySet()) { + needUpdates |= i>=index; + if (needUpdates) break; + } + + if (needUpdates) { + Map newCis = new HashMap(oldCis.size()); + for (Integer i : oldCis.keySet()) + { + Integer oldKey = i; + Integer newKey = i>=index ? i+rcvs.length : i; + InterestSet oldValue = oldCis.get(oldKey); + newCis.put(newKey, oldValue); + } + is.componentInterests = newCis; + } + + // Add component interest listener + /* + for (int i = index; isize()) throw new AccessorException("Index out of bounds"); + assert b.isOpen(); + writeLock(); + try { + Serializer rcs = params.serializerScheme.getSerializer( rcb ); + // Write + int oldCount = _size(); + int newCount = oldCount + repeatCount; + if (index>oldCount) throw new AccessorException("Index out of range"); + b.position(0L); + b.writeInt(newCount); + boolean lastEntry = index == oldCount; + + int size = rcs.getSize(obj) * repeatCount; + long pos = getStartPosition(index); + b.position(pos); + b.insertBytes(size, ByteSide.Right); + + b.position(pos); + for (int i=0; i= index) { + java.lang.ref.Reference value = children.remove(key); + if (value.get()!=null) children.put(key+repeatCount, value); + key = children.lowerKey(key); + } + } + + // Notify Listeners + ListenerEntry le = listeners; + while (le!=null) { + ArrayInterestSet is = le.getInterestSet(); + if (is.inNotifications()) { + for (int i=0; i oldCis = is.componentInterests; + boolean needUpdates = false; + for (Integer i : oldCis.keySet()) { + needUpdates |= i>=index; + if (needUpdates) break; + } + + if (needUpdates) { + Map newCis = new HashMap(oldCis.size()); + for (Integer i : oldCis.keySet()) + { + Integer oldKey = i; + Integer newKey = i>=index ? i+repeatCount : i; + InterestSet oldValue = oldCis.get(oldKey); + newCis.put(newKey, oldValue); + } + is.componentInterests = newCis; + } + + } + + le = le.next; + } + + } catch (IOException e) { + throw new AccessorException(e); + } catch (SerializerConstructionException e) { + throw new AccessorException(e); + } finally { + writeUnlock(); + } + } + + @Override + public void removeNoflush(int index, int count) throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + // Write + boolean lastEntry = index == count; + int oldCount = _size(); + if (index<0||index+count>oldCount) throw new AccessorException("Index out of bounds"); + long pos = getStartPosition(index); + long lastPos = getStartPosition(index+count-1); + long lastLen = getLength(index, lastPos); + long end = lastPos + lastLen; + long len = end - pos; + b.position(pos); + b.removeBytes(len, ByteSide.Right); + + // Remove children + SortedMap> sm = children.subMap(index, true, index+count, false); + for (Entry> e : sm.entrySet()) { + BinaryObject bo = e.getValue().get(); + if (bo==null) continue; + bo.invalidatedNotification(); + } + sm.clear(); + + // Update the keys of consecutive children + if (!lastEntry && !children.isEmpty()) { + Integer lastKey = children.lastKey(); + Integer key = children.higherKey(index); + while (key != null && key <= lastKey) { + java.lang.ref.Reference value = children.remove(key); + if (value.get()!=null) children.put(key-count, value); + key = children.higherKey(key); + } + } + + // Notify Listeners + ListenerEntry le = listeners; + while (le!=null) { + ArrayInterestSet is = le.getInterestSet(); + if (is.inNotifications()) { + ArrayElementRemoved e = new ArrayElementRemoved(index); + emitEvent(le, e); + } + + // Update indices of interest sets + if (is.componentInterests!=null) { + Map oldCis = is.componentInterests; + boolean needUpdates = false; + for (Integer i : oldCis.keySet()) { + needUpdates |= i>=index; + if (needUpdates) break; + } + + if (needUpdates) { + Map newCis = new HashMap(oldCis.size()); + for (Integer i : oldCis.keySet()) + { + Integer oldKey = i; + Integer newKey = i>=index ? i-1 : i; + InterestSet oldValue = oldCis.get(oldKey); + newCis.put(newKey, oldValue); + } + is.componentInterests = newCis; + } + } + le = le.next; + } + + + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public Object get(int index, Binding valueBinding) throws AccessorException { + assert b.isOpen(); + readLock(); + try { + long pos = getStartPosition(index); + b.position(pos); + Serializer s = params.serializerScheme.getSerializer(valueBinding); + return s.deserialize(b); + } catch (IOException e) { + throw new AccessorException(e); + } catch (SerializerConstructionException e) { + throw new AccessorException(e); + } finally { + readUnlock(); + } + } + + @Override + public void get(int index, Binding valueBinding, Object dst) throws AccessorException { + assert b.isOpen(); + readLock(); + try { + long pos = getStartPosition(index); + b.position(pos); + Serializer s = params.serializerScheme.getSerializer(valueBinding); + s.deserializeTo(b, dst); + } catch (IOException e) { + throw new AccessorException(e); + } catch (SerializerConstructionException e) { + throw new AccessorException(e); + } finally { + readUnlock(); + } + } + + + @SuppressWarnings("unchecked") + @Override + public T getAccessor(int index) + throws AccessorConstructionException { + assert b.isOpen(); + readLock(); + try { + int count = _size(); + if (index<0 || index>=count) throw new ReferenceException("Element index ("+index+") out of bounds ("+count+")"); + + // Get existing or create new + BinaryObject sa = getExistingAccessor(index); + if (sa==null) { + long pos = getStartPosition(index); + long len = getLength(index, pos); + + // Instantiate correct sub accessor. + sa = createSubAccessor(cb.type(), pos, len, params); + children.put(index, new WeakReference(sa) ); + + // Add component interest sets + ListenerEntry le = listeners; + while (le!=null) { + ArrayInterestSet is = le.getInterestSet(); + + // Generic element interest + InterestSet gis = is.getComponentInterest(); + if (gis != null) { + try { + ChildReference childPath = ChildReference.concatenate(le.path, new IndexReference(index) ); + sa.addListener(le.listener, gis, childPath, le.executor); + } catch (AccessorException e) { + throw new AccessorConstructionException(e); + } + } + + // Specific element interest + InterestSet cis = is.getComponentInterest(index); + if (cis != null) { + try { + ChildReference childPath = ChildReference.concatenate(le.path, new IndexReference(index) ); + sa.addListener(le.listener, cis, childPath,le.executor); + } catch (AccessorException e) { + throw new AccessorConstructionException(e); + } + } + + // Next listener + le = le.next; + } + + } + + return (T) sa; + } catch (AccessorException e) { + throw new AccessorConstructionException(e); + } finally { + readUnlock(); + } + } + + @SuppressWarnings("unchecked") + @Override + public T getComponent(ChildReference reference) + throws AccessorConstructionException { + if (reference==null) return (T) this; + if (reference instanceof LabelReference) { + LabelReference lr = (LabelReference) reference; + try { + Integer index = new Integer( lr.label ); + Accessor result = getAccessor(index); + if (reference.getChildReference() != null) + result = result.getComponent(reference.getChildReference()); + return (T) result; + } catch ( NumberFormatException nfe ) { + throw new ReferenceException(nfe); + } + } else if (reference instanceof IndexReference) { + IndexReference ref = (IndexReference) reference; + int index = ref.getIndex(); + Accessor result = getAccessor(index); + if (reference.getChildReference() != null) + result = result.getComponent(reference.getChildReference()); + return (T) result; + } throw new ReferenceException(reference.getClass().getName()+" is not a reference of an array"); + } + + @Override + public void getAll(Binding valueBinding, Object[] array) + throws AccessorException { + assert b.isOpen(); + readLock(); + try { + int size = _size(); + if (size > array.length) throw new AccessorException("Argument array too short"); + Serializer s = params.serializerScheme.getSerializer(valueBinding); + b.position(0L); + for (int i=0; i values) + throws AccessorException { + assert b.isOpen(); + readLock(); + try { + b.position(0L); + int size = _size(); + Serializer s = params.serializerScheme.getSerializer(valueBinding); + for (int i=0; ioldSize) { + Object dummy = cb.createDefault(); + int count = newSize-oldSize; + addRepeatNoflush(oldSize, cb, dummy, count); + } + + } catch (BindingException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public void setSize(int newSize) throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + setSizeNoflush(newSize); + b.flush(); + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public int size() throws AccessorException { + assert b.isOpen(); + readLock(); + try { + return (int) ( b.length() / elementSize ); + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + readUnlock(); + } + } + + private int _size() throws AccessorException { + try { + return (int) ( b.length() / elementSize ); + } catch (IOException e) { + throw new AccessorException( e ); + } + } + + @Override + Event applyLocal(Event e, boolean makeRollback) throws AccessorException { + Event rollback = null; + if (e instanceof ValueAssigned) { + ValueAssigned va = (ValueAssigned) e; + if (makeRollback) rollback = new ValueAssigned(cb, getValue(cb)); + setValueNoflush(va.newValue.getBinding(), va.newValue.getValue()); + } else + if (e instanceof ArrayElementAdded) { + ArrayElementAdded aa = (ArrayElementAdded) e; + addNoflush(aa.index, aa.value.getBinding(), aa.value.getValue()); + if (makeRollback) rollback = new ArrayElementRemoved(aa.index); + } else if (e instanceof ArrayElementRemoved) { + ArrayElementRemoved ar = (ArrayElementRemoved) e; + if (ar.index<0 || ar.index >=size()) throw new AccessorException("Array index out of bounds"); + if (makeRollback) { + Object cv = get(ar.index, cb); + rollback = new ArrayElementAdded(ar.index, new MutableVariant(cb, cv)); + } + removeNoflush(ar.index, 1); + } else { + throw new AccessorException("Cannot apply "+e.getClass().getName()+" to Array"); + } + + return rollback; + } + + + + @Override + public void add(Binding binding, Object value) throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + addNoflush(binding, value); + b.flush(); + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public void add(int index, Binding binding, Object value) + throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + addNoflush(index, binding, value); + b.flush(); + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public void addAll(Binding binding, Object[] values) + throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + addAllNoflush(binding, values); + b.flush(); + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public void addAll(int index, Binding binding, Object[] values) + throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + addAllNoflush(index, binding, values); + b.flush(); + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public void remove(int index, int count) throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + removeNoflush(index, count); + b.flush(); + } catch (IOException e) { + throw new AccessorException( e ); + } finally { + writeUnlock(); + } + } + + @Override + public void set(int index, Binding binding, Object value) + throws AccessorException { + assert b.isOpen(); + writeLock(); + try { + setNoflush(index, binding, value); + b.flush(); + } catch (IOException e) { + throw new AccessorException(e); + } finally { + writeUnlock(); + } + } + +} +