/******************************************************************************* * Copyright (c) 2007, 2011 Association for Decentralized Information Management in * Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * VTT Technical Research Centre of Finland - initial API and implementation *******************************************************************************/ package org.simantics.databoard.accessor.binary; import java.io.IOException; import java.lang.ref.WeakReference; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.SortedMap; import java.util.TreeMap; import org.simantics.databoard.accessor.Accessor; import org.simantics.databoard.accessor.ArrayAccessor; import org.simantics.databoard.accessor.CloseableAccessor; import org.simantics.databoard.accessor.StreamAccessor; import org.simantics.databoard.accessor.error.AccessorConstructionException; import org.simantics.databoard.accessor.error.AccessorException; import org.simantics.databoard.accessor.error.ReferenceException; import org.simantics.databoard.accessor.event.ArrayElementAdded; import org.simantics.databoard.accessor.event.ArrayElementRemoved; import org.simantics.databoard.accessor.event.Event; import org.simantics.databoard.accessor.event.ValueAssigned; import org.simantics.databoard.accessor.file.FileArrayAccessor; import org.simantics.databoard.accessor.impl.AccessorParams; import org.simantics.databoard.accessor.impl.ListenerEntry; import org.simantics.databoard.accessor.interestset.ArrayInterestSet; import org.simantics.databoard.accessor.interestset.InterestSet; import org.simantics.databoard.accessor.reference.ChildReference; import org.simantics.databoard.accessor.reference.IndexReference; import org.simantics.databoard.accessor.reference.LabelReference; import org.simantics.databoard.adapter.AdaptException; import org.simantics.databoard.binding.ArrayBinding; import org.simantics.databoard.binding.Binding; import org.simantics.databoard.binding.error.BindingException; import org.simantics.databoard.binding.mutable.MutableVariant; import org.simantics.databoard.serialization.Serializer; import org.simantics.databoard.serialization.SerializerConstructionException; import org.simantics.databoard.type.ArrayType; import org.simantics.databoard.type.Datatype; import org.simantics.databoard.util.binary.Blob; import org.simantics.databoard.util.binary.RandomAccessBinary.ByteSide; /** * Stream Array is accessor to an array where values can be added to * the end, and where element size is constant. * *

* The Binary format is different from the normal array format, there is no size * integer at the beginning of the binary data. Instead the size is derieved * from the size of the binary data by dividing byte size with element size. * Therefore, the element size must be constant. * * @author Toni Kalajainen */ public class BinaryStreamArray extends BinaryObject implements ArrayAccessor, FileArrayAccessor, CloseableAccessor, ArrayAccessor.CloseableArrayAccessor, StreamAccessor { /** Accessors to children */ TreeMap> children = new TreeMap>(); /** Element Binding */ Binding cb; /** Element Serializer */ Serializer cs; /** Element size */ int elementSize; public BinaryStreamArray(BinaryObject parent, Blob blob, Datatype type, AccessorParams params) throws AccessorException { super(parent, blob, type, params); ArrayType at = (ArrayType) type; cb = params.bindingScheme.getBindingUnchecked(at.componentType); cs = params.serializerScheme.getSerializerUnchecked( cb ); Integer elementConstantSize = cs.getConstantSize(); if (elementConstantSize == null) { throw new AccessorException("The size in an element of an AppendableArray must be constant."); } elementSize = elementConstantSize; } public ArrayType type() { return (ArrayType) type; } /** * Get existing sub accessor * @param index * @return sub-accessor or null */ BinaryObject getExistingAccessor(int index) { java.lang.ref.Reference ref = children.get(index); if (ref==null) return null; BinaryObject res = (BinaryObject) ref.get(); // if (res==null) children.remove(index); return res; } /** * Get start position of a field * * @param fieldIndex * @return * @throws AccessorException */ long getStartPosition(int fieldIndex) throws AccessorException { return fieldIndex * (long) (elementSize); } long getLength(int index, long pos) throws AccessorException { return elementSize; } @Override public void setNoflush(int index, Binding rcb, Object rcv) throws AccessorException { assert b.isOpen(); writeLock(); try { // Write Serializer rcs = params.serializerScheme.getSerializer( rcb ); long pos = getStartPosition(index); b.position(pos); rcs.serialize(b, null, rcv); // Notify ListenerEntry le = listeners; while (le!=null) { ArrayInterestSet is = le.getInterestSet(); if (is.inNotificationsOf(index)) { MutableVariant newValue = null; if (is.inValues()) newValue = new MutableVariant(rcb, rcb.isImmutable() ? rcv : rcb.clone(rcv)); Event e = new ValueAssigned(new IndexReference(index), newValue); emitEvent(le, e); } le = le.next; } } catch (IOException e) { throw new AccessorException(e); } catch (AdaptException e) { throw new AccessorException(e); } catch (SerializerConstructionException e) { throw new AccessorException(e); } finally { writeUnlock(); } } /** * Set all values * * @param arrayBinding * @param newArray */ @Override public void setValueNoflush(Binding arrayBinding, Object newArray) throws AccessorException { assert b.isOpen(); writeLock(); try { // Write ArrayBinding rb = ((ArrayBinding)arrayBinding); Binding rcb = rb.getComponentBinding(); Serializer rcs = params.serializerScheme.getSerializer( rcb ); int oldCount = _size(); int newCount = rb.size(newArray); b.setLength( newCount * elementSize ); // Serialize b.position(0L); for (int index=0; index=newCount; index--) { BinaryObject sa = getExistingAccessor(index); if (sa!=null) { sa.invalidatedNotification(); children.remove(index); sa = null; } // Notify changes ListenerEntry le = listeners; while (le!=null) { ArrayInterestSet is = le.getInterestSet(); if (is.inNotificationsOf(index)) { Event e = new ArrayElementRemoved(index); emitEvent(le, e); } le = le.next; } } // Notify new assignment if (listeners!=null) { for (int index=0; indexoldCount) throw new AccessorException("Index out of range"); long pos = getStartPosition(index); b.position(pos); b.insertBytes(elementSize, ByteSide.Left); rcs.serialize(b, null, rcv); // Update child map keys if (!lastEntry && !children.isEmpty()) { Integer key = children.lastKey(); while (key != null && key >= index) { java.lang.ref.Reference value = children.remove(key); if (value.get()!=null) children.put(key+1, value); key = children.lowerKey(key); } } // Notify Listeners ListenerEntry le = listeners; while (le!=null) { ArrayInterestSet is = le.getInterestSet(); if (is.inNotifications()) { MutableVariant newValue = null; if (is.inValues()) newValue = new MutableVariant(rcb, rcb.isImmutable() ? rcv : rcb.clone(rcv)); ArrayElementAdded e = new ArrayElementAdded(index, newValue); emitEvent(le, e); } // Update indices of interest sets if (is.componentInterests!=null) { Map oldCis = is.componentInterests; boolean needUpdates = false; for (Integer i : oldCis.keySet()) { needUpdates |= i>=index; if (needUpdates) break; } if (needUpdates) { Map newCis = new HashMap(oldCis.size()); for (Integer i : oldCis.keySet()) { Integer oldKey = i; Integer newKey = i>=index ? i+1 : i; InterestSet oldValue = oldCis.get(oldKey); newCis.put(newKey, oldValue); } is.componentInterests = newCis; } } le = le.next; } } catch (IOException e) { throw new AccessorException(e); } catch (AdaptException e) { throw new AccessorException(e); } catch (SerializerConstructionException e) { throw new AccessorException(e); } finally { writeUnlock(); } } @Override public void addNoflush(Binding binding, Object value) throws AccessorException { addNoflush(_size(), binding, value); } @Override public void addAllNoflush(Binding binding, Object[] values) throws AccessorException { addAllNoflush(size(), binding, values); } @Override public void addAllNoflush(int index, Binding rcb, Object[] rcvs) throws AccessorException { if (index<0||index>size()) throw new AccessorException("Index out of bounds"); assert b.isOpen(); writeLock(); try { Serializer rcs = params.serializerScheme.getSerializer( rcb ); // Write b.position(0L); int oldCount = b.readInt(); int newCount = oldCount + rcvs.length; if (index>oldCount) throw new AccessorException("Index out of range"); b.position(0L); b.writeInt(newCount); boolean lastEntry = index == oldCount; int size = 0; for (int i=0; i= index) { java.lang.ref.Reference value = children.remove(key); if (value.get()!=null) children.put(key+rcvs.length, value); key = children.lowerKey(key); } } // Notify Listeners ListenerEntry le = listeners; while (le!=null) { ArrayInterestSet is = le.getInterestSet(); if (is.inNotifications()) { for (int i=0; i oldCis = is.componentInterests; boolean needUpdates = false; for (Integer i : oldCis.keySet()) { needUpdates |= i>=index; if (needUpdates) break; } if (needUpdates) { Map newCis = new HashMap(oldCis.size()); for (Integer i : oldCis.keySet()) { Integer oldKey = i; Integer newKey = i>=index ? i+rcvs.length : i; InterestSet oldValue = oldCis.get(oldKey); newCis.put(newKey, oldValue); } is.componentInterests = newCis; } // Add component interest listener /* for (int i = index; isize()) throw new AccessorException("Index out of bounds"); assert b.isOpen(); writeLock(); try { Serializer rcs = params.serializerScheme.getSerializer( rcb ); // Write int oldCount = _size(); int newCount = oldCount + repeatCount; if (index>oldCount) throw new AccessorException("Index out of range"); b.position(0L); b.writeInt(newCount); boolean lastEntry = index == oldCount; int size = rcs.getSize(obj) * repeatCount; long pos = getStartPosition(index); b.position(pos); b.insertBytes(size, ByteSide.Right); b.position(pos); for (int i=0; i= index) { java.lang.ref.Reference value = children.remove(key); if (value.get()!=null) children.put(key+repeatCount, value); key = children.lowerKey(key); } } // Notify Listeners ListenerEntry le = listeners; while (le!=null) { ArrayInterestSet is = le.getInterestSet(); if (is.inNotifications()) { for (int i=0; i oldCis = is.componentInterests; boolean needUpdates = false; for (Integer i : oldCis.keySet()) { needUpdates |= i>=index; if (needUpdates) break; } if (needUpdates) { Map newCis = new HashMap(oldCis.size()); for (Integer i : oldCis.keySet()) { Integer oldKey = i; Integer newKey = i>=index ? i+repeatCount : i; InterestSet oldValue = oldCis.get(oldKey); newCis.put(newKey, oldValue); } is.componentInterests = newCis; } } le = le.next; } } catch (IOException e) { throw new AccessorException(e); } catch (SerializerConstructionException e) { throw new AccessorException(e); } finally { writeUnlock(); } } @Override public void removeNoflush(int index, int count) throws AccessorException { assert b.isOpen(); writeLock(); try { // Write boolean lastEntry = index == count; int oldCount = _size(); if (index<0||index+count>oldCount) throw new AccessorException("Index out of bounds"); long pos = getStartPosition(index); long lastPos = getStartPosition(index+count-1); long lastLen = getLength(index, lastPos); long end = lastPos + lastLen; long len = end - pos; b.position(pos); b.removeBytes(len, ByteSide.Right); // Remove children SortedMap> sm = children.subMap(index, true, index+count, false); for (Entry> e : sm.entrySet()) { BinaryObject bo = e.getValue().get(); if (bo==null) continue; bo.invalidatedNotification(); } sm.clear(); // Update the keys of consecutive children if (!lastEntry && !children.isEmpty()) { Integer lastKey = children.lastKey(); Integer key = children.higherKey(index); while (key != null && key <= lastKey) { java.lang.ref.Reference value = children.remove(key); if (value.get()!=null) children.put(key-count, value); key = children.higherKey(key); } } // Notify Listeners ListenerEntry le = listeners; while (le!=null) { ArrayInterestSet is = le.getInterestSet(); if (is.inNotifications()) { ArrayElementRemoved e = new ArrayElementRemoved(index); emitEvent(le, e); } // Update indices of interest sets if (is.componentInterests!=null) { Map oldCis = is.componentInterests; boolean needUpdates = false; for (Integer i : oldCis.keySet()) { needUpdates |= i>=index; if (needUpdates) break; } if (needUpdates) { Map newCis = new HashMap(oldCis.size()); for (Integer i : oldCis.keySet()) { Integer oldKey = i; Integer newKey = i>=index ? i-1 : i; InterestSet oldValue = oldCis.get(oldKey); newCis.put(newKey, oldValue); } is.componentInterests = newCis; } } le = le.next; } } catch (IOException e) { throw new AccessorException( e ); } finally { writeUnlock(); } } @Override public Object get(int index, Binding valueBinding) throws AccessorException { assert b.isOpen(); readLock(); try { long pos = getStartPosition(index); b.position(pos); Serializer s = params.serializerScheme.getSerializer(valueBinding); return s.deserialize(b); } catch (IOException e) { throw new AccessorException(e); } catch (SerializerConstructionException e) { throw new AccessorException(e); } finally { readUnlock(); } } @Override public void get(int index, Binding valueBinding, Object dst) throws AccessorException { assert b.isOpen(); readLock(); try { long pos = getStartPosition(index); b.position(pos); Serializer s = params.serializerScheme.getSerializer(valueBinding); s.deserializeTo(b, dst); } catch (IOException e) { throw new AccessorException(e); } catch (SerializerConstructionException e) { throw new AccessorException(e); } finally { readUnlock(); } } @SuppressWarnings("unchecked") @Override public T getAccessor(int index) throws AccessorConstructionException { assert b.isOpen(); readLock(); try { int count = _size(); if (index<0 || index>=count) throw new ReferenceException("Element index ("+index+") out of bounds ("+count+")"); // Get existing or create new BinaryObject sa = getExistingAccessor(index); if (sa==null) { long pos = getStartPosition(index); long len = getLength(index, pos); // Instantiate correct sub accessor. sa = createSubAccessor(cb.type(), pos, len, params); children.put(index, new WeakReference(sa) ); // Add component interest sets ListenerEntry le = listeners; while (le!=null) { ArrayInterestSet is = le.getInterestSet(); // Generic element interest InterestSet gis = is.getComponentInterest(); if (gis != null) { try { ChildReference childPath = ChildReference.concatenate(le.path, new IndexReference(index) ); sa.addListener(le.listener, gis, childPath, le.executor); } catch (AccessorException e) { throw new AccessorConstructionException(e); } } // Specific element interest InterestSet cis = is.getComponentInterest(index); if (cis != null) { try { ChildReference childPath = ChildReference.concatenate(le.path, new IndexReference(index) ); sa.addListener(le.listener, cis, childPath,le.executor); } catch (AccessorException e) { throw new AccessorConstructionException(e); } } // Next listener le = le.next; } } return (T) sa; } catch (AccessorException e) { throw new AccessorConstructionException(e); } finally { readUnlock(); } } @SuppressWarnings("unchecked") @Override public T getComponent(ChildReference reference) throws AccessorConstructionException { if (reference==null) return (T) this; if (reference instanceof LabelReference) { LabelReference lr = (LabelReference) reference; try { Integer index = new Integer( lr.label ); Accessor result = getAccessor(index); if (reference.getChildReference() != null) result = result.getComponent(reference.getChildReference()); return (T) result; } catch ( NumberFormatException nfe ) { throw new ReferenceException(nfe); } } else if (reference instanceof IndexReference) { IndexReference ref = (IndexReference) reference; int index = ref.getIndex(); Accessor result = getAccessor(index); if (reference.getChildReference() != null) result = result.getComponent(reference.getChildReference()); return (T) result; } throw new ReferenceException(reference.getClass().getName()+" is not a reference of an array"); } @Override public void getAll(Binding valueBinding, Object[] array) throws AccessorException { assert b.isOpen(); readLock(); try { int size = _size(); if (size > array.length) throw new AccessorException("Argument array too short"); Serializer s = params.serializerScheme.getSerializer(valueBinding); b.position(0L); for (int i=0; i values) throws AccessorException { assert b.isOpen(); readLock(); try { b.position(0L); int size = _size(); Serializer s = params.serializerScheme.getSerializer(valueBinding); for (int i=0; ioldSize) { Object dummy = cb.createDefault(); int count = newSize-oldSize; addRepeatNoflush(oldSize, cb, dummy, count); } } catch (BindingException e) { throw new AccessorException( e ); } finally { writeUnlock(); } } @Override public void setSize(int newSize) throws AccessorException { assert b.isOpen(); writeLock(); try { setSizeNoflush(newSize); b.flush(); } catch (IOException e) { throw new AccessorException( e ); } finally { writeUnlock(); } } @Override public int size() throws AccessorException { assert b.isOpen(); readLock(); try { return (int) ( b.length() / elementSize ); } catch (IOException e) { throw new AccessorException( e ); } finally { readUnlock(); } } private int _size() throws AccessorException { try { return (int) ( b.length() / elementSize ); } catch (IOException e) { throw new AccessorException( e ); } } @Override Event applyLocal(Event e, boolean makeRollback) throws AccessorException { Event rollback = null; if (e instanceof ValueAssigned) { ValueAssigned va = (ValueAssigned) e; if (makeRollback) rollback = new ValueAssigned(cb, getValue(cb)); setValueNoflush(va.newValue.getBinding(), va.newValue.getValue()); } else if (e instanceof ArrayElementAdded) { ArrayElementAdded aa = (ArrayElementAdded) e; addNoflush(aa.index, aa.value.getBinding(), aa.value.getValue()); if (makeRollback) rollback = new ArrayElementRemoved(aa.index); } else if (e instanceof ArrayElementRemoved) { ArrayElementRemoved ar = (ArrayElementRemoved) e; if (ar.index<0 || ar.index >=size()) throw new AccessorException("Array index out of bounds"); if (makeRollback) { Object cv = get(ar.index, cb); rollback = new ArrayElementAdded(ar.index, new MutableVariant(cb, cv)); } removeNoflush(ar.index, 1); } else { throw new AccessorException("Cannot apply "+e.getClass().getName()+" to Array"); } return rollback; } @Override public void add(Binding binding, Object value) throws AccessorException { assert b.isOpen(); writeLock(); try { addNoflush(binding, value); b.flush(); } catch (IOException e) { throw new AccessorException( e ); } finally { writeUnlock(); } } @Override public void add(int index, Binding binding, Object value) throws AccessorException { assert b.isOpen(); writeLock(); try { addNoflush(index, binding, value); b.flush(); } catch (IOException e) { throw new AccessorException( e ); } finally { writeUnlock(); } } @Override public void addAll(Binding binding, Object[] values) throws AccessorException { assert b.isOpen(); writeLock(); try { addAllNoflush(binding, values); b.flush(); } catch (IOException e) { throw new AccessorException( e ); } finally { writeUnlock(); } } @Override public void addAll(int index, Binding binding, Object[] values) throws AccessorException { assert b.isOpen(); writeLock(); try { addAllNoflush(index, binding, values); b.flush(); } catch (IOException e) { throw new AccessorException( e ); } finally { writeUnlock(); } } @Override public void remove(int index, int count) throws AccessorException { assert b.isOpen(); writeLock(); try { removeNoflush(index, count); b.flush(); } catch (IOException e) { throw new AccessorException( e ); } finally { writeUnlock(); } } @Override public void set(int index, Binding binding, Object value) throws AccessorException { assert b.isOpen(); writeLock(); try { setNoflush(index, binding, value); b.flush(); } catch (IOException e) { throw new AccessorException(e); } finally { writeUnlock(); } } }